Files
NewWPSBot/Plugins/WPSCrystalSystem/crystal_plugin_base.py

375 lines
15 KiB
Python
Raw Normal View History

2025-11-11 23:48:50 +08:00
"""Crystal system primary plugin implementation."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Dict, List, Mapping, Optional, Tuple
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.clock_scheduler import get_clock_scheduler
from PWF.CoreModules.plugin_interface import DatabaseModel
from Plugins.WPSAPI import WPSAPI
from Plugins.WPSAlchemyGame import WPSAlchemyGame
from Plugins.WPSBackpackSystem import WPSBackpackSystem
from Plugins.WPSStoreSystem import WPSStoreSystem
from .crystal_models import (
DEFAULT_CRYSTAL_COLOR_MAP,
DEFAULT_CRYSTAL_EXCHANGE_ENTRIES,
DEFAULT_CRYSTAL_ITEMS,
CrystalColorDefinition,
CrystalExchangeEntry,
CrystalItemDefinition,
)
from .crystal_service import get_crystal_db_models, get_crystal_service
logger: ProjectConfig = Architecture.Get(ProjectConfig)
class WPSCrystalSystem(WPSAPI):
"""Main crystal system plugin."""
_initialized: bool = False
def __init__(self) -> None:
super().__init__()
self._service = get_crystal_service()
self._items: Dict[str, CrystalItemDefinition] = dict(DEFAULT_CRYSTAL_ITEMS)
self._colors: Dict[str, CrystalColorDefinition] = dict(DEFAULT_CRYSTAL_COLOR_MAP)
self._exchange_entries: Dict[str, CrystalExchangeEntry] = {
key.lower(): value for key, value in DEFAULT_CRYSTAL_EXCHANGE_ENTRIES.items()
}
# ------------------------------------------------------------------ #
# Plugin lifecycle
# ------------------------------------------------------------------ #
def dependencies(self) -> List[type]:
return [
WPSAPI,
WPSBackpackSystem,
WPSStoreSystem,
WPSAlchemyGame,
]
def register_db_model(self) -> List[DatabaseModel]:
return get_crystal_db_models()
def wake_up(self) -> None:
if WPSCrystalSystem._initialized:
return
WPSCrystalSystem._initialized = True
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
store: WPSStoreSystem = Architecture.Get(WPSStoreSystem)
alchemy: WPSAlchemyGame = Architecture.Get(WPSAlchemyGame)
self._ensure_prerequisite_items(backpack)
self._service.register_items(backpack, self._items)
for color_def in self._colors.values():
self._service.register_chain_recipes(alchemy, color_def.chain_stages)
self._service.register_final_fusion(alchemy, color_def.final_fusion)
self._service.register_exchange_modes(store, self._items, self._exchange_entries)
self._recover_wait_tasks()
self.register_plugin("crystal")
self.register_plugin("水晶")
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSCrystalSystem 插件已加载,当前颜色数:{len(self._colors)}{ConsoleFrontColor.RESET}",
)
# ------------------------------------------------------------------ #
# Scheduler recovery & callbacks
# ------------------------------------------------------------------ #
def _recover_wait_tasks(self) -> None:
scheduler = get_clock_scheduler()
pending_tasks = self._service.recover_pending_wait_flows()
for task in pending_tasks:
delay_ms = task.get("delay_ms", 0)
record_id = task["record_id"]
task_id = scheduler.register_task(
plugin_module=self.__module__,
plugin_class=self.__class__.__name__,
callback_name="complete_wait_flow",
delay_ms=delay_ms,
kwargs={"record_id": record_id},
)
self._service.update_task_binding(record_id, task_id)
async def complete_wait_flow(self, record_id: int) -> None:
info = self._service.mark_wait_flow_completed(record_id)
if not info:
return
chat_id = int(info["chat_id"])
user_id = int(info["user_id"])
produced_item = info["produced_item"]
message = (
"# ⏱️ 水晶淬炼完成\n"
f"- 用户:`{user_id}`\n"
f"- 获得物品:`{produced_item}`\n"
f"- 时间:{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}"
)
self._service.update_task_binding(record_id, None)
await self.send_markdown_message(message, chat_id, user_id)
# ------------------------------------------------------------------ #
# Command handling
# ------------------------------------------------------------------ #
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self._send_help(chat_id, user_id)
tokens = payload.split()
if not tokens:
return await self._send_help(chat_id, user_id)
action = tokens[0].lower()
if action in {"help", "帮助"}:
return await self._send_help(chat_id, user_id)
if action in {"变色", "color"}:
color_key = tokens[1] if len(tokens) > 1 else ""
return await self._handle_color_wait(chat_id, user_id, color_key)
if action in {"兑换", "exchange"}:
exchange_key = tokens[1] if len(tokens) > 1 else ""
return await self._handle_exchange(chat_id, user_id, exchange_key)
if action in {"列表", "list"}:
return await self._send_overview(chat_id, user_id)
return await self._send_help(chat_id, user_id)
# ------------------------------------------------------------------ #
# Helper methods
# ------------------------------------------------------------------ #
async def _send_help(self, chat_id: int, user_id: int) -> Optional[str]:
lines = [
"# 💎 水晶系统指令",
"- `水晶 变色 <颜色>`:启动对应颜色的淬炼流程。",
"- `水晶 兑换 <ID>`:使用物品兑换指定奖励。",
"- `水晶 列表`:查看可用颜色与兑换信息。",
]
return await self.send_markdown_message("\n".join(lines), chat_id, user_id)
async def _send_overview(self, chat_id: int, user_id: int) -> Optional[str]:
lines = ["# 📦 水晶配置一览"]
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
for color_def in self._colors.values():
wait_cost = self._format_item_requirements(
backpack, color_def.wait_stage.consumed_items
)
lines.append(
f"- {color_def.display_name}|阶段:{len(color_def.chain_stages)}|等待消耗:{wait_cost}"
)
lines.append("\n## 🔄 兑换项目")
for entry in self._exchange_entries.values():
cost = self._format_item_requirements(backpack, entry.required_items)
reward_name = self._resolve_item_name(backpack, entry.reward_item)
display_name = entry.metadata.get("display_name") or reward_name
lines.append(
f"- {display_name}{entry.identifier})|消耗:{cost}|奖励:{reward_name}"
)
return await self.send_markdown_message("\n".join(lines), chat_id, user_id)
async def _handle_color_wait(
self,
chat_id: int,
user_id: int,
color_key: str,
) -> Optional[str]:
if not color_key:
return await self.send_markdown_message("❌ 请指定颜色,例如 `水晶 变色 黑色`", chat_id, user_id)
color_def = self._resolve_color(color_key)
if not color_def:
return await self.send_markdown_message("❌ 未找到对应颜色配置。", chat_id, user_id)
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
missing = self._check_item_requirements(
backpack, user_id, color_def.wait_stage.consumed_items
)
if missing:
formatted = ", ".join(
f"{self._resolve_item_name(backpack, item)}×{amount}" for item, amount in missing
)
return await self.send_markdown_message(
f"❌ 材料不足:{formatted}", chat_id, user_id
)
# Deduct materials
self._deduct_items(backpack, user_id, color_def.wait_stage.consumed_items)
record_id, expected_end = self._service.create_wait_record(
color_def, color_def.wait_stage, user_id, chat_id
)
delay_ms = max(
int((expected_end - datetime.now(timezone.utc)).total_seconds() * 1000), 0
)
scheduler = get_clock_scheduler()
task_id = scheduler.register_task(
plugin_module=self.__module__,
plugin_class=self.__class__.__name__,
callback_name="complete_wait_flow",
delay_ms=delay_ms,
kwargs={"record_id": record_id},
)
self._service.update_task_binding(record_id, task_id)
time_str = expected_end.strftime("%Y-%m-%d %H:%M")
msg = (
"# 🔁 水晶淬炼已排程\n"
f"- 颜色:{color_def.display_name}\n"
f"- 预计完成:{time_str}\n"
"- 完成后会自动发放淬炼产物。"
)
return await self.send_markdown_message(msg, chat_id, user_id)
async def _handle_exchange(
self,
chat_id: int,
user_id: int,
exchange_key: str,
) -> Optional[str]:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
if not exchange_key:
example = self._default_exchange_example(backpack)
return await self.send_markdown_message(
f"❌ 请提供兑换ID或名称例如 `水晶 兑换 {example}`", chat_id, user_id
)
entry = self._resolve_exchange_entry(exchange_key, backpack)
if not entry:
available = ", ".join(
entry.metadata.get("display_name", entry.identifier)
for entry in self._exchange_entries.values()
)
return await self.send_markdown_message(
f"❌ 未找到对应的兑换项目。可用选项:{available}", chat_id, user_id
)
missing = self._check_item_requirements(backpack, user_id, entry.required_items)
if missing:
formatted = ", ".join(
f"{self._resolve_item_name(backpack, item)}×{amount}" for item, amount in missing
)
return await self.send_markdown_message(f"❌ 兑换所需物品不足:{formatted}", chat_id, user_id)
self._deduct_items(backpack, user_id, entry.required_items)
backpack.add_item(user_id, entry.reward_item, entry.reward_amount)
reward_name = self._resolve_item_name(backpack, entry.reward_item)
display_name = entry.metadata.get("display_name") or reward_name
msg = (
"# 🔄 兑换成功\n"
f"- 项目:{display_name}\n"
f"- 奖励:{reward_name} × {entry.reward_amount}"
)
return await self.send_markdown_message(msg, chat_id, user_id)
def _ensure_prerequisite_items(self, backpack: WPSBackpackSystem) -> None:
"""Verify key prerequisite materials exist, log warnings otherwise."""
required_items = [
"combat_material_crystal",
"combat_material_essence",
"garden_wine_maple",
"garden_wine_sage",
]
missing = []
for item_id in required_items:
try:
backpack.add_item(0, item_id, 0)
except Exception:
missing.append(item_id)
if missing:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}水晶系统启动时发现缺少前置物品: {', '.join(missing)}{ConsoleFrontColor.RESET}",
)
def _resolve_color(self, key: str) -> Optional[CrystalColorDefinition]:
lowered = key.lower()
if lowered in self._colors:
return self._colors[lowered]
for color_def in self._colors.values():
if color_def.display_name.lower() == lowered:
return color_def
return None
def _format_item_requirements(
self, backpack: WPSBackpackSystem, items: Mapping[str, int]
) -> str:
return (
", ".join(
f"{self._resolve_item_name(backpack, item_id)}×{amount}"
for item_id, amount in items.items()
)
or ""
)
def _check_item_requirements(
self,
backpack: WPSBackpackSystem,
user_id: int,
requirements: Mapping[str, int],
) -> List[Tuple[str, int]]:
missing: List[Tuple[str, int]] = []
for item_id, amount in requirements.items():
current = backpack.add_item(user_id, item_id, 0)
if current < amount:
missing.append((item_id, amount - current))
return missing
def _deduct_items(
self,
backpack: WPSBackpackSystem,
user_id: int,
requirements: Mapping[str, int],
) -> None:
for item_id, amount in requirements.items():
current = backpack.add_item(user_id, item_id, 0)
backpack.set_item_quantity(user_id, item_id, max(current - amount, 0))
def _resolve_item_name(self, backpack: WPSBackpackSystem, item_id: str) -> str:
try:
definition = backpack._get_definition(item_id)
return definition.name
except Exception:
return item_id
def _resolve_exchange_entry(
self, key: str, backpack: WPSBackpackSystem
) -> Optional[CrystalExchangeEntry]:
lowered = key.strip().lower()
if not lowered:
return None
direct = self._exchange_entries.get(lowered)
if direct:
return direct
for entry in self._exchange_entries.values():
if entry.identifier.lower() == lowered:
return entry
display_name = entry.metadata.get("display_name")
if display_name and display_name.lower() == lowered:
return entry
if entry.reward_item.lower() == lowered:
return entry
reward_name = self._resolve_item_name(backpack, entry.reward_item)
if reward_name.lower() == lowered:
return entry
return None
def _default_exchange_example(self, backpack: WPSBackpackSystem) -> str:
for entry in self._exchange_entries.values():
display_name = entry.metadata.get("display_name")
if display_name:
return display_name
reward_name = self._resolve_item_name(backpack, entry.reward_item)
if reward_name:
return reward_name
return entry.identifier
return "exchange_id"