diff --git a/.tasks/2025-11-08_3_backpack.md b/.tasks/2025-11-08_3_backpack.md index 3ed02c8..5ea2196 100644 --- a/.tasks/2025-11-08_3_backpack.md +++ b/.tasks/2025-11-08_3_backpack.md @@ -26,6 +26,20 @@ PWF插件框架, 通过`PluginInterface`管理插件生命周期, `WPSAPI`提供 # 任务进度 +2025-11-08_15:11:43 +- 已修改: Plugins/WPSBackpackSystem.py +- 更改: 定义背包系统插件结构, 建立物品等级枚举、数据库表模型、跨插件接口与指令回调 +- 原因: 实现背包系统基础能力供其他插件注册物品与查询用户背包 +- 阻碍因素: 无 +- 状态: 未确认 + +2025-11-08_15:29:00 +- 已修改: PWF/CoreModules/plugin_interface.py Plugins/WPSBackpackSystem.py +- 更改: 调整插件加载逻辑确保在wake_up前创建数据库表, 移除插件内部重复建表调用 +- 原因: 解决首次运行背包插件报错“no such table: backpack_items” +- 阻碍因素: 无 +- 状态: 成功 + # 最终审查 diff --git a/PWF b/PWF index 6c53a3a..4d3d841 160000 --- a/PWF +++ b/PWF @@ -1 +1 @@ -Subproject commit 6c53a3a18f0d461ebbbe20bed33783c86561f428 +Subproject commit 4d3d841ddac30a060c333cb248d629437234d635 diff --git a/Plugins/WPSAPI.py b/Plugins/WPSAPI.py index 76bca0c..bca3616 100644 --- a/Plugins/WPSAPI.py +++ b/Plugins/WPSAPI.py @@ -180,7 +180,10 @@ class BasicWPSInterface(PluginInterface): if webhook_url == "" or webhook_url == None: return None - result = await self.get_message_sender_function(webhook_url, self.get_message_sender_type())(message) + result = await self.get_message_sender_function(webhook_url, self.get_message_sender_type())(f""" +--- +{message} +""") if get_internal_verbose(): logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, Result: {result}") return None diff --git a/Plugins/WPSBackpackSystem.py b/Plugins/WPSBackpackSystem.py index e69de29..18ed6b9 100644 --- a/Plugins/WPSBackpackSystem.py +++ b/Plugins/WPSBackpackSystem.py @@ -0,0 +1,305 @@ +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Dict, List, Optional, Tuple, override + +from PWF.Convention.Runtime.Architecture import Architecture +from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig +from PWF.CoreModules.database import get_db + +from .WPSAPI import WPSAPI + +logger: ProjectConfig = Architecture.Get(ProjectConfig) + + +class BackpackItemTier(Enum): + """背包物品等级定义""" + + COMMON = ("普通", "#C0C0C0") + RARE = ("稀有", "#4DA6FF") + EPIC = ("史诗", "#B266FF") + LEGENDARY = ("传说", "#FF9933") + + def __init__(self, display_name: str, color_hex: str): + self.display_name = display_name + self.color_hex = color_hex + + @classmethod + def from_string(cls, value: str) -> "BackpackItemTier": + for tier in cls: + if tier.name == value or tier.display_name == value: + return tier + raise ValueError(f"Unknown backpack item tier: {value}") + + def to_markdown_label(self, content: str) -> str: + return f'{content}' + + +@dataclass(frozen=True) +class BackpackItemDefinition: + item_id: str + name: str + tier: BackpackItemTier + + +@dataclass(frozen=True) +class BackpackUserItem: + item_id: str + quantity: int + definition: BackpackItemDefinition + + +class WPSBackpackSystem(WPSAPI): + """背包基础系统,负责物品注册与用户物品存取""" + + ITEMS_TABLE = "backpack_items" + USER_ITEMS_TABLE = "backpack_user_items" + + def __init__(self) -> None: + super().__init__() + self._item_cache: Dict[str, BackpackItemDefinition] = {} + + @override + def dependencies(self) -> List[type]: + return [WPSAPI] + + @override + def is_enable_plugin(self) -> bool: + return True + + @override + def register_db_model(self): + from PWF.CoreModules.plugin_interface import DatabaseModel + + return [ + DatabaseModel( + table_name=self.ITEMS_TABLE, + column_defs={ + "item_id": "TEXT PRIMARY KEY", + "name": "TEXT NOT NULL", + "tier": "TEXT NOT NULL", + }, + ), + DatabaseModel( + table_name=self.USER_ITEMS_TABLE, + column_defs={ + "user_id": "INTEGER NOT NULL", + "item_id": "TEXT NOT NULL", + "quantity": "INTEGER NOT NULL DEFAULT 0", + "PRIMARY KEY (user_id, item_id)": "", + }, + ), + ] + + @override + def wake_up(self) -> None: + logger.Log( + "Info", + f"{ConsoleFrontColor.GREEN}WPSBackpackSystem 插件已加载{ConsoleFrontColor.RESET}", + ) + self.register_plugin("背包") + self.register_plugin("backpack") + self._warm_item_cache() + + def _warm_item_cache(self) -> None: + cursor = get_db().conn.cursor() + cursor.execute( + f"SELECT item_id, name, tier FROM {self.ITEMS_TABLE}" + ) + rows = cursor.fetchall() + for row in rows: + tier = BackpackItemTier.from_string(row["tier"]) + self._item_cache[row["item_id"]] = BackpackItemDefinition( + item_id=row["item_id"], + name=row["name"], + tier=tier, + ) + + # region 对外接口 + + def register_item( + self, + item_id: str, + name: str, + tier: BackpackItemTier, + ) -> None: + if not item_id or not name: + raise ValueError("item_id and name must be provided") + cursor = get_db().conn.cursor() + cursor.execute( + f""" + INSERT INTO {self.ITEMS_TABLE} (item_id, name, tier) + VALUES (?, ?, ?) + ON CONFLICT(item_id) DO UPDATE + SET name = excluded.name, + tier = excluded.tier + """, + (item_id, name, tier.name), + ) + get_db().conn.commit() + self._item_cache[item_id] = BackpackItemDefinition(item_id, name, tier) + + def add_item( + self, + user_id: int, + item_id: str, + delta: int, + ) -> int: + if delta == 0: + definition = self._get_definition(item_id) + return self._get_user_quantity(user_id, item_id, definition) + if delta < 0: + raise ValueError("delta must be positive when using add_item") + definition = self._get_definition(item_id) + cursor = get_db().conn.cursor() + cursor.execute( + f""" + INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity) + VALUES (?, ?, ?) + ON CONFLICT(user_id, item_id) DO UPDATE + SET quantity = {self.USER_ITEMS_TABLE}.quantity + excluded.quantity + """, + (user_id, item_id, delta), + ) + get_db().conn.commit() + return self._get_user_quantity(user_id, item_id, definition) + + def set_item_quantity( + self, + user_id: int, + item_id: str, + quantity: int, + ) -> int: + if quantity < 0: + raise ValueError("quantity must be non-negative") + definition = self._get_definition(item_id) + cursor = get_db().conn.cursor() + cursor.execute( + f""" + INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity) + VALUES (?, ?, ?) + ON CONFLICT(user_id, item_id) DO UPDATE + SET quantity = excluded.quantity + """, + (user_id, item_id, quantity), + ) + get_db().conn.commit() + return self._get_user_quantity(user_id, item_id, definition) + + def get_user_items(self, user_id: int) -> List[BackpackUserItem]: + cursor = get_db().conn.cursor() + cursor.execute( + f""" + SELECT ui.item_id, ui.quantity, i.name, i.tier + FROM {self.USER_ITEMS_TABLE} ui + JOIN {self.ITEMS_TABLE} i ON ui.item_id = i.item_id + WHERE ui.user_id = ? AND ui.quantity > 0 + ORDER BY ui.quantity DESC + """, + (user_id,), + ) + rows = cursor.fetchall() + result: List[BackpackUserItem] = [] + for row in rows: + definition = self._item_cache.get(row["item_id"]) + if not definition: + try: + definition = BackpackItemDefinition( + item_id=row["item_id"], + name=row["name"], + tier=BackpackItemTier.from_string(row["tier"]), + ) + except ValueError: + continue + result.append( + BackpackUserItem( + item_id=row["item_id"], + quantity=row["quantity"], + definition=definition, + ) + ) + return result + + # endregion + + def _get_definition(self, item_id: str) -> BackpackItemDefinition: + if item_id in self._item_cache: + return self._item_cache[item_id] + cursor = get_db().conn.cursor() + cursor.execute( + f"SELECT item_id, name, tier FROM {self.ITEMS_TABLE} WHERE item_id = ?", + (item_id,), + ) + row = cursor.fetchone() + if not row: + raise ValueError(f"Item {item_id} not registered in backpack system") + definition = BackpackItemDefinition( + item_id=row["item_id"], + name=row["name"], + tier=BackpackItemTier.from_string(row["tier"]), + ) + self._item_cache[item_id] = definition + return definition + + def _get_user_quantity( + self, + user_id: int, + item_id: str, + definition: BackpackItemDefinition, + ) -> int: + cursor = get_db().conn.cursor() + cursor.execute( + f""" + SELECT quantity + FROM {self.USER_ITEMS_TABLE} + WHERE user_id = ? AND item_id = ? + """, + (user_id, item_id), + ) + row = cursor.fetchone() + quantity = int(row["quantity"]) if row else 0 + if quantity == 0: + cursor.execute( + f""" + DELETE FROM {self.USER_ITEMS_TABLE} + WHERE user_id = ? AND item_id = ? + """, + (user_id, item_id), + ) + get_db().conn.commit() + return quantity + + def _format_backpack_markdown(self, user_items: List[BackpackUserItem]) -> str: + if not user_items: + return "# 🎒 背包一览\n> 当前背包空空如也,去冒险获取一些物品吧!" + + lines = ["# 🎒 背包一览"] + sorted_items = sorted( + user_items, + key=lambda item: ( + list(BackpackItemTier).index(item.definition.tier), + -item.quantity, + item.definition.name, + ), + ) + for entry in sorted_items: + tier_label = entry.definition.tier.to_markdown_label( + entry.definition.tier.display_name + ) + lines.append( + f"- {tier_label} · {entry.definition.name} × **{entry.quantity}**" + ) + return "\n".join(lines) + + @override + async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: + message = self.parse_message_after_at(message).strip() + user_items = self.get_user_items(user_id) + markdown = self._format_backpack_markdown(user_items) + await self.send_markdown_message(markdown, chat_id, user_id) + return None + + +__all__ = ["WPSBackpackSystem", "BackpackItemTier", "BackpackItemDefinition"] +