1
0
forked from sass/tipibot

Added Fienta integration

This commit is contained in:
AlacrisDevs
2026-04-29 22:38:47 +03:00
parent a4a447867f
commit 3c2b4342a2
12 changed files with 1336 additions and 29 deletions

868
core/lan_fienta.py Normal file
View File

@@ -0,0 +1,868 @@
"""Fienta registration sync for the LAN bot profile.
The module keeps Fienta data in PocketBase, assigns Discord roles, and mirrors
public tournament teams into the LAN live registration sheet.
"""
from __future__ import annotations
import asyncio
import datetime as dt
import logging
import re
import unicodedata
from dataclasses import dataclass, field
from typing import Any
import discord
import gspread
from google.oauth2.service_account import Credentials
import config
from . import pb_client
log = logging.getLogger("tipilan.fienta")
SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
]
CS2_GENERAL_ROLE_ID = 1498736834656604251
LOL_GENERAL_ROLE_ID = 1498736949706490017
LANGUAGE_GENERAL_ROLE_ID = 1416417344984715366
CS2_CAPTAIN_ROLE_ID = 1498738332316860426
CS2_MANAGER_ROLE_ID = 1498738500558655679
EXISTING_LANGUAGE_ROLE_IDS = {
"EE": 1425781026482950245,
"LV": 1425781129528606740,
"FI": 1425781429618348073,
}
CONFIRMED_TEXT = "Kinnitatud"
PENDING_TEXT = "Kinnitamisel"
CANCELLED_STATUSES = {"CANCELLED", "REFUNDED", "EXPIRED", "VOIDED"}
BLOCKED_COUNTRY_CODES = {"BY", "RU"}
BLOCKED_COUNTRY_NAMES = {
"belarus",
"russia",
"russian federation",
"valgevene",
"venemaa",
"vene föderatsioon",
"vene foderatsioon",
}
TICKET_TYPES: dict[str, dict[str, Any]] = {
"595507": {
"game": "cs2",
"kind": "participant",
"sheet_public": True,
"main": True,
},
"595509": {
"game": "cs2",
"kind": "reserve",
"sheet_public": False,
"main": False,
},
"595510": {
"game": "cs2",
"kind": "manager",
"sheet_public": False,
"main": False,
},
"595912": {
"game": "lol",
"kind": "participant",
"sheet_public": True,
"main": True,
},
}
SHEET_CONFIG = {
"cs2": {"worksheet": "CS2", "start_row": 6, "end_row": 37, "cols": 6},
"lol": {"worksheet": "LoL", "start_row": 6, "end_row": 17, "cols": 5},
}
COUNTRY_CODE_BY_NAME = {
"afghanistan": "AF",
"albaania": "AL",
"albania": "AL",
"andorra": "AD",
"armeenia": "AM",
"armenia": "AM",
"austria": "AT",
"austria vabariik": "AT",
"azerbaijan": "AZ",
"aserbaidžaan": "AZ",
"belgia": "BE",
"belgium": "BE",
"bosnia ja hertsegoviina": "BA",
"bosnia and herzegovina": "BA",
"bulgaaria": "BG",
"bulgaria": "BG",
"canada": "CA",
"kanada": "CA",
"croatia": "HR",
"eesti": "EE",
"estonia": "EE",
"est": "EE",
"denmark": "DK",
"taani": "DK",
"finland": "FI",
"soome": "FI",
"france": "FR",
"prantsusmaa": "FR",
"georgia": "GE",
"gruusia": "GE",
"germany": "DE",
"saksamaa": "DE",
"greece": "GR",
"kreeka": "GR",
"hungary": "HU",
"ungari": "HU",
"iceland": "IS",
"island": "IS",
"ireland": "IE",
"iirimaa": "IE",
"italy": "IT",
"itaalia": "IT",
"japan": "JP",
"jaapan": "JP",
"kazakhstan": "KZ",
"kasahstan": "KZ",
"latvia": "LV",
"läti": "LV",
"lati": "LV",
"liechtenstein": "LI",
"lithuania": "LT",
"leedu": "LT",
"luxembourg": "LU",
"luksemburg": "LU",
"malta": "MT",
"moldova": "MD",
"montenegro": "ME",
"netherlands": "NL",
"holland": "NL",
"madalmaad": "NL",
"norway": "NO",
"norra": "NO",
"poland": "PL",
"poola": "PL",
"portugal": "PT",
"romania": "RO",
"rumeenia": "RO",
"serbia": "RS",
"slovakia": "SK",
"slovakkia": "SK",
"slovenia": "SI",
"sloveenia": "SI",
"spain": "ES",
"hispaania": "ES",
"sweden": "SE",
"rootsi": "SE",
"switzerland": "CH",
"šveits": "CH",
"sveits": "CH",
"turkey": "TR",
"türgi": "TR",
"ukraine": "UA",
"ukraina": "UA",
"united kingdom": "GB",
"suurbritannia": "GB",
"great britain": "GB",
"united states": "US",
"usa": "US",
"ameerika ühendriigid": "US",
"ameerika uhendriigid": "US",
"belarus": "BY",
"valgevene": "BY",
"russia": "RU",
"russian federation": "RU",
"venemaa": "RU",
}
COUNTRY_ROLE_COLOURS = {
"EE": 0x0072CE,
"LV": 0x9E3039,
"FI": 0x003580,
"LT": 0xFDB913,
"SE": 0x006AA7,
"DE": 0xDD0000,
"PL": 0xDC143C,
"UA": 0x0057B7,
"GB": 0x012169,
"US": 0x3C3B6E,
}
_client: gspread.Client | None = None
_spreadsheet: gspread.Spreadsheet | None = None
_sync_lock = asyncio.Lock()
@dataclass
class SyncSummary:
saved: int = 0
created: int = 0
updated: int = 0
roles_synced: int = 0
unmatched: int = 0
sheet_rows: int = 0
alerts: list[str] = field(default_factory=list)
def short(self) -> str:
return (
f"saved={self.saved}, created={self.created}, updated={self.updated}, "
f"roles={self.roles_synced}, unmatched={self.unmatched}, sheet_rows={self.sheet_rows}, "
f"alerts={len(self.alerts)}"
)
def _text_field(name: str, required: bool = False) -> dict:
return {
"name": name,
"type": "text",
"required": required,
"options": {"min": None, "max": None, "pattern": ""},
}
def _bool_field(name: str) -> dict:
return {"name": name, "type": "bool", "required": False}
def fienta_collection_payload() -> dict:
fields = [
_text_field("registration_key", required=True),
_text_field("order_id"),
_text_field("ticket_code"),
_text_field("order_status"),
_text_field("order_url"),
_text_field("payment_time"),
_text_field("game"),
_text_field("kind"),
_text_field("ticket_type_id"),
_text_field("ticket_title"),
_text_field("ticket_group_title"),
_text_field("team_name"),
_text_field("discord_username"),
_text_field("nickname"),
_text_field("country"),
_text_field("country_code"),
_text_field("riot_id"),
_text_field("steam64_id"),
_text_field("vrs_ranking"),
_bool_field("is_main"),
_bool_field("is_reserve"),
_bool_field("is_manager"),
_bool_field("is_captain"),
_bool_field("sheet_public"),
_bool_field("blocked_country"),
_bool_field("active"),
_bool_field("roles_synced"),
_text_field("last_sync_error"),
_text_field("updated_at"),
]
return {
"name": config.PB_FIENTA_COLLECTION_LAN,
"type": "base",
"fields": fields,
"listRule": None,
"viewRule": None,
"createRule": None,
"updateRule": None,
"deleteRule": None,
}
async def ensure_storage() -> bool:
"""Create the LAN Fienta collection when it does not exist."""
return await pb_client.ensure_collection(
config.PB_FIENTA_COLLECTION_LAN,
fienta_collection_payload(),
)
def _strip_accents(value: str) -> str:
normalized = unicodedata.normalize("NFKD", value)
return "".join(ch for ch in normalized if not unicodedata.combining(ch))
def _norm(value: Any) -> str:
return re.sub(r"\s+", " ", str(value or "")).strip()
def _norm_key(value: Any) -> str:
return _strip_accents(_norm(value)).casefold()
def _discord_key(value: Any) -> str:
return _norm(value).lstrip("@").casefold()
def _country_code(country: str) -> str:
raw = _norm(country)
if not raw:
return ""
key = _norm_key(raw)
if key in COUNTRY_CODE_BY_NAME:
return COUNTRY_CODE_BY_NAME[key]
for name, code in COUNTRY_CODE_BY_NAME.items():
if _norm_key(name) == key:
return code
upper = raw.upper()
if re.fullmatch(r"[A-Z]{2}", upper):
return upper
letters = re.sub(r"[^A-Z]", "", _strip_accents(upper))
return (letters[:2] or "XX").upper()
def _is_blocked_country(country: str, code: str) -> bool:
country_key = _norm_key(country)
return code in BLOCKED_COUNTRY_CODES or any(
_norm_key(name) == country_key for name in BLOCKED_COUNTRY_NAMES
)
def _role_safe_name(name: str) -> str:
cleaned = re.sub(r"\s+", " ", _norm(name)).strip("@# ")
return cleaned[:90] or "Unknown"
def _team_role_name(game: str, team_name: str) -> str:
prefix = "CS2" if game == "cs2" else "LoL"
return f"[{prefix}] {_role_safe_name(team_name)}"[:100]
def _language_role_name(code: str) -> str:
return f"[{code.upper()}]"
def _role_colour_for_country(code: str) -> discord.Color:
if code in COUNTRY_ROLE_COLOURS:
return discord.Color(COUNTRY_ROLE_COLOURS[code])
seed = sum(ord(ch) for ch in code)
hue = seed % 6
colours = [0x5865F2, 0x57F287, 0xFEE75C, 0xEB459E, 0xED4245, 0x00A8FC]
return discord.Color(colours[hue])
def _now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).isoformat()
def _ticket_rows(payload: dict[str, Any]) -> list[dict[str, Any]]:
order = payload.get("order") or {}
order_id = _norm(order.get("id"))
status = _norm(order.get("status")).upper()
payment = order.get("payment") or {}
payment_time = _norm(payment.get("time"))
order_url = _norm(order.get("order_url"))
results: list[dict[str, Any]] = []
for ticket in order.get("tickets") or []:
ticket_code = _norm(ticket.get("code"))
for idx, row in enumerate(ticket.get("rows") or []):
ticket_type = row.get("ticket_type") or {}
ticket_type_id = _norm(ticket_type.get("id"))
mapping = TICKET_TYPES.get(ticket_type_id)
if not mapping:
continue
attendee = row.get("attendee") or {}
country = _norm(attendee.get("country"))
code = _country_code(country)
kind = str(mapping["kind"])
nickname = (
_norm(attendee.get("nickname_134815"))
or _norm(attendee.get("nickname_134816"))
or _norm(attendee.get("full_name"))
)
captain_raw = _norm(attendee.get("tiimi_kapten_134872")).casefold()
registration_key = f"{order_id}:{ticket_code}:{idx}"
results.append(
{
"registration_key": registration_key,
"order_id": order_id,
"ticket_code": ticket_code,
"order_status": status,
"order_url": order_url,
"payment_time": payment_time,
"game": mapping["game"],
"kind": kind,
"ticket_type_id": ticket_type_id,
"ticket_title": _norm(ticket_type.get("title")),
"ticket_group_title": _norm((ticket_type.get("ticket_type_group") or {}).get("title")),
"team_name": _norm(attendee.get("team_name_134821")),
"discord_username": _norm(attendee.get("discord_username_134871")),
"nickname": nickname,
"country": country,
"country_code": code,
"riot_id": _norm(attendee.get("riot_id_134870")),
"steam64_id": _norm(attendee.get("steam64_id_134819")),
"vrs_ranking": _norm(attendee.get("team_vrs_ranking_134825")),
"is_main": bool(mapping["main"]),
"is_reserve": kind == "reserve",
"is_manager": kind == "manager",
"is_captain": captain_raw in {"jah", "yes", "true", "1"},
"sheet_public": bool(mapping["sheet_public"]),
"blocked_country": _is_blocked_country(country, code),
"active": status == "COMPLETED",
"roles_synced": False,
"last_sync_error": "",
"updated_at": _now_iso(),
}
)
return results
async def process_payload(bot: discord.Client, payload: dict[str, Any]) -> SyncSummary:
"""Store a Fienta webhook payload and resync LAN roles/sheets."""
async with _sync_lock:
summary = SyncSummary()
await ensure_storage()
rows = _ticket_rows(payload)
if not rows:
summary.alerts.append("Fienta webhook did not contain any known tournament ticket rows.")
await _send_alerts(bot, summary.alerts)
return summary
for row in rows:
_, created = await pb_client.upsert_record_by_field(
config.PB_FIENTA_COLLECTION_LAN,
"registration_key",
row["registration_key"],
row,
)
summary.saved += 1
if created:
summary.created += 1
else:
summary.updated += 1
resync = await resync_all(bot, send_alerts=False)
summary.roles_synced += resync.roles_synced
summary.unmatched += resync.unmatched
summary.sheet_rows += resync.sheet_rows
summary.alerts.extend(resync.alerts)
await _send_alerts(bot, summary.alerts)
return summary
async def resync_all(bot: discord.Client, send_alerts: bool = True) -> SyncSummary:
"""Re-apply all stored Fienta registrations to Discord and Sheets."""
summary = SyncSummary()
await ensure_storage()
records = await _all_registration_records()
await _sync_roles(bot, records, summary)
sheet_rows, sheet_alerts = await asyncio.to_thread(_sync_public_sheets, records)
summary.sheet_rows += sheet_rows
summary.alerts.extend(sheet_alerts)
if send_alerts:
await _send_alerts(bot, summary.alerts)
return summary
async def sync_member_join(bot: discord.Client, member: discord.Member) -> SyncSummary:
"""Apply any stored registrations that match a newly joined member."""
if member.guild.id != config.GUILD_ID:
return SyncSummary()
summary = SyncSummary()
await ensure_storage()
target = _discord_key(member.name)
records = [
record
for record in await _all_registration_records()
if _discord_key(record.get("discord_username")) == target
]
if not records:
return summary
await _sync_roles(bot, records, summary, preloaded_member=member)
await _send_alerts(bot, summary.alerts)
return summary
async def count_records() -> int:
await ensure_storage()
return await pb_client.count_records_in(config.PB_FIENTA_COLLECTION_LAN)
async def _all_registration_records() -> list[dict[str, Any]]:
return await pb_client.list_all_records_in(config.PB_FIENTA_COLLECTION_LAN)
async def _sync_roles(
bot: discord.Client,
records: list[dict[str, Any]],
summary: SyncSummary,
preloaded_member: discord.Member | None = None,
) -> None:
guild = bot.get_guild(config.GUILD_ID)
if guild is None:
summary.alerts.append(f"LAN guild {config.GUILD_ID} is not available to the bot.")
return
if preloaded_member is None:
await _ensure_member_cache(guild)
captain_counts: dict[tuple[str, str], int] = {}
for record in records:
if (
record.get("game") == "cs2"
and record.get("is_main")
and record.get("is_captain")
and record.get("active")
):
key = ("cs2", _norm_key(record.get("team_name")))
captain_counts[key] = captain_counts.get(key, 0) + 1
for (_, team_key), count in captain_counts.items():
if count > 1:
team = next(
(_norm(r.get("team_name")) for r in records if _norm_key(r.get("team_name")) == team_key),
team_key,
)
summary.alerts.append(f"Multiple CS2 captains marked for team `{team}` ({count}).")
for record in records:
error = await _sync_record_roles(guild, record, summary, preloaded_member)
try:
await pb_client.update_record_in(
config.PB_FIENTA_COLLECTION_LAN,
record["id"],
{
"roles_synced": not bool(error),
"last_sync_error": error,
"updated_at": _now_iso(),
},
)
except Exception as exc:
summary.alerts.append(
f"Could not update sync state for `{record.get('registration_key')}`: {exc}"
)
async def _sync_record_roles(
guild: discord.Guild,
record: dict[str, Any],
summary: SyncSummary,
preloaded_member: discord.Member | None = None,
) -> str:
team_name = _norm(record.get("team_name"))
username = _norm(record.get("discord_username"))
game = _norm(record.get("game"))
status = _norm(record.get("order_status")).upper()
country = _norm(record.get("country"))
country_code = _norm(record.get("country_code"))
if status in CANCELLED_STATUSES:
summary.alerts.append(
f"Registration `{record.get('registration_key')}` is {status}; no automatic role removal was done."
)
return "inactive order"
if not record.get("active"):
summary.alerts.append(
f"Registration `{record.get('registration_key')}` is `{status or 'UNKNOWN'}`; roles not assigned yet."
)
return "order not completed"
if record.get("blocked_country"):
summary.alerts.append(
f"Blocked country registration skipped: `{username}` / `{team_name}` / `{country}`."
)
return "blocked country"
if not username:
summary.unmatched += 1
summary.alerts.append(f"Registration `{record.get('registration_key')}` has no Discord username.")
return "missing Discord username"
if not team_name:
summary.alerts.append(f"Registration `{record.get('registration_key')}` has no team name.")
return "missing team name"
member = preloaded_member or _find_member_by_username(guild, username)
if member is None:
summary.unmatched += 1
summary.alerts.append(f"No Discord member found for `{username}` ({game.upper()} `{team_name}`).")
return "Discord member not found"
roles: list[discord.Role] = []
general_role_id = CS2_GENERAL_ROLE_ID if game == "cs2" else LOL_GENERAL_ROLE_ID
general_role = guild.get_role(general_role_id)
if general_role is None:
return f"general role {general_role_id} not found"
roles.append(general_role)
team_role = await _get_or_create_role(
guild,
_team_role_name(game, team_name),
anchor=general_role,
colour=general_role.color if general_role.color.value else discord.Color.default(),
)
roles.append(team_role)
language_general = guild.get_role(LANGUAGE_GENERAL_ROLE_ID)
if language_general:
roles.append(language_general)
if country_code:
country_role = await _get_or_create_country_role(guild, country_code, language_general)
roles.append(country_role)
else:
summary.alerts.append(f"Missing country for `{username}` ({game.upper()} `{team_name}`).")
else:
summary.alerts.append(f"Language general role {LANGUAGE_GENERAL_ROLE_ID} not found.")
if game == "cs2" and record.get("is_main") and record.get("is_captain"):
captain_role = guild.get_role(CS2_CAPTAIN_ROLE_ID)
if captain_role:
roles.append(captain_role)
else:
summary.alerts.append(f"CS2 Captain role {CS2_CAPTAIN_ROLE_ID} not found.")
if game == "cs2" and record.get("is_manager"):
manager_role = guild.get_role(CS2_MANAGER_ROLE_ID)
if manager_role:
roles.append(manager_role)
else:
summary.alerts.append(f"CS2 Manager role {CS2_MANAGER_ROLE_ID} not found.")
missing = [role for role in _unique_roles(roles) if role not in member.roles]
if missing:
try:
await member.add_roles(*missing, reason="Fienta LAN registration sync")
except discord.Forbidden:
return "bot lacks permission to add roles"
except discord.HTTPException as exc:
return f"Discord role add failed: {exc}"
summary.roles_synced += 1
return ""
async def _ensure_member_cache(guild: discord.Guild) -> None:
try:
if not guild.chunked:
await guild.chunk(cache=True)
except Exception as exc:
log.warning("Could not chunk guild members for Fienta sync: %s", exc)
def _find_member_by_username(guild: discord.Guild, username: str) -> discord.Member | None:
target = _discord_key(username)
if not target:
return None
for member in guild.members:
candidates = [member.name, getattr(member, "global_name", None), member.display_name]
if any(_discord_key(candidate) == target for candidate in candidates if candidate):
return member
return None
def _unique_roles(roles: list[discord.Role]) -> list[discord.Role]:
seen: set[int] = set()
result: list[discord.Role] = []
for role in roles:
if role.id not in seen:
seen.add(role.id)
result.append(role)
return result
async def _get_or_create_country_role(
guild: discord.Guild,
country_code: str,
anchor: discord.Role,
) -> discord.Role:
code = country_code.upper()
existing_id = EXISTING_LANGUAGE_ROLE_IDS.get(code)
if existing_id:
role = guild.get_role(existing_id)
if role:
return role
return await _get_or_create_role(
guild,
_language_role_name(code),
anchor=anchor,
colour=_role_colour_for_country(code),
)
async def _get_or_create_role(
guild: discord.Guild,
name: str,
anchor: discord.Role,
colour: discord.Color,
) -> discord.Role:
role = discord.utils.get(guild.roles, name=name)
if role is None:
role = await guild.create_role(name=name, color=colour, reason="Fienta LAN registration sync")
await _move_role_under(guild, role, anchor)
return role
async def _move_role_under(guild: discord.Guild, role: discord.Role, anchor: discord.Role) -> None:
if role.position == max(anchor.position - 1, 1):
return
try:
await guild.edit_role_positions(positions={role: max(anchor.position - 1, 1)})
except discord.Forbidden:
log.warning("No permission to move role %s under %s", role.name, anchor.name)
except discord.HTTPException as exc:
log.warning("Could not move role %s under %s: %s", role.name, anchor.name, exc)
def _get_spreadsheet() -> gspread.Spreadsheet:
global _client, _spreadsheet
if _spreadsheet is not None:
return _spreadsheet
creds = Credentials.from_service_account_file(config.GOOGLE_CREDS_PATH, scopes=SCOPES)
_client = gspread.authorize(creds)
_spreadsheet = _client.open_by_key(config.SHEET_ID)
return _spreadsheet
def _sync_public_sheets(records: list[dict[str, Any]]) -> tuple[int, list[str]]:
alerts: list[str] = []
rows_written = 0
spreadsheet = _get_spreadsheet()
for game in ("cs2", "lol"):
cfg = SHEET_CONFIG[game]
try:
worksheet = spreadsheet.worksheet(cfg["worksheet"])
except gspread.WorksheetNotFound:
alerts.append(f"Worksheet `{cfg['worksheet']}` not found in LAN live sheet.")
continue
teams = _public_teams(records, game)
rows_written += _write_game_sheet(worksheet, game, teams, alerts)
return rows_written, alerts
def _public_teams(records: list[dict[str, Any]], game: str) -> list[dict[str, Any]]:
by_team: dict[str, dict[str, Any]] = {}
for record in records:
status = _norm(record.get("order_status")).upper()
if record.get("game") != game or not record.get("sheet_public"):
continue
if record.get("blocked_country") or status in CANCELLED_STATUSES:
continue
team_name = _norm(record.get("team_name"))
if not team_name:
continue
key = _norm_key(team_name)
team = by_team.setdefault(
key,
{
"team_name": team_name,
"lineup": [],
"vrs": "",
"payment_time": _norm(record.get("payment_time")),
"confirmed": True,
},
)
team["lineup"].append(
{
"nickname": _norm(record.get("nickname")),
"country": _norm(record.get("country")),
"country_code": _norm(record.get("country_code")),
}
)
if not team["vrs"] and record.get("vrs_ranking"):
team["vrs"] = _norm(record.get("vrs_ranking"))
if _norm(record.get("payment_time")) < team["payment_time"]:
team["payment_time"] = _norm(record.get("payment_time"))
if status != "COMPLETED":
team["confirmed"] = False
return sorted(by_team.values(), key=lambda t: (t["payment_time"], _norm_key(t["team_name"])))
def _write_game_sheet(
worksheet: gspread.Worksheet,
game: str,
teams: list[dict[str, Any]],
alerts: list[str],
) -> int:
cfg = SHEET_CONFIG[game]
start_row = int(cfg["start_row"])
end_row = int(cfg["end_row"])
capacity = end_row - start_row + 1
existing = worksheet.get(f"B{start_row}:B{end_row}")
by_name: dict[str, int] = {}
empty_rows: list[int] = []
for offset in range(capacity):
row_num = start_row + offset
value = ""
if offset < len(existing) and existing[offset]:
value = _norm(existing[offset][0])
if value:
by_name[_norm_key(value)] = row_num
else:
empty_rows.append(row_num)
rows_written = 0
for team in teams:
key = _norm_key(team["team_name"])
row_num = by_name.get(key)
if row_num is None:
if not empty_rows:
alerts.append(f"{game.upper()} live sheet is full; `{team['team_name']}` was not added.")
continue
row_num = empty_rows.pop(0)
by_name[key] = row_num
no = row_num - start_row + 1
lineup = "\n".join(_lineup_entry(player) for player in team["lineup"])
timestamp = _format_sheet_time(team["payment_time"])
status = CONFIRMED_TEXT if team["confirmed"] else PENDING_TEXT
if game == "cs2":
values = [[no, team["team_name"], lineup, team["vrs"], timestamp, status]]
worksheet.update(values, f"A{row_num}:F{row_num}", value_input_option="USER_ENTERED")
else:
values = [[no, team["team_name"], lineup, timestamp, status]]
worksheet.update(values, f"A{row_num}:E{row_num}", value_input_option="USER_ENTERED")
rows_written += 1
return rows_written
def _lineup_entry(player: dict[str, str]) -> str:
nickname = player.get("nickname") or "?"
country = player.get("country") or player.get("country_code") or "?"
return f"{nickname}, {country}"
def _format_sheet_time(raw: str) -> str:
if not raw:
return ""
try:
parsed = dt.datetime.fromisoformat(raw)
except ValueError:
return raw
return parsed.strftime("%d.%m.%Y %H:%M")
async def _send_alerts(bot: discord.Client, alerts: list[str]) -> None:
if not alerts or not config.FIENTA_ADMIN_ALERT_CHANNEL_ID:
return
try:
channel = bot.get_channel(config.FIENTA_ADMIN_ALERT_CHANNEL_ID)
if channel is None:
channel = await bot.fetch_channel(config.FIENTA_ADMIN_ALERT_CHANNEL_ID)
except Exception as exc:
log.warning("Could not fetch Fienta alert channel: %s", exc)
return
if not hasattr(channel, "send"):
return
header = "**Fienta LAN sync alerts**"
chunks: list[str] = []
current = header
for alert in alerts:
line = f"\n- {alert}"
if len(current) + len(line) > 1900:
chunks.append(current)
current = header + line
else:
current += line
chunks.append(current)
for chunk in chunks:
try:
await channel.send(chunk)
except Exception as exc:
log.warning("Could not send Fienta alert: %s", exc)
break

