"""Add new schema fields to the economy_users PocketBase collection. Run once after pulling the stats + jailbreak_used changes: python scripts/add_stats_fields.py Requirements: - PocketBase running and reachable at PB_URL - PB_ADMIN_EMAIL / PB_ADMIN_PASSWORD set in .env """ from __future__ import annotations import asyncio import os import sys from pathlib import Path import aiohttp from dotenv import load_dotenv sys.path.insert(0, str(Path(__file__).parent.parent)) load_dotenv() PB_URL = os.getenv("PB_URL", "http://127.0.0.1:8090") PB_ADMIN_EMAIL = os.getenv("PB_ADMIN_EMAIL", "") PB_ADMIN_PASSWORD = os.getenv("PB_ADMIN_PASSWORD", "") COLLECTION = "economy_users" # --------------------------------------------------------------------------- # New fields to add # --------------------------------------------------------------------------- _NEW_BOOL_FIELDS = [ "jailbreak_used", ] _NEW_NUMBER_FIELDS = [ "heist_global_cd_until", "peak_balance", "lifetime_earned", "lifetime_lost", "work_count", "beg_count", "total_wagered", "biggest_win", "biggest_loss", "slots_jackpots", "crimes_attempted", "crimes_succeeded", "times_jailed", "total_bail_paid", "heists_joined", "heists_won", "total_given", "total_received", "best_daily_streak", ] def _number_field(name: str) -> dict: return { "name": name, "type": "number", "required": False, "options": {"min": None, "max": None, "noDecimal": True}, } def _bool_field(name: str) -> dict: return {"name": name, "type": "bool", "required": False} async def main() -> None: timeout = aiohttp.ClientTimeout(total=15) async with aiohttp.ClientSession(timeout=timeout) as session: # ── Authenticate ──────────────────────────────────────────────────── async with session.post( f"{PB_URL}/api/collections/_superusers/auth-with-password", json={"identity": PB_ADMIN_EMAIL, "password": PB_ADMIN_PASSWORD}, ) as resp: if resp.status != 200: print(f"Auth failed ({resp.status}): {await resp.text()}") return token = (await resp.json())["token"] hdrs = {"Authorization": token} # ── Fetch current collection ───────────────────────────────────────── async with session.get( f"{PB_URL}/api/collections/{COLLECTION}", headers=hdrs ) as resp: if resp.status != 200: print(f"Could not fetch collection ({resp.status}): {await resp.text()}") return col = await resp.json() existing = {f["name"] for f in col.get("fields", [])} print(f"Existing fields ({len(existing)}): {sorted(existing)}\n") new_fields = [] for name in _NEW_BOOL_FIELDS: if name not in existing: new_fields.append(_bool_field(name)) print(f" + {name} (bool)") else: print(f" = {name} (already exists)") for name in _NEW_NUMBER_FIELDS: if name not in existing: new_fields.append(_number_field(name)) print(f" + {name} (number)") else: print(f" = {name} (already exists)") if not new_fields: print("\nNothing to add - schema already up to date.") return # ── Patch collection schema ────────────────────────────────────────── updated_fields = col.get("fields", []) + new_fields async with session.patch( f"{PB_URL}/api/collections/{COLLECTION}", json={"fields": updated_fields}, headers=hdrs, ) as resp: if resp.status != 200: print(f"\nSchema update failed ({resp.status}): {await resp.text()}") return print(f"\n✅ Added {len(new_fields)} field(s) successfully.") if __name__ == "__main__": asyncio.run(main())