Files
NewWPSBot/Plugins/WPSBackpackSystem.py

568 lines
20 KiB
Python
Raw Permalink Normal View History

2025-11-08 15:34:44 +08:00
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
2025-11-13 20:31:17 +08:00
from PWF.Convention.Runtime.Config import *
2025-11-08 15:34:44 +08:00
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
2025-11-11 22:32:41 +08:00
from PWF.CoreModules.flags import get_internal_debug
2025-11-08 15:34:44 +08:00
2025-11-12 22:58:36 +08:00
from .WPSAPI import GuideEntry, GuideSection, WPSAPI
2025-11-08 15:34:44 +08:00
logger: ProjectConfig = Architecture.Get(ProjectConfig)
class BackpackItemTier(Enum):
"""背包物品等级定义"""
COMMON = ("普通", "#C0C0C0")
RARE = ("稀有", "#4DA6FF")
EPIC = ("史诗", "#B266FF")
LEGENDARY = ("传说", "#FF9933")
def __init__(self, display_name: str, color_hex: str):
self.display_name = display_name
self.color_hex = color_hex
@classmethod
def from_string(cls, value: str) -> "BackpackItemTier":
for tier in cls:
if tier.name == value or tier.display_name == value:
return tier
raise ValueError(f"Unknown backpack item tier: {value}")
def to_markdown_label(self, content: str) -> str:
return f'<font color="{self.color_hex}">{content}</font>'
@dataclass(frozen=True)
class BackpackItemDefinition:
item_id: str
name: str
tier: BackpackItemTier
2025-11-10 22:30:16 +08:00
description: str
2025-11-08 15:34:44 +08:00
@dataclass(frozen=True)
class BackpackUserItem:
item_id: str
quantity: int
definition: BackpackItemDefinition
class WPSBackpackSystem(WPSAPI):
"""背包基础系统,负责物品注册与用户物品存取"""
ITEMS_TABLE = "backpack_items"
USER_ITEMS_TABLE = "backpack_user_items"
2025-11-12 22:58:36 +08:00
def get_guide_subtitle(self) -> str:
return "管理物品注册、背包存储与查询的核心系统"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"物品缓存数": str(len(self._item_cache)),
"数据表": f"{self.ITEMS_TABLE}, {self.USER_ITEMS_TABLE}",
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "背包",
"identifier": "背包",
"description": "以稀有度分组展示用户当前携带物品。",
"metadata": {"别名": "backpack"},
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
tier_labels = ", ".join(tier.display_name for tier in BackpackItemTier)
return (
{
"title": "物品注册",
"description": (
"`register_item(item_id, name, tier, description)` "
"将物品写入背包表,重复调用会更新名称、稀有度和描述。"
),
},
{
"title": "稀有度体系",
"description": f"支持稀有度:{tier_labels},调用 `to_markdown_label` 可渲染彩色标签。",
},
{
"title": "库存操作",
"description": (
"`add_item` / `set_item_quantity` / `_get_user_quantity` "
"确保用户物品数量保持非负,并自动创建记录。"
),
},
)
2025-11-08 15:34:44 +08:00
def __init__(self) -> None:
super().__init__()
self._item_cache: Dict[str, BackpackItemDefinition] = {}
@override
def dependencies(self) -> List[type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def register_db_model(self):
from PWF.CoreModules.plugin_interface import DatabaseModel
2025-11-10 22:30:16 +08:00
models = [
2025-11-08 15:34:44 +08:00
DatabaseModel(
table_name=self.ITEMS_TABLE,
column_defs={
"item_id": "TEXT PRIMARY KEY",
"name": "TEXT NOT NULL",
"tier": "TEXT NOT NULL",
2025-11-10 22:30:16 +08:00
"description": "TEXT NOT NULL DEFAULT ''",
2025-11-08 15:34:44 +08:00
},
),
DatabaseModel(
table_name=self.USER_ITEMS_TABLE,
column_defs={
"user_id": "INTEGER NOT NULL",
"item_id": "TEXT NOT NULL",
"quantity": "INTEGER NOT NULL DEFAULT 0",
"PRIMARY KEY (user_id, item_id)": "",
},
),
]
2025-11-10 22:30:16 +08:00
return models
2025-11-08 15:34:44 +08:00
@override
def wake_up(self) -> None:
2025-11-10 22:30:16 +08:00
db = get_db()
db.define_column(self.ITEMS_TABLE, "description", "TEXT NOT NULL DEFAULT ''")
2025-11-08 15:34:44 +08:00
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSBackpackSystem 插件已加载{ConsoleFrontColor.RESET}",
)
self.register_plugin("背包")
self.register_plugin("backpack")
self._warm_item_cache()
def _warm_item_cache(self) -> None:
cursor = get_db().conn.cursor()
cursor.execute(
2025-11-10 22:30:16 +08:00
f"SELECT item_id, name, tier, description FROM {self.ITEMS_TABLE}"
2025-11-08 15:34:44 +08:00
)
rows = cursor.fetchall()
for row in rows:
tier = BackpackItemTier.from_string(row["tier"])
2025-11-10 22:30:16 +08:00
description = row["description"] or ""
2025-11-08 15:34:44 +08:00
self._item_cache[row["item_id"]] = BackpackItemDefinition(
item_id=row["item_id"],
name=row["name"],
tier=tier,
2025-11-10 22:30:16 +08:00
description=description,
2025-11-08 15:34:44 +08:00
)
2025-11-12 22:58:36 +08:00
def _iter_registered_items(self) -> Sequence[BackpackItemDefinition]:
try:
if not self._item_cache:
self._warm_item_cache()
except Exception:
return ()
return tuple(self._item_cache.values())
2025-11-13 20:31:17 +08:00
def get_items_by_tier(
self,
tier: BackpackItemTier,
*,
blacklist: Optional[Collection[str]] = None,
) -> List[BackpackItemDefinition]:
normalized_blacklist: Set[str] = set()
if blacklist:
normalized_blacklist = {
str(item_id).strip().lower()
for item_id in blacklist
if str(item_id).strip()
}
items: List[BackpackItemDefinition] = []
for definition in self._iter_registered_items():
if definition.tier != tier:
continue
if normalized_blacklist and definition.item_id.lower() in normalized_blacklist:
continue
items.append(definition)
return items
2025-11-12 22:58:36 +08:00
def collect_additional_sections(self) -> Sequence[GuideSection]:
sections = list(super().collect_additional_sections())
item_entries: List[GuideEntry] = []
tier_icons = {
BackpackItemTier.COMMON: "🪙",
BackpackItemTier.RARE: "💠",
BackpackItemTier.EPIC: "",
BackpackItemTier.LEGENDARY: "🌟",
}
for definition in self._iter_registered_items():
item_entries.append(
GuideEntry(
title=definition.name,
identifier=definition.item_id,
description=definition.description or "(暂无描述)",
category="背包物品",
metadata={
"稀有度": definition.tier.display_name,
},
icon=tier_icons.get(definition.tier, "🎁"),
tags=(definition.tier.display_name,),
group=definition.tier.display_name,
)
)
if item_entries:
sections.append(
GuideSection(
title="物品图鉴",
entries=item_entries,
layout="grid",
section_id="backpack-items",
description="当前已注册的背包物品列表,按稀有度分组展示。",
)
)
return tuple(sections)
2025-11-08 15:34:44 +08:00
# region 对外接口
def register_item(
self,
item_id: str,
name: str,
tier: BackpackItemTier,
2025-11-10 22:30:16 +08:00
description: str,
2025-11-08 15:34:44 +08:00
) -> None:
if not item_id or not name:
raise ValueError("item_id and name must be provided")
cursor = get_db().conn.cursor()
cursor.execute(
f"""
2025-11-10 22:30:16 +08:00
INSERT INTO {self.ITEMS_TABLE} (item_id, name, tier, description)
VALUES (?, ?, ?, ?)
2025-11-08 15:34:44 +08:00
ON CONFLICT(item_id) DO UPDATE
SET name = excluded.name,
2025-11-10 22:30:16 +08:00
tier = excluded.tier,
description = excluded.description
2025-11-08 15:34:44 +08:00
""",
2025-11-10 22:30:16 +08:00
(item_id, name, tier.name, description or ""),
2025-11-08 15:34:44 +08:00
)
get_db().conn.commit()
2025-11-10 22:30:16 +08:00
self._item_cache[item_id] = BackpackItemDefinition(
item_id=item_id,
name=name,
tier=tier,
description=description or "",
)
2025-11-08 15:34:44 +08:00
def add_item(
self,
user_id: int,
item_id: str,
delta: int,
) -> int:
if delta == 0:
definition = self._get_definition(item_id)
return self._get_user_quantity(user_id, item_id, definition)
if delta < 0:
raise ValueError("delta must be positive when using add_item")
definition = self._get_definition(item_id)
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity)
VALUES (?, ?, ?)
ON CONFLICT(user_id, item_id) DO UPDATE
SET quantity = {self.USER_ITEMS_TABLE}.quantity + excluded.quantity
""",
(user_id, item_id, delta),
)
get_db().conn.commit()
return self._get_user_quantity(user_id, item_id, definition)
def set_item_quantity(
self,
user_id: int,
item_id: str,
quantity: int,
) -> int:
if quantity < 0:
raise ValueError("quantity must be non-negative")
definition = self._get_definition(item_id)
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity)
VALUES (?, ?, ?)
ON CONFLICT(user_id, item_id) DO UPDATE
SET quantity = excluded.quantity
""",
(user_id, item_id, quantity),
)
get_db().conn.commit()
return self._get_user_quantity(user_id, item_id, definition)
def get_user_items(self, user_id: int) -> List[BackpackUserItem]:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
2025-11-10 22:30:16 +08:00
SELECT ui.item_id, ui.quantity, i.name, i.tier, i.description
2025-11-08 15:34:44 +08:00
FROM {self.USER_ITEMS_TABLE} ui
JOIN {self.ITEMS_TABLE} i ON ui.item_id = i.item_id
WHERE ui.user_id = ? AND ui.quantity > 0
ORDER BY ui.quantity DESC
""",
(user_id,),
)
rows = cursor.fetchall()
result: List[BackpackUserItem] = []
for row in rows:
definition = self._item_cache.get(row["item_id"])
if not definition:
try:
definition = BackpackItemDefinition(
item_id=row["item_id"],
name=row["name"],
tier=BackpackItemTier.from_string(row["tier"]),
2025-11-10 22:30:16 +08:00
description=row["description"] or "",
2025-11-08 15:34:44 +08:00
)
except ValueError:
continue
result.append(
BackpackUserItem(
item_id=row["item_id"],
quantity=row["quantity"],
definition=definition,
)
)
return result
# endregion
def _get_definition(self, item_id: str) -> BackpackItemDefinition:
if item_id in self._item_cache:
return self._item_cache[item_id]
cursor = get_db().conn.cursor()
cursor.execute(
2025-11-10 22:30:16 +08:00
f"SELECT item_id, name, tier, description FROM {self.ITEMS_TABLE} WHERE item_id = ?",
2025-11-08 15:34:44 +08:00
(item_id,),
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Item {item_id} not registered in backpack system")
definition = BackpackItemDefinition(
item_id=row["item_id"],
name=row["name"],
tier=BackpackItemTier.from_string(row["tier"]),
2025-11-10 22:30:16 +08:00
description=row["description"] or "",
2025-11-08 15:34:44 +08:00
)
self._item_cache[item_id] = definition
return definition
def _get_user_quantity(
self,
user_id: int,
item_id: str,
definition: BackpackItemDefinition,
) -> int:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT quantity
FROM {self.USER_ITEMS_TABLE}
WHERE user_id = ? AND item_id = ?
""",
(user_id, item_id),
)
row = cursor.fetchone()
quantity = int(row["quantity"]) if row else 0
if quantity == 0:
cursor.execute(
f"""
DELETE FROM {self.USER_ITEMS_TABLE}
WHERE user_id = ? AND item_id = ?
""",
(user_id, item_id),
)
get_db().conn.commit()
return quantity
def _format_backpack_markdown(self, user_items: List[BackpackUserItem]) -> str:
if not user_items:
return "# 🎒 背包一览\n> 当前背包空空如也,去冒险获取一些物品吧!"
lines = ["# 🎒 背包一览"]
sorted_items = sorted(
user_items,
key=lambda item: (
list(BackpackItemTier).index(item.definition.tier),
-item.quantity,
item.definition.name,
),
)
for entry in sorted_items:
tier_label = entry.definition.tier.to_markdown_label(
entry.definition.tier.display_name
)
lines.append(
f"- {tier_label} · {entry.definition.name} × **{entry.quantity}**"
)
return "\n".join(lines)
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
message = self.parse_message_after_at(message).strip()
user_items = self.get_user_items(user_id)
markdown = self._format_backpack_markdown(user_items)
await self.send_markdown_message(markdown, chat_id, user_id)
return None
2025-11-10 22:30:16 +08:00
class WPSItemDescription(WPSAPI):
@override
def dependencies(self) -> List[type]:
return [WPSBackpackSystem]
@override
def is_enable_plugin(self) -> bool:
return True
def __init__(self) -> None:
super().__init__()
from Plugins.WPSCombatSystem.combat_models import ADVENTURE_SOUVENIRS
from Plugins.WPSGardenSystem.garden_models import GARDEN_CROPS
from Plugins.WPSGardenSystem.garden_service import GardenConfig
self._backpack: Optional[WPSBackpackSystem] = None
self._souvenir_prices: Dict[str, int] = {
item_id: sell_price
for item_id, (_, __, sell_price, ___) in ADVENTURE_SOUVENIRS.items()
}
self._garden_crops = GARDEN_CROPS
self._garden_sale_multiplier = GardenConfig.load().sale_multiplier
@override
def wake_up(self) -> None:
self._backpack = Architecture.Get(WPSBackpackSystem)
self.register_plugin("查看")
self.register_plugin("view")
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message("❌ 指令格式:`查看 <物品ID或名称>`", chat_id, user_id)
definition = self._resolve_definition(payload)
if not definition:
return await self.send_markdown_message("❌ 未找到对应物品,请确认输入是否正确", chat_id, user_id)
sale_info = self._get_sale_info(definition.item_id)
markdown = self._format_markdown(definition, sale_info)
return await self.send_markdown_message(markdown, chat_id, user_id)
def _resolve_definition(self, identifier: str) -> Optional[BackpackItemDefinition]:
lowered = identifier.strip().lower()
db = get_db().conn.cursor()
db.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(lowered, lowered),
)
row = db.fetchone()
item_id = row["item_id"] if row else identifier.strip()
backpack = self._backpack or Architecture.Get(WPSBackpackSystem)
try:
return backpack._get_definition(item_id)
except Exception:
return None
def _get_sale_info(self, item_id: str) -> Optional[Dict[str, str]]:
if item_id in self._souvenir_prices:
price = self._souvenir_prices[item_id]
return {
"category": "战斗纪念品",
"price": f"{price} 分/个",
"note": "在战斗系统中出售可立即换取积分。",
}
for crop in self._garden_crops.values():
if crop.fruit_id == item_id:
price = crop.seed_price * self._garden_sale_multiplier
return {
"category": "菜园果实",
"price": f"{price} 分/个",
"note": "可通过 `菜园 售出 <果实> <数量>` 换取积分。",
}
return None
def _format_markdown(
self,
definition: BackpackItemDefinition,
sale_info: Optional[Dict[str, str]],
) -> str:
tier_label = definition.tier.to_markdown_label(definition.tier.display_name)
lines = [
"# 🔍 物品详情",
f"- 名称:{definition.name}",
f"- ID`{definition.item_id}`",
f"- 稀有度:{tier_label}",
f"- 描述:{definition.description or '(暂无描述)'}",
]
if sale_info:
lines.append(f"- 分类:{sale_info['category']}")
lines.append(f"- 系统售价:{sale_info['price']}")
note = sale_info.get("note")
if note:
lines.append(f"- 提示:{note}")
return "\n".join(lines)
2025-11-11 22:32:41 +08:00
if get_internal_debug():
class WPSDebugGiveItem(WPSAPI):
@override
def dependencies(self) -> List[type]:
return [WPSBackpackSystem]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
self.register_plugin("give")
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if payload == "":
return await self.send_markdown_message("❌ 指令格式:`give <物品ID> [数量]`", chat_id, user_id)
tokens = [token.strip() for token in payload.split() if token.strip()]
if not tokens or not tokens[0]:
return await self.send_markdown_message("❌ 指令格式:`give <物品ID> [数量]`", chat_id, user_id)
item_id = tokens[0]
quantity = int(tokens[1]) if len(tokens) > 1 else 1
if quantity <= 0:
return await self.send_markdown_message("❌ 数量必须大于0", chat_id, user_id)
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
backpack.add_item(user_id, item_id, quantity)
return await self.send_markdown_message(f"✅ 成功给予 {quantity}{item_id} 给用户 {user_id}", chat_id, user_id)
2025-11-10 22:30:16 +08:00
__all__ = [
"WPSBackpackSystem",
"BackpackItemTier",
"BackpackItemDefinition",
"WPSItemDescription",
]
2025-11-08 15:34:44 +08:00