1
0
forked from sass/tipibot
Files
tipibot/bot.py
2026-04-20 12:09:39 +03:00

898 lines
31 KiB
Python

"""TipiLAN Bot - Discord member management powered by Google Sheets."""
import asyncio
import collections
import datetime
import json
import logging
import logging.handlers
import math
import re
import time
from pathlib import Path
from zoneinfo import ZoneInfo
import discord
from discord import app_commands
from discord.ext import tasks
import colorlog
import psutil
import config
import strings as S
import economy
import pb_client
import sheets
from dev_member_commands import register_dev_member_commands
from dev_member_runtime import handle_member_join, run_birthday_daily
from economy_admin_commands import register_economy_admin_commands
from economy_extra_commands import register_economy_extra_commands
from economy_fish_commands import register_economy_fish_commands
from economy_games_commands import register_economy_games_commands
from economy_income_commands import register_economy_income_commands
from economy_prestige_commands import register_prestige_commands
from economy_profile_commands import register_economy_profile_commands
from economy_support_commands import register_economy_support_commands
from ops_channel_commands import register_ops_channel_commands
from ops_admin_commands import register_ops_admin_commands
from member_sync import SyncResult
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
_LOG_DIR = Path("logs") / config.BOT_PROFILE
_LOG_DIR.mkdir(parents=True, exist_ok=True)
_fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
_txn_fmt = logging.Formatter("%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
# Console handler - coloured output
_color_fmt = colorlog.ColoredFormatter(
"%(log_color)s%(asctime)s [%(levelname)-8s]%(reset)s %(cyan)s%(name)s%(reset)s: %(message)s",
log_colors={
"DEBUG": "white",
"INFO": "green",
"WARNING": "yellow,bold",
"ERROR": "red,bold",
"CRITICAL": "red,bg_white,bold",
},
)
_console = logging.StreamHandler()
_console.setFormatter(_color_fmt)
_console.setLevel(logging.INFO)
# General rotating file: logs/bot.log (5 MB x 5 backups)
_file_h = logging.handlers.RotatingFileHandler(
_LOG_DIR / "bot.log", maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8"
)
_file_h.setFormatter(_fmt)
_file_h.setLevel(logging.INFO)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(_console)
logging.getLogger().addHandler(_file_h)
# Transaction log: logs/transactions.log (daily rotation, 30 days)
_txn_h = logging.handlers.TimedRotatingFileHandler(
_LOG_DIR / "transactions.log", when="midnight", backupCount=30, encoding="utf-8"
)
_txn_h.setFormatter(_txn_fmt)
_txn_h.setLevel(logging.INFO)
_txn_logger = logging.getLogger("tipiCOIN.txn")
_txn_logger.addHandler(_txn_h)
_txn_logger.propagate = False # don't double-log to console/bot.log
log = logging.getLogger("tipilan")
# ---------------------------------------------------------------------------
# Bot setup
# ---------------------------------------------------------------------------
intents = discord.Intents.default()
intents.members = True # Required: Server Members Intent must be ON in dev portal
bot = discord.Client(intents=intents)
tree = app_commands.CommandTree(bot)
GUILD_OBJ = discord.Object(id=config.GUILD_ID)
IS_DEV_PROFILE = config.BOT_PROFILE == "dev"
TALLINN_TZ = ZoneInfo("Europe/Tallinn")
_start_time = datetime.datetime.now()
_process = psutil.Process()
_DATA_DIR = Path("data") / config.BOT_PROFILE
_DATA_DIR.mkdir(parents=True, exist_ok=True)
_active_games: set[int] = set() # users with an in-progress interactive game
# heist global CD is persisted on the house record in PocketBase (see economy.get/set_heist_global_cd)
_spam_tracker: dict[int, collections.deque] = {} # user_id -> deque of recent income-cmd timestamps
_SPAM_WINDOW = 5.0 # seconds
_SPAM_THRESHOLD = 5 # income commands within window triggers jail
_gamble_cooldowns: dict[int, float] = {} # user_id -> monotonic timestamp of last gamble
_GAMBLE_CD = 30 # seconds (default gambling cooldown)
_GAMBLE_CD_360 = 25 # seconds (with monitor_360 item)
_BDAY_LOG = _DATA_DIR / "birthday_sent.json"
_RESTART_FILE = _DATA_DIR / "restart_channel.json"
_BOT_CONFIG = _DATA_DIR / "bot_config.json"
_PAUSED = False # maintenance mode: blocks non-admin commands when True
_DEV_ONLY_COMMANDS: tuple[str, ...] = ("birthdays", "check", "member")
def _apply_profile_command_filters() -> None:
"""Remove commands that should not exist for the active bot profile."""
if IS_DEV_PROFILE:
return
for name in _DEV_ONLY_COMMANDS:
removed = tree.remove_command(name)
if removed:
log.info("Profile '%s': removed dev-only command /%s", config.BOT_PROFILE, name)
def _load_bot_config() -> dict:
if _BOT_CONFIG.exists():
try:
return json.loads(_BOT_CONFIG.read_text(encoding="utf-8"))
except Exception:
pass
return {"allowed_channels": []}
def _save_bot_config(cfg: dict) -> None:
_DATA_DIR.mkdir(exist_ok=True)
_BOT_CONFIG.write_text(json.dumps(cfg, indent=2), encoding="utf-8")
def _get_allowed_channels() -> list[int]:
return [int(c) for c in _load_bot_config().get("allowed_channels", [])]
def _set_allowed_channels(channel_ids: list[int]) -> None:
cfg = _load_bot_config()
cfg["allowed_channels"] = [str(c) for c in channel_ids]
_save_bot_config(cfg)
def _get_paused() -> bool:
return _PAUSED
def _set_paused(value: bool) -> None:
global _PAUSED
_PAUSED = value
def _member_cache_size() -> int:
return len(sheets.get_cache())
# ---------------------------------------------------------------------------
# EXP / Level role helpers
# ---------------------------------------------------------------------------
def _level_role_name(level: int) -> str:
return economy.level_role_name(level)
async def _apply_level_role(member: discord.Member, new_level: int, old_level: int) -> None:
"""Swap vanity role when the user crosses a tier boundary."""
new_role_name = _level_role_name(new_level)
old_role_name = _level_role_name(old_level)
if new_role_name == old_role_name:
return
guild = member.guild
old_role = discord.utils.find(lambda r: r.name == old_role_name, guild.roles)
if old_role and old_role in member.roles:
try:
await member.remove_roles(old_role, reason="Level up")
except discord.Forbidden:
pass
new_role = discord.utils.find(lambda r: r.name == new_role_name, guild.roles)
if new_role:
try:
await member.add_roles(new_role, reason="Level up")
except discord.Forbidden:
pass
async def _ensure_level_role(member: discord.Member, level: int) -> None:
"""Make sure the user has exactly the right vanity role + ECONOMY base role (idempotent)."""
correct_name = _level_role_name(level)
all_role_names = {name for _, name in economy.LEVEL_ROLES}
guild = member.guild
for role in member.roles:
if role.name in all_role_names and role.name != correct_name:
try:
await member.remove_roles(role, reason="Role sync")
except discord.Forbidden:
pass
correct_role = discord.utils.find(lambda r: r.name == correct_name, guild.roles)
if correct_role and correct_role not in member.roles:
try:
await member.add_roles(correct_role, reason="Role sync")
except discord.Forbidden:
pass
economy_role = discord.utils.find(lambda r: r.name == economy.ECONOMY_ROLE, guild.roles)
if economy_role and economy_role not in member.roles:
try:
await member.add_roles(economy_role, reason="Economy member")
except discord.Forbidden:
pass
async def _award_exp(interaction: discord.Interaction, amount: int) -> None:
"""Award EXP and post a public level-up notice if the user reaches a new tier."""
result = await economy.award_exp(interaction.user.id, amount)
member = interaction.guild.get_member(interaction.user.id) if interaction.guild else None
if member:
economy_role = discord.utils.find(lambda r: r.name == economy.ECONOMY_ROLE, interaction.guild.roles)
if economy_role and economy_role not in member.roles:
try:
await member.add_roles(economy_role, reason="Economy member")
except discord.Forbidden:
pass
if result["new_level"] <= result["old_level"]:
return
if member:
await _apply_level_role(member, result["new_level"], result["old_level"])
new_role = _level_role_name(result["new_level"])
old_role = _level_role_name(result["old_level"])
extra = S.MSG_LEVELUP_ROLE.format(role=new_role) if new_role != old_role else ""
try:
await interaction.followup.send(
S.MSG_LEVELUP.format(name=interaction.user.display_name, level=result["new_level"], extra=extra),
ephemeral=False,
)
except Exception:
pass
@tree.interaction_check
async def _log_command(interaction: discord.Interaction) -> bool:
"""Log every slash command invocation and enforce allowed-channel restriction."""
if interaction.command:
opts = interaction.data.get("options", [])
opts_str = " ".join(f"{o['name']}={o.get('value', '?')}" for o in opts) if opts else ""
log.info(
"CMD /%s user=%s (%s)%s",
interaction.command.name,
interaction.user.id,
interaction.user.display_name,
f" [{opts_str}]" if opts_str else "",
)
# DMs always pass
if interaction.guild is None:
return True
# Admins (manage_guild) can use commands in any channel (and bypass pause)
member = interaction.user
if hasattr(member, "guild_permissions") and member.guild_permissions.manage_guild:
return True
# Maintenance mode: block all non-admin commands
if _PAUSED:
await interaction.response.send_message(S.MSG_MAINTENANCE, ephemeral=True)
return False
allowed = _get_allowed_channels()
if not allowed:
return True # no restriction configured
if interaction.channel_id in allowed:
return True
mentions = " ".join(f"<#{cid}>" for cid in allowed)
await interaction.response.send_message(
S.ERR["channel_only"].format(channels=mentions),
ephemeral=True,
)
return False
def _load_bday_log() -> dict:
try:
return json.loads(_BDAY_LOG.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _has_announced_today(discord_id: int) -> bool:
today = str(datetime.date.today())
return str(discord_id) in _load_bday_log().get(today, [])
def _mark_announced_today(discord_id: int) -> None:
log_data = _load_bday_log()
today = str(datetime.date.today())
today_list = log_data.setdefault(today, [])
uid = str(discord_id)
if uid not in today_list:
today_list.append(uid)
cutoff = str(datetime.date.today() - datetime.timedelta(days=2))
log_data = {k: v for k, v in log_data.items() if k >= cutoff}
_BDAY_LOG.write_text(json.dumps(log_data), encoding="utf-8")
# ---------------------------------------------------------------------------
# Daily 09:00 Tallinn-time birthday task
# ---------------------------------------------------------------------------
@tasks.loop(time=datetime.time(hour=9, minute=0, tzinfo=ZoneInfo("Europe/Tallinn")))
async def birthday_daily():
"""Announce birthdays every day at 09:00 Tallinn time."""
if not IS_DEV_PROFILE:
return
await run_birthday_daily(
bot,
log,
has_announced_today=_has_announced_today,
mark_announced_today=_mark_announced_today,
)
@birthday_daily.before_loop
async def before_birthday_daily():
await bot.wait_until_ready()
# ---------------------------------------------------------------------------
# Rotating rich presence
# ---------------------------------------------------------------------------
_presence_index = 0
_economy_count: int = 0
_PRESENCES: list = [
lambda g: discord.Activity(
type=discord.ActivityType.watching,
name="/help - kõik käsklused",
state="Vaata, mida TipiBOTil pakkuda on",
),
lambda g: discord.Activity(
type=discord.ActivityType.watching,
name="/daily - päevane boonus TipiCOINe",
state="Streak boonused kuni x3.0 rohkem 🔥",
),
lambda g: discord.Activity(
type=discord.ActivityType.watching,
name=f"{_economy_count - 1 or '?'} mängijat võistlevad",
state="/leaderboard - kes on tipus?",
),
]
@tasks.loop(seconds=20)
async def _rotate_presence() -> None:
global _presence_index, _economy_count
guild = bot.get_guild(config.GUILD_ID)
try:
_economy_count = await pb_client.count_records()
except Exception as e:
log.warning("Presence: failed to fetch economy count: %s", e)
activity = _PRESENCES[_presence_index % len(_PRESENCES)](guild)
await bot.change_presence(status=discord.Status.online, activity=activity)
_presence_index += 1
@_rotate_presence.before_loop
async def _before_rotate_presence():
await bot.wait_until_ready()
# ---------------------------------------------------------------------------
# Events
# ---------------------------------------------------------------------------
@bot.event
async def on_ready():
"""Load sheet data and sync slash commands on startup."""
log.info("Logged in as %s (ID: %s)", bot.user, bot.user.id)
economy.set_house(bot.user.id)
_apply_profile_command_filters()
# Pull sheet data into cache
if IS_DEV_PROFILE:
try:
data = sheets.refresh()
log.info("Loaded %d member rows from Google Sheets", len(data))
except Exception as e:
log.error("Failed to load sheet on startup: %s", e)
# Sync slash commands to the guild only; wipe any leftover global registrations
tree.copy_global_to(guild=GUILD_OBJ)
await tree.sync(guild=GUILD_OBJ)
tree.clear_commands(guild=None)
await tree.sync()
log.info("Slash commands synced to guild %s (global commands cleared)", config.GUILD_ID)
# Start daily birthday task
if IS_DEV_PROFILE and not birthday_daily.is_running():
birthday_daily.start()
log.info("Birthday daily task started (fires 09:00 Tallinn time)")
# Start rotating rich presence
if not _rotate_presence.is_running():
_rotate_presence.start()
log.info("Rich presence rotation started")
# Re-schedule any reminder tasks lost on restart
await _restore_reminders()
# Notify the channel where /restart was triggered
if _RESTART_FILE.exists():
try:
data = json.loads(_RESTART_FILE.read_text(encoding="utf-8"))
ch = await bot.fetch_channel(int(data["channel_id"]))
if ch:
await ch.send(S.MSG_RESTART_DONE)
except Exception as e:
log.warning("Could not send restart notification: %s", e)
finally:
_RESTART_FILE.unlink(missing_ok=True)
@bot.event
async def on_disconnect():
log.warning("Bot disconnected from Discord gateway")
@bot.event
async def on_resumed():
log.info("Bot reconnected to Discord (session resumed)")
@bot.event
async def on_member_join(member: discord.Member):
"""When someone joins, look them up in the sheet and sync."""
if not IS_DEV_PROFILE:
return
await handle_member_join(
member,
bot,
log,
has_announced_today=_has_announced_today,
mark_announced_today=_mark_announced_today,
log_sync_result=_log_sync_result,
)
# ---------------------------------------------------------------------------
# Slash commands
# ---------------------------------------------------------------------------
if IS_DEV_PROFILE:
register_dev_member_commands(
tree,
bot,
log,
has_announced_today=_has_announced_today,
mark_announced_today=_mark_announced_today,
)
register_ops_admin_commands(
tree,
bot,
log,
process=_process,
start_time=_start_time,
log_dir=_LOG_DIR,
guild_obj=GUILD_OBJ,
restart_file=_RESTART_FILE,
get_member_cache_size=_member_cache_size,
get_paused=_get_paused,
set_paused=_set_paused,
count_economy_users=pb_client.count_records,
)
register_ops_channel_commands(
tree,
bot,
log,
get_allowed_channels=_get_allowed_channels,
set_allowed_channels=_set_allowed_channels,
)
@tree.command(name="ping", description=S.CMD["ping"])
async def cmd_ping(interaction: discord.Interaction):
await interaction.response.send_message(S.MSG_PONG)
# ---------------------------------------------------------------------------
# /help
# ---------------------------------------------------------------------------
_HELP_PAGE_SIZE = 10
_DEV_ONLY_HELP_TOKENS: tuple[str, ...] = tuple(f"/{name}" for name in _DEV_ONLY_COMMANDS)
def _visible_help_fields(category_key: str) -> list[tuple[str, str]]:
fields: list[tuple[str, str]] = list(S.HELP_CATEGORIES[category_key]["fields"])
if IS_DEV_PROFILE:
return fields
visible: list[tuple[str, str]] = []
for name, value in fields:
blob = f"{name}\n{value}".lower()
if any(tok in blob for tok in _DEV_ONLY_HELP_TOKENS):
continue
visible.append((name, value))
return visible
def _help_embed(category_key: str, page: int = 0) -> discord.Embed:
cat = S.HELP_CATEGORIES[category_key]
fields = _visible_help_fields(category_key)
total_pages = max(1, math.ceil(len(fields) / _HELP_PAGE_SIZE))
page = max(0, min(page, total_pages - 1))
page_fields = fields[page * _HELP_PAGE_SIZE : (page + 1) * _HELP_PAGE_SIZE]
title = cat["label"]
if total_pages > 1:
title += f" ({page + 1}/{total_pages})"
embed = discord.Embed(title=title, description=cat["description"], color=cat["color"])
for name, value in page_fields:
embed.add_field(name=name, value=value, inline=False)
embed.set_footer(text=S.HELP_UI["footer"])
return embed
class HelpView(discord.ui.View):
def __init__(self, is_admin: bool = False, category: str = "üldine", page: int = 0):
super().__init__(timeout=120)
self.is_admin = is_admin
self.category = category
self.page = page
self._rebuild()
def _rebuild(self) -> None:
self.clear_items()
fields = _visible_help_fields(self.category)
total_pages = max(1, math.ceil(len(fields) / _HELP_PAGE_SIZE))
self.page = max(0, min(self.page, total_pages - 1))
select_row = 0
if total_pages > 1:
select_row = 1
prev_btn = discord.ui.Button(
label="", style=discord.ButtonStyle.secondary,
disabled=(self.page == 0), row=0,
)
prev_btn.callback = self._prev
next_btn = discord.ui.Button(
label="", style=discord.ButtonStyle.secondary,
disabled=(self.page >= total_pages - 1), row=0,
)
next_btn.callback = self._next
self.add_item(prev_btn)
self.add_item(next_btn)
self.add_item(HelpSelect(self.is_admin, self.category, row=select_row))
async def _prev(self, interaction: discord.Interaction) -> None:
self.page = max(0, self.page - 1)
self._rebuild()
await interaction.response.edit_message(embed=_help_embed(self.category, self.page), view=self)
async def _next(self, interaction: discord.Interaction) -> None:
total = max(1, math.ceil(len(_visible_help_fields(self.category)) / _HELP_PAGE_SIZE))
self.page = min(total - 1, self.page + 1)
self._rebuild()
await interaction.response.edit_message(embed=_help_embed(self.category, self.page), view=self)
class HelpSelect(discord.ui.Select):
def __init__(self, is_admin: bool = False, current: str = "üldine", row: int = 0):
options = [
discord.SelectOption(
label=v["label"], value=k, description=v["description"],
default=(k == current),
)
for k, v in S.HELP_CATEGORIES.items()
if k != "admin" or is_admin
]
super().__init__(placeholder=S.HELP_UI["select_placeholder"], options=options, row=row)
async def callback(self, interaction: discord.Interaction) -> None:
view = self.view
view.category = self.values[0]
view.page = 0
view._rebuild()
await interaction.response.edit_message(embed=_help_embed(view.category, 0), view=view)
@tree.command(name="help", description=S.CMD["help"])
async def cmd_help(interaction: discord.Interaction):
perms = interaction.user.guild_permissions if interaction.guild else None
is_admin = bool(perms and (perms.manage_roles or perms.manage_guild))
await interaction.response.send_message(
embed=_help_embed("üldine"), view=HelpView(is_admin), ephemeral=True
)
# ---------------------------------------------------------------------------
# TipiBOT economy commands
# ---------------------------------------------------------------------------
def _coin(amount: int) -> str:
return f"**{amount:,}** {economy.COIN}"
def _cd_ts(remaining: datetime.timedelta) -> str:
"""Discord relative timestamp string for when a cooldown expires."""
expiry = int((datetime.datetime.now(datetime.timezone.utc) + remaining).timestamp())
return f"<t:{expiry}:R>"
register_economy_admin_commands(
tree,
bot,
log,
cd_ts=_cd_ts,
apply_level_role=_apply_level_role,
)
def _gamble_cd(uid: int, has_360: bool = False) -> datetime.timedelta | None:
"""Check and set gambling cooldown. Returns remaining time if on CD, else None."""
cd = float(_GAMBLE_CD_360 if has_360 else _GAMBLE_CD)
now = time.monotonic()
remaining = cd - (now - _gamble_cooldowns.get(uid, 0.0))
if remaining > 0:
return datetime.timedelta(seconds=remaining)
_gamble_cooldowns[uid] = now
return None
async def _check_cmd_rate(interaction: discord.Interaction) -> bool:
"""Record an income command use. Jails the user and returns True if spam is detected."""
uid = interaction.user.id
now = time.monotonic()
q = _spam_tracker.setdefault(uid, collections.deque())
q.append(now)
while q and now - q[0] > _SPAM_WINDOW:
q.popleft()
if len(q) >= _SPAM_THRESHOLD:
q.clear()
await economy.do_spam_jail(uid)
await interaction.response.send_message(S.MSG_SPAM_JAIL, ephemeral=True)
return True
return False
register_prestige_commands(
tree,
check_cmd_rate=_check_cmd_rate,
ensure_level_role=_ensure_level_role,
)
def _parse_amount(value: str, balance: int) -> tuple[int | None, str | None]:
"""Parse an amount string; 'all' resolves to the user's full balance.
Accepts plain integers and valid thousand-separated numbers (1,000 / 1.000 / 1 000).
Rejects decimals and ambiguous inputs like 1,1 or 1.5.
Returns (amount, None) on success or (None, error_msg) on failure."""
v = value.strip()
if v.lower() == "all":
return balance, None
# Strip valid thousand separators: groups of exactly 3 digits after separator
if re.fullmatch(r'\d{1,3}([,. ]\d{3})*', v):
v = re.sub(r'[,. ]', '', v)
try:
return int(v), None
except ValueError:
return None, S.ERR["invalid_amount"]
# ---------------------------------------------------------------------------
# Reminder system
# ---------------------------------------------------------------------------
_reminder_tasks: dict[tuple[int, str], asyncio.Task] = {}
def _cancel_reminder_task(user_id: int, cmd: str) -> None:
task = _reminder_tasks.pop((user_id, cmd), None)
if task and not task.done():
task.cancel()
def _schedule_reminder(user_id: int, cmd: str, delay: datetime.timedelta) -> None:
"""DM the user when their cooldown expires. Replaces any existing task."""
async def _remind():
await asyncio.sleep(delay.total_seconds())
user = bot.get_user(user_id)
if user is None:
try:
user = await bot.fetch_user(user_id)
except (discord.NotFound, discord.HTTPException):
user = None
if user:
try:
await user.send(
S.MSG_REMINDER.format(cmd=cmd)
)
except (discord.Forbidden, discord.HTTPException):
pass
_reminder_tasks.pop((user_id, cmd), None)
key = (user_id, cmd)
existing = _reminder_tasks.get(key)
if existing and not existing.done():
existing.cancel()
_reminder_tasks[key] = asyncio.create_task(_remind())
_REMINDER_COOLDOWN_KEYS: dict[str, str] = {
"daily": "last_daily",
"work": "last_work",
"beg": "last_beg",
"crime": "last_crime",
"rob": "last_rob",
"fish": "last_fish",
}
async def _restore_reminders() -> None:
"""Re-schedule reminder tasks lost when the bot restarted."""
now = datetime.datetime.now(datetime.timezone.utc)
restored = 0
for uid_str, user in (await economy.get_all_users_raw()).items():
reminders = user.get("reminders", [])
if not reminders:
continue
user_id = int(uid_str)
for cmd in reminders:
last_key = _REMINDER_COOLDOWN_KEYS.get(cmd)
if not last_key:
continue
last_str = user.get(last_key)
if not last_str:
continue
items = user.get("items", [])
if cmd == "work" and "monitor" in items:
cooldown = datetime.timedelta(minutes=40)
elif cmd == "beg" and "hiirematt" in items:
cooldown = datetime.timedelta(minutes=3)
elif cmd == "daily" and "korvaklapid" in items:
cooldown = datetime.timedelta(hours=18)
elif cmd == "fish" and "ussipurk" in items:
cooldown = datetime.timedelta(seconds=90)
else:
cooldown = economy.COOLDOWNS.get(cmd)
if not cooldown:
continue
last_dt = datetime.datetime.fromisoformat(last_str)
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=datetime.timezone.utc)
remaining = (last_dt + cooldown) - now
if remaining.total_seconds() > 0:
_schedule_reminder(user_id, cmd, remaining)
restored += 1
if restored:
log.info("Restored %d reminder task(s) after restart", restored)
async def _maybe_remind(user_id: int, cmd: str) -> None:
"""Schedule a DM reminder if the user has opted in for this command."""
user_data = await economy.get_user(user_id)
if cmd not in user_data.get("reminders", []):
return
items = set(user_data.get("items", []))
if cmd == "work" and "monitor" in items:
delay = datetime.timedelta(minutes=40)
elif cmd == "beg" and "hiirematt" in items:
delay = datetime.timedelta(minutes=3)
elif cmd == "daily" and "korvaklapid" in items:
delay = datetime.timedelta(hours=18)
elif cmd == "fish" and "ussipurk" in items:
delay = datetime.timedelta(seconds=90)
else:
delay = economy.COOLDOWNS.get(cmd, datetime.timedelta(hours=1))
_schedule_reminder(user_id, cmd, delay)
register_economy_support_commands(
tree,
parse_amount=_parse_amount,
coin=_coin,
cancel_reminder_task=_cancel_reminder_task,
)
register_economy_profile_commands(
tree,
coin=_coin,
cd_ts=_cd_ts,
ensure_level_role=_ensure_level_role,
)
register_economy_income_commands(
tree,
bot,
coin=_coin,
cd_ts=_cd_ts,
check_cmd_rate=_check_cmd_rate,
maybe_remind=_maybe_remind,
award_exp=_award_exp,
)
register_economy_fish_commands(
tree,
coin=_coin,
cd_ts=_cd_ts,
check_cmd_rate=_check_cmd_rate,
maybe_remind=_maybe_remind,
award_exp=_award_exp,
active_games=_active_games,
)
register_economy_games_commands(
tree,
coin=_coin,
cd_ts=_cd_ts,
parse_amount=_parse_amount,
gamble_cd=_gamble_cd,
award_exp=_award_exp,
active_games=_active_games,
)
register_economy_extra_commands(
tree,
bot,
coin=_coin,
cd_ts=_cd_ts,
parse_amount=_parse_amount,
ensure_level_role=_ensure_level_role,
active_games=_active_games,
)
# ---------------------------------------------------------------------------
# Error handling for slash commands
# ---------------------------------------------------------------------------
@tree.error
async def on_app_command_error(interaction: discord.Interaction, error: app_commands.AppCommandError):
if isinstance(error, app_commands.MissingPermissions):
msg = S.ERR["missing_perms"]
else:
log.exception("Unhandled slash command error: %s", error)
msg = S.ERR["generic_error"].format(error=error)
try:
if interaction.response.is_done():
await interaction.followup.send(msg, ephemeral=True)
else:
await interaction.response.send_message(msg, ephemeral=True)
except Exception:
pass
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _log_sync_result(member: discord.Member, result: SyncResult):
if result.nickname_changed:
log.info(" → Nickname set for %s", member)
if result.roles_added:
log.info(" → Roles added for %s: %s", member, result.roles_added)
if result.birthday_soon:
log.info(" → Birthday coming up for %s", member)
for err in result.errors:
log.warning("%s", err)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def _asyncio_exception_handler(loop: asyncio.AbstractEventLoop, context: dict) -> None:
exc = context.get("exception")
msg = context.get("message", "unknown asyncio error")
if exc:
log.error("Unhandled asyncio exception: %s", msg, exc_info=exc)
else:
log.error("Unhandled asyncio error: %s", msg)
if __name__ == "__main__":
if not config.DISCORD_TOKEN:
profile_key = "DISCORD_TOKEN_ECONOMY" if config.BOT_PROFILE == "economy" else "DISCORD_TOKEN_DEV"
raise SystemExit(
f"{profile_key} pole seadistatud profiilile '{config.BOT_PROFILE}'. "
"Kopeeri .env.example failiks .env ja täida see."
)
async def _main() -> None:
loop = asyncio.get_event_loop()
loop.set_exception_handler(_asyncio_exception_handler)
await bot.start(config.DISCORD_TOKEN, reconnect=True)
asyncio.run(_main())