from __future__ import annotations from dataclasses import dataclass from enum import Enum from typing import Dict, List, Optional, Tuple, override from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from PWF.CoreModules.database import get_db from .WPSAPI import WPSAPI logger: ProjectConfig = Architecture.Get(ProjectConfig) class BackpackItemTier(Enum): """背包物品等级定义""" COMMON = ("普通", "#C0C0C0") RARE = ("稀有", "#4DA6FF") EPIC = ("史诗", "#B266FF") LEGENDARY = ("传说", "#FF9933") def __init__(self, display_name: str, color_hex: str): self.display_name = display_name self.color_hex = color_hex @classmethod def from_string(cls, value: str) -> "BackpackItemTier": for tier in cls: if tier.name == value or tier.display_name == value: return tier raise ValueError(f"Unknown backpack item tier: {value}") def to_markdown_label(self, content: str) -> str: return f'{content}' @dataclass(frozen=True) class BackpackItemDefinition: item_id: str name: str tier: BackpackItemTier @dataclass(frozen=True) class BackpackUserItem: item_id: str quantity: int definition: BackpackItemDefinition class WPSBackpackSystem(WPSAPI): """背包基础系统,负责物品注册与用户物品存取""" ITEMS_TABLE = "backpack_items" USER_ITEMS_TABLE = "backpack_user_items" def __init__(self) -> None: super().__init__() self._item_cache: Dict[str, BackpackItemDefinition] = {} @override def dependencies(self) -> List[type]: return [WPSAPI] @override def is_enable_plugin(self) -> bool: return True @override def register_db_model(self): from PWF.CoreModules.plugin_interface import DatabaseModel return [ DatabaseModel( table_name=self.ITEMS_TABLE, column_defs={ "item_id": "TEXT PRIMARY KEY", "name": "TEXT NOT NULL", "tier": "TEXT NOT NULL", }, ), DatabaseModel( table_name=self.USER_ITEMS_TABLE, column_defs={ "user_id": "INTEGER NOT NULL", "item_id": "TEXT NOT NULL", "quantity": "INTEGER NOT NULL DEFAULT 0", "PRIMARY KEY (user_id, item_id)": "", }, ), ] @override def wake_up(self) -> None: logger.Log( "Info", f"{ConsoleFrontColor.GREEN}WPSBackpackSystem 插件已加载{ConsoleFrontColor.RESET}", ) self.register_plugin("背包") self.register_plugin("backpack") self._warm_item_cache() def _warm_item_cache(self) -> None: cursor = get_db().conn.cursor() cursor.execute( f"SELECT item_id, name, tier FROM {self.ITEMS_TABLE}" ) rows = cursor.fetchall() for row in rows: tier = BackpackItemTier.from_string(row["tier"]) self._item_cache[row["item_id"]] = BackpackItemDefinition( item_id=row["item_id"], name=row["name"], tier=tier, ) # region 对外接口 def register_item( self, item_id: str, name: str, tier: BackpackItemTier, ) -> None: if not item_id or not name: raise ValueError("item_id and name must be provided") cursor = get_db().conn.cursor() cursor.execute( f""" INSERT INTO {self.ITEMS_TABLE} (item_id, name, tier) VALUES (?, ?, ?) ON CONFLICT(item_id) DO UPDATE SET name = excluded.name, tier = excluded.tier """, (item_id, name, tier.name), ) get_db().conn.commit() self._item_cache[item_id] = BackpackItemDefinition(item_id, name, tier) def add_item( self, user_id: int, item_id: str, delta: int, ) -> int: if delta == 0: definition = self._get_definition(item_id) return self._get_user_quantity(user_id, item_id, definition) if delta < 0: raise ValueError("delta must be positive when using add_item") definition = self._get_definition(item_id) cursor = get_db().conn.cursor() cursor.execute( f""" INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity) VALUES (?, ?, ?) ON CONFLICT(user_id, item_id) DO UPDATE SET quantity = {self.USER_ITEMS_TABLE}.quantity + excluded.quantity """, (user_id, item_id, delta), ) get_db().conn.commit() return self._get_user_quantity(user_id, item_id, definition) def set_item_quantity( self, user_id: int, item_id: str, quantity: int, ) -> int: if quantity < 0: raise ValueError("quantity must be non-negative") definition = self._get_definition(item_id) cursor = get_db().conn.cursor() cursor.execute( f""" INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity) VALUES (?, ?, ?) ON CONFLICT(user_id, item_id) DO UPDATE SET quantity = excluded.quantity """, (user_id, item_id, quantity), ) get_db().conn.commit() return self._get_user_quantity(user_id, item_id, definition) def get_user_items(self, user_id: int) -> List[BackpackUserItem]: cursor = get_db().conn.cursor() cursor.execute( f""" SELECT ui.item_id, ui.quantity, i.name, i.tier FROM {self.USER_ITEMS_TABLE} ui JOIN {self.ITEMS_TABLE} i ON ui.item_id = i.item_id WHERE ui.user_id = ? AND ui.quantity > 0 ORDER BY ui.quantity DESC """, (user_id,), ) rows = cursor.fetchall() result: List[BackpackUserItem] = [] for row in rows: definition = self._item_cache.get(row["item_id"]) if not definition: try: definition = BackpackItemDefinition( item_id=row["item_id"], name=row["name"], tier=BackpackItemTier.from_string(row["tier"]), ) except ValueError: continue result.append( BackpackUserItem( item_id=row["item_id"], quantity=row["quantity"], definition=definition, ) ) return result # endregion def _get_definition(self, item_id: str) -> BackpackItemDefinition: if item_id in self._item_cache: return self._item_cache[item_id] cursor = get_db().conn.cursor() cursor.execute( f"SELECT item_id, name, tier FROM {self.ITEMS_TABLE} WHERE item_id = ?", (item_id,), ) row = cursor.fetchone() if not row: raise ValueError(f"Item {item_id} not registered in backpack system") definition = BackpackItemDefinition( item_id=row["item_id"], name=row["name"], tier=BackpackItemTier.from_string(row["tier"]), ) self._item_cache[item_id] = definition return definition def _get_user_quantity( self, user_id: int, item_id: str, definition: BackpackItemDefinition, ) -> int: cursor = get_db().conn.cursor() cursor.execute( f""" SELECT quantity FROM {self.USER_ITEMS_TABLE} WHERE user_id = ? AND item_id = ? """, (user_id, item_id), ) row = cursor.fetchone() quantity = int(row["quantity"]) if row else 0 if quantity == 0: cursor.execute( f""" DELETE FROM {self.USER_ITEMS_TABLE} WHERE user_id = ? AND item_id = ? """, (user_id, item_id), ) get_db().conn.commit() return quantity def _format_backpack_markdown(self, user_items: List[BackpackUserItem]) -> str: if not user_items: return "# 🎒 背包一览\n> 当前背包空空如也,去冒险获取一些物品吧!" lines = ["# 🎒 背包一览"] sorted_items = sorted( user_items, key=lambda item: ( list(BackpackItemTier).index(item.definition.tier), -item.quantity, item.definition.name, ), ) for entry in sorted_items: tier_label = entry.definition.tier.to_markdown_label( entry.definition.tier.display_name ) lines.append( f"- {tier_label} · {entry.definition.name} × **{entry.quantity}**" ) return "\n".join(lines) @override async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: message = self.parse_message_after_at(message).strip() user_items = self.get_user_items(user_id) markdown = self._format_backpack_markdown(user_items) await self.send_markdown_message(markdown, chat_id, user_id) return None __all__ = ["WPSBackpackSystem", "BackpackItemTier", "BackpackItemDefinition"]