"""Crystal system primary plugin implementation.""" from __future__ import annotations from datetime import datetime, timezone from PWF.Convention.Runtime.Architecture import * from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from PWF.CoreModules.clock_scheduler import get_clock_scheduler from PWF.CoreModules.plugin_interface import DatabaseModel from Plugins.WPSAPI import GuideEntry, WPSAPI from Plugins.WPSAlchemyGame import WPSAlchemyGame from Plugins.WPSBackpackSystem import BackpackItemTier, WPSBackpackSystem from Plugins.WPSCombatSystem.combat_models import SPARK_DUST_ITEM_ID from Plugins.WPSStoreSystem import WPSStoreSystem from Plugins.WPSGardenSystem import ( GardenCropDefinition, GardenExtraReward, WPSGardenBase, register_crop, ) from .crystal_models import ( DEFAULT_CRYSTAL_COLOR_MAP, DEFAULT_CRYSTAL_EXCHANGE_ENTRIES, DEFAULT_CRYSTAL_ITEMS, CrystalColorDefinition, CrystalExchangeEntry, CrystalItemDefinition, build_default_crystal_crops, CRYSTAL_BASE_ITEM_ID, CRYSTAL_BASE_SCROLL_ID, CRYSTAL_TREE_FRUIT_ID, CRYSTAL_TREE_SEED_ID, CRYSTAL_TINT_POWDER_ID, CRYSTAL_RESONANCE_POWDER_ID, ) from .crystal_service import get_crystal_db_models, get_crystal_service logger: ProjectConfig = Architecture.Get(ProjectConfig) class WPSCrystalSystem(WPSAPI): """Main crystal system plugin.""" _initialized: bool = False def __init__(self) -> None: super().__init__() self._service = get_crystal_service() self._items: Dict[str, CrystalItemDefinition] = dict(DEFAULT_CRYSTAL_ITEMS) self._colors: Dict[str, CrystalColorDefinition] = dict(DEFAULT_CRYSTAL_COLOR_MAP) self._exchange_entries: Dict[str, CrystalExchangeEntry] = { key.lower(): value for key, value in DEFAULT_CRYSTAL_EXCHANGE_ENTRIES.items() } def get_guide_subtitle(self) -> str: return "水晶养成、颜色淬炼与兑换拓展系统" def get_guide_metadata(self) -> Dict[str, str]: return { "颜色数量": str(len(self._colors)), "水晶物品": str(len(self._items)), "兑换项目": str(len(self._exchange_entries)), } def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "水晶", "identifier": "水晶", "description": "查看系统配置或执行变色与兑换等操作。", "metadata": {"别名": "crystal"}, }, ) def collect_item_entries(self) -> Sequence[GuideEntry]: return ( { "title": "基础水晶物品", "description": f"{len(self._items)} 个水晶组件,可由背包/商店系统持有与交易。", }, { "title": "颜色链路", "description": f"{len(self._colors)} 条变色链(包含等待阶段与最终融合)。", }, ) def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "变色流程", "description": "`水晶 变色 <颜色>` 进入等待流程,完成后获得对应水晶部件。", }, { "title": "兑换指令", "description": "`水晶 兑换 ` 消耗配置好的材料换取奖励物品。", }, { "title": "菜园扩展", "description": "系统会向菜园注册水晶树作物,使果实与水晶体系互相联动。", }, ) # ------------------------------------------------------------------ # # Plugin lifecycle # ------------------------------------------------------------------ # def dependencies(self) -> List[type]: return [ WPSAPI, WPSBackpackSystem, WPSStoreSystem, WPSAlchemyGame, WPSGardenBase, ] def register_db_model(self) -> List[DatabaseModel]: return get_crystal_db_models() def wake_up(self) -> None: if WPSCrystalSystem._initialized: return WPSCrystalSystem._initialized = True backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) store: WPSStoreSystem = Architecture.Get(WPSStoreSystem) alchemy: WPSAlchemyGame = Architecture.Get(WPSAlchemyGame) self._ensure_prerequisite_items(backpack) self._service.register_items(backpack, self._items) for color_def in self._colors.values(): self._service.register_chain_recipes(alchemy, color_def.chain_stages) self._service.register_final_fusion(alchemy, color_def.final_fusion) # 水晶系统在商店中出售的物品存在价格问题 # self._service.register_exchange_modes(store, self._items, self._exchange_entries) self._register_garden_extensions(backpack, store, alchemy) self._recover_wait_tasks() self.register_plugin("crystal") self.register_plugin("水晶") logger.Log( "Info", f"{ConsoleFrontColor.GREEN}WPSCrystalSystem 插件已加载,当前颜色数:{len(self._colors)}{ConsoleFrontColor.RESET}", ) # ------------------------------------------------------------------ # # Scheduler recovery & callbacks # ------------------------------------------------------------------ # def _recover_wait_tasks(self) -> None: scheduler = get_clock_scheduler() pending_tasks = self._service.recover_pending_wait_flows() for task in pending_tasks: delay_ms = task.get("delay_ms", 0) record_id = task["record_id"] task_id = scheduler.register_task( plugin_module=self.__module__, plugin_class=self.__class__.__name__, callback_name="complete_wait_flow", delay_ms=delay_ms, kwargs={"record_id": record_id}, ) self._service.update_task_binding(record_id, task_id) async def complete_wait_flow(self, record_id: int) -> None: info = self._service.mark_wait_flow_completed(record_id) if not info: return chat_id = int(info["chat_id"]) user_id = int(info["user_id"]) produced_item = info["produced_item"] message = ( "# ⏱️ 水晶淬炼完成\n" f"- 用户:`{user_id}`\n" f"- 获得物品:`{produced_item}`\n" f"- 时间:{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}" ) self._service.update_task_binding(record_id, None) await self.send_markdown_message(message, chat_id, user_id) # ------------------------------------------------------------------ # # Command handling # ------------------------------------------------------------------ # async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: payload = self.parse_message_after_at(message).strip() if not payload: return await self._send_help(chat_id, user_id) tokens = payload.split() if not tokens: return await self._send_help(chat_id, user_id) action = tokens[0].lower() if action in {"help", "帮助"}: return await self._send_help(chat_id, user_id) if action in {"变色", "color"}: color_key = tokens[1] if len(tokens) > 1 else "" return await self._handle_color_wait(chat_id, user_id, color_key) if action in {"兑换", "exchange"}: exchange_key = tokens[1] if len(tokens) > 1 else "" return await self._handle_exchange(chat_id, user_id, exchange_key) if action in {"列表", "list"}: return await self._send_overview(chat_id, user_id) return await self._send_help(chat_id, user_id) # ------------------------------------------------------------------ # # Helper methods # ------------------------------------------------------------------ # async def _send_help(self, chat_id: int, user_id: int) -> Optional[str]: lines = [ "# 💎 水晶系统指令", "- `水晶 变色 <颜色>`:启动对应颜色的淬炼流程。", "- `水晶 兑换 `:使用物品兑换指定奖励。", "- `水晶 列表`:查看可用颜色与兑换信息。", ] return await self.send_markdown_message("\n".join(lines), chat_id, user_id) async def _send_overview(self, chat_id: int, user_id: int) -> Optional[str]: lines = ["# 📦 水晶配置一览"] backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) for color_def in self._colors.values(): wait_cost = self._format_item_requirements( backpack, color_def.wait_stage.consumed_items ) lines.append( f"- {color_def.display_name}|阶段:{len(color_def.chain_stages)}|等待消耗:{wait_cost}" ) lines.append("\n## 🔄 兑换项目") for entry in self._exchange_entries.values(): cost = self._format_item_requirements(backpack, entry.required_items) reward_name = self._resolve_item_name(backpack, entry.reward_item) display_name = entry.metadata.get("display_name") or reward_name lines.append( f"- {display_name}({entry.identifier})|消耗:{cost}|奖励:{reward_name}" ) return await self.send_markdown_message("\n".join(lines), chat_id, user_id) async def _handle_color_wait( self, chat_id: int, user_id: int, color_key: str, ) -> Optional[str]: if not color_key: return await self.send_markdown_message("❌ 请指定颜色,例如 `水晶 变色 黑色`", chat_id, user_id) color_def = self._resolve_color(color_key) if not color_def: return await self.send_markdown_message("❌ 未找到对应颜色配置。", chat_id, user_id) backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) missing = self._check_item_requirements( backpack, user_id, color_def.wait_stage.consumed_items ) if missing: formatted = ", ".join( f"{self._resolve_item_name(backpack, item)}×{amount}" for item, amount in missing ) return await self.send_markdown_message( f"❌ 材料不足:{formatted}", chat_id, user_id ) # Deduct materials self._deduct_items(backpack, user_id, color_def.wait_stage.consumed_items) record_id, expected_end = self._service.create_wait_record( color_def, color_def.wait_stage, user_id, chat_id ) delay_ms = max( int((expected_end - datetime.now(timezone.utc)).total_seconds() * 1000), 0 ) scheduler = get_clock_scheduler() task_id = scheduler.register_task( plugin_module=self.__module__, plugin_class=self.__class__.__name__, callback_name="complete_wait_flow", delay_ms=delay_ms, kwargs={"record_id": record_id}, ) self._service.update_task_binding(record_id, task_id) time_str = expected_end.strftime("%Y-%m-%d %H:%M") msg = ( "# 🔁 水晶淬炼已排程\n" f"- 颜色:{color_def.display_name}\n" f"- 预计完成:{time_str}\n" "- 完成后会自动发放淬炼产物。" ) return await self.send_markdown_message(msg, chat_id, user_id) async def _handle_exchange( self, chat_id: int, user_id: int, exchange_key: str, ) -> Optional[str]: backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) if not exchange_key: example = self._default_exchange_example(backpack) return await self.send_markdown_message( f"❌ 请提供兑换ID或名称,例如 `水晶 兑换 {example}`", chat_id, user_id ) entry = self._resolve_exchange_entry(exchange_key, backpack) if not entry: available = ", ".join( entry.metadata.get("display_name", entry.identifier) for entry in self._exchange_entries.values() ) return await self.send_markdown_message( f"❌ 未找到对应的兑换项目。可用选项:{available}", chat_id, user_id ) missing = self._check_item_requirements(backpack, user_id, entry.required_items) if missing: formatted = ", ".join( f"{self._resolve_item_name(backpack, item)}×{amount}" for item, amount in missing ) return await self.send_markdown_message(f"❌ 兑换所需物品不足:{formatted}", chat_id, user_id) self._deduct_items(backpack, user_id, entry.required_items) backpack.add_item(user_id, entry.reward_item, entry.reward_amount) reward_name = self._resolve_item_name(backpack, entry.reward_item) display_name = entry.metadata.get("display_name") or reward_name msg = ( "# 🔄 兑换成功\n" f"- 项目:{display_name}\n" f"- 奖励:{reward_name} × {entry.reward_amount}" ) return await self.send_markdown_message(msg, chat_id, user_id) def _ensure_prerequisite_items(self, backpack: WPSBackpackSystem) -> None: """Verify key prerequisite materials exist, log warnings otherwise.""" required_items = [ "combat_material_crystal", "combat_material_essence", "garden_wine_maple", "garden_wine_sage", ] missing = [] for item_id in required_items: try: backpack.add_item(0, item_id, 0) except Exception: missing.append(item_id) if missing: logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}水晶系统启动时发现缺少前置物品: {', '.join(missing)}{ConsoleFrontColor.RESET}", ) def _resolve_color(self, key: str) -> Optional[CrystalColorDefinition]: lowered = key.lower() if lowered in self._colors: return self._colors[lowered] for color_def in self._colors.values(): if color_def.display_name.lower() == lowered: return color_def return None def _format_item_requirements( self, backpack: WPSBackpackSystem, items: Mapping[str, int] ) -> str: return ( ", ".join( f"{self._resolve_item_name(backpack, item_id)}×{amount}" for item_id, amount in items.items() ) or "无" ) def _check_item_requirements( self, backpack: WPSBackpackSystem, user_id: int, requirements: Mapping[str, int], ) -> List[Tuple[str, int]]: missing: List[Tuple[str, int]] = [] for item_id, amount in requirements.items(): current = backpack.add_item(user_id, item_id, 0) if current < amount: missing.append((item_id, amount - current)) return missing def _deduct_items( self, backpack: WPSBackpackSystem, user_id: int, requirements: Mapping[str, int], ) -> None: for item_id, amount in requirements.items(): current = backpack.add_item(user_id, item_id, 0) backpack.set_item_quantity(user_id, item_id, max(current - amount, 0)) def _resolve_item_name(self, backpack: WPSBackpackSystem, item_id: str) -> str: try: definition = backpack._get_definition(item_id) return definition.name except Exception: return item_id def _resolve_exchange_entry( self, key: str, backpack: WPSBackpackSystem ) -> Optional[CrystalExchangeEntry]: lowered = key.strip().lower() if not lowered: return None direct = self._exchange_entries.get(lowered) if direct: return direct for entry in self._exchange_entries.values(): if entry.identifier.lower() == lowered: return entry display_name = entry.metadata.get("display_name") if display_name and display_name.lower() == lowered: return entry if entry.reward_item.lower() == lowered: return entry reward_name = self._resolve_item_name(backpack, entry.reward_item) if reward_name.lower() == lowered: return entry return None def _default_exchange_example(self, backpack: WPSBackpackSystem) -> str: for entry in self._exchange_entries.values(): display_name = entry.metadata.get("display_name") if display_name: return display_name reward_name = self._resolve_item_name(backpack, entry.reward_item) if reward_name: return reward_name return entry.identifier return "exchange_id" def _register_garden_extensions( self, backpack: WPSBackpackSystem, store: WPSStoreSystem, alchemy: WPSAlchemyGame, ) -> None: crops = build_default_crystal_crops() if not crops: return garden_service = WPSGardenBase.service() limit_amount = getattr(garden_service.config, "seed_store_limit", 5) sale_multiplier = getattr(garden_service.config, "sale_multiplier", 10) tier_map = { "common": BackpackItemTier.COMMON, "rare": BackpackItemTier.RARE, "epic": BackpackItemTier.EPIC, "legendary": BackpackItemTier.LEGENDARY, } for crop in crops: try: register_crop(crop, overwrite=False) except Exception as exc: # pylint: disable=broad-except logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册水晶作物 {crop.seed_id} 失败: {exc}{ConsoleFrontColor.RESET}", ) continue tier = tier_map.get(crop.tier.lower(), BackpackItemTier.RARE) seed_name = self._item_display_name( crop.seed_id, f"{crop.display_name}的种子", backpack ) seed_desc = f"{crop.display_name}的种子,可在菜园中孕育晶体能量。" fruit_name = self._item_display_name( crop.fruit_id, f"{crop.display_name}的果实", backpack ) fruit_desc = f"{crop.display_name}成熟后的果实,能够作为高级炼金材料。" self._safe_register_backpack_item(backpack, crop.seed_id, seed_name, tier, seed_desc) self._safe_register_backpack_item( backpack, crop.fruit_id, fruit_name, tier, fruit_desc, ) self._safe_register_store_mode(store, crop.seed_id, crop.seed_price, limit_amount) if crop.extra_item_id: self._safe_register_backpack_item( backpack, crop.extra_item_id, f"{crop.display_name}的特产", BackpackItemTier.RARE, f"{crop.display_name}衍生的稀有特产,可在特定配方中使用。", ) if crop.wine_item_id: self._try_register_wine_recipe(alchemy, crop) self._register_tint_powder_recipe(alchemy) self._register_resonance_powder_recipe(alchemy) self._register_base_scroll_recipe(alchemy) self._register_base_core_recipe(alchemy) def _safe_register_backpack_item( self, backpack: WPSBackpackSystem, item_id: str, name: str, tier: BackpackItemTier, description: str, ) -> None: try: backpack.register_item(item_id, name, tier, description) except Exception: pass def _safe_register_store_mode( self, store: WPSStoreSystem, item_id: str, price: int, limit_amount: int, ) -> None: try: store.register_mode( item_id=item_id, price=price, limit_amount=limit_amount, ) except Exception: pass def _try_register_wine_recipe( self, alchemy: WPSAlchemyGame, crop: GardenCropDefinition, ) -> None: if not crop.wine_item_id: return try: alchemy.register_recipe( (crop.fruit_id, crop.fruit_id, crop.fruit_id), crop.wine_item_id, "garden_item_rot_fruit", 0.75, ) except Exception: pass def _register_tint_powder_recipe(self, alchemy: WPSAlchemyGame) -> None: try: alchemy.register_recipe( (CRYSTAL_TREE_FRUIT_ID, CRYSTAL_TREE_FRUIT_ID, "garden_item_rot_fruit"), CRYSTAL_TINT_POWDER_ID, WPSAlchemyGame.ASH_ITEM_ID, 0.8, ) except Exception as exc: # pylint: disable=broad-except logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册 {CRYSTAL_TREE_FRUIT_ID} 炼金配方失败: {exc}{ConsoleFrontColor.RESET}", ) def _register_resonance_powder_recipe(self, alchemy: WPSAlchemyGame) -> None: try: alchemy.register_recipe( (SPARK_DUST_ITEM_ID, SPARK_DUST_ITEM_ID, SPARK_DUST_ITEM_ID), CRYSTAL_RESONANCE_POWDER_ID, WPSAlchemyGame.ASH_ITEM_ID, 0.45, ) except Exception as exc: # pylint: disable=broad-except logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册闪烁粉尘共鸣配方失败: {exc}{ConsoleFrontColor.RESET}", ) def _register_base_scroll_recipe(self, alchemy: WPSAlchemyGame) -> None: """注册空白变色卷轴配方:三种木材合成""" try: alchemy.register_recipe( ("garden_wood_ginkgo", "garden_wood_sakura", "garden_wood_maple"), CRYSTAL_BASE_SCROLL_ID, WPSAlchemyGame.ASH_ITEM_ID, 0.7, ) except Exception as exc: # pylint: disable=broad-except logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册空白变色卷轴配方失败: {exc}{ConsoleFrontColor.RESET}", ) def _register_base_core_recipe(self, alchemy: WPSAlchemyGame) -> None: """注册未调谐晶核配方:水晶树果实 + 胜利之树果实 + 腐败果实""" try: alchemy.register_recipe( (CRYSTAL_TREE_FRUIT_ID, "combat_fruit_victory_tree", "garden_item_rot_fruit"), CRYSTAL_BASE_ITEM_ID, WPSAlchemyGame.ASH_ITEM_ID, 0.6, ) except Exception as exc: # pylint: disable=broad-except logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册未调谐晶核配方失败: {exc}{ConsoleFrontColor.RESET}", ) def _item_display_name( self, item_id: str, default_name: str, backpack: WPSBackpackSystem, ) -> str: try: definition = backpack._get_definition(item_id) return definition.name except Exception: return default_name