1
0
forked from sass/tipibot

Feature: Clean up the codebase

This commit is contained in:
Rene Arumetsa
2026-04-20 23:01:51 +03:00
parent 17102ae202
commit 77a3badd41
18 changed files with 31 additions and 33 deletions

1729
core/economy.py Normal file

File diff suppressed because it is too large Load Diff

213
core/member_sync.py Normal file
View File

@@ -0,0 +1,213 @@
"""Member synchronization logic - roles, nicknames, birthdays."""
from __future__ import annotations
import logging
from datetime import datetime, date
from dataclasses import dataclass, field
import discord
import config
from . import sheets
log = logging.getLogger(__name__)
_PLACEHOLDER = {"-", "x", "n/a", "none", "ei"}
def _is_placeholder(val: str) -> bool:
"""Return True if a sheet cell value is an intentional empty marker."""
return not val.strip() or val.strip().lower() in _PLACEHOLDER
# Maps sheet role values to Discord role names where they differ
ROLE_NAME_MAP: dict[str, str] = {
"Juht": "Tiimijuht",
"Admin": "+",
}
@dataclass
class SyncResult:
"""Tracks what happened during a sync operation."""
nickname_changed: bool = False
roles_added: list[str] = field(default_factory=list)
roles_removed: list[str] = field(default_factory=list)
birthday_soon: bool = False
not_found: bool = False
errors: list[str] = field(default_factory=list)
synced: bool = False # True when no errors; caller writes this to the sheet
@property
def changed(self) -> bool:
return self.nickname_changed or self.roles_added or self.roles_removed
def _format_nickname(full_name: str) -> str:
"""Format a nickname from a full name: first name + last name initial.
Examples:
'Mari Tamm' -> 'Mari T'
'Mari-Liis Tamm' -> 'Mari-Liis T'
'Jaan' -> 'Jaan'
"""
parts = full_name.strip().split()
if len(parts) < 2:
return full_name.strip()
first = parts[0] # preserves hyphenated names like Mari-Liis
last_initial = parts[-1][0].upper()
return f"{first} {last_initial}"
def _parse_roles(roles_str: str) -> list[str]:
"""Parse comma-separated role names from the sheet."""
if not roles_str:
return []
return [r.strip().rstrip(".,;:!?") for r in str(roles_str).split(",") if r.strip()]
def _parse_birthday(raw: str) -> date | None:
"""Parse a birthday string robustly. Returns None for garbage/placeholder values."""
raw = str(raw).strip()
if _is_placeholder(raw):
return None
today = date.today()
for fmt, has_year in [("%d/%m/%Y", True), ("%Y-%m-%d", True), ("%m-%d", False)]:
try:
parsed = datetime.strptime(raw, fmt).date()
if has_year and not (1920 <= parsed.year <= today.year):
return None # unrealistic year (formula artefact, future year, etc.)
return parsed if has_year else parsed.replace(year=today.year)
except ValueError:
continue
return None
def _is_birthday_soon(birthday_str: str, window_days: int | None = None) -> bool:
"""Check if a birthday falls within the configured window."""
bday = _parse_birthday(birthday_str)
if bday is None:
return False
window = window_days or config.BIRTHDAY_WINDOW_DAYS
today = date.today()
this_year_bday = bday.replace(year=today.year)
if this_year_bday < today:
this_year_bday = bday.replace(year=today.year + 1)
delta = (this_year_bday - today).days
return 0 <= delta <= window
def is_birthday_today(birthday_str: str) -> bool:
"""Return True if today is the member's birthday (any supported date format)."""
bday = _parse_birthday(birthday_str)
if bday is None:
return False
today = date.today()
return bday.month == today.month and bday.day == today.day
async def sync_member(
member: discord.Member,
guild: discord.Guild,
) -> SyncResult:
"""Synchronize a single member's nickname and roles based on sheet data.
Also populates Discord ID in the sheet if missing, and updates username.
Returns a SyncResult describing what happened.
"""
result = SyncResult()
# Look up member in sheet cache
row = sheets.find_member(member.id, member.name)
if row is None:
result.not_found = True
return result
# --- Backfill User ID if missing ---
raw_id = str(row.get("User ID", "")).strip()
if not raw_id or raw_id == "0":
sheets.set_user_id(member.name, member.id)
# --- Update Discord username in sheet if it changed ---
sheet_username = str(row.get("Discord", "")).strip()
if sheet_username.lower() != member.name.lower():
sheets.update_username(member.id, member.name)
# --- Nickname (Nimi = real name, formatted as first name + last initial) ---
nimi = str(row.get("Nimi", "")).strip()
desired_nick = _format_nickname(nimi) if nimi else None
if desired_nick and member.nick != desired_nick:
try:
await member.edit(nick=desired_nick)
result.nickname_changed = True
except discord.Forbidden:
log.debug("No permission to set nickname for %s (likely admin), skipping", member)
except discord.HTTPException as e:
result.errors.append(f"Hüüdnime viga kasutajale {member}: {e}")
# --- Roles (from Organisatsioon, Valdkond, Roll columns; values may be comma-separated) ---
desired_role_names: list[str] = []
for col in ("Organisatsioon", "Valdkond", "Roll"):
val = str(row.get(col, "")).strip()
if not _is_placeholder(val):
desired_role_names.extend(
r for r in _parse_roles(val)
if not _is_placeholder(r)
)
desired_roles: list[discord.Role] = []
for rname in desired_role_names:
rname = ROLE_NAME_MAP.get(rname, rname)
role = discord.utils.get(guild.roles, name=rname)
if role:
desired_roles.append(role)
else:
result.errors.append(f"Rolli '{rname}' ei leitud serverist")
# Base roles that every synced member must have
for rid in config.BASE_ROLE_IDS:
role = guild.get_role(rid)
if role:
desired_roles.append(role)
else:
result.errors.append(f"Baasrolli ID {rid} ei leitud serverist")
# Roles to add (desired but member doesn't have)
to_add = [r for r in desired_roles if r not in member.roles]
# (we currently only ADD the desired roles, not remove extras - safe default)
if to_add:
try:
await member.add_roles(*to_add, reason="Sheet sync")
result.roles_added = [r.name for r in to_add]
except discord.Forbidden:
log.debug("No permission to add roles for %s (likely admin), skipping", member)
except discord.HTTPException as e:
result.errors.append(f"Rolli viga kasutajale {member}: {e}")
# --- Birthday check ---
birthday_str = str(row.get("Sünnipäev", "")).strip()
if not _is_placeholder(birthday_str) and _is_birthday_soon(birthday_str):
result.birthday_soon = True
# --- Mark synced (caller is responsible for writing to sheet) ---
result.synced = not bool(result.errors)
return result
async def announce_birthday(
member: discord.Member,
bot: discord.Client,
) -> None:
"""Send a birthday-coming-soon announcement to the configured channel."""
channel = bot.get_channel(config.BIRTHDAY_CHANNEL_ID)
if channel is None:
log.warning("Birthday channel %s not found", config.BIRTHDAY_CHANNEL_ID)
return
row = sheets.find_member(member.id, member.name)
bday_str = str(row.get("Sünnipäev", "")) if row else "?"
await channel.send(
f"🎂 @here **{member.display_name}** sünnipäev on täna! 🥳 Palju-palju õnne sünnipäevaks! 🎉"
)

154
core/pb_client.py Normal file
View File

@@ -0,0 +1,154 @@
"""Async PocketBase REST client for TipiLAN Bot.
Handles admin authentication (auto-refreshed), and CRUD operations on the
economy_users collection. Uses aiohttp, which discord.py already depends on.
Environment variables (set in .env):
PB_URL Base URL of PocketBase (default: http://127.0.0.1:8090)
PB_ADMIN_EMAIL PocketBase admin e-mail
PB_ADMIN_PASSWORD PocketBase admin password
PB_ECONOMY_COLLECTION_DEV / PB_ECONOMY_COLLECTION_ECONOMY
"""
from __future__ import annotations
import asyncio
import logging
import time
from typing import Any
import aiohttp
import config
_log = logging.getLogger("tipiCOIN.pb")
PB_URL = config.PB_URL
PB_ADMIN_EMAIL = config.PB_ADMIN_EMAIL
PB_ADMIN_PASSWORD = config.PB_ADMIN_PASSWORD
ECONOMY_COLLECTION = config.PB_ECONOMY_COLLECTION
_TIMEOUT = aiohttp.ClientTimeout(total=10)
# ---------------------------------------------------------------------------
# Persistent session (created once, reused for the lifetime of the process)
# ---------------------------------------------------------------------------
_session: aiohttp.ClientSession | None = None
def _get_session() -> aiohttp.ClientSession:
global _session
if _session is None or _session.closed:
_session = aiohttp.ClientSession(timeout=_TIMEOUT)
return _session
# ---------------------------------------------------------------------------
# Auth token cache
# ---------------------------------------------------------------------------
_token: str = ""
_token_expiry: float = 0.0
_auth_lock = asyncio.Lock()
async def _ensure_auth() -> str:
global _token, _token_expiry
async with _auth_lock:
if time.monotonic() < _token_expiry:
return _token
session = _get_session()
async with session.post(
f"{PB_URL}/api/collections/_superusers/auth-with-password",
json={"identity": PB_ADMIN_EMAIL, "password": PB_ADMIN_PASSWORD},
) as resp:
if resp.status != 200:
text = await resp.text()
raise RuntimeError(f"PocketBase auth failed ({resp.status}): {text}")
data = await resp.json()
_token = data["token"]
_token_expiry = time.monotonic() + 13 * 24 * 3600 # refresh well before expiry
_log.debug("PocketBase admin token refreshed")
return _token
async def _hdrs() -> dict[str, str]:
return {"Authorization": await _ensure_auth()}
# ---------------------------------------------------------------------------
# CRUD helpers
# ---------------------------------------------------------------------------
async def get_record(user_id: str) -> dict[str, Any] | None:
"""Fetch one economy record by Discord user_id. Returns None if not found."""
session = _get_session()
async with session.get(
f"{PB_URL}/api/collections/{ECONOMY_COLLECTION}/records",
params={"filter": f'user_id="{user_id}"', "perPage": 1},
headers=await _hdrs(),
) as resp:
resp.raise_for_status()
data = await resp.json()
items = data.get("items", [])
return items[0] if items else None
async def create_record(record: dict[str, Any]) -> dict[str, Any]:
"""Create a new economy record. Returns the created record (includes PB id)."""
session = _get_session()
async with session.post(
f"{PB_URL}/api/collections/{ECONOMY_COLLECTION}/records",
json=record,
headers=await _hdrs(),
) as resp:
if resp.status not in (200, 201):
text = await resp.text()
raise RuntimeError(f"PocketBase create failed ({resp.status}): {text}")
return await resp.json()
async def update_record(record_id: str, data: dict[str, Any]) -> dict[str, Any]:
"""PATCH an existing record by its PocketBase record id."""
session = _get_session()
async with session.patch(
f"{PB_URL}/api/collections/{ECONOMY_COLLECTION}/records/{record_id}",
json=data,
headers=await _hdrs(),
) as resp:
resp.raise_for_status()
return await resp.json()
async def count_records() -> int:
"""Return the total number of records in the collection (single cheap request)."""
session = _get_session()
async with session.get(
f"{PB_URL}/api/collections/{ECONOMY_COLLECTION}/records",
params={"perPage": 1, "page": 1},
headers=await _hdrs(),
) as resp:
resp.raise_for_status()
data = await resp.json()
return int(data.get("totalItems", 0))
async def list_all_records(page_size: int = 500) -> list[dict[str, Any]]:
"""Fetch every record in the collection, handling PocketBase pagination."""
results: list[dict[str, Any]] = []
page = 1
session = _get_session()
hdrs = await _hdrs()
while True:
async with session.get(
f"{PB_URL}/api/collections/{ECONOMY_COLLECTION}/records",
params={"perPage": page_size, "page": page},
headers=hdrs,
) as resp:
resp.raise_for_status()
data = await resp.json()
batch = data.get("items", [])
results.extend(batch)
if len(batch) < page_size:
break
page += 1
return results

216
core/sheets.py Normal file
View File

@@ -0,0 +1,216 @@
"""Google Sheets integration - read/write member data via gspread."""
import gspread
from google.oauth2.service_account import Credentials
import config
# Scopes needed: read + write to Sheets
SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
]
# ---------------------------------------------------------------------------
# Expected sheet columns (header row):
# Nimi | Organisatsioon | Meil | Discord | User ID | Sünnipäev |
# Telefon | Valdkond | Roll | Discordis synced? | Groupi lisatud?
#
# - Nimi : member's real name (used as Discord nickname)
# - Organisatsioon : organisation value - maps to a Discord role
# - Meil : email address (read-only for bot)
# - Discord : Discord username for initial matching
# - User ID : numeric Discord user ID (bot can populate this)
# - Sünnipäev : birthday date string (YYYY-MM-DD or MM-DD)
# - Telefon : phone number (read-only for bot)
# - Valdkond : field/area value - maps to a Discord role
# - Roll : role value - maps to a Discord role
# - Discordis synced? : TRUE/FALSE - bot writes this after confirming sync
# - Groupi lisatud? : group membership flag (managed externally)
# ---------------------------------------------------------------------------
EXPECTED_HEADERS = [
"Nimi",
"Organisatsioon",
"Meil",
"Discord",
"User ID",
"Sünnipäev",
"Telefon",
"Valdkond",
"Roll",
"Discordis synced?",
"Groupi lisatud?",
]
_client: gspread.Client | None = None
_worksheet: gspread.Worksheet | None = None
# In-memory cache: list of dicts (one per row)
_cache: list[dict] = []
def _get_worksheet() -> gspread.Worksheet:
"""Authenticate and return the first worksheet of the configured sheet."""
global _client, _worksheet
creds = Credentials.from_service_account_file(config.GOOGLE_CREDS_PATH, scopes=SCOPES)
_client = gspread.authorize(creds)
spreadsheet = _client.open_by_key(config.SHEET_ID)
_worksheet = spreadsheet.sheet1
return _worksheet
def _ensure_headers(ws: gspread.Worksheet) -> None:
"""If the sheet is empty or missing headers, write them (headers are in row 1)."""
existing = ws.row_values(1)
if existing != EXPECTED_HEADERS:
for col_idx, header in enumerate(EXPECTED_HEADERS, start=1):
if col_idx > len(existing) or existing[col_idx - 1] != header:
ws.update_cell(1, col_idx, header)
def refresh() -> list[dict]:
"""Pull all rows from the sheet into the in-memory cache.
Returns the cache (list of dicts keyed by header names).
"""
global _cache
ws = _get_worksheet()
_ensure_headers(ws)
# head=1: row 1 is the header; row 2 is a formula/stats row - skip it
records = ws.get_all_records(head=1)
_cache = records[1:] # drop the formula row (row 2) from the cache
return _cache
def get_cache() -> list[dict]:
"""Return the current in-memory cache without re-querying."""
return _cache
def find_member_by_id(discord_id: int) -> dict | None:
"""Look up a member row by Discord user ID."""
for row in _cache:
raw = row.get("User ID", "")
if str(raw).strip() == str(discord_id):
return row
return None
def find_member_by_username(username: str) -> dict | None:
"""Look up a member row by Discord username (case-insensitive)."""
for row in _cache:
if str(row.get("Discord", "")).strip().lower() == username.lower():
return row
return None
def find_member(discord_id: int, username: str) -> dict | None:
"""Try ID first, fall back to username."""
return find_member_by_id(discord_id) or find_member_by_username(username)
# ---- Write helpers --------------------------------------------------------
def _row_index_for_member(discord_id: int | None = None, username: str | None = None) -> int | None:
"""Return the 1-based sheet row index for a member (header = row 2, data from row 3)."""
for idx, row in enumerate(_cache):
if discord_id and str(row.get("User ID", "")).strip() == str(discord_id):
return idx + 3 # +3 because header is row 2, data starts row 3, idx is 0-based
if username and str(row.get("Discord", "")).strip().lower() == username.lower():
return idx + 3
return None
def update_cell_for_member(
discord_id: int | None,
username: str | None,
column_name: str,
value: str,
) -> bool:
"""Write a value to a specific column for a member row.
Returns True if the write succeeded.
"""
ws = _worksheet or _get_worksheet()
row_idx = _row_index_for_member(discord_id=discord_id, username=username)
if row_idx is None:
return False
try:
col_idx = EXPECTED_HEADERS.index(column_name) + 1
except ValueError:
return False
ws.update([[value]], gspread.utils.rowcol_to_a1(row_idx, col_idx),
value_input_option="USER_ENTERED")
# Keep cache in sync
cache_idx = row_idx - 3
if 0 <= cache_idx < len(_cache):
_cache[cache_idx][column_name] = value
return True
def batch_set_synced(updates: list[tuple[int, bool]]) -> None:
"""Batch-write 'Discordis synced?' for multiple members in a single API call."""
ws = _worksheet or _get_worksheet()
col_idx = EXPECTED_HEADERS.index("Discordis synced?") + 1
cells = []
for discord_id, synced in updates:
row_idx = _row_index_for_member(discord_id=discord_id)
if row_idx is None:
continue
cells.append(gspread.Cell(row_idx, col_idx, "TRUE" if synced else "FALSE"))
cache_idx = row_idx - 3
if 0 <= cache_idx < len(_cache):
_cache[cache_idx]["Discordis synced?"] = "TRUE" if synced else "FALSE"
if cells:
ws.update_cells(cells, value_input_option="USER_ENTERED")
def set_user_id(username: str, discord_id: int) -> bool:
"""Write a Discord user ID for a row matched by Discord username."""
return update_cell_for_member(
discord_id=None,
username=username,
column_name="User ID",
value=str(discord_id),
)
def set_synced(discord_id: int, synced: bool) -> bool:
"""Mark a member as synced (TRUE) or not (FALSE)."""
return update_cell_for_member(
discord_id=discord_id,
username=None,
column_name="Discordis synced?",
value="TRUE" if synced else "FALSE",
)
def update_username(discord_id: int, new_username: str) -> bool:
"""Update the Discord column for a member (keeps sheet in sync with Discord)."""
return update_cell_for_member(
discord_id=discord_id,
username=None,
column_name="Discord",
value=new_username,
)
def add_new_member_row(username: str, discord_id: int) -> None:
"""Append a new row to the sheet with Discord username and User ID pre-filled.
All other columns are left empty for manual entry by an admin.
"""
ws = _worksheet or _get_worksheet()
row = [""] * len(EXPECTED_HEADERS)
row[EXPECTED_HEADERS.index("Discord")] = username
row[EXPECTED_HEADERS.index("User ID")] = str(discord_id)
row[EXPECTED_HEADERS.index("Discordis synced?")] = "FALSE"
ws.append_row(row, value_input_option="USER_ENTERED")
# Add to local cache so subsequent find_member() calls work in the same session
new_entry = {h: row[i] for i, h in enumerate(EXPECTED_HEADERS)}
_cache.append(new_entry)