"""Service layer for the crystal system.""" from __future__ import annotations import json from datetime import datetime, timedelta, timezone from typing import Dict, List, Mapping, Optional, Sequence, Tuple from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from PWF.CoreModules.database import STATUS_PENDING, Database, get_db from PWF.CoreModules.plugin_interface import DatabaseModel from Plugins.WPSAlchemyGame import WPSAlchemyGame from Plugins.WPSBackpackSystem import BackpackItemTier, WPSBackpackSystem from Plugins.WPSStoreSystem import WPSStoreSystem from .crystal_models import ( CRYSTAL_BASE_ITEM_ID, CrystalColorDefinition, CrystalExchangeEntry, CrystalFinalFusion, CrystalItemDefinition, CrystalRecipeStage, CrystalWaitStage, ) CRYSTAL_TABLE_NAME = "crystal_records" def get_crystal_db_models() -> List[DatabaseModel]: """Return database models required for the crystal system.""" return [ DatabaseModel( table_name=CRYSTAL_TABLE_NAME, column_defs={ "record_id": "INTEGER PRIMARY KEY AUTOINCREMENT", "user_id": "INTEGER NOT NULL", "chat_id": "INTEGER NOT NULL", "flow_type": "TEXT NOT NULL", "color_key": "TEXT", "state": "TEXT NOT NULL", "input_items": "TEXT NOT NULL", "current_stage": "TEXT", "expected_end_time": "TEXT", "scheduled_task_id": "INTEGER", "metadata": "TEXT", "created_at": "TEXT NOT NULL", "updated_at": "TEXT NOT NULL", }, ), ] class CrystalService: """Encapsulates shared behaviours for the crystal system.""" def __init__(self) -> None: self._db: Database = get_db() self._config: ProjectConfig = Architecture.Get(ProjectConfig) self._logger = self._config # --------------------------------------------------------------------- # # Registration helpers # --------------------------------------------------------------------- # def register_items( self, backpack: WPSBackpackSystem, item_definitions: Mapping[str, CrystalItemDefinition], ) -> None: """Register crystal items into the backpack system.""" for definition in item_definitions.values(): try: backpack.register_item( definition.item_id, definition.name, definition.tier, definition.description, ) except Exception as exc: # pylint: disable=broad-except self._logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册水晶物品 {definition.item_id} 失败: {exc}{ConsoleFrontColor.RESET}", ) def register_exchange_modes( self, store: WPSStoreSystem, item_definitions: Mapping[str, CrystalItemDefinition], exchange_entries: Mapping[str, CrystalExchangeEntry], ) -> None: """Register barter-friendly items into store system (as metadata only).""" for entry in exchange_entries.values(): reward_def = item_definitions.get(entry.reward_item) if reward_def is None: continue metadata = {"exchange": "true", **entry.metadata} metadata["required_items"] = json.dumps(entry.required_items) try: store.register_mode( item_id=entry.reward_item, price=1, limit_amount=entry.reward_amount, metadata=metadata, ) except Exception: # Store可能会因模式重复而抛异常,这里忽略。 continue def register_chain_recipes( self, alchemy: WPSAlchemyGame, stages: Sequence[CrystalRecipeStage], ) -> None: """Register crystal chain recipes into alchemy system.""" for stage in stages: try: alchemy.register_recipe( stage.materials, stage.result_item, stage.fail_item, stage.base_success_rate, ) except Exception as exc: # pylint: disable=broad-except self._logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册水晶配方 {stage.identifier} 失败: {exc}{ConsoleFrontColor.RESET}", ) def register_final_fusion( self, alchemy: WPSAlchemyGame, fusion: CrystalFinalFusion, ) -> None: """Register the final crystal fusion recipe.""" try: alchemy.register_recipe( fusion.materials, fusion.result_item, fusion.fail_item, fusion.base_success_rate, ) except Exception as exc: # pylint: disable=broad-except self._logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册终端水晶配方 {fusion.identifier} 失败: {exc}{ConsoleFrontColor.RESET}", ) # --------------------------------------------------------------------- # # Wait-flow handling # --------------------------------------------------------------------- # def create_wait_record( self, color_def: CrystalColorDefinition, wait_stage: CrystalWaitStage, user_id: int, chat_id: int, ) -> Tuple[int, datetime]: """Create a wait-flow record and enqueue scheduler task.""" now = datetime.now(timezone.utc) expected_end = now + timedelta(minutes=wait_stage.delay_minutes) payload = { "color_key": color_def.color_key, "wait_stage": wait_stage.identifier, "produced_item": wait_stage.produced_item, } cursor = self._db.conn.cursor() cursor.execute( f""" INSERT INTO {CRYSTAL_TABLE_NAME} ( user_id, chat_id, flow_type, color_key, state, input_items, current_stage, expected_end_time, metadata, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( user_id, chat_id, "wait_flow", color_def.color_key, STATUS_PENDING, json.dumps(wait_stage.consumed_items), wait_stage.identifier, expected_end.isoformat(), json.dumps(payload), now.isoformat(), now.isoformat(), ), ) record_id = cursor.lastrowid self._db.conn.commit() return int(record_id), expected_end def update_task_binding(self, record_id: int, task_id: Optional[int]) -> None: """Update the scheduled task binding for a record.""" cursor = self._db.conn.cursor() cursor.execute( f"UPDATE {CRYSTAL_TABLE_NAME} SET scheduled_task_id = ? WHERE record_id = ?", (task_id, record_id), ) self._db.conn.commit() def mark_wait_flow_completed(self, record_id: int) -> Optional[Dict[str, str]]: """Scheduler callback when a wait flow is ready.""" cursor = self._db.conn.cursor() cursor.execute( f""" SELECT record_id, user_id, chat_id, metadata FROM {CRYSTAL_TABLE_NAME} WHERE record_id = ? AND flow_type = 'wait_flow' """, (record_id,), ) row = cursor.fetchone() if not row: return None metadata = json.loads(row["metadata"]) if row["metadata"] else {} produced_item = metadata.get("produced_item") user_id = int(row["user_id"]) chat_id = int(row["chat_id"]) payload = { "state": "completed", "updated_at": datetime.now(timezone.utc).isoformat(), } cursor.execute( f""" UPDATE {CRYSTAL_TABLE_NAME} SET state = :state, updated_at = :updated_at WHERE record_id = :record_id """, {"state": "completed", "updated_at": payload["updated_at"], "record_id": record_id}, ) self._db.conn.commit() if not produced_item: return None backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) try: backpack.add_item(user_id, produced_item, 1) except Exception as exc: # pylint: disable=broad-except self._logger.Log( "Error", f"{ConsoleFrontColor.RED}发放等待产物 {produced_item} 失败: {exc}{ConsoleFrontColor.RESET}", ) return None return { "produced_item": produced_item, "user_id": str(user_id), "chat_id": str(chat_id), } # --------------------------------------------------------------------- # # Recovery # --------------------------------------------------------------------- # def recover_pending_wait_flows(self) -> List[Dict[str, int]]: """Recover pending wait-flow tasks when the plugin is reloaded.""" cursor = self._db.conn.cursor() cursor.execute( f""" SELECT record_id, current_stage, expected_end_time, scheduled_task_id FROM {CRYSTAL_TABLE_NAME} WHERE flow_type = 'wait_flow' AND state = ? """, (STATUS_PENDING,), ) rows = cursor.fetchall() now = datetime.now(timezone.utc) tasks: List[Dict[str, int]] = [] for row in rows: record_id = int(row["record_id"]) expected_end = row["expected_end_time"] scheduled_task_id = row["scheduled_task_id"] delay_ms = 0 if expected_end: try: target_dt = datetime.fromisoformat(expected_end) remaining = target_dt - now delay_ms = max(int(remaining.total_seconds() * 1000), 0) except ValueError: delay_ms = 0 if scheduled_task_id: continue tasks.append({"record_id": record_id, "delay_ms": delay_ms}) return tasks _CRYSTAL_SERVICE: Optional[CrystalService] = None def get_crystal_service() -> CrystalService: """Return singleton crystal service instance.""" global _CRYSTAL_SERVICE if _CRYSTAL_SERVICE is None: _CRYSTAL_SERVICE = CrystalService() return _CRYSTAL_SERVICE