898 lines
31 KiB
Python
898 lines
31 KiB
Python
"""TipiLAN Bot - Discord member management powered by Google Sheets."""
|
|
|
|
import asyncio
|
|
import collections
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import logging.handlers
|
|
import math
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import discord
|
|
from discord import app_commands
|
|
from discord.ext import tasks
|
|
|
|
import colorlog
|
|
import psutil
|
|
|
|
import config
|
|
import strings as S
|
|
import economy
|
|
import pb_client
|
|
import sheets
|
|
from dev_member_commands import register_dev_member_commands
|
|
from dev_member_runtime import handle_member_join, run_birthday_daily
|
|
from economy_admin_commands import register_economy_admin_commands
|
|
from economy_extra_commands import register_economy_extra_commands
|
|
from economy_fish_commands import register_economy_fish_commands
|
|
from economy_games_commands import register_economy_games_commands
|
|
from economy_income_commands import register_economy_income_commands
|
|
from economy_prestige_commands import register_prestige_commands
|
|
from economy_profile_commands import register_economy_profile_commands
|
|
from economy_support_commands import register_economy_support_commands
|
|
from ops_channel_commands import register_ops_channel_commands
|
|
from ops_admin_commands import register_ops_admin_commands
|
|
from member_sync import SyncResult
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Logging
|
|
# ---------------------------------------------------------------------------
|
|
_LOG_DIR = Path("logs") / config.BOT_PROFILE
|
|
_LOG_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
_fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
|
|
_txn_fmt = logging.Formatter("%(asctime)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
|
|
|
|
# Console handler - coloured output
|
|
_color_fmt = colorlog.ColoredFormatter(
|
|
"%(log_color)s%(asctime)s [%(levelname)-8s]%(reset)s %(cyan)s%(name)s%(reset)s: %(message)s",
|
|
log_colors={
|
|
"DEBUG": "white",
|
|
"INFO": "green",
|
|
"WARNING": "yellow,bold",
|
|
"ERROR": "red,bold",
|
|
"CRITICAL": "red,bg_white,bold",
|
|
},
|
|
)
|
|
_console = logging.StreamHandler()
|
|
_console.setFormatter(_color_fmt)
|
|
_console.setLevel(logging.INFO)
|
|
|
|
# General rotating file: logs/bot.log (5 MB x 5 backups)
|
|
_file_h = logging.handlers.RotatingFileHandler(
|
|
_LOG_DIR / "bot.log", maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8"
|
|
)
|
|
_file_h.setFormatter(_fmt)
|
|
_file_h.setLevel(logging.INFO)
|
|
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
logging.getLogger().addHandler(_console)
|
|
logging.getLogger().addHandler(_file_h)
|
|
|
|
# Transaction log: logs/transactions.log (daily rotation, 30 days)
|
|
_txn_h = logging.handlers.TimedRotatingFileHandler(
|
|
_LOG_DIR / "transactions.log", when="midnight", backupCount=30, encoding="utf-8"
|
|
)
|
|
_txn_h.setFormatter(_txn_fmt)
|
|
_txn_h.setLevel(logging.INFO)
|
|
_txn_logger = logging.getLogger("tipiCOIN.txn")
|
|
_txn_logger.addHandler(_txn_h)
|
|
_txn_logger.propagate = False # don't double-log to console/bot.log
|
|
|
|
log = logging.getLogger("tipilan")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bot setup
|
|
# ---------------------------------------------------------------------------
|
|
intents = discord.Intents.default()
|
|
intents.members = True # Required: Server Members Intent must be ON in dev portal
|
|
|
|
bot = discord.Client(intents=intents)
|
|
tree = app_commands.CommandTree(bot)
|
|
|
|
GUILD_OBJ = discord.Object(id=config.GUILD_ID)
|
|
IS_DEV_PROFILE = config.BOT_PROFILE == "dev"
|
|
TALLINN_TZ = ZoneInfo("Europe/Tallinn")
|
|
_start_time = datetime.datetime.now()
|
|
_process = psutil.Process()
|
|
_DATA_DIR = Path("data") / config.BOT_PROFILE
|
|
_DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
_active_games: set[int] = set() # users with an in-progress interactive game
|
|
# heist global CD is persisted on the house record in PocketBase (see economy.get/set_heist_global_cd)
|
|
_spam_tracker: dict[int, collections.deque] = {} # user_id -> deque of recent income-cmd timestamps
|
|
_SPAM_WINDOW = 5.0 # seconds
|
|
_SPAM_THRESHOLD = 5 # income commands within window triggers jail
|
|
_gamble_cooldowns: dict[int, float] = {} # user_id -> monotonic timestamp of last gamble
|
|
_GAMBLE_CD = 30 # seconds (default gambling cooldown)
|
|
_GAMBLE_CD_360 = 25 # seconds (with monitor_360 item)
|
|
_BDAY_LOG = _DATA_DIR / "birthday_sent.json"
|
|
_RESTART_FILE = _DATA_DIR / "restart_channel.json"
|
|
_BOT_CONFIG = _DATA_DIR / "bot_config.json"
|
|
_PAUSED = False # maintenance mode: blocks non-admin commands when True
|
|
_DEV_ONLY_COMMANDS: tuple[str, ...] = ("birthdays", "check", "member")
|
|
|
|
|
|
def _apply_profile_command_filters() -> None:
|
|
"""Remove commands that should not exist for the active bot profile."""
|
|
if IS_DEV_PROFILE:
|
|
return
|
|
for name in _DEV_ONLY_COMMANDS:
|
|
removed = tree.remove_command(name)
|
|
if removed:
|
|
log.info("Profile '%s': removed dev-only command /%s", config.BOT_PROFILE, name)
|
|
|
|
|
|
def _load_bot_config() -> dict:
|
|
if _BOT_CONFIG.exists():
|
|
try:
|
|
return json.loads(_BOT_CONFIG.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
pass
|
|
return {"allowed_channels": []}
|
|
|
|
|
|
def _save_bot_config(cfg: dict) -> None:
|
|
_DATA_DIR.mkdir(exist_ok=True)
|
|
_BOT_CONFIG.write_text(json.dumps(cfg, indent=2), encoding="utf-8")
|
|
|
|
|
|
def _get_allowed_channels() -> list[int]:
|
|
return [int(c) for c in _load_bot_config().get("allowed_channels", [])]
|
|
|
|
|
|
def _set_allowed_channels(channel_ids: list[int]) -> None:
|
|
cfg = _load_bot_config()
|
|
cfg["allowed_channels"] = [str(c) for c in channel_ids]
|
|
_save_bot_config(cfg)
|
|
|
|
|
|
def _get_paused() -> bool:
|
|
return _PAUSED
|
|
|
|
|
|
def _set_paused(value: bool) -> None:
|
|
global _PAUSED
|
|
_PAUSED = value
|
|
|
|
|
|
def _member_cache_size() -> int:
|
|
return len(sheets.get_cache())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# EXP / Level role helpers
|
|
# ---------------------------------------------------------------------------
|
|
def _level_role_name(level: int) -> str:
|
|
return economy.level_role_name(level)
|
|
|
|
|
|
async def _apply_level_role(member: discord.Member, new_level: int, old_level: int) -> None:
|
|
"""Swap vanity role when the user crosses a tier boundary."""
|
|
new_role_name = _level_role_name(new_level)
|
|
old_role_name = _level_role_name(old_level)
|
|
if new_role_name == old_role_name:
|
|
return
|
|
guild = member.guild
|
|
old_role = discord.utils.find(lambda r: r.name == old_role_name, guild.roles)
|
|
if old_role and old_role in member.roles:
|
|
try:
|
|
await member.remove_roles(old_role, reason="Level up")
|
|
except discord.Forbidden:
|
|
pass
|
|
new_role = discord.utils.find(lambda r: r.name == new_role_name, guild.roles)
|
|
if new_role:
|
|
try:
|
|
await member.add_roles(new_role, reason="Level up")
|
|
except discord.Forbidden:
|
|
pass
|
|
|
|
|
|
async def _ensure_level_role(member: discord.Member, level: int) -> None:
|
|
"""Make sure the user has exactly the right vanity role + ECONOMY base role (idempotent)."""
|
|
correct_name = _level_role_name(level)
|
|
all_role_names = {name for _, name in economy.LEVEL_ROLES}
|
|
guild = member.guild
|
|
for role in member.roles:
|
|
if role.name in all_role_names and role.name != correct_name:
|
|
try:
|
|
await member.remove_roles(role, reason="Role sync")
|
|
except discord.Forbidden:
|
|
pass
|
|
correct_role = discord.utils.find(lambda r: r.name == correct_name, guild.roles)
|
|
if correct_role and correct_role not in member.roles:
|
|
try:
|
|
await member.add_roles(correct_role, reason="Role sync")
|
|
except discord.Forbidden:
|
|
pass
|
|
economy_role = discord.utils.find(lambda r: r.name == economy.ECONOMY_ROLE, guild.roles)
|
|
if economy_role and economy_role not in member.roles:
|
|
try:
|
|
await member.add_roles(economy_role, reason="Economy member")
|
|
except discord.Forbidden:
|
|
pass
|
|
|
|
|
|
async def _award_exp(interaction: discord.Interaction, amount: int) -> None:
|
|
"""Award EXP and post a public level-up notice if the user reaches a new tier."""
|
|
result = await economy.award_exp(interaction.user.id, amount)
|
|
member = interaction.guild.get_member(interaction.user.id) if interaction.guild else None
|
|
if member:
|
|
economy_role = discord.utils.find(lambda r: r.name == economy.ECONOMY_ROLE, interaction.guild.roles)
|
|
if economy_role and economy_role not in member.roles:
|
|
try:
|
|
await member.add_roles(economy_role, reason="Economy member")
|
|
except discord.Forbidden:
|
|
pass
|
|
if result["new_level"] <= result["old_level"]:
|
|
return
|
|
if member:
|
|
await _apply_level_role(member, result["new_level"], result["old_level"])
|
|
new_role = _level_role_name(result["new_level"])
|
|
old_role = _level_role_name(result["old_level"])
|
|
extra = S.MSG_LEVELUP_ROLE.format(role=new_role) if new_role != old_role else ""
|
|
try:
|
|
await interaction.followup.send(
|
|
S.MSG_LEVELUP.format(name=interaction.user.display_name, level=result["new_level"], extra=extra),
|
|
ephemeral=False,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
@tree.interaction_check
|
|
async def _log_command(interaction: discord.Interaction) -> bool:
|
|
"""Log every slash command invocation and enforce allowed-channel restriction."""
|
|
if interaction.command:
|
|
opts = interaction.data.get("options", [])
|
|
opts_str = " ".join(f"{o['name']}={o.get('value', '?')}" for o in opts) if opts else ""
|
|
log.info(
|
|
"CMD /%s user=%s (%s)%s",
|
|
interaction.command.name,
|
|
interaction.user.id,
|
|
interaction.user.display_name,
|
|
f" [{opts_str}]" if opts_str else "",
|
|
)
|
|
|
|
# DMs always pass
|
|
if interaction.guild is None:
|
|
return True
|
|
|
|
# Admins (manage_guild) can use commands in any channel (and bypass pause)
|
|
member = interaction.user
|
|
if hasattr(member, "guild_permissions") and member.guild_permissions.manage_guild:
|
|
return True
|
|
|
|
# Maintenance mode: block all non-admin commands
|
|
if _PAUSED:
|
|
await interaction.response.send_message(S.MSG_MAINTENANCE, ephemeral=True)
|
|
return False
|
|
|
|
allowed = _get_allowed_channels()
|
|
if not allowed:
|
|
return True # no restriction configured
|
|
|
|
if interaction.channel_id in allowed:
|
|
return True
|
|
|
|
mentions = " ".join(f"<#{cid}>" for cid in allowed)
|
|
await interaction.response.send_message(
|
|
S.ERR["channel_only"].format(channels=mentions),
|
|
ephemeral=True,
|
|
)
|
|
return False
|
|
|
|
|
|
def _load_bday_log() -> dict:
|
|
try:
|
|
return json.loads(_BDAY_LOG.read_text(encoding="utf-8"))
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return {}
|
|
|
|
|
|
def _has_announced_today(discord_id: int) -> bool:
|
|
today = str(datetime.date.today())
|
|
return str(discord_id) in _load_bday_log().get(today, [])
|
|
|
|
|
|
def _mark_announced_today(discord_id: int) -> None:
|
|
log_data = _load_bday_log()
|
|
today = str(datetime.date.today())
|
|
today_list = log_data.setdefault(today, [])
|
|
uid = str(discord_id)
|
|
if uid not in today_list:
|
|
today_list.append(uid)
|
|
cutoff = str(datetime.date.today() - datetime.timedelta(days=2))
|
|
log_data = {k: v for k, v in log_data.items() if k >= cutoff}
|
|
_BDAY_LOG.write_text(json.dumps(log_data), encoding="utf-8")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Daily 09:00 Tallinn-time birthday task
|
|
# ---------------------------------------------------------------------------
|
|
@tasks.loop(time=datetime.time(hour=9, minute=0, tzinfo=ZoneInfo("Europe/Tallinn")))
|
|
async def birthday_daily():
|
|
"""Announce birthdays every day at 09:00 Tallinn time."""
|
|
if not IS_DEV_PROFILE:
|
|
return
|
|
await run_birthday_daily(
|
|
bot,
|
|
log,
|
|
has_announced_today=_has_announced_today,
|
|
mark_announced_today=_mark_announced_today,
|
|
)
|
|
|
|
|
|
@birthday_daily.before_loop
|
|
async def before_birthday_daily():
|
|
await bot.wait_until_ready()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rotating rich presence
|
|
# ---------------------------------------------------------------------------
|
|
_presence_index = 0
|
|
_economy_count: int = 0
|
|
_PRESENCES: list = [
|
|
lambda g: discord.Activity(
|
|
type=discord.ActivityType.watching,
|
|
name="/help - kõik käsklused",
|
|
state="Vaata, mida TipiBOTil pakkuda on",
|
|
),
|
|
lambda g: discord.Activity(
|
|
type=discord.ActivityType.watching,
|
|
name="/daily - päevane boonus TipiCOINe",
|
|
state="Streak boonused kuni x3.0 rohkem 🔥",
|
|
),
|
|
lambda g: discord.Activity(
|
|
type=discord.ActivityType.watching,
|
|
name=f"{_economy_count - 1 or '?'} mängijat võistlevad",
|
|
state="/leaderboard - kes on tipus?",
|
|
),
|
|
]
|
|
|
|
|
|
@tasks.loop(seconds=20)
|
|
async def _rotate_presence() -> None:
|
|
global _presence_index, _economy_count
|
|
guild = bot.get_guild(config.GUILD_ID)
|
|
try:
|
|
_economy_count = await pb_client.count_records()
|
|
except Exception as e:
|
|
log.warning("Presence: failed to fetch economy count: %s", e)
|
|
activity = _PRESENCES[_presence_index % len(_PRESENCES)](guild)
|
|
await bot.change_presence(status=discord.Status.online, activity=activity)
|
|
_presence_index += 1
|
|
|
|
|
|
@_rotate_presence.before_loop
|
|
async def _before_rotate_presence():
|
|
await bot.wait_until_ready()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Events
|
|
# ---------------------------------------------------------------------------
|
|
@bot.event
|
|
async def on_ready():
|
|
"""Load sheet data and sync slash commands on startup."""
|
|
log.info("Logged in as %s (ID: %s)", bot.user, bot.user.id)
|
|
economy.set_house(bot.user.id)
|
|
|
|
_apply_profile_command_filters()
|
|
|
|
# Pull sheet data into cache
|
|
if IS_DEV_PROFILE:
|
|
try:
|
|
data = sheets.refresh()
|
|
log.info("Loaded %d member rows from Google Sheets", len(data))
|
|
except Exception as e:
|
|
log.error("Failed to load sheet on startup: %s", e)
|
|
|
|
# Sync slash commands to the guild only; wipe any leftover global registrations
|
|
tree.copy_global_to(guild=GUILD_OBJ)
|
|
await tree.sync(guild=GUILD_OBJ)
|
|
tree.clear_commands(guild=None)
|
|
await tree.sync()
|
|
log.info("Slash commands synced to guild %s (global commands cleared)", config.GUILD_ID)
|
|
|
|
# Start daily birthday task
|
|
if IS_DEV_PROFILE and not birthday_daily.is_running():
|
|
birthday_daily.start()
|
|
log.info("Birthday daily task started (fires 09:00 Tallinn time)")
|
|
|
|
# Start rotating rich presence
|
|
if not _rotate_presence.is_running():
|
|
_rotate_presence.start()
|
|
log.info("Rich presence rotation started")
|
|
|
|
# Re-schedule any reminder tasks lost on restart
|
|
await _restore_reminders()
|
|
|
|
# Notify the channel where /restart was triggered
|
|
if _RESTART_FILE.exists():
|
|
try:
|
|
data = json.loads(_RESTART_FILE.read_text(encoding="utf-8"))
|
|
ch = await bot.fetch_channel(int(data["channel_id"]))
|
|
if ch:
|
|
await ch.send(S.MSG_RESTART_DONE)
|
|
except Exception as e:
|
|
log.warning("Could not send restart notification: %s", e)
|
|
finally:
|
|
_RESTART_FILE.unlink(missing_ok=True)
|
|
|
|
|
|
@bot.event
|
|
async def on_disconnect():
|
|
log.warning("Bot disconnected from Discord gateway")
|
|
|
|
|
|
@bot.event
|
|
async def on_resumed():
|
|
log.info("Bot reconnected to Discord (session resumed)")
|
|
|
|
|
|
@bot.event
|
|
async def on_member_join(member: discord.Member):
|
|
"""When someone joins, look them up in the sheet and sync."""
|
|
if not IS_DEV_PROFILE:
|
|
return
|
|
await handle_member_join(
|
|
member,
|
|
bot,
|
|
log,
|
|
has_announced_today=_has_announced_today,
|
|
mark_announced_today=_mark_announced_today,
|
|
log_sync_result=_log_sync_result,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Slash commands
|
|
# ---------------------------------------------------------------------------
|
|
if IS_DEV_PROFILE:
|
|
register_dev_member_commands(
|
|
tree,
|
|
bot,
|
|
log,
|
|
has_announced_today=_has_announced_today,
|
|
mark_announced_today=_mark_announced_today,
|
|
)
|
|
|
|
register_ops_admin_commands(
|
|
tree,
|
|
bot,
|
|
log,
|
|
process=_process,
|
|
start_time=_start_time,
|
|
log_dir=_LOG_DIR,
|
|
guild_obj=GUILD_OBJ,
|
|
restart_file=_RESTART_FILE,
|
|
get_member_cache_size=_member_cache_size,
|
|
get_paused=_get_paused,
|
|
set_paused=_set_paused,
|
|
count_economy_users=pb_client.count_records,
|
|
)
|
|
|
|
register_ops_channel_commands(
|
|
tree,
|
|
bot,
|
|
log,
|
|
get_allowed_channels=_get_allowed_channels,
|
|
set_allowed_channels=_set_allowed_channels,
|
|
)
|
|
|
|
|
|
@tree.command(name="ping", description=S.CMD["ping"])
|
|
async def cmd_ping(interaction: discord.Interaction):
|
|
await interaction.response.send_message(S.MSG_PONG)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# /help
|
|
# ---------------------------------------------------------------------------
|
|
_HELP_PAGE_SIZE = 10
|
|
_DEV_ONLY_HELP_TOKENS: tuple[str, ...] = tuple(f"/{name}" for name in _DEV_ONLY_COMMANDS)
|
|
|
|
|
|
def _visible_help_fields(category_key: str) -> list[tuple[str, str]]:
|
|
fields: list[tuple[str, str]] = list(S.HELP_CATEGORIES[category_key]["fields"])
|
|
if IS_DEV_PROFILE:
|
|
return fields
|
|
|
|
visible: list[tuple[str, str]] = []
|
|
for name, value in fields:
|
|
blob = f"{name}\n{value}".lower()
|
|
if any(tok in blob for tok in _DEV_ONLY_HELP_TOKENS):
|
|
continue
|
|
visible.append((name, value))
|
|
return visible
|
|
|
|
|
|
def _help_embed(category_key: str, page: int = 0) -> discord.Embed:
|
|
cat = S.HELP_CATEGORIES[category_key]
|
|
fields = _visible_help_fields(category_key)
|
|
total_pages = max(1, math.ceil(len(fields) / _HELP_PAGE_SIZE))
|
|
page = max(0, min(page, total_pages - 1))
|
|
page_fields = fields[page * _HELP_PAGE_SIZE : (page + 1) * _HELP_PAGE_SIZE]
|
|
title = cat["label"]
|
|
if total_pages > 1:
|
|
title += f" ({page + 1}/{total_pages})"
|
|
embed = discord.Embed(title=title, description=cat["description"], color=cat["color"])
|
|
for name, value in page_fields:
|
|
embed.add_field(name=name, value=value, inline=False)
|
|
embed.set_footer(text=S.HELP_UI["footer"])
|
|
return embed
|
|
|
|
|
|
class HelpView(discord.ui.View):
|
|
def __init__(self, is_admin: bool = False, category: str = "üldine", page: int = 0):
|
|
super().__init__(timeout=120)
|
|
self.is_admin = is_admin
|
|
self.category = category
|
|
self.page = page
|
|
self._rebuild()
|
|
|
|
def _rebuild(self) -> None:
|
|
self.clear_items()
|
|
fields = _visible_help_fields(self.category)
|
|
total_pages = max(1, math.ceil(len(fields) / _HELP_PAGE_SIZE))
|
|
self.page = max(0, min(self.page, total_pages - 1))
|
|
select_row = 0
|
|
if total_pages > 1:
|
|
select_row = 1
|
|
prev_btn = discord.ui.Button(
|
|
label="◀", style=discord.ButtonStyle.secondary,
|
|
disabled=(self.page == 0), row=0,
|
|
)
|
|
prev_btn.callback = self._prev
|
|
next_btn = discord.ui.Button(
|
|
label="▶", style=discord.ButtonStyle.secondary,
|
|
disabled=(self.page >= total_pages - 1), row=0,
|
|
)
|
|
next_btn.callback = self._next
|
|
self.add_item(prev_btn)
|
|
self.add_item(next_btn)
|
|
self.add_item(HelpSelect(self.is_admin, self.category, row=select_row))
|
|
|
|
async def _prev(self, interaction: discord.Interaction) -> None:
|
|
self.page = max(0, self.page - 1)
|
|
self._rebuild()
|
|
await interaction.response.edit_message(embed=_help_embed(self.category, self.page), view=self)
|
|
|
|
async def _next(self, interaction: discord.Interaction) -> None:
|
|
total = max(1, math.ceil(len(_visible_help_fields(self.category)) / _HELP_PAGE_SIZE))
|
|
self.page = min(total - 1, self.page + 1)
|
|
self._rebuild()
|
|
await interaction.response.edit_message(embed=_help_embed(self.category, self.page), view=self)
|
|
|
|
|
|
class HelpSelect(discord.ui.Select):
|
|
def __init__(self, is_admin: bool = False, current: str = "üldine", row: int = 0):
|
|
options = [
|
|
discord.SelectOption(
|
|
label=v["label"], value=k, description=v["description"],
|
|
default=(k == current),
|
|
)
|
|
for k, v in S.HELP_CATEGORIES.items()
|
|
if k != "admin" or is_admin
|
|
]
|
|
super().__init__(placeholder=S.HELP_UI["select_placeholder"], options=options, row=row)
|
|
|
|
async def callback(self, interaction: discord.Interaction) -> None:
|
|
view = self.view
|
|
view.category = self.values[0]
|
|
view.page = 0
|
|
view._rebuild()
|
|
await interaction.response.edit_message(embed=_help_embed(view.category, 0), view=view)
|
|
|
|
|
|
@tree.command(name="help", description=S.CMD["help"])
|
|
async def cmd_help(interaction: discord.Interaction):
|
|
perms = interaction.user.guild_permissions if interaction.guild else None
|
|
is_admin = bool(perms and (perms.manage_roles or perms.manage_guild))
|
|
await interaction.response.send_message(
|
|
embed=_help_embed("üldine"), view=HelpView(is_admin), ephemeral=True
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TipiBOT economy commands
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _coin(amount: int) -> str:
|
|
return f"**{amount:,}** {economy.COIN}"
|
|
|
|
|
|
def _cd_ts(remaining: datetime.timedelta) -> str:
|
|
"""Discord relative timestamp string for when a cooldown expires."""
|
|
expiry = int((datetime.datetime.now(datetime.timezone.utc) + remaining).timestamp())
|
|
return f"<t:{expiry}:R>"
|
|
|
|
|
|
register_economy_admin_commands(
|
|
tree,
|
|
bot,
|
|
log,
|
|
cd_ts=_cd_ts,
|
|
apply_level_role=_apply_level_role,
|
|
)
|
|
|
|
|
|
def _gamble_cd(uid: int, has_360: bool = False) -> datetime.timedelta | None:
|
|
"""Check and set gambling cooldown. Returns remaining time if on CD, else None."""
|
|
cd = float(_GAMBLE_CD_360 if has_360 else _GAMBLE_CD)
|
|
now = time.monotonic()
|
|
remaining = cd - (now - _gamble_cooldowns.get(uid, 0.0))
|
|
if remaining > 0:
|
|
return datetime.timedelta(seconds=remaining)
|
|
_gamble_cooldowns[uid] = now
|
|
return None
|
|
|
|
|
|
async def _check_cmd_rate(interaction: discord.Interaction) -> bool:
|
|
"""Record an income command use. Jails the user and returns True if spam is detected."""
|
|
uid = interaction.user.id
|
|
now = time.monotonic()
|
|
q = _spam_tracker.setdefault(uid, collections.deque())
|
|
q.append(now)
|
|
while q and now - q[0] > _SPAM_WINDOW:
|
|
q.popleft()
|
|
if len(q) >= _SPAM_THRESHOLD:
|
|
q.clear()
|
|
await economy.do_spam_jail(uid)
|
|
await interaction.response.send_message(S.MSG_SPAM_JAIL, ephemeral=True)
|
|
return True
|
|
return False
|
|
|
|
|
|
register_prestige_commands(
|
|
tree,
|
|
check_cmd_rate=_check_cmd_rate,
|
|
ensure_level_role=_ensure_level_role,
|
|
)
|
|
|
|
|
|
def _parse_amount(value: str, balance: int) -> tuple[int | None, str | None]:
|
|
"""Parse an amount string; 'all' resolves to the user's full balance.
|
|
Accepts plain integers and valid thousand-separated numbers (1,000 / 1.000 / 1 000).
|
|
Rejects decimals and ambiguous inputs like 1,1 or 1.5.
|
|
Returns (amount, None) on success or (None, error_msg) on failure."""
|
|
v = value.strip()
|
|
if v.lower() == "all":
|
|
return balance, None
|
|
# Strip valid thousand separators: groups of exactly 3 digits after separator
|
|
if re.fullmatch(r'\d{1,3}([,. ]\d{3})*', v):
|
|
v = re.sub(r'[,. ]', '', v)
|
|
try:
|
|
return int(v), None
|
|
except ValueError:
|
|
return None, S.ERR["invalid_amount"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Reminder system
|
|
# ---------------------------------------------------------------------------
|
|
_reminder_tasks: dict[tuple[int, str], asyncio.Task] = {}
|
|
|
|
|
|
def _cancel_reminder_task(user_id: int, cmd: str) -> None:
|
|
task = _reminder_tasks.pop((user_id, cmd), None)
|
|
if task and not task.done():
|
|
task.cancel()
|
|
|
|
|
|
def _schedule_reminder(user_id: int, cmd: str, delay: datetime.timedelta) -> None:
|
|
"""DM the user when their cooldown expires. Replaces any existing task."""
|
|
async def _remind():
|
|
await asyncio.sleep(delay.total_seconds())
|
|
user = bot.get_user(user_id)
|
|
if user is None:
|
|
try:
|
|
user = await bot.fetch_user(user_id)
|
|
except (discord.NotFound, discord.HTTPException):
|
|
user = None
|
|
if user:
|
|
try:
|
|
await user.send(
|
|
S.MSG_REMINDER.format(cmd=cmd)
|
|
)
|
|
except (discord.Forbidden, discord.HTTPException):
|
|
pass
|
|
_reminder_tasks.pop((user_id, cmd), None)
|
|
|
|
key = (user_id, cmd)
|
|
existing = _reminder_tasks.get(key)
|
|
if existing and not existing.done():
|
|
existing.cancel()
|
|
_reminder_tasks[key] = asyncio.create_task(_remind())
|
|
|
|
|
|
_REMINDER_COOLDOWN_KEYS: dict[str, str] = {
|
|
"daily": "last_daily",
|
|
"work": "last_work",
|
|
"beg": "last_beg",
|
|
"crime": "last_crime",
|
|
"rob": "last_rob",
|
|
"fish": "last_fish",
|
|
}
|
|
|
|
|
|
async def _restore_reminders() -> None:
|
|
"""Re-schedule reminder tasks lost when the bot restarted."""
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
restored = 0
|
|
for uid_str, user in (await economy.get_all_users_raw()).items():
|
|
reminders = user.get("reminders", [])
|
|
if not reminders:
|
|
continue
|
|
user_id = int(uid_str)
|
|
for cmd in reminders:
|
|
last_key = _REMINDER_COOLDOWN_KEYS.get(cmd)
|
|
if not last_key:
|
|
continue
|
|
last_str = user.get(last_key)
|
|
if not last_str:
|
|
continue
|
|
items = user.get("items", [])
|
|
if cmd == "work" and "monitor" in items:
|
|
cooldown = datetime.timedelta(minutes=40)
|
|
elif cmd == "beg" and "hiirematt" in items:
|
|
cooldown = datetime.timedelta(minutes=3)
|
|
elif cmd == "daily" and "korvaklapid" in items:
|
|
cooldown = datetime.timedelta(hours=18)
|
|
elif cmd == "fish" and "ussipurk" in items:
|
|
cooldown = datetime.timedelta(seconds=90)
|
|
else:
|
|
cooldown = economy.COOLDOWNS.get(cmd)
|
|
if not cooldown:
|
|
continue
|
|
last_dt = datetime.datetime.fromisoformat(last_str)
|
|
if last_dt.tzinfo is None:
|
|
last_dt = last_dt.replace(tzinfo=datetime.timezone.utc)
|
|
remaining = (last_dt + cooldown) - now
|
|
if remaining.total_seconds() > 0:
|
|
_schedule_reminder(user_id, cmd, remaining)
|
|
restored += 1
|
|
if restored:
|
|
log.info("Restored %d reminder task(s) after restart", restored)
|
|
|
|
|
|
async def _maybe_remind(user_id: int, cmd: str) -> None:
|
|
"""Schedule a DM reminder if the user has opted in for this command."""
|
|
user_data = await economy.get_user(user_id)
|
|
if cmd not in user_data.get("reminders", []):
|
|
return
|
|
items = set(user_data.get("items", []))
|
|
if cmd == "work" and "monitor" in items:
|
|
delay = datetime.timedelta(minutes=40)
|
|
elif cmd == "beg" and "hiirematt" in items:
|
|
delay = datetime.timedelta(minutes=3)
|
|
elif cmd == "daily" and "korvaklapid" in items:
|
|
delay = datetime.timedelta(hours=18)
|
|
elif cmd == "fish" and "ussipurk" in items:
|
|
delay = datetime.timedelta(seconds=90)
|
|
else:
|
|
delay = economy.COOLDOWNS.get(cmd, datetime.timedelta(hours=1))
|
|
_schedule_reminder(user_id, cmd, delay)
|
|
|
|
|
|
register_economy_support_commands(
|
|
tree,
|
|
parse_amount=_parse_amount,
|
|
coin=_coin,
|
|
cancel_reminder_task=_cancel_reminder_task,
|
|
)
|
|
|
|
register_economy_profile_commands(
|
|
tree,
|
|
coin=_coin,
|
|
cd_ts=_cd_ts,
|
|
ensure_level_role=_ensure_level_role,
|
|
)
|
|
|
|
register_economy_income_commands(
|
|
tree,
|
|
bot,
|
|
coin=_coin,
|
|
cd_ts=_cd_ts,
|
|
check_cmd_rate=_check_cmd_rate,
|
|
maybe_remind=_maybe_remind,
|
|
award_exp=_award_exp,
|
|
)
|
|
|
|
register_economy_fish_commands(
|
|
tree,
|
|
coin=_coin,
|
|
cd_ts=_cd_ts,
|
|
check_cmd_rate=_check_cmd_rate,
|
|
maybe_remind=_maybe_remind,
|
|
award_exp=_award_exp,
|
|
active_games=_active_games,
|
|
)
|
|
|
|
register_economy_games_commands(
|
|
tree,
|
|
coin=_coin,
|
|
cd_ts=_cd_ts,
|
|
parse_amount=_parse_amount,
|
|
gamble_cd=_gamble_cd,
|
|
award_exp=_award_exp,
|
|
active_games=_active_games,
|
|
)
|
|
|
|
register_economy_extra_commands(
|
|
tree,
|
|
bot,
|
|
coin=_coin,
|
|
cd_ts=_cd_ts,
|
|
parse_amount=_parse_amount,
|
|
ensure_level_role=_ensure_level_role,
|
|
active_games=_active_games,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Error handling for slash commands
|
|
# ---------------------------------------------------------------------------
|
|
@tree.error
|
|
async def on_app_command_error(interaction: discord.Interaction, error: app_commands.AppCommandError):
|
|
if isinstance(error, app_commands.MissingPermissions):
|
|
msg = S.ERR["missing_perms"]
|
|
else:
|
|
log.exception("Unhandled slash command error: %s", error)
|
|
msg = S.ERR["generic_error"].format(error=error)
|
|
|
|
try:
|
|
if interaction.response.is_done():
|
|
await interaction.followup.send(msg, ephemeral=True)
|
|
else:
|
|
await interaction.response.send_message(msg, ephemeral=True)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
def _log_sync_result(member: discord.Member, result: SyncResult):
|
|
if result.nickname_changed:
|
|
log.info(" → Nickname set for %s", member)
|
|
if result.roles_added:
|
|
log.info(" → Roles added for %s: %s", member, result.roles_added)
|
|
if result.birthday_soon:
|
|
log.info(" → Birthday coming up for %s", member)
|
|
for err in result.errors:
|
|
log.warning(" → %s", err)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
def _asyncio_exception_handler(loop: asyncio.AbstractEventLoop, context: dict) -> None:
|
|
exc = context.get("exception")
|
|
msg = context.get("message", "unknown asyncio error")
|
|
if exc:
|
|
log.error("Unhandled asyncio exception: %s", msg, exc_info=exc)
|
|
else:
|
|
log.error("Unhandled asyncio error: %s", msg)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if not config.DISCORD_TOKEN:
|
|
profile_key = "DISCORD_TOKEN_ECONOMY" if config.BOT_PROFILE == "economy" else "DISCORD_TOKEN_DEV"
|
|
raise SystemExit(
|
|
f"{profile_key} pole seadistatud profiilile '{config.BOT_PROFILE}'. "
|
|
"Kopeeri .env.example failiks .env ja täida see."
|
|
)
|
|
|
|
async def _main() -> None:
|
|
loop = asyncio.get_event_loop()
|
|
loop.set_exception_handler(_asyncio_exception_handler)
|
|
await bot.start(config.DISCORD_TOKEN, reconnect=True)
|
|
|
|
asyncio.run(_main())
|