1
0
forked from sass/tipibot
Files
tipibot/ops_admin_commands.py
2026-04-04 21:06:43 +03:00

135 lines
4.9 KiB
Python

from __future__ import annotations
import asyncio
import datetime
import json
import logging
import os
import subprocess
import sys
from collections.abc import Awaitable, Callable
from pathlib import Path
import discord
from discord import app_commands
import strings as S
def register_ops_admin_commands(
tree: app_commands.CommandTree,
bot: discord.Client,
log: logging.Logger,
process,
start_time: datetime.datetime,
log_dir: Path,
guild_obj: discord.Object,
restart_file: Path,
get_member_cache_size: Callable[[], int],
get_paused: Callable[[], bool],
set_paused: Callable[[bool], None],
count_economy_users: Callable[[], Awaitable[int]],
) -> None:
@tree.command(name="status", description=S.CMD["status"])
@app_commands.guild_only()
@app_commands.default_permissions(manage_guild=True)
async def cmd_status(interaction: discord.Interaction):
mem = process.memory_info()
cpu = process.cpu_percent(interval=0.1)
uptime = datetime.datetime.now() - start_time
hours, rem = divmod(int(uptime.total_seconds()), 3600)
minutes, seconds = divmod(rem, 60)
tasks_count = len(asyncio.all_tasks())
latency_ms = round(bot.latency * 1000, 1)
cache_size = get_member_cache_size()
user_count = await count_economy_users()
embed = discord.Embed(title=S.STATUS_UI["title"], color=0x57F287)
embed.add_field(
name=S.STATUS_UI["uptime_field"],
value=S.STATUS_UI["uptime_val"].format(hours=hours, minutes=minutes, seconds=seconds),
inline=True,
)
embed.add_field(
name=S.STATUS_UI["latency_field"],
value=S.STATUS_UI["latency_val"].format(ms=latency_ms),
inline=True,
)
embed.add_field(
name=S.STATUS_UI["ram_field"],
value=S.STATUS_UI["ram_val"].format(mb=f"{mem.rss / 1024**2:.1f}"),
inline=True,
)
embed.add_field(
name=S.STATUS_UI["cpu_field"],
value=S.STATUS_UI["cpu_val"].format(percent=f"{cpu:.1f}"),
inline=True,
)
embed.add_field(
name=S.STATUS_UI["tasks_field"],
value=str(tasks_count),
inline=True,
)
embed.add_field(
name=S.STATUS_UI["eco_players_field"],
value=str(user_count),
inline=True,
)
embed.add_field(
name=S.STATUS_UI["members_cache_field"],
value=str(cache_size),
inline=True,
)
log_lines = [
S.STATUS_UI["log_line"].format(name=p.name, size_kb=f"{p.stat().st_size / 1024:.1f}")
for p in sorted(log_dir.glob("*.log*"))
if p.is_file()
]
embed.add_field(
name=S.STATUS_UI["log_files_field"],
value="\n".join(log_lines) or S.STATUS_UI["none"],
inline=False,
)
await interaction.response.send_message(embed=embed, ephemeral=True)
@tree.command(name="sync", description=S.CMD["sync"])
@app_commands.guild_only()
@app_commands.default_permissions(manage_guild=True)
async def cmd_sync(interaction: discord.Interaction):
await interaction.response.defer(ephemeral=True)
tree.copy_global_to(guild=guild_obj)
await tree.sync(guild=guild_obj)
tree.clear_commands(guild=None)
await tree.sync()
await interaction.followup.send(S.MSG_SYNC_DONE, ephemeral=True)
log.info("/sync triggered by %s", interaction.user)
@tree.command(name="restart", description=S.CMD["restart"])
@app_commands.guild_only()
@app_commands.default_permissions(manage_guild=True)
async def cmd_restart(interaction: discord.Interaction):
restart_file.write_text(json.dumps({"channel_id": interaction.channel_id}), encoding="utf-8")
await interaction.response.send_message(S.MSG_RESTARTING, ephemeral=True)
log.info("/restart triggered by %s", interaction.user)
subprocess.Popen([sys.executable] + sys.argv, cwd=os.getcwd())
await bot.close()
@tree.command(name="shutdown", description=S.CMD["shutdown"])
@app_commands.guild_only()
@app_commands.default_permissions(manage_guild=True)
async def cmd_shutdown(interaction: discord.Interaction):
await interaction.response.send_message(S.MSG_SHUTTING_DOWN, ephemeral=True)
log.info("/shutdown triggered by %s", interaction.user)
await bot.close()
@tree.command(name="pause", description=S.CMD["pause"])
@app_commands.guild_only()
@app_commands.default_permissions(manage_guild=True)
async def cmd_pause(interaction: discord.Interaction):
paused = not get_paused()
set_paused(paused)
msg = S.MSG_PAUSED if paused else S.MSG_UNPAUSED
log.info("/pause toggled → %s by %s", "PAUSED" if paused else "UNPAUSED", interaction.user)
await interaction.response.send_message(msg, ephemeral=True)