Files
NewWPSBot/Plugins/WPSBackpackSystem.py

568 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from PWF.Convention.Runtime.Config import *
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
from PWF.CoreModules.flags import get_internal_debug
from .WPSAPI import GuideEntry, GuideSection, WPSAPI
logger: ProjectConfig = Architecture.Get(ProjectConfig)
class BackpackItemTier(Enum):
"""背包物品等级定义"""
COMMON = ("普通", "#C0C0C0")
RARE = ("稀有", "#4DA6FF")
EPIC = ("史诗", "#B266FF")
LEGENDARY = ("传说", "#FF9933")
def __init__(self, display_name: str, color_hex: str):
self.display_name = display_name
self.color_hex = color_hex
@classmethod
def from_string(cls, value: str) -> "BackpackItemTier":
for tier in cls:
if tier.name == value or tier.display_name == value:
return tier
raise ValueError(f"Unknown backpack item tier: {value}")
def to_markdown_label(self, content: str) -> str:
return f'<font color="{self.color_hex}">{content}</font>'
@dataclass(frozen=True)
class BackpackItemDefinition:
item_id: str
name: str
tier: BackpackItemTier
description: str
@dataclass(frozen=True)
class BackpackUserItem:
item_id: str
quantity: int
definition: BackpackItemDefinition
class WPSBackpackSystem(WPSAPI):
"""背包基础系统,负责物品注册与用户物品存取"""
ITEMS_TABLE = "backpack_items"
USER_ITEMS_TABLE = "backpack_user_items"
def get_guide_subtitle(self) -> str:
return "管理物品注册、背包存储与查询的核心系统"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"物品缓存数": str(len(self._item_cache)),
"数据表": f"{self.ITEMS_TABLE}, {self.USER_ITEMS_TABLE}",
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "背包",
"identifier": "背包",
"description": "以稀有度分组展示用户当前携带物品。",
"metadata": {"别名": "backpack"},
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
tier_labels = ", ".join(tier.display_name for tier in BackpackItemTier)
return (
{
"title": "物品注册",
"description": (
"`register_item(item_id, name, tier, description)` "
"将物品写入背包表,重复调用会更新名称、稀有度和描述。"
),
},
{
"title": "稀有度体系",
"description": f"支持稀有度:{tier_labels},调用 `to_markdown_label` 可渲染彩色标签。",
},
{
"title": "库存操作",
"description": (
"`add_item` / `set_item_quantity` / `_get_user_quantity` "
"确保用户物品数量保持非负,并自动创建记录。"
),
},
)
def __init__(self) -> None:
super().__init__()
self._item_cache: Dict[str, BackpackItemDefinition] = {}
@override
def dependencies(self) -> List[type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def register_db_model(self):
from PWF.CoreModules.plugin_interface import DatabaseModel
models = [
DatabaseModel(
table_name=self.ITEMS_TABLE,
column_defs={
"item_id": "TEXT PRIMARY KEY",
"name": "TEXT NOT NULL",
"tier": "TEXT NOT NULL",
"description": "TEXT NOT NULL DEFAULT ''",
},
),
DatabaseModel(
table_name=self.USER_ITEMS_TABLE,
column_defs={
"user_id": "INTEGER NOT NULL",
"item_id": "TEXT NOT NULL",
"quantity": "INTEGER NOT NULL DEFAULT 0",
"PRIMARY KEY (user_id, item_id)": "",
},
),
]
return models
@override
def wake_up(self) -> None:
db = get_db()
db.define_column(self.ITEMS_TABLE, "description", "TEXT NOT NULL DEFAULT ''")
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSBackpackSystem 插件已加载{ConsoleFrontColor.RESET}",
)
self.register_plugin("背包")
self.register_plugin("backpack")
self._warm_item_cache()
def _warm_item_cache(self) -> None:
cursor = get_db().conn.cursor()
cursor.execute(
f"SELECT item_id, name, tier, description FROM {self.ITEMS_TABLE}"
)
rows = cursor.fetchall()
for row in rows:
tier = BackpackItemTier.from_string(row["tier"])
description = row["description"] or ""
self._item_cache[row["item_id"]] = BackpackItemDefinition(
item_id=row["item_id"],
name=row["name"],
tier=tier,
description=description,
)
def _iter_registered_items(self) -> Sequence[BackpackItemDefinition]:
try:
if not self._item_cache:
self._warm_item_cache()
except Exception:
return ()
return tuple(self._item_cache.values())
def get_items_by_tier(
self,
tier: BackpackItemTier,
*,
blacklist: Optional[Collection[str]] = None,
) -> List[BackpackItemDefinition]:
normalized_blacklist: Set[str] = set()
if blacklist:
normalized_blacklist = {
str(item_id).strip().lower()
for item_id in blacklist
if str(item_id).strip()
}
items: List[BackpackItemDefinition] = []
for definition in self._iter_registered_items():
if definition.tier != tier:
continue
if normalized_blacklist and definition.item_id.lower() in normalized_blacklist:
continue
items.append(definition)
return items
def collect_additional_sections(self) -> Sequence[GuideSection]:
sections = list(super().collect_additional_sections())
item_entries: List[GuideEntry] = []
tier_icons = {
BackpackItemTier.COMMON: "🪙",
BackpackItemTier.RARE: "💠",
BackpackItemTier.EPIC: "",
BackpackItemTier.LEGENDARY: "🌟",
}
for definition in self._iter_registered_items():
item_entries.append(
GuideEntry(
title=definition.name,
identifier=definition.item_id,
description=definition.description or "(暂无描述)",
category="背包物品",
metadata={
"稀有度": definition.tier.display_name,
},
icon=tier_icons.get(definition.tier, "🎁"),
tags=(definition.tier.display_name,),
group=definition.tier.display_name,
)
)
if item_entries:
sections.append(
GuideSection(
title="物品图鉴",
entries=item_entries,
layout="grid",
section_id="backpack-items",
description="当前已注册的背包物品列表,按稀有度分组展示。",
)
)
return tuple(sections)
# region 对外接口
def register_item(
self,
item_id: str,
name: str,
tier: BackpackItemTier,
description: str,
) -> None:
if not item_id or not name:
raise ValueError("item_id and name must be provided")
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.ITEMS_TABLE} (item_id, name, tier, description)
VALUES (?, ?, ?, ?)
ON CONFLICT(item_id) DO UPDATE
SET name = excluded.name,
tier = excluded.tier,
description = excluded.description
""",
(item_id, name, tier.name, description or ""),
)
get_db().conn.commit()
self._item_cache[item_id] = BackpackItemDefinition(
item_id=item_id,
name=name,
tier=tier,
description=description or "",
)
def add_item(
self,
user_id: int,
item_id: str,
delta: int,
) -> int:
if delta == 0:
definition = self._get_definition(item_id)
return self._get_user_quantity(user_id, item_id, definition)
if delta < 0:
raise ValueError("delta must be positive when using add_item")
definition = self._get_definition(item_id)
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity)
VALUES (?, ?, ?)
ON CONFLICT(user_id, item_id) DO UPDATE
SET quantity = {self.USER_ITEMS_TABLE}.quantity + excluded.quantity
""",
(user_id, item_id, delta),
)
get_db().conn.commit()
return self._get_user_quantity(user_id, item_id, definition)
def set_item_quantity(
self,
user_id: int,
item_id: str,
quantity: int,
) -> int:
if quantity < 0:
raise ValueError("quantity must be non-negative")
definition = self._get_definition(item_id)
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity)
VALUES (?, ?, ?)
ON CONFLICT(user_id, item_id) DO UPDATE
SET quantity = excluded.quantity
""",
(user_id, item_id, quantity),
)
get_db().conn.commit()
return self._get_user_quantity(user_id, item_id, definition)
def get_user_items(self, user_id: int) -> List[BackpackUserItem]:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT ui.item_id, ui.quantity, i.name, i.tier, i.description
FROM {self.USER_ITEMS_TABLE} ui
JOIN {self.ITEMS_TABLE} i ON ui.item_id = i.item_id
WHERE ui.user_id = ? AND ui.quantity > 0
ORDER BY ui.quantity DESC
""",
(user_id,),
)
rows = cursor.fetchall()
result: List[BackpackUserItem] = []
for row in rows:
definition = self._item_cache.get(row["item_id"])
if not definition:
try:
definition = BackpackItemDefinition(
item_id=row["item_id"],
name=row["name"],
tier=BackpackItemTier.from_string(row["tier"]),
description=row["description"] or "",
)
except ValueError:
continue
result.append(
BackpackUserItem(
item_id=row["item_id"],
quantity=row["quantity"],
definition=definition,
)
)
return result
# endregion
def _get_definition(self, item_id: str) -> BackpackItemDefinition:
if item_id in self._item_cache:
return self._item_cache[item_id]
cursor = get_db().conn.cursor()
cursor.execute(
f"SELECT item_id, name, tier, description FROM {self.ITEMS_TABLE} WHERE item_id = ?",
(item_id,),
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Item {item_id} not registered in backpack system")
definition = BackpackItemDefinition(
item_id=row["item_id"],
name=row["name"],
tier=BackpackItemTier.from_string(row["tier"]),
description=row["description"] or "",
)
self._item_cache[item_id] = definition
return definition
def _get_user_quantity(
self,
user_id: int,
item_id: str,
definition: BackpackItemDefinition,
) -> int:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT quantity
FROM {self.USER_ITEMS_TABLE}
WHERE user_id = ? AND item_id = ?
""",
(user_id, item_id),
)
row = cursor.fetchone()
quantity = int(row["quantity"]) if row else 0
if quantity == 0:
cursor.execute(
f"""
DELETE FROM {self.USER_ITEMS_TABLE}
WHERE user_id = ? AND item_id = ?
""",
(user_id, item_id),
)
get_db().conn.commit()
return quantity
def _format_backpack_markdown(self, user_items: List[BackpackUserItem]) -> str:
if not user_items:
return "# 🎒 背包一览\n> 当前背包空空如也,去冒险获取一些物品吧!"
lines = ["# 🎒 背包一览"]
sorted_items = sorted(
user_items,
key=lambda item: (
list(BackpackItemTier).index(item.definition.tier),
-item.quantity,
item.definition.name,
),
)
for entry in sorted_items:
tier_label = entry.definition.tier.to_markdown_label(
entry.definition.tier.display_name
)
lines.append(
f"- {tier_label} · {entry.definition.name} × **{entry.quantity}**"
)
return "\n".join(lines)
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
message = self.parse_message_after_at(message).strip()
user_items = self.get_user_items(user_id)
markdown = self._format_backpack_markdown(user_items)
await self.send_markdown_message(markdown, chat_id, user_id)
return None
class WPSItemDescription(WPSAPI):
@override
def dependencies(self) -> List[type]:
return [WPSBackpackSystem]
@override
def is_enable_plugin(self) -> bool:
return True
def __init__(self) -> None:
super().__init__()
from Plugins.WPSCombatSystem.combat_models import ADVENTURE_SOUVENIRS
from Plugins.WPSGardenSystem.garden_models import GARDEN_CROPS
from Plugins.WPSGardenSystem.garden_service import GardenConfig
self._backpack: Optional[WPSBackpackSystem] = None
self._souvenir_prices: Dict[str, int] = {
item_id: sell_price
for item_id, (_, __, sell_price, ___) in ADVENTURE_SOUVENIRS.items()
}
self._garden_crops = GARDEN_CROPS
self._garden_sale_multiplier = GardenConfig.load().sale_multiplier
@override
def wake_up(self) -> None:
self._backpack = Architecture.Get(WPSBackpackSystem)
self.register_plugin("查看")
self.register_plugin("view")
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message("❌ 指令格式:`查看 <物品ID或名称>`", chat_id, user_id)
definition = self._resolve_definition(payload)
if not definition:
return await self.send_markdown_message("❌ 未找到对应物品,请确认输入是否正确", chat_id, user_id)
sale_info = self._get_sale_info(definition.item_id)
markdown = self._format_markdown(definition, sale_info)
return await self.send_markdown_message(markdown, chat_id, user_id)
def _resolve_definition(self, identifier: str) -> Optional[BackpackItemDefinition]:
lowered = identifier.strip().lower()
db = get_db().conn.cursor()
db.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(lowered, lowered),
)
row = db.fetchone()
item_id = row["item_id"] if row else identifier.strip()
backpack = self._backpack or Architecture.Get(WPSBackpackSystem)
try:
return backpack._get_definition(item_id)
except Exception:
return None
def _get_sale_info(self, item_id: str) -> Optional[Dict[str, str]]:
if item_id in self._souvenir_prices:
price = self._souvenir_prices[item_id]
return {
"category": "战斗纪念品",
"price": f"{price} 分/个",
"note": "在战斗系统中出售可立即换取积分。",
}
for crop in self._garden_crops.values():
if crop.fruit_id == item_id:
price = crop.seed_price * self._garden_sale_multiplier
return {
"category": "菜园果实",
"price": f"{price} 分/个",
"note": "可通过 `菜园 售出 <果实> <数量>` 换取积分。",
}
return None
def _format_markdown(
self,
definition: BackpackItemDefinition,
sale_info: Optional[Dict[str, str]],
) -> str:
tier_label = definition.tier.to_markdown_label(definition.tier.display_name)
lines = [
"# 🔍 物品详情",
f"- 名称:{definition.name}",
f"- ID`{definition.item_id}`",
f"- 稀有度:{tier_label}",
f"- 描述:{definition.description or '(暂无描述)'}",
]
if sale_info:
lines.append(f"- 分类:{sale_info['category']}")
lines.append(f"- 系统售价:{sale_info['price']}")
note = sale_info.get("note")
if note:
lines.append(f"- 提示:{note}")
return "\n".join(lines)
if get_internal_debug():
class WPSDebugGiveItem(WPSAPI):
@override
def dependencies(self) -> List[type]:
return [WPSBackpackSystem]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
self.register_plugin("give")
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if payload == "":
return await self.send_markdown_message("❌ 指令格式:`give <物品ID> [数量]`", chat_id, user_id)
tokens = [token.strip() for token in payload.split() if token.strip()]
if not tokens or not tokens[0]:
return await self.send_markdown_message("❌ 指令格式:`give <物品ID> [数量]`", chat_id, user_id)
item_id = tokens[0]
quantity = int(tokens[1]) if len(tokens) > 1 else 1
if quantity <= 0:
return await self.send_markdown_message("❌ 数量必须大于0", chat_id, user_id)
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
backpack.add_item(user_id, item_id, quantity)
return await self.send_markdown_message(f"✅ 成功给予 {quantity}{item_id} 给用户 {user_id}", chat_id, user_id)
__all__ = [
"WPSBackpackSystem",
"BackpackItemTier",
"BackpackItemDefinition",
"WPSItemDescription",
]