diff --git a/.tasks/2025-11-11_1_crystal-system.md b/.tasks/2025-11-11_1_crystal-system.md new file mode 100644 index 0000000..d5c583e --- /dev/null +++ b/.tasks/2025-11-11_1_crystal-system.md @@ -0,0 +1,34 @@ +# 背景 +文件名:2025-11-11_1 +创建于:2025-11-11_21:49:00 +创建者:ASUS +主分支:main +任务分支:无 +Yolo模式:Off + +# 任务描述 +新增高级水晶系统,依赖菜园系统与冒险系统,能够复用其已注册的前置物品;水晶插件需要向商店注册水晶商品,提供水晶变色(如消耗99炉灰并等待一小时获得黑色水晶卷轴),并向炼金系统注册物品配方(黑色水晶卷轴*2 + 水晶 => 黑水晶)等交互。 + +# 项目概览 +现有插件式框架包含菜园、冒险、炼金等系统,均提供接口供新插件注册物品、商店条目与炼金配方;PWF核心定义了插件基类、配置与依赖管理逻辑。 + +# 分析 +待梳理菜园系统、冒险系统及相关物品注册接口的实现细节,以确认水晶系统能够复用的资源与依赖顺序;同时需要了解商店与炼金系统的注册流程与数据模型约束。 + +# 提议的解决方案 +- 编写新的 `WPSCrystalSystem` 插件,依赖背包、商店、炼金等核心模块,初始化时完成水晶物品、配方与以物易物挂载,并通过调度器复用待机任务。 +- 使用链式炼金拆分颜色演化(炉灰→粉尘→卷轴→水晶),同时为每种颜色配置等待流程与最终融合配方,确保常量均可配置化。 +- 引入专属数据库表 `crystal_records` 记录变色/兑换流程状态,结合调度器在系统重启后恢复任务。 +- 提供指令入口 `水晶`,支持 `变色`、`兑换`、`列表` 等子命令,并返回 Markdown 状态。 + +# 当前执行步骤:"执行水晶系统实现" + +# 任务进度 +2025-11-11_21:49:00 +- 完成水晶模型、服务与主插件代码,注册物品与配方,并接入调度器、商店及指令交互。 +2025-11-11_22:30:00 +- 调整指令展示与兑换匹配逻辑,补充中文名称映射;基础材料获取途径尚待设计。 + +# 最终审查 +未完成 + diff --git a/Plugins/WPSCrystalSystem/__init__.py b/Plugins/WPSCrystalSystem/__init__.py new file mode 100644 index 0000000..7827e2f --- /dev/null +++ b/Plugins/WPSCrystalSystem/__init__.py @@ -0,0 +1,6 @@ +"""Crystal system plugin exports.""" + +from .crystal_plugin_base import WPSCrystalSystem + +__all__ = ["WPSCrystalSystem"] + diff --git a/Plugins/WPSCrystalSystem/crystal_models.py b/Plugins/WPSCrystalSystem/crystal_models.py new file mode 100644 index 0000000..84f4ad5 --- /dev/null +++ b/Plugins/WPSCrystalSystem/crystal_models.py @@ -0,0 +1,248 @@ +"""Core data models and default configuration for the crystal system.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Dict, List, Mapping, Optional, Sequence, Tuple + +from Plugins.WPSAlchemyGame import WPSAlchemyGame +from Plugins.WPSBackpackSystem import BackpackItemTier + + +@dataclass(frozen=True) +class CrystalItemDefinition: + """Definition of a crystal-related item.""" + + item_id: str + name: str + tier: BackpackItemTier + description: str + + +@dataclass(frozen=True) +class CrystalRecipeStage: + """A single stage recipe that is registered into the alchemy system.""" + + identifier: str + materials: Tuple[str, str, str] + result_item: str + fail_item: str + base_success_rate: float + description: str = "" + + +@dataclass(frozen=True) +class CrystalWaitStage: + """A stage describing a delayed transformation outside the alchemy system.""" + + identifier: str + consumed_items: Mapping[str, int] + produced_item: str + delay_minutes: int + message: str = "" + + +@dataclass(frozen=True) +class CrystalFinalFusion: + """Final fusion recipe that converts scrolls and base crystals to coloured crystals.""" + + identifier: str + materials: Tuple[str, str, str] + result_item: str + fail_item: str + base_success_rate: float + description: str = "" + + +@dataclass(frozen=True) +class CrystalExchangeEntry: + """Definition of a crystal barter/exchange offer.""" + + identifier: str + required_items: Mapping[str, int] + reward_item: str + reward_amount: int = 1 + metadata: Mapping[str, str] = field(default_factory=dict) + + +@dataclass(frozen=True) +class CrystalColorDefinition: + """Full definition for a colour pipeline.""" + + color_key: str + display_name: str + chain_stages: Sequence[CrystalRecipeStage] + wait_stage: CrystalWaitStage + final_fusion: CrystalFinalFusion + + +CRYSTAL_BASE_ITEM_ID = "crystal_base_core" +CRYSTAL_BASE_SCROLL_ID = "crystal_base_scroll" +CRYSTAL_TINT_POWDER_ID = "crystal_tint_powder" +CRYSTAL_RESONANCE_POWDER_ID = "crystal_resonance_powder" + + +def _build_black_color_definition() -> CrystalColorDefinition: + """Construct default chain definition for black crystal.""" + + stage1 = CrystalRecipeStage( + identifier="black_stage_1", + materials=( + WPSAlchemyGame.ASH_ITEM_ID, + WPSAlchemyGame.ASH_ITEM_ID, + CRYSTAL_TINT_POWDER_ID, + ), + result_item="crystal_black_dust_stage1", + fail_item=WPSAlchemyGame.ASH_ITEM_ID, + base_success_rate=0.9, + description="将炉灰与变色粉尘融合成初阶黑色粉尘。", + ) + stage2 = CrystalRecipeStage( + identifier="black_stage_2", + materials=( + stage1.result_item, + stage1.result_item, + CRYSTAL_RESONANCE_POWDER_ID, + ), + result_item="crystal_black_dust_stage2", + fail_item=WPSAlchemyGame.ASH_ITEM_ID, + base_success_rate=0.75, + description="压缩初阶粉尘并注入共鸣粉形成高阶粉尘。", + ) + scroll_stage = CrystalRecipeStage( + identifier="black_scroll_alchemy", + materials=( + stage2.result_item, + stage2.result_item, + CRYSTAL_BASE_SCROLL_ID, + ), + result_item="crystal_black_scroll", + fail_item=WPSAlchemyGame.ASH_ITEM_ID, + base_success_rate=0.65, + description="将高阶粉尘灌注到卷轴之上,得出黑色变色卷轴。", + ) + + wait_stage = CrystalWaitStage( + identifier="black_scroll_darkening", + consumed_items={ + WPSAlchemyGame.ASH_ITEM_ID: 99, + "crystal_black_scroll": 1, + }, + produced_item="crystal_black_scroll_charged", + delay_minutes=60, + message="黑色变色卷轴正在吸收阴焰,预计一小时后完成。", + ) + + final_fusion = CrystalFinalFusion( + identifier="black_crystal_fusion", + materials=( + wait_stage.produced_item, + wait_stage.produced_item, + CRYSTAL_BASE_ITEM_ID, + ), + result_item="crystal_black_core", + fail_item=WPSAlchemyGame.ASH_ITEM_ID, + base_success_rate=0.7, + description="两张黑色卷轴共鸣后唤醒基础晶核,形成黑水晶。", + ) + + return CrystalColorDefinition( + color_key="black", + display_name="黑色水晶", + chain_stages=[stage1, stage2, scroll_stage], + wait_stage=wait_stage, + final_fusion=final_fusion, + ) + + +DEFAULT_CRYSTAL_ITEMS: Dict[str, CrystalItemDefinition] = { + CRYSTAL_BASE_ITEM_ID: CrystalItemDefinition( + item_id=CRYSTAL_BASE_ITEM_ID, + name="未调谐晶核", + tier=BackpackItemTier.RARE, + description="天然形成的晶核,等待注入颜色与秩序。", + ), + CRYSTAL_BASE_SCROLL_ID: CrystalItemDefinition( + item_id=CRYSTAL_BASE_SCROLL_ID, + name="空白变色卷轴", + tier=BackpackItemTier.RARE, + description="允许运色粉尘封印其上,作为变色的载体。", + ), + CRYSTAL_TINT_POWDER_ID: CrystalItemDefinition( + item_id=CRYSTAL_TINT_POWDER_ID, + name="变色粉尘", + tier=BackpackItemTier.COMMON, + description="常见的变色媒介,可与炉灰混合产生初阶粉尘。", + ), + CRYSTAL_RESONANCE_POWDER_ID: CrystalItemDefinition( + item_id=CRYSTAL_RESONANCE_POWDER_ID, + name="共鸣粉末", + tier=BackpackItemTier.RARE, + description="能触发粉尘的共鸣性,有助于稳定高阶色谱。", + ), + "crystal_black_dust_stage1": CrystalItemDefinition( + item_id="crystal_black_dust_stage1", + name="黑色粉尘-初阶", + tier=BackpackItemTier.RARE, + description="微量黑色素的粉尘,需要进一步压缩。", + ), + "crystal_black_dust_stage2": CrystalItemDefinition( + item_id="crystal_black_dust_stage2", + name="黑色粉尘-高阶", + tier=BackpackItemTier.EPIC, + description="高纯度黑色粉尘,可用于制作黑色卷轴。", + ), + "crystal_black_scroll": CrystalItemDefinition( + item_id="crystal_black_scroll", + name="黑色变色卷轴", + tier=BackpackItemTier.EPIC, + description="封装黑色粉尘力量的卷轴,但仍需阴焰淬炼。", + ), + "crystal_black_scroll_charged": CrystalItemDefinition( + item_id="crystal_black_scroll_charged", + name="淬炼黑色卷轴", + tier=BackpackItemTier.EPIC, + description="经历阴焰淬炼的黑色卷轴,适合唤醒晶核。", + ), + "crystal_black_core": CrystalItemDefinition( + item_id="crystal_black_core", + name="黑水晶", + tier=BackpackItemTier.LEGENDARY, + description="浓缩黑暗能量的晶体,可在多系统中作为高级材料。", + ), +} + + +DEFAULT_CRYSTAL_COLOR_MAP: Dict[str, CrystalColorDefinition] = { + "black": _build_black_color_definition(), +} + + +DEFAULT_CRYSTAL_EXCHANGE_ENTRIES: Dict[str, CrystalExchangeEntry] = { + "exchange_black_scroll": CrystalExchangeEntry( + identifier="exchange_black_scroll", + required_items={ + "crystal_black_dust_stage2": 2, + "crystal_tint_powder": 5, + }, + reward_item="crystal_black_scroll", + metadata={ + "category": "scroll", + "display_name": "黑色变色卷轴兑换", + }, + ), + "exchange_black_core": CrystalExchangeEntry( + identifier="exchange_black_core", + required_items={ + "crystal_black_scroll_charged": 1, + CRYSTAL_BASE_ITEM_ID: 1, + "combat_material_crystal": 2, + }, + reward_item="crystal_black_core", + metadata={ + "category": "crystal", + "display_name": "黑水晶兑换", + }, + ), +} + diff --git a/Plugins/WPSCrystalSystem/crystal_plugin_base.py b/Plugins/WPSCrystalSystem/crystal_plugin_base.py new file mode 100644 index 0000000..8daf090 --- /dev/null +++ b/Plugins/WPSCrystalSystem/crystal_plugin_base.py @@ -0,0 +1,374 @@ +"""Crystal system primary plugin implementation.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Dict, List, Mapping, Optional, Tuple + +from PWF.Convention.Runtime.Architecture import Architecture +from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig +from PWF.CoreModules.clock_scheduler import get_clock_scheduler +from PWF.CoreModules.plugin_interface import DatabaseModel + +from Plugins.WPSAPI import WPSAPI +from Plugins.WPSAlchemyGame import WPSAlchemyGame +from Plugins.WPSBackpackSystem import WPSBackpackSystem +from Plugins.WPSStoreSystem import WPSStoreSystem + +from .crystal_models import ( + DEFAULT_CRYSTAL_COLOR_MAP, + DEFAULT_CRYSTAL_EXCHANGE_ENTRIES, + DEFAULT_CRYSTAL_ITEMS, + CrystalColorDefinition, + CrystalExchangeEntry, + CrystalItemDefinition, +) +from .crystal_service import get_crystal_db_models, get_crystal_service + + +logger: ProjectConfig = Architecture.Get(ProjectConfig) + + +class WPSCrystalSystem(WPSAPI): + """Main crystal system plugin.""" + + _initialized: bool = False + + def __init__(self) -> None: + super().__init__() + self._service = get_crystal_service() + self._items: Dict[str, CrystalItemDefinition] = dict(DEFAULT_CRYSTAL_ITEMS) + self._colors: Dict[str, CrystalColorDefinition] = dict(DEFAULT_CRYSTAL_COLOR_MAP) + self._exchange_entries: Dict[str, CrystalExchangeEntry] = { + key.lower(): value for key, value in DEFAULT_CRYSTAL_EXCHANGE_ENTRIES.items() + } + + # ------------------------------------------------------------------ # + # Plugin lifecycle + # ------------------------------------------------------------------ # + + def dependencies(self) -> List[type]: + return [ + WPSAPI, + WPSBackpackSystem, + WPSStoreSystem, + WPSAlchemyGame, + ] + + def register_db_model(self) -> List[DatabaseModel]: + return get_crystal_db_models() + + def wake_up(self) -> None: + if WPSCrystalSystem._initialized: + return + WPSCrystalSystem._initialized = True + + backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) + store: WPSStoreSystem = Architecture.Get(WPSStoreSystem) + alchemy: WPSAlchemyGame = Architecture.Get(WPSAlchemyGame) + + self._ensure_prerequisite_items(backpack) + + self._service.register_items(backpack, self._items) + + for color_def in self._colors.values(): + self._service.register_chain_recipes(alchemy, color_def.chain_stages) + self._service.register_final_fusion(alchemy, color_def.final_fusion) + + self._service.register_exchange_modes(store, self._items, self._exchange_entries) + + self._recover_wait_tasks() + self.register_plugin("crystal") + self.register_plugin("水晶") + + logger.Log( + "Info", + f"{ConsoleFrontColor.GREEN}WPSCrystalSystem 插件已加载,当前颜色数:{len(self._colors)}{ConsoleFrontColor.RESET}", + ) + + # ------------------------------------------------------------------ # + # Scheduler recovery & callbacks + # ------------------------------------------------------------------ # + + def _recover_wait_tasks(self) -> None: + scheduler = get_clock_scheduler() + pending_tasks = self._service.recover_pending_wait_flows() + for task in pending_tasks: + delay_ms = task.get("delay_ms", 0) + record_id = task["record_id"] + task_id = scheduler.register_task( + plugin_module=self.__module__, + plugin_class=self.__class__.__name__, + callback_name="complete_wait_flow", + delay_ms=delay_ms, + kwargs={"record_id": record_id}, + ) + self._service.update_task_binding(record_id, task_id) + + async def complete_wait_flow(self, record_id: int) -> None: + info = self._service.mark_wait_flow_completed(record_id) + if not info: + return + chat_id = int(info["chat_id"]) + user_id = int(info["user_id"]) + produced_item = info["produced_item"] + message = ( + "# ⏱️ 水晶淬炼完成\n" + f"- 用户:`{user_id}`\n" + f"- 获得物品:`{produced_item}`\n" + f"- 时间:{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}" + ) + self._service.update_task_binding(record_id, None) + await self.send_markdown_message(message, chat_id, user_id) + + # ------------------------------------------------------------------ # + # Command handling + # ------------------------------------------------------------------ # + + async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: + payload = self.parse_message_after_at(message).strip() + if not payload: + return await self._send_help(chat_id, user_id) + tokens = payload.split() + if not tokens: + return await self._send_help(chat_id, user_id) + + action = tokens[0].lower() + if action in {"help", "帮助"}: + return await self._send_help(chat_id, user_id) + if action in {"变色", "color"}: + color_key = tokens[1] if len(tokens) > 1 else "" + return await self._handle_color_wait(chat_id, user_id, color_key) + if action in {"兑换", "exchange"}: + exchange_key = tokens[1] if len(tokens) > 1 else "" + return await self._handle_exchange(chat_id, user_id, exchange_key) + if action in {"列表", "list"}: + return await self._send_overview(chat_id, user_id) + return await self._send_help(chat_id, user_id) + + # ------------------------------------------------------------------ # + # Helper methods + # ------------------------------------------------------------------ # + + async def _send_help(self, chat_id: int, user_id: int) -> Optional[str]: + lines = [ + "# 💎 水晶系统指令", + "- `水晶 变色 <颜色>`:启动对应颜色的淬炼流程。", + "- `水晶 兑换 `:使用物品兑换指定奖励。", + "- `水晶 列表`:查看可用颜色与兑换信息。", + ] + return await self.send_markdown_message("\n".join(lines), chat_id, user_id) + + async def _send_overview(self, chat_id: int, user_id: int) -> Optional[str]: + lines = ["# 📦 水晶配置一览"] + backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) + for color_def in self._colors.values(): + wait_cost = self._format_item_requirements( + backpack, color_def.wait_stage.consumed_items + ) + lines.append( + f"- {color_def.display_name}|阶段:{len(color_def.chain_stages)}|等待消耗:{wait_cost}" + ) + lines.append("\n## 🔄 兑换项目") + for entry in self._exchange_entries.values(): + cost = self._format_item_requirements(backpack, entry.required_items) + reward_name = self._resolve_item_name(backpack, entry.reward_item) + display_name = entry.metadata.get("display_name") or reward_name + lines.append( + f"- {display_name}({entry.identifier})|消耗:{cost}|奖励:{reward_name}" + ) + return await self.send_markdown_message("\n".join(lines), chat_id, user_id) + + async def _handle_color_wait( + self, + chat_id: int, + user_id: int, + color_key: str, + ) -> Optional[str]: + if not color_key: + return await self.send_markdown_message("❌ 请指定颜色,例如 `水晶 变色 黑色`", chat_id, user_id) + color_def = self._resolve_color(color_key) + if not color_def: + return await self.send_markdown_message("❌ 未找到对应颜色配置。", chat_id, user_id) + backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) + missing = self._check_item_requirements( + backpack, user_id, color_def.wait_stage.consumed_items + ) + if missing: + formatted = ", ".join( + f"{self._resolve_item_name(backpack, item)}×{amount}" for item, amount in missing + ) + return await self.send_markdown_message( + f"❌ 材料不足:{formatted}", chat_id, user_id + ) + + # Deduct materials + self._deduct_items(backpack, user_id, color_def.wait_stage.consumed_items) + + record_id, expected_end = self._service.create_wait_record( + color_def, color_def.wait_stage, user_id, chat_id + ) + delay_ms = max( + int((expected_end - datetime.now(timezone.utc)).total_seconds() * 1000), 0 + ) + scheduler = get_clock_scheduler() + task_id = scheduler.register_task( + plugin_module=self.__module__, + plugin_class=self.__class__.__name__, + callback_name="complete_wait_flow", + delay_ms=delay_ms, + kwargs={"record_id": record_id}, + ) + self._service.update_task_binding(record_id, task_id) + + time_str = expected_end.strftime("%Y-%m-%d %H:%M") + msg = ( + "# 🔁 水晶淬炼已排程\n" + f"- 颜色:{color_def.display_name}\n" + f"- 预计完成:{time_str}\n" + "- 完成后会自动发放淬炼产物。" + ) + return await self.send_markdown_message(msg, chat_id, user_id) + + async def _handle_exchange( + self, + chat_id: int, + user_id: int, + exchange_key: str, + ) -> Optional[str]: + backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) + if not exchange_key: + example = self._default_exchange_example(backpack) + return await self.send_markdown_message( + f"❌ 请提供兑换ID或名称,例如 `水晶 兑换 {example}`", chat_id, user_id + ) + entry = self._resolve_exchange_entry(exchange_key, backpack) + if not entry: + available = ", ".join( + entry.metadata.get("display_name", entry.identifier) + for entry in self._exchange_entries.values() + ) + return await self.send_markdown_message( + f"❌ 未找到对应的兑换项目。可用选项:{available}", chat_id, user_id + ) + missing = self._check_item_requirements(backpack, user_id, entry.required_items) + if missing: + formatted = ", ".join( + f"{self._resolve_item_name(backpack, item)}×{amount}" for item, amount in missing + ) + return await self.send_markdown_message(f"❌ 兑换所需物品不足:{formatted}", chat_id, user_id) + self._deduct_items(backpack, user_id, entry.required_items) + backpack.add_item(user_id, entry.reward_item, entry.reward_amount) + reward_name = self._resolve_item_name(backpack, entry.reward_item) + display_name = entry.metadata.get("display_name") or reward_name + msg = ( + "# 🔄 兑换成功\n" + f"- 项目:{display_name}\n" + f"- 奖励:{reward_name} × {entry.reward_amount}" + ) + return await self.send_markdown_message(msg, chat_id, user_id) + + def _ensure_prerequisite_items(self, backpack: WPSBackpackSystem) -> None: + """Verify key prerequisite materials exist, log warnings otherwise.""" + + required_items = [ + "combat_material_crystal", + "combat_material_essence", + "garden_wine_maple", + "garden_wine_sage", + ] + missing = [] + for item_id in required_items: + try: + backpack.add_item(0, item_id, 0) + except Exception: + missing.append(item_id) + if missing: + logger.Log( + "Warning", + f"{ConsoleFrontColor.YELLOW}水晶系统启动时发现缺少前置物品: {', '.join(missing)}{ConsoleFrontColor.RESET}", + ) + + def _resolve_color(self, key: str) -> Optional[CrystalColorDefinition]: + lowered = key.lower() + if lowered in self._colors: + return self._colors[lowered] + for color_def in self._colors.values(): + if color_def.display_name.lower() == lowered: + return color_def + return None + + def _format_item_requirements( + self, backpack: WPSBackpackSystem, items: Mapping[str, int] + ) -> str: + return ( + ", ".join( + f"{self._resolve_item_name(backpack, item_id)}×{amount}" + for item_id, amount in items.items() + ) + or "无" + ) + + def _check_item_requirements( + self, + backpack: WPSBackpackSystem, + user_id: int, + requirements: Mapping[str, int], + ) -> List[Tuple[str, int]]: + missing: List[Tuple[str, int]] = [] + for item_id, amount in requirements.items(): + current = backpack.add_item(user_id, item_id, 0) + if current < amount: + missing.append((item_id, amount - current)) + return missing + + def _deduct_items( + self, + backpack: WPSBackpackSystem, + user_id: int, + requirements: Mapping[str, int], + ) -> None: + for item_id, amount in requirements.items(): + current = backpack.add_item(user_id, item_id, 0) + backpack.set_item_quantity(user_id, item_id, max(current - amount, 0)) + + def _resolve_item_name(self, backpack: WPSBackpackSystem, item_id: str) -> str: + try: + definition = backpack._get_definition(item_id) + return definition.name + except Exception: + return item_id + + def _resolve_exchange_entry( + self, key: str, backpack: WPSBackpackSystem + ) -> Optional[CrystalExchangeEntry]: + lowered = key.strip().lower() + if not lowered: + return None + direct = self._exchange_entries.get(lowered) + if direct: + return direct + for entry in self._exchange_entries.values(): + if entry.identifier.lower() == lowered: + return entry + display_name = entry.metadata.get("display_name") + if display_name and display_name.lower() == lowered: + return entry + if entry.reward_item.lower() == lowered: + return entry + reward_name = self._resolve_item_name(backpack, entry.reward_item) + if reward_name.lower() == lowered: + return entry + return None + + def _default_exchange_example(self, backpack: WPSBackpackSystem) -> str: + for entry in self._exchange_entries.values(): + display_name = entry.metadata.get("display_name") + if display_name: + return display_name + reward_name = self._resolve_item_name(backpack, entry.reward_item) + if reward_name: + return reward_name + return entry.identifier + return "exchange_id" + diff --git a/Plugins/WPSCrystalSystem/crystal_service.py b/Plugins/WPSCrystalSystem/crystal_service.py new file mode 100644 index 0000000..c36e418 --- /dev/null +++ b/Plugins/WPSCrystalSystem/crystal_service.py @@ -0,0 +1,310 @@ +"""Service layer for the crystal system.""" + +from __future__ import annotations + +import json +from datetime import datetime, timedelta, timezone +from typing import Dict, List, Mapping, Optional, Sequence, Tuple + +from PWF.Convention.Runtime.Architecture import Architecture +from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig +from PWF.CoreModules.database import STATUS_PENDING, Database, get_db +from PWF.CoreModules.plugin_interface import DatabaseModel + +from Plugins.WPSAlchemyGame import WPSAlchemyGame +from Plugins.WPSBackpackSystem import BackpackItemTier, WPSBackpackSystem +from Plugins.WPSStoreSystem import WPSStoreSystem + +from .crystal_models import ( + CRYSTAL_BASE_ITEM_ID, + CrystalColorDefinition, + CrystalExchangeEntry, + CrystalFinalFusion, + CrystalItemDefinition, + CrystalRecipeStage, + CrystalWaitStage, +) + + +CRYSTAL_TABLE_NAME = "crystal_records" + + +def get_crystal_db_models() -> List[DatabaseModel]: + """Return database models required for the crystal system.""" + + return [ + DatabaseModel( + table_name=CRYSTAL_TABLE_NAME, + column_defs={ + "record_id": "INTEGER PRIMARY KEY AUTOINCREMENT", + "user_id": "INTEGER NOT NULL", + "chat_id": "INTEGER NOT NULL", + "flow_type": "TEXT NOT NULL", + "color_key": "TEXT", + "state": "TEXT NOT NULL", + "input_items": "TEXT NOT NULL", + "current_stage": "TEXT", + "expected_end_time": "TEXT", + "scheduled_task_id": "INTEGER", + "metadata": "TEXT", + "created_at": "TEXT NOT NULL", + "updated_at": "TEXT NOT NULL", + }, + ), + ] + + +class CrystalService: + """Encapsulates shared behaviours for the crystal system.""" + + def __init__(self) -> None: + self._db: Database = get_db() + self._config: ProjectConfig = Architecture.Get(ProjectConfig) + self._logger = self._config + + # --------------------------------------------------------------------- # + # Registration helpers + # --------------------------------------------------------------------- # + + def register_items( + self, + backpack: WPSBackpackSystem, + item_definitions: Mapping[str, CrystalItemDefinition], + ) -> None: + """Register crystal items into the backpack system.""" + + for definition in item_definitions.values(): + try: + backpack.register_item( + definition.item_id, + definition.name, + definition.tier, + definition.description, + ) + except Exception as exc: # pylint: disable=broad-except + self._logger.Log( + "Warning", + f"{ConsoleFrontColor.YELLOW}注册水晶物品 {definition.item_id} 失败: {exc}{ConsoleFrontColor.RESET}", + ) + + def register_exchange_modes( + self, + store: WPSStoreSystem, + item_definitions: Mapping[str, CrystalItemDefinition], + exchange_entries: Mapping[str, CrystalExchangeEntry], + ) -> None: + """Register barter-friendly items into store system (as metadata only).""" + + for entry in exchange_entries.values(): + reward_def = item_definitions.get(entry.reward_item) + if reward_def is None: + continue + metadata = {"exchange": "true", **entry.metadata} + metadata["required_items"] = json.dumps(entry.required_items) + try: + store.register_mode( + item_id=entry.reward_item, + price=1, + limit_amount=entry.reward_amount, + metadata=metadata, + ) + except Exception: + # Store可能会因模式重复而抛异常,这里忽略。 + continue + + def register_chain_recipes( + self, + alchemy: WPSAlchemyGame, + stages: Sequence[CrystalRecipeStage], + ) -> None: + """Register crystal chain recipes into alchemy system.""" + + for stage in stages: + try: + alchemy.register_recipe( + stage.materials, + stage.result_item, + stage.fail_item, + stage.base_success_rate, + ) + except Exception as exc: # pylint: disable=broad-except + self._logger.Log( + "Warning", + f"{ConsoleFrontColor.YELLOW}注册水晶配方 {stage.identifier} 失败: {exc}{ConsoleFrontColor.RESET}", + ) + + def register_final_fusion( + self, + alchemy: WPSAlchemyGame, + fusion: CrystalFinalFusion, + ) -> None: + """Register the final crystal fusion recipe.""" + + try: + alchemy.register_recipe( + fusion.materials, + fusion.result_item, + fusion.fail_item, + fusion.base_success_rate, + ) + except Exception as exc: # pylint: disable=broad-except + self._logger.Log( + "Warning", + f"{ConsoleFrontColor.YELLOW}注册终端水晶配方 {fusion.identifier} 失败: {exc}{ConsoleFrontColor.RESET}", + ) + + # --------------------------------------------------------------------- # + # Wait-flow handling + # --------------------------------------------------------------------- # + + def create_wait_record( + self, + color_def: CrystalColorDefinition, + wait_stage: CrystalWaitStage, + user_id: int, + chat_id: int, + ) -> Tuple[int, datetime]: + """Create a wait-flow record and enqueue scheduler task.""" + + now = datetime.now(timezone.utc) + expected_end = now + timedelta(minutes=wait_stage.delay_minutes) + payload = { + "color_key": color_def.color_key, + "wait_stage": wait_stage.identifier, + "produced_item": wait_stage.produced_item, + } + cursor = self._db.conn.cursor() + cursor.execute( + f""" + INSERT INTO {CRYSTAL_TABLE_NAME} ( + user_id, chat_id, flow_type, color_key, state, input_items, + current_stage, expected_end_time, metadata, created_at, updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + user_id, + chat_id, + "wait_flow", + color_def.color_key, + STATUS_PENDING, + json.dumps(wait_stage.consumed_items), + wait_stage.identifier, + expected_end.isoformat(), + json.dumps(payload), + now.isoformat(), + now.isoformat(), + ), + ) + record_id = cursor.lastrowid + self._db.conn.commit() + return int(record_id), expected_end + + def update_task_binding(self, record_id: int, task_id: Optional[int]) -> None: + """Update the scheduled task binding for a record.""" + + cursor = self._db.conn.cursor() + cursor.execute( + f"UPDATE {CRYSTAL_TABLE_NAME} SET scheduled_task_id = ? WHERE record_id = ?", + (task_id, record_id), + ) + self._db.conn.commit() + + def mark_wait_flow_completed(self, record_id: int) -> Optional[Dict[str, str]]: + """Scheduler callback when a wait flow is ready.""" + + cursor = self._db.conn.cursor() + cursor.execute( + f""" + SELECT record_id, user_id, chat_id, metadata + FROM {CRYSTAL_TABLE_NAME} + WHERE record_id = ? AND flow_type = 'wait_flow' + """, + (record_id,), + ) + row = cursor.fetchone() + if not row: + return None + metadata = json.loads(row["metadata"]) if row["metadata"] else {} + produced_item = metadata.get("produced_item") + user_id = int(row["user_id"]) + chat_id = int(row["chat_id"]) + payload = { + "state": "completed", + "updated_at": datetime.now(timezone.utc).isoformat(), + } + cursor.execute( + f""" + UPDATE {CRYSTAL_TABLE_NAME} + SET state = :state, + updated_at = :updated_at + WHERE record_id = :record_id + """, + {"state": "completed", "updated_at": payload["updated_at"], "record_id": record_id}, + ) + self._db.conn.commit() + if not produced_item: + return None + backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) + try: + backpack.add_item(user_id, produced_item, 1) + except Exception as exc: # pylint: disable=broad-except + self._logger.Log( + "Error", + f"{ConsoleFrontColor.RED}发放等待产物 {produced_item} 失败: {exc}{ConsoleFrontColor.RESET}", + ) + return None + return { + "produced_item": produced_item, + "user_id": str(user_id), + "chat_id": str(chat_id), + } + + # --------------------------------------------------------------------- # + # Recovery + # --------------------------------------------------------------------- # + + def recover_pending_wait_flows(self) -> List[Dict[str, int]]: + """Recover pending wait-flow tasks when the plugin is reloaded.""" + + cursor = self._db.conn.cursor() + cursor.execute( + f""" + SELECT record_id, current_stage, expected_end_time, scheduled_task_id + FROM {CRYSTAL_TABLE_NAME} + WHERE flow_type = 'wait_flow' AND state = ? + """, + (STATUS_PENDING,), + ) + rows = cursor.fetchall() + now = datetime.now(timezone.utc) + tasks: List[Dict[str, int]] = [] + for row in rows: + record_id = int(row["record_id"]) + expected_end = row["expected_end_time"] + scheduled_task_id = row["scheduled_task_id"] + delay_ms = 0 + if expected_end: + try: + target_dt = datetime.fromisoformat(expected_end) + remaining = target_dt - now + delay_ms = max(int(remaining.total_seconds() * 1000), 0) + except ValueError: + delay_ms = 0 + if scheduled_task_id: + continue + tasks.append({"record_id": record_id, "delay_ms": delay_ms}) + return tasks + + +_CRYSTAL_SERVICE: Optional[CrystalService] = None + + +def get_crystal_service() -> CrystalService: + """Return singleton crystal service instance.""" + + global _CRYSTAL_SERVICE + if _CRYSTAL_SERVICE is None: + _CRYSTAL_SERVICE = CrystalService() + return _CRYSTAL_SERVICE +