from __future__ import annotations import hashlib import random from datetime import datetime, timezone from enum import Enum from typing import Any, Dict, List, Optional, Tuple, override from uuid import uuid4 from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from PWF.CoreModules.database import get_db from PWF.CoreModules.plugin_interface import DatabaseModel from pydantic import BaseModel, Field from .WPSAPI import WPSAPI from .WPSBackpackSystem import BackpackItemDefinition, WPSBackpackSystem from .WPSConfigSystem import WPSConfigAPI logger: ProjectConfig = Architecture.Get(ProjectConfig) class StoreItemSource(Enum): SYSTEM = "system" PERMANENT = "permanent" PLAYER = "player" class StoreMode(BaseModel): mode_id: str item_id: str price: int limit_amount: int is_permanent: bool = False metadata: Dict[str, str] = Field(default_factory=dict) registered_at: datetime class Config: allow_mutation = False class StoreEntry(BaseModel): entry_id: int mode_id: str item_id: str display_name: str price: int limit_amount: int remaining_amount: int source: StoreItemSource created_at: datetime registry_version: int class Config: allow_mutation = False class PlayerListing(BaseModel): user_id: int item_id: str item_name: str price: int quantity: int created_at: datetime status: str class Config: allow_mutation = False class WPSStoreSystem(WPSAPI): SYSTEM_TABLE = "store_system_items" PLAYER_TABLE = "store_user_listings" def __init__(self) -> None: super().__init__() self._mode_registry: Dict[str, StoreMode] = {} self._registry_version: int = 0 self._default_hourly_count: int = int( logger.FindItem("store_hourly_count", 5) ) logger.SaveProperties() self._permanent_mode_ids: set[str] = set() @override def dependencies(self) -> List[type]: return [WPSAPI, WPSConfigAPI, WPSBackpackSystem] @override def is_enable_plugin(self) -> bool: return True @override def register_db_model(self): return [ DatabaseModel( table_name=self.SYSTEM_TABLE, column_defs={ "entry_id": "INTEGER PRIMARY KEY AUTOINCREMENT", "mode_id": "TEXT UNIQUE", "item_id": "TEXT NOT NULL", "display_name": "TEXT NOT NULL", "price": "INTEGER NOT NULL", "limit_amount": "INTEGER", "remaining_amount": "INTEGER", "source": "TEXT NOT NULL", "created_at": "TEXT NOT NULL", "registry_version": "INTEGER NOT NULL", }, ), DatabaseModel( table_name=self.PLAYER_TABLE, column_defs={ "user_id": "INTEGER PRIMARY KEY", "item_id": "TEXT NOT NULL", "price": "INTEGER NOT NULL", "quantity": "INTEGER NOT NULL", "created_at": "TEXT NOT NULL", "status": "TEXT NOT NULL", }, ), ] @override def wake_up(self) -> None: logger.Log( "Info", f"{ConsoleFrontColor.GREEN}WPSStoreSystem 插件已加载{ConsoleFrontColor.RESET}", ) for alias in ["store", "商店"]: self.register_plugin(alias) self._cleanup_state() def _cleanup_state(self) -> None: cursor = get_db().conn.cursor() # 移除无效的系统商品条目 cursor.execute( f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ? AND remaining_amount = 0", (StoreItemSource.SYSTEM.value,), ) # 清理玩家非激活记录 cursor.execute( f"DELETE FROM {self.PLAYER_TABLE} WHERE status != ?", ("active",), ) get_db().conn.commit() # region 模式注册接口 def register_mode( self, item_id: str, price: int, limit_amount: int, *, is_permanent: bool = False, metadata: Optional[Dict[str, str]] = None, ) -> str: if not item_id: raise ValueError("item_id must be provided") if price <= 0: raise ValueError("price must be positive") if limit_amount == 0: raise ValueError("limit_amount must not be zero") if limit_amount < -1: raise ValueError("limit_amount must be -1 or positive") backpack = Architecture.Get(WPSBackpackSystem) try: item_def = backpack._get_definition(item_id) except Exception as exc: raise ValueError(f"Item {item_id} not registered in backpack system") from exc mode_id = uuid4().hex metadata = dict(metadata or {}) self._mode_registry[mode_id] = StoreMode( mode_id=mode_id, item_id=item_id, price=price, limit_amount=limit_amount, is_permanent=is_permanent, metadata=metadata, registered_at=datetime.now(timezone.utc), ) if is_permanent: self._permanent_mode_ids.add(mode_id) self._bump_registry_version() logger.Log( "Info", f"{ConsoleFrontColor.CYAN}Store mode {mode_id} registered for item {item_def.name} ({item_id}){ConsoleFrontColor.RESET}", ) return mode_id def register_permanent_mode( self, item_id: str, price: int, *, limit_amount: int = -1, metadata: Optional[Dict[str, str]] = None, ) -> str: return self.register_mode( item_id=item_id, price=price, limit_amount=limit_amount, is_permanent=True, metadata=metadata, ) def unregister_mode(self, mode_id: str) -> bool: mode = self._mode_registry.pop(mode_id, None) if not mode: return False if mode.is_permanent and mode_id in self._permanent_mode_ids: self._permanent_mode_ids.remove(mode_id) self._bump_registry_version() logger.Log( "Info", f"{ConsoleFrontColor.YELLOW}Store mode {mode_id} unregistered{ConsoleFrontColor.RESET}", ) return True def list_modes(self) -> List[StoreMode]: return list(self._mode_registry.values()) def _bump_registry_version(self) -> None: self._registry_version += 1 def _get_permanent_modes(self) -> List[StoreMode]: return [ mode for mode in self._mode_registry.values() if mode.is_permanent ] def _get_candidate_modes(self) -> List[StoreMode]: return [ mode for mode in self._mode_registry.values() if not mode.is_permanent ] # endregion # region 刷新与数据同步 def _current_hour_key(self) -> str: return datetime.now(timezone.utc).strftime("%Y%m%d%H") def _ensure_hour_snapshot(self) -> None: hour_key = self._current_hour_key() if self._should_refresh(hour_key): self._rebuild_system_entries(hour_key) self._sync_permanent_entries() def _should_refresh(self, hour_key: str) -> bool: cursor = get_db().conn.cursor() cursor.execute( f""" SELECT created_at, registry_version FROM {self.SYSTEM_TABLE} WHERE source = ? LIMIT 1 """, (StoreItemSource.SYSTEM.value,), ) row = cursor.fetchone() if not row: return True try: created_at = datetime.fromisoformat(row["created_at"]) except ValueError: return True current_hour = created_at.strftime("%Y%m%d%H") registry_version = int(row["registry_version"]) return current_hour != hour_key or registry_version != self._registry_version def _rebuild_system_entries(self, hour_key: str) -> None: cursor = get_db().conn.cursor() cursor.execute( f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ?", (StoreItemSource.SYSTEM.value,), ) candidates = self._get_candidate_modes() if not candidates: get_db().conn.commit() return seed = self._compute_seed(hour_key) rng = random.Random(seed) rng.shuffle(candidates) selection = candidates[: self._default_hourly_count] now = datetime.now(timezone.utc).isoformat() backpack = Architecture.Get(WPSBackpackSystem) for mode in selection: definition = backpack._get_definition(mode.item_id) remaining = mode.limit_amount if mode.limit_amount >= 0 else -1 cursor.execute( f""" INSERT INTO {self.SYSTEM_TABLE} ( mode_id, item_id, display_name, price, limit_amount, remaining_amount, source, created_at, registry_version ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( mode.mode_id, mode.item_id, definition.name, mode.price, mode.limit_amount, remaining, StoreItemSource.SYSTEM.value, now, self._registry_version, ), ) get_db().conn.commit() def _sync_permanent_entries(self) -> None: cursor = get_db().conn.cursor() permanent_modes = self._get_permanent_modes() permanent_ids = [mode.mode_id for mode in permanent_modes] if permanent_ids: placeholders = ", ".join("?" for _ in permanent_ids) cursor.execute( f""" DELETE FROM {self.SYSTEM_TABLE} WHERE source = ? AND mode_id NOT IN ({placeholders}) """, (StoreItemSource.PERMANENT.value, *permanent_ids), ) else: cursor.execute( f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ?", (StoreItemSource.PERMANENT.value,), ) now = datetime.now(timezone.utc).isoformat() backpack = Architecture.Get(WPSBackpackSystem) for mode in permanent_modes: definition = backpack._get_definition(mode.item_id) cursor.execute( f""" INSERT INTO {self.SYSTEM_TABLE} ( mode_id, item_id, display_name, price, limit_amount, remaining_amount, source, created_at, registry_version ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(mode_id) DO UPDATE SET item_id = excluded.item_id, display_name = excluded.display_name, price = excluded.price, limit_amount = excluded.limit_amount, remaining_amount = excluded.remaining_amount, created_at = excluded.created_at, registry_version = excluded.registry_version """, ( mode.mode_id, mode.item_id, definition.name, mode.price, mode.limit_amount, -1 if mode.limit_amount == -1 else mode.limit_amount, StoreItemSource.PERMANENT.value, now, self._registry_version, ), ) get_db().conn.commit() def _compute_seed(self, hour_key: str) -> int: payload = f"{hour_key}|{self._registry_version}".encode("utf-8") digest = hashlib.sha256(payload).digest() return int.from_bytes(digest[:8], "big") # endregion # region 指令处理 STORE_COMMANDS = {"store", "商店"} @override async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: _ = self.parse_message_after_at(message).strip() response = await self._handle_store(chat_id, user_id) return await self.send_markdown_message(response, chat_id, user_id) async def _handle_store(self, chat_id: int, user_id: int) -> str: self._ensure_hour_snapshot() system_entries, permanent_entries = self._fetch_system_entries() player_listings = self._fetch_player_listings() return self._format_store_markdown( chat_id=chat_id, user_id=user_id, system_entries=system_entries, permanent_entries=permanent_entries, player_listings=player_listings, ) async def purchase_item( self, *, chat_id: int, user_id: int, identifier: str, quantity: int, ) -> str: identifier = identifier.strip() if not identifier: return "❌ 购买指令格式错误,请提供物品名称或ID" if quantity <= 0: return "❌ 购买数量必须大于0" self._ensure_hour_snapshot() system_entries, permanent_entries = self._fetch_system_entries() player_listings = self._fetch_player_listings() matched_system_entries = self._resolve_system_entries( identifier=identifier, system_entries=system_entries, permanent_entries=permanent_entries, ) matched_player_listings = await self._resolve_player_listings( identifier=identifier, listings=player_listings, ) if not matched_system_entries and not matched_player_listings: return "❌ 未找到匹配的商品,请确认名称或物品ID是否正确" candidates: list[tuple[int, int, str, StoreEntry | PlayerListing]] = [] for idx, listing in enumerate(matched_player_listings): candidates.append((listing.price, 0, f"player-{idx}", listing)) for idx, entry in enumerate(system_entries): if entry in matched_system_entries["system"]: candidates.append((entry.price, 1, f"system-{idx}", entry)) for idx, entry in enumerate(permanent_entries): if entry in matched_system_entries["permanent"]: candidates.append((entry.price, 2, f"permanent-{idx}", entry)) if not candidates: return "❌ 未找到匹配的商品,请确认名称或物品ID是否正确" candidates.sort(key=lambda item: (item[0], item[1], item[2])) for price, source_priority, _, payload in candidates: if source_priority == 0: listing = payload # type: ignore[assignment] response = await self._purchase_player_listing( listing=listing, quantity=quantity, chat_id=chat_id, user_id=user_id, ) if not response.startswith("❌"): return response if "库存不足" in response: continue return response entry = payload # type: ignore[assignment] response = await self._purchase_system_entry( entry=entry, quantity=quantity, chat_id=chat_id, user_id=user_id, ) if not response.startswith("❌"): return response if "库存不足" in response: continue return response return "❌ 所有匹配的商品已售罄或库存不足" async def sell_item( self, *, chat_id: int, user_id: int, identifier: str, quantity: int, price: int, ) -> str: identifier = identifier.strip() if not identifier: return "❌ 出售指令格式错误,请提供物品名称或ID" if quantity < 0: return "❌ 出售数量必须大于或等于0" if price < 0: return "❌ 出售单价必须是非负整数" backpack = Architecture.Get(WPSBackpackSystem) item_id, definition = self._resolve_item(identifier) if item_id is None: return "❌ 未找到对应物品,请确认物品已注册且名称/ID 正确" existing_listing = self._get_user_listing(user_id) now_iso = datetime.now(timezone.utc).isoformat() if existing_listing and existing_listing.status == "active": if existing_listing.item_id != item_id: return ( "❌ 你已在出售其他物品,如需更换请先使用 `出售 <当前物品ID> 0` 撤下" ) if quantity == 0: cursor = get_db().conn.cursor() cursor.execute( f""" UPDATE {self.PLAYER_TABLE} SET quantity = 0, status = ?, created_at = ? WHERE user_id = ? """, ("closed", now_iso, user_id), ) get_db().conn.commit() backpack.add_item(user_id, item_id, existing_listing.quantity) return f"✅ 已撤下出售的 {definition.name},返还数量 {existing_listing.quantity}" diff = quantity - existing_listing.quantity if diff > 0: available_qty = self._get_user_inventory_quantity(user_id, item_id) if available_qty < diff: return f"❌ 背包数量不足,需要额外 {diff} 个 {definition.name}" backpack.set_item_quantity(user_id, item_id, available_qty - diff) elif diff < 0: backpack.add_item(user_id, item_id, -diff) cursor = get_db().conn.cursor() cursor.execute( f""" UPDATE {self.PLAYER_TABLE} SET quantity = ?, price = ?, created_at = ?, status = ? WHERE user_id = ? """, (quantity, price, now_iso, "active", user_id), ) get_db().conn.commit() return f"✅ 已更新出售数量为 {quantity} 个 {definition.name},售价 {price} 分/个" if quantity == 0: return "ℹ️ 当前没有在出售的物品" available_qty = self._get_user_inventory_quantity(user_id, item_id) if available_qty < quantity: return f"❌ 背包数量不足,当前拥有 {available_qty} 个 {definition.name}" backpack.set_item_quantity(user_id, item_id, available_qty - quantity) cursor = get_db().conn.cursor() cursor.execute( f""" INSERT INTO {self.PLAYER_TABLE} (user_id, item_id, price, quantity, created_at, status) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(user_id) DO UPDATE SET item_id = excluded.item_id, price = excluded.price, quantity = excluded.quantity, created_at = excluded.created_at, status = excluded.status """, (user_id, item_id, price, quantity, now_iso, "active"), ) get_db().conn.commit() return f"✅ 已上架 {definition.name} × {quantity},售价 {price} 分/个" def _help_message(self) -> str: return """# 🛒 商店指令帮助 - `商店`:查看当前系统商品与玩家出售列表 - `购买 <物品名称或ID> <数量>`:购买指定商品 - `出售 <物品名称或ID> <数量> <单价>`:上架或更新自己的出售物品(限一种) """ def _fetch_system_entries(self) -> Tuple[List[StoreEntry], List[StoreEntry]]: cursor = get_db().conn.cursor() cursor.execute( f""" SELECT entry_id, mode_id, item_id, display_name, price, limit_amount, remaining_amount, source, created_at, registry_version FROM {self.SYSTEM_TABLE} ORDER BY source DESC, entry_id ASC """ ) rows = cursor.fetchall() system_entries: List[StoreEntry] = [] permanent_entries: List[StoreEntry] = [] for row in rows: created_at = self._parse_datetime(row["created_at"]) entry = StoreEntry( entry_id=row["entry_id"], mode_id=row["mode_id"], item_id=row["item_id"], display_name=row["display_name"], price=row["price"], limit_amount=row["limit_amount"], remaining_amount=row["remaining_amount"], source=StoreItemSource(row["source"]), created_at=created_at, registry_version=row["registry_version"], ) if entry.source == StoreItemSource.PERMANENT: permanent_entries.append(entry) elif entry.source == StoreItemSource.SYSTEM: system_entries.append(entry) return system_entries, permanent_entries def _fetch_player_listings(self) -> List[PlayerListing]: cursor = get_db().conn.cursor() cursor.execute( f""" SELECT user_id, item_id, price, quantity, created_at, status FROM {self.PLAYER_TABLE} WHERE status = ? ORDER BY created_at ASC """, ("active",), ) rows = cursor.fetchall() listings: List[PlayerListing] = [] backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) for row in rows: item_id = row["item_id"] try: definition = backpack._get_definition(item_id) item_name = definition.name except Exception: item_name = item_id listings.append( PlayerListing( user_id=row["user_id"], item_id=item_id, item_name=item_name, price=row["price"], quantity=row["quantity"], created_at=self._parse_datetime(row["created_at"]), status=row["status"], ) ) return listings def _format_store_markdown( self, *, chat_id: int, user_id: int, system_entries: List[StoreEntry], permanent_entries: List[StoreEntry], player_listings: List[PlayerListing], ) -> str: lines: List[str] = ["# 🏬 每小时商店"] config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI) if permanent_entries: lines.append("## ♾️ 常驻商品") for entry in permanent_entries: limit_text = ( "不限量" if entry.limit_amount == -1 else f"限购 {entry.limit_amount}" ) lines.append( f"- {entry.display_name}|价格 {entry.price} 分|{limit_text}" ) if system_entries: lines.append("## ⏱️ 本时段商品") for entry in system_entries: if entry.remaining_amount == -1: stock_text = "剩余:不限量" else: stock_text = f"剩余:{entry.remaining_amount}" lines.append( f"- {entry.display_name}|价格 {entry.price} 分|{stock_text}" ) if not permanent_entries and not system_entries: lines.append("> ⚠️ 当前没有可售的系统商品") lines.append("## 👥 玩家出售") if player_listings: for listing in player_listings: owner = "你" if listing.user_id != user_id: owner = config_api.get_user_name(listing.user_id) lines.append( f"- {owner}|{listing.item_name}|数量 {listing.quantity}|价格 {listing.price} 分" ) else: lines.append("> 当前暂无玩家出售信息") lines.append("\n---\n" + self._help_message()) return "\n".join(lines) def _parse_datetime(self, value: str) -> datetime: try: return datetime.fromisoformat(value) except ValueError: return datetime.now(timezone.utc) # endregion # region 购买逻辑 async def _purchase_system_entry( self, *, entry: StoreEntry, quantity: int, chat_id: int, user_id: int, ) -> str: total_price = entry.price * quantity config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI) user_points = config_api.get_user_points(user_id) if user_points < total_price: return f"❌ 积分不足,需要 {total_price} 分,当前仅有 {user_points} 分" cursor = get_db().conn.cursor() if entry.remaining_amount != -1: cursor.execute( f""" UPDATE {self.SYSTEM_TABLE} SET remaining_amount = remaining_amount - ? WHERE entry_id = ? AND remaining_amount >= ? """, (quantity, entry.entry_id, quantity), ) if cursor.rowcount == 0: get_db().conn.rollback() return "❌ 库存不足,无法完成购买" get_db().conn.commit() new_points = await config_api.adjust_user_points( chat_id, user_id, -total_price, reason=f"购买 {entry.display_name}" ) backpack = Architecture.Get(WPSBackpackSystem) backpack.add_item(user_id, entry.item_id, quantity) return ( f"✅ 成功购买 {entry.display_name} × {quantity},花费 {total_price} 分\n" f"当前剩余积分:{new_points}" ) async def _resolve_player_listings( self, identifier: str, listings: List[PlayerListing], ) -> List[PlayerListing]: if not listings: return [] backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) identifier_lower = identifier.lower() matches: List[PlayerListing] = [] for listing in listings: if listing.quantity <= 0: continue if listing.item_id.lower() == identifier_lower: matches.append(listing) continue if listing.item_name.lower() == identifier_lower: matches.append(listing) continue try: definition = backpack._get_definition(listing.item_id) except Exception: continue if definition.name.lower() == identifier_lower: matches.append(listing) return matches async def _purchase_player_listing( self, *, listing: PlayerListing, quantity: int, chat_id: int, user_id: int, ) -> str: if quantity <= 0: return "❌ 购买数量必须大于0" if listing.quantity < quantity: return "❌ 玩家出售库存不足" if listing.user_id == user_id: return "❌ 无法购买自己上架的商品" total_price = listing.price * quantity config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI) buyer_points = config_api.get_user_points(user_id) if buyer_points < total_price: return f"❌ 积分不足,需要 {total_price} 分,当前仅有 {buyer_points} 分" cursor = get_db().conn.cursor() cursor.execute( f""" UPDATE {self.PLAYER_TABLE} SET quantity = quantity - ? WHERE user_id = ? AND quantity >= ? """, (quantity, listing.user_id, quantity), ) if cursor.rowcount == 0: get_db().conn.rollback() return "❌ 玩家出售库存不足或已被其他人购买" cursor.execute( f""" UPDATE {self.PLAYER_TABLE} SET status = ? WHERE user_id = ? AND quantity <= 0 """, ("closed", listing.user_id), ) get_db().conn.commit() buyer_new_points = await config_api.adjust_user_points( chat_id, user_id, -total_price, reason="购买玩家商品" ) await config_api.adjust_user_points( chat_id, listing.user_id, total_price, reason="玩家出售结算", ) backpack = Architecture.Get(WPSBackpackSystem) backpack.add_item(user_id, listing.item_id, quantity) definition = backpack._get_definition(listing.item_id) return ( f"✅ 成功购买玩家商品 {definition.name} × {quantity},花费 {total_price} 分\n" f"当前剩余积分:{buyer_new_points}" ) def _resolve_system_entries( self, identifier: str, system_entries: List[StoreEntry], permanent_entries: List[StoreEntry], ) -> Dict[str, List[StoreEntry]]: identifier_lower = identifier.lower() matched_system: List[StoreEntry] = [] matched_permanent: List[StoreEntry] = [] for entry in permanent_entries: if entry.item_id.lower() == identifier_lower or entry.display_name.lower() == identifier_lower: matched_permanent.append(entry) for entry in system_entries: if entry.item_id.lower() == identifier_lower or entry.display_name.lower() == identifier_lower: matched_system.append(entry) return {"system": matched_system, "permanent": matched_permanent} def _get_user_listing(self, user_id: int) -> Optional[PlayerListing]: cursor = get_db().conn.cursor() cursor.execute( f""" SELECT user_id, item_id, price, quantity, created_at, status FROM {self.PLAYER_TABLE} WHERE user_id = ? """, (user_id,), ) row = cursor.fetchone() if not row: return None item_id = row["item_id"] backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) try: definition = backpack._get_definition(item_id) item_name = definition.name except Exception: item_name = item_id return PlayerListing( user_id=row["user_id"], item_id=item_id, item_name=item_name, price=row["price"], quantity=row["quantity"], created_at=self._parse_datetime(row["created_at"]), status=row["status"], ) def _derive_player_price(self, item_id: str) -> int: prices = [ mode.price for mode in self._mode_registry.values() if mode.item_id == item_id ] if not prices: raise ValueError( f"Item {item_id} has no registered store mode to derive price" ) return min(prices) def _resolve_item( self, identifier: str, ) -> Tuple[Optional[str], Optional["BackpackItemDefinition"]]: backpack = Architecture.Get(WPSBackpackSystem) identifier_lower = identifier.lower() cursor = get_db().conn.cursor() cursor.execute( f""" SELECT item_id FROM {WPSBackpackSystem.ITEMS_TABLE} WHERE lower(item_id) = ? OR lower(name) = ? LIMIT 1 """, (identifier_lower, identifier_lower), ) row = cursor.fetchone() item_id = row["item_id"] if row else identifier try: definition = backpack._get_definition(item_id) return definition.item_id, definition except Exception: return None, None def _get_user_inventory_quantity(self, user_id: int, item_id: str) -> int: backpack = Architecture.Get(WPSBackpackSystem) for item in backpack.get_user_items(user_id): if item.item_id == item_id: return item.quantity return 0 def get_store_snapshot(self) -> Dict[str, Any]: self._ensure_hour_snapshot() system_entries, permanent_entries = self._fetch_system_entries() player_listings = self._fetch_player_listings() return { "system": [self._entry_to_dict(entry) for entry in system_entries], "permanent": [self._entry_to_dict(entry) for entry in permanent_entries], "players": [self._listing_to_dict(listing) for listing in player_listings], } def get_user_listing_snapshot(self, user_id: int) -> Optional[Dict[str, Any]]: listing = self._get_user_listing(user_id) if not listing or listing.status != "active": return None return self._listing_to_dict(listing) def _entry_to_dict(self, entry: StoreEntry) -> Dict[str, Any]: return { "entry_id": entry.entry_id, "mode_id": entry.mode_id, "item_id": entry.item_id, "display_name": entry.display_name, "price": entry.price, "limit_amount": entry.limit_amount, "remaining_amount": entry.remaining_amount, "source": entry.source.value, "created_at": entry.created_at.isoformat(), "registry_version": entry.registry_version, } def _listing_to_dict(self, listing: PlayerListing) -> Dict[str, Any]: return { "user_id": listing.user_id, "item_id": listing.item_id, "price": listing.price, "quantity": listing.quantity, "created_at": listing.created_at.isoformat(), "status": listing.status, } # endregion # endregion class WPSStoreBuyCommand(WPSAPI): @override def dependencies(self) -> list[type]: return [WPSStoreSystem] @override def is_enable_plugin(self) -> bool: return True @override def wake_up(self) -> None: logger.Log( "Info", f"{ConsoleFrontColor.GREEN}WPSStoreBuyCommand 插件已加载{ConsoleFrontColor.RESET}", ) for alias in ["buy", "购买"]: self.register_plugin(alias) @override async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: message = self.parse_message_after_at(message).strip() if not message: return await self._send_error( "❌ 购买指令格式错误,请使用:`购买 <物品名称或ID> <数量>`", chat_id, user_id, ) tokens = [token.strip() for token in message.split() if token.strip()] if len(tokens) < 2: return await self._send_error( "❌ 购买指令格式错误,请使用:`购买 <物品名称或ID> <数量>`", chat_id, user_id, ) identifier = " ".join(tokens[:-1]).strip() quantity_token = tokens[-1] try: quantity = int(quantity_token) except ValueError: return await self._send_error("❌ 购买数量必须是整数", chat_id, user_id) store_api: WPSStoreSystem = Architecture.Get(WPSStoreSystem) response = await store_api.purchase_item( chat_id=chat_id, user_id=user_id, identifier=identifier, quantity=quantity, ) return await self.send_markdown_message(response, chat_id, user_id) async def _send_error(self, message: str, chat_id: int, user_id: int) -> Optional[str]: return await self.send_markdown_message(message, chat_id, user_id) class WPSStoreSellCommand(WPSAPI): @override def dependencies(self) -> list[type]: return [WPSStoreSystem] @override def is_enable_plugin(self) -> bool: return True @override def wake_up(self) -> None: logger.Log( "Info", f"{ConsoleFrontColor.GREEN}WPSStoreSellCommand 插件已加载{ConsoleFrontColor.RESET}", ) for alias in ["sell", "出售"]: self.register_plugin(alias) @override async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: message = self.parse_message_after_at(message).strip() if not message: return await self._send_error( "❌ 出售指令格式错误,请使用:`出售 <物品名称或ID> <数量> <单价>`", chat_id, user_id, ) tokens = [token.strip() for token in message.split() if token.strip()] if len(tokens) < 3: return await self._send_error( "❌ 出售指令格式错误,请使用:`出售 <物品名称或ID> <数量> <单价>`", chat_id, user_id, ) identifier = " ".join(tokens[:-2]).strip() quantity_token = tokens[-2] price_token = tokens[-1] if not identifier: return await self._send_error( "❌ 出售指令格式错误,请提供物品名称或ID", chat_id, user_id, ) try: quantity = int(quantity_token) except ValueError: return await self._send_error("❌ 出售数量必须是整数", chat_id, user_id) try: price = int(price_token) except ValueError: return await self._send_error("❌ 出售单价必须是整数", chat_id, user_id) if price < 0: return await self._send_error("❌ 出售单价必须是非负整数", chat_id, user_id) store_api = Architecture.Get(WPSStoreSystem) response = await store_api.sell_item( chat_id=chat_id, user_id=user_id, identifier=identifier, quantity=quantity, price=price, ) return await self.send_markdown_message(response, chat_id, user_id) async def _send_error(self, message: str, chat_id: int, user_id: int) -> Optional[str]: return await self.send_markdown_message(message, chat_id, user_id) __all__ = ["WPSStoreSystem"]