View File

@@ -7,7 +7,7 @@ Environment variables (set in .env):
PB_URL Base URL of PocketBase (default: http://127.0.0.1:8090)
PB_ADMIN_EMAIL PocketBase admin e-mail
PB_ADMIN_PASSWORD PocketBase admin password
PB_ECONOMY_COLLECTION_DEV / PB_ECONOMY_COLLECTION_ECONOMY
PB_ECONOMY_COLLECTION_DEV / PB_ECONOMY_COLLECTION_ECONOMY / PB_ECONOMY_COLLECTION_LAN
"""
from __future__ import annotations
@@ -75,16 +75,28 @@ async def _hdrs() -> dict[str, str]:
return {"Authorization": await _ensure_auth()}
def _escape_filter_value(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
# ---------------------------------------------------------------------------
# CRUD helpers
# ---------------------------------------------------------------------------
async def get_record(user_id: str) -> dict[str, Any] | None:
"""Fetch one economy record by Discord user_id. Returns None if not found."""
return await get_first_record(
ECONOMY_COLLECTION,
f'user_id="{_escape_filter_value(user_id)}"',
)
async def get_first_record(collection: str, filter_expr: str) -> dict[str, Any] | None:
"""Fetch one record from any collection by a PocketBase filter expression."""
session = _get_session()
async with session.get(
f"{PB_URL}/api/collections/{ECONOMY_COLLECTION}/records",
params={"filter": f'user_id="{user_id}"', "perPage": 1},
f"{PB_URL}/api/collections/{collection}/records",
params={"filter": filter_expr, "perPage": 1},
headers=await _hdrs(),
) as resp:
resp.raise_for_status()
@@ -93,11 +105,22 @@ async def get_record(user_id: str) -> dict[str, Any] | None:
return items[0] if items else None
async def get_record_by_field(collection: str, field: str, value: str) -> dict[str, Any] | None:
"""Fetch one record where `field` exactly equals `value`."""
escaped = _escape_filter_value(value)
return await get_first_record(collection, f'{field}="{escaped}"')
async def create_record(record: dict[str, Any]) -> dict[str, Any]:
"""Create a new economy record. Returns the created record (includes PB id)."""
return await create_record_in(ECONOMY_COLLECTION, record)
async def create_record_in(collection: str, record: dict[str, Any]) -> dict[str, Any]:
"""Create a new record in any collection. Returns the created record."""
session = _get_session()
async with session.post(
f"{PB_URL}/api/collections/{ECONOMY_COLLECTION}/records",
f"{PB_URL}/api/collections/{collection}/records",
json=record,
headers=await _hdrs(),
) as resp:
@@ -109,9 +132,14 @@ async def create_record(record: dict[str, Any]) -> dict[str, Any]:
async def update_record(record_id: str, data: dict[str, Any]) -> dict[str, Any]:
"""PATCH an existing record by its PocketBase record id."""
return await update_record_in(ECONOMY_COLLECTION, record_id, data)
async def update_record_in(collection: str, record_id: str, data: dict[str, Any]) -> dict[str, Any]:
"""PATCH an existing record in any collection by its PocketBase record id."""
session = _get_session()
async with session.patch(
f"{PB_URL}/api/collections/{ECONOMY_COLLECTION}/records/{record_id}",
f"{PB_URL}/api/collections/{collection}/records/{record_id}",
json=data,
headers=await _hdrs(),
) as resp:
@@ -121,9 +149,14 @@ async def update_record(record_id: str, data: dict[str, Any]) -> dict[str, Any]:
async def count_records() -> int:
"""Return the total number of records in the collection (single cheap request)."""
return await count_records_in(ECONOMY_COLLECTION)
async def count_records_in(collection: str) -> int:
"""Return the total number of records in any collection."""
session = _get_session()
async with session.get(
f"{PB_URL}/api/collections/{ECONOMY_COLLECTION}/records",
f"{PB_URL}/api/collections/{collection}/records",
params={"perPage": 1, "page": 1},
headers=await _hdrs(),
) as resp:
@@ -134,13 +167,18 @@ async def count_records() -> int:
async def list_all_records(page_size: int = 500) -> list[dict[str, Any]]:
"""Fetch every record in the collection, handling PocketBase pagination."""
return await list_all_records_in(ECONOMY_COLLECTION, page_size=page_size)
async def list_all_records_in(collection: str, page_size: int = 500) -> list[dict[str, Any]]:
"""Fetch every record in any collection, handling PocketBase pagination."""
results: list[dict[str, Any]] = []
page = 1
session = _get_session()
hdrs = await _hdrs()
while True:
async with session.get(
f"{PB_URL}/api/collections/{ECONOMY_COLLECTION}/records",
f"{PB_URL}/api/collections/{collection}/records",
params={"perPage": page_size, "page": page},
headers=hdrs,
) as resp:
@@ -152,3 +190,51 @@ async def list_all_records(page_size: int = 500) -> list[dict[str, Any]]:
break
page += 1
return results
async def upsert_record_by_field(
collection: str,
field: str,
value: str,
data: dict[str, Any],
) -> tuple[dict[str, Any], bool]:
"""Create or update a record. Returns (record, created)."""
existing = await get_record_by_field(collection, field, value)
if existing:
return await update_record_in(collection, existing["id"], data), False
return await create_record_in(collection, data), True
async def get_collection(collection: str) -> dict[str, Any] | None:
"""Fetch collection metadata, returning None if it doesn't exist."""
session = _get_session()
async with session.get(
f"{PB_URL}/api/collections/{collection}",
headers=await _hdrs(),
) as resp:
if resp.status == 404:
return None
resp.raise_for_status()
return await resp.json()
async def create_collection(payload: dict[str, Any]) -> dict[str, Any]:
"""Create a PocketBase collection from a full collection payload."""
session = _get_session()
async with session.post(
f"{PB_URL}/api/collections",
json=payload,
headers=await _hdrs(),
) as resp:
if resp.status not in (200, 201):
text = await resp.text()
raise RuntimeError(f"PocketBase collection create failed ({resp.status}): {text}")
return await resp.json()
async def ensure_collection(collection: str, payload: dict[str, Any]) -> bool:
"""Create `collection` when missing. Returns True if created."""
if await get_collection(collection):
return False
await create_collection(payload)
return True