from __future__ import annotations from dataclasses import dataclass from enum import Enum from PWF.Convention.Runtime.Config import * from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from PWF.CoreModules.database import get_db from PWF.CoreModules.flags import get_internal_debug from .WPSAPI import GuideEntry, GuideSection, WPSAPI logger: ProjectConfig = Architecture.Get(ProjectConfig) class BackpackItemTier(Enum): """背包物品等级定义""" COMMON = ("普通", "#C0C0C0") RARE = ("稀有", "#4DA6FF") EPIC = ("史诗", "#B266FF") LEGENDARY = ("传说", "#FF9933") def __init__(self, display_name: str, color_hex: str): self.display_name = display_name self.color_hex = color_hex @classmethod def from_string(cls, value: str) -> "BackpackItemTier": for tier in cls: if tier.name == value or tier.display_name == value: return tier raise ValueError(f"Unknown backpack item tier: {value}") def to_markdown_label(self, content: str) -> str: return f'{content}' @dataclass(frozen=True) class BackpackItemDefinition: item_id: str name: str tier: BackpackItemTier description: str @dataclass(frozen=True) class BackpackUserItem: item_id: str quantity: int definition: BackpackItemDefinition class WPSBackpackSystem(WPSAPI): """背包基础系统,负责物品注册与用户物品存取""" ITEMS_TABLE = "backpack_items" USER_ITEMS_TABLE = "backpack_user_items" def get_guide_subtitle(self) -> str: return "管理物品注册、背包存储与查询的核心系统" def get_guide_metadata(self) -> Dict[str, str]: return { "物品缓存数": str(len(self._item_cache)), "数据表": f"{self.ITEMS_TABLE}, {self.USER_ITEMS_TABLE}", } def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "背包", "identifier": "背包", "description": "以稀有度分组展示用户当前携带物品。", "metadata": {"别名": "backpack"}, }, ) def collect_guide_entries(self) -> Sequence[GuideEntry]: tier_labels = ", ".join(tier.display_name for tier in BackpackItemTier) return ( { "title": "物品注册", "description": ( "`register_item(item_id, name, tier, description)` " "将物品写入背包表,重复调用会更新名称、稀有度和描述。" ), }, { "title": "稀有度体系", "description": f"支持稀有度:{tier_labels},调用 `to_markdown_label` 可渲染彩色标签。", }, { "title": "库存操作", "description": ( "`add_item` / `set_item_quantity` / `_get_user_quantity` " "确保用户物品数量保持非负,并自动创建记录。" ), }, ) def __init__(self) -> None: super().__init__() self._item_cache: Dict[str, BackpackItemDefinition] = {} @override def dependencies(self) -> List[type]: return [WPSAPI] @override def is_enable_plugin(self) -> bool: return True @override def register_db_model(self): from PWF.CoreModules.plugin_interface import DatabaseModel models = [ DatabaseModel( table_name=self.ITEMS_TABLE, column_defs={ "item_id": "TEXT PRIMARY KEY", "name": "TEXT NOT NULL", "tier": "TEXT NOT NULL", "description": "TEXT NOT NULL DEFAULT ''", }, ), DatabaseModel( table_name=self.USER_ITEMS_TABLE, column_defs={ "user_id": "INTEGER NOT NULL", "item_id": "TEXT NOT NULL", "quantity": "INTEGER NOT NULL DEFAULT 0", "PRIMARY KEY (user_id, item_id)": "", }, ), ] return models @override def wake_up(self) -> None: db = get_db() db.define_column(self.ITEMS_TABLE, "description", "TEXT NOT NULL DEFAULT ''") logger.Log( "Info", f"{ConsoleFrontColor.GREEN}WPSBackpackSystem 插件已加载{ConsoleFrontColor.RESET}", ) self.register_plugin("背包") self.register_plugin("backpack") self._warm_item_cache() def _warm_item_cache(self) -> None: cursor = get_db().conn.cursor() cursor.execute( f"SELECT item_id, name, tier, description FROM {self.ITEMS_TABLE}" ) rows = cursor.fetchall() for row in rows: tier = BackpackItemTier.from_string(row["tier"]) description = row["description"] or "" self._item_cache[row["item_id"]] = BackpackItemDefinition( item_id=row["item_id"], name=row["name"], tier=tier, description=description, ) def _iter_registered_items(self) -> Sequence[BackpackItemDefinition]: try: if not self._item_cache: self._warm_item_cache() except Exception: return () return tuple(self._item_cache.values()) def get_items_by_tier( self, tier: BackpackItemTier, *, blacklist: Optional[Collection[str]] = None, ) -> List[BackpackItemDefinition]: normalized_blacklist: Set[str] = set() if blacklist: normalized_blacklist = { str(item_id).strip().lower() for item_id in blacklist if str(item_id).strip() } items: List[BackpackItemDefinition] = [] for definition in self._iter_registered_items(): if definition.tier != tier: continue if normalized_blacklist and definition.item_id.lower() in normalized_blacklist: continue items.append(definition) return items def collect_additional_sections(self) -> Sequence[GuideSection]: sections = list(super().collect_additional_sections()) item_entries: List[GuideEntry] = [] tier_icons = { BackpackItemTier.COMMON: "🪙", BackpackItemTier.RARE: "💠", BackpackItemTier.EPIC: "⚡", BackpackItemTier.LEGENDARY: "🌟", } for definition in self._iter_registered_items(): item_entries.append( GuideEntry( title=definition.name, identifier=definition.item_id, description=definition.description or "(暂无描述)", category="背包物品", metadata={ "稀有度": definition.tier.display_name, }, icon=tier_icons.get(definition.tier, "🎁"), tags=(definition.tier.display_name,), group=definition.tier.display_name, ) ) if item_entries: sections.append( GuideSection( title="物品图鉴", entries=item_entries, layout="grid", section_id="backpack-items", description="当前已注册的背包物品列表,按稀有度分组展示。", ) ) return tuple(sections) # region 对外接口 def register_item( self, item_id: str, name: str, tier: BackpackItemTier, description: str, ) -> None: if not item_id or not name: raise ValueError("item_id and name must be provided") cursor = get_db().conn.cursor() cursor.execute( f""" INSERT INTO {self.ITEMS_TABLE} (item_id, name, tier, description) VALUES (?, ?, ?, ?) ON CONFLICT(item_id) DO UPDATE SET name = excluded.name, tier = excluded.tier, description = excluded.description """, (item_id, name, tier.name, description or ""), ) get_db().conn.commit() self._item_cache[item_id] = BackpackItemDefinition( item_id=item_id, name=name, tier=tier, description=description or "", ) def add_item( self, user_id: int, item_id: str, delta: int, ) -> int: if delta == 0: definition = self._get_definition(item_id) return self._get_user_quantity(user_id, item_id, definition) if delta < 0: raise ValueError("delta must be positive when using add_item") definition = self._get_definition(item_id) cursor = get_db().conn.cursor() cursor.execute( f""" INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity) VALUES (?, ?, ?) ON CONFLICT(user_id, item_id) DO UPDATE SET quantity = {self.USER_ITEMS_TABLE}.quantity + excluded.quantity """, (user_id, item_id, delta), ) get_db().conn.commit() return self._get_user_quantity(user_id, item_id, definition) def set_item_quantity( self, user_id: int, item_id: str, quantity: int, ) -> int: if quantity < 0: raise ValueError("quantity must be non-negative") definition = self._get_definition(item_id) cursor = get_db().conn.cursor() cursor.execute( f""" INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity) VALUES (?, ?, ?) ON CONFLICT(user_id, item_id) DO UPDATE SET quantity = excluded.quantity """, (user_id, item_id, quantity), ) get_db().conn.commit() return self._get_user_quantity(user_id, item_id, definition) def get_user_items(self, user_id: int) -> List[BackpackUserItem]: cursor = get_db().conn.cursor() cursor.execute( f""" SELECT ui.item_id, ui.quantity, i.name, i.tier, i.description FROM {self.USER_ITEMS_TABLE} ui JOIN {self.ITEMS_TABLE} i ON ui.item_id = i.item_id WHERE ui.user_id = ? AND ui.quantity > 0 ORDER BY ui.quantity DESC """, (user_id,), ) rows = cursor.fetchall() result: List[BackpackUserItem] = [] for row in rows: definition = self._item_cache.get(row["item_id"]) if not definition: try: definition = BackpackItemDefinition( item_id=row["item_id"], name=row["name"], tier=BackpackItemTier.from_string(row["tier"]), description=row["description"] or "", ) except ValueError: continue result.append( BackpackUserItem( item_id=row["item_id"], quantity=row["quantity"], definition=definition, ) ) return result # endregion def _get_definition(self, item_id: str) -> BackpackItemDefinition: if item_id in self._item_cache: return self._item_cache[item_id] cursor = get_db().conn.cursor() cursor.execute( f"SELECT item_id, name, tier, description FROM {self.ITEMS_TABLE} WHERE item_id = ?", (item_id,), ) row = cursor.fetchone() if not row: raise ValueError(f"Item {item_id} not registered in backpack system") definition = BackpackItemDefinition( item_id=row["item_id"], name=row["name"], tier=BackpackItemTier.from_string(row["tier"]), description=row["description"] or "", ) self._item_cache[item_id] = definition return definition def _get_user_quantity( self, user_id: int, item_id: str, definition: BackpackItemDefinition, ) -> int: cursor = get_db().conn.cursor() cursor.execute( f""" SELECT quantity FROM {self.USER_ITEMS_TABLE} WHERE user_id = ? AND item_id = ? """, (user_id, item_id), ) row = cursor.fetchone() quantity = int(row["quantity"]) if row else 0 if quantity == 0: cursor.execute( f""" DELETE FROM {self.USER_ITEMS_TABLE} WHERE user_id = ? AND item_id = ? """, (user_id, item_id), ) get_db().conn.commit() return quantity def _format_backpack_markdown(self, user_items: List[BackpackUserItem]) -> str: if not user_items: return "# 🎒 背包一览\n> 当前背包空空如也,去冒险获取一些物品吧!" lines = ["# 🎒 背包一览"] sorted_items = sorted( user_items, key=lambda item: ( list(BackpackItemTier).index(item.definition.tier), -item.quantity, item.definition.name, ), ) for entry in sorted_items: tier_label = entry.definition.tier.to_markdown_label( entry.definition.tier.display_name ) lines.append( f"- {tier_label} · {entry.definition.name} × **{entry.quantity}**" ) return "\n".join(lines) @override async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: message = self.parse_message_after_at(message).strip() user_items = self.get_user_items(user_id) markdown = self._format_backpack_markdown(user_items) await self.send_markdown_message(markdown, chat_id, user_id) return None class WPSItemDescription(WPSAPI): @override def dependencies(self) -> List[type]: return [WPSBackpackSystem] @override def is_enable_plugin(self) -> bool: return True def __init__(self) -> None: super().__init__() from Plugins.WPSCombatSystem.combat_models import ADVENTURE_SOUVENIRS from Plugins.WPSGardenSystem.garden_models import GARDEN_CROPS from Plugins.WPSGardenSystem.garden_service import GardenConfig self._backpack: Optional[WPSBackpackSystem] = None self._souvenir_prices: Dict[str, int] = { item_id: sell_price for item_id, (_, __, sell_price, ___) in ADVENTURE_SOUVENIRS.items() } self._garden_crops = GARDEN_CROPS self._garden_sale_multiplier = GardenConfig.load().sale_multiplier @override def wake_up(self) -> None: self._backpack = Architecture.Get(WPSBackpackSystem) self.register_plugin("查看") self.register_plugin("view") async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: payload = self.parse_message_after_at(message).strip() if not payload: return await self.send_markdown_message("❌ 指令格式:`查看 <物品ID或名称>`", chat_id, user_id) definition = self._resolve_definition(payload) if not definition: return await self.send_markdown_message("❌ 未找到对应物品,请确认输入是否正确", chat_id, user_id) sale_info = self._get_sale_info(definition.item_id) markdown = self._format_markdown(definition, sale_info) return await self.send_markdown_message(markdown, chat_id, user_id) def _resolve_definition(self, identifier: str) -> Optional[BackpackItemDefinition]: lowered = identifier.strip().lower() db = get_db().conn.cursor() db.execute( f""" SELECT item_id FROM {WPSBackpackSystem.ITEMS_TABLE} WHERE lower(item_id) = ? OR lower(name) = ? LIMIT 1 """, (lowered, lowered), ) row = db.fetchone() item_id = row["item_id"] if row else identifier.strip() backpack = self._backpack or Architecture.Get(WPSBackpackSystem) try: return backpack._get_definition(item_id) except Exception: return None def _get_sale_info(self, item_id: str) -> Optional[Dict[str, str]]: if item_id in self._souvenir_prices: price = self._souvenir_prices[item_id] return { "category": "战斗纪念品", "price": f"{price} 分/个", "note": "在战斗系统中出售可立即换取积分。", } for crop in self._garden_crops.values(): if crop.fruit_id == item_id: price = crop.seed_price * self._garden_sale_multiplier return { "category": "菜园果实", "price": f"{price} 分/个", "note": "可通过 `菜园 售出 <果实> <数量>` 换取积分。", } return None def _format_markdown( self, definition: BackpackItemDefinition, sale_info: Optional[Dict[str, str]], ) -> str: tier_label = definition.tier.to_markdown_label(definition.tier.display_name) lines = [ "# 🔍 物品详情", f"- 名称:{definition.name}", f"- ID:`{definition.item_id}`", f"- 稀有度:{tier_label}", f"- 描述:{definition.description or '(暂无描述)'}", ] if sale_info: lines.append(f"- 分类:{sale_info['category']}") lines.append(f"- 系统售价:{sale_info['price']}") note = sale_info.get("note") if note: lines.append(f"- 提示:{note}") return "\n".join(lines) if get_internal_debug(): class WPSDebugGiveItem(WPSAPI): @override def dependencies(self) -> List[type]: return [WPSBackpackSystem] @override def is_enable_plugin(self) -> bool: return True @override def wake_up(self) -> None: self.register_plugin("give") async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: payload = self.parse_message_after_at(message).strip() if payload == "": return await self.send_markdown_message("❌ 指令格式:`give <物品ID> [数量]`", chat_id, user_id) tokens = [token.strip() for token in payload.split() if token.strip()] if not tokens or not tokens[0]: return await self.send_markdown_message("❌ 指令格式:`give <物品ID> [数量]`", chat_id, user_id) item_id = tokens[0] quantity = int(tokens[1]) if len(tokens) > 1 else 1 if quantity <= 0: return await self.send_markdown_message("❌ 数量必须大于0", chat_id, user_id) backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) backpack.add_item(user_id, item_id, quantity) return await self.send_markdown_message(f"✅ 成功给予 {quantity} 个 {item_id} 给用户 {user_id}", chat_id, user_id) __all__ = [ "WPSBackpackSystem", "BackpackItemTier", "BackpackItemDefinition", "WPSItemDescription", ]