Files
NewWPSBot/Plugins/WPSBackpackSystem.py
2025-11-08 15:34:44 +08:00

306 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional, Tuple, override
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
from .WPSAPI import WPSAPI
logger: ProjectConfig = Architecture.Get(ProjectConfig)
class BackpackItemTier(Enum):
"""背包物品等级定义"""
COMMON = ("普通", "#C0C0C0")
RARE = ("稀有", "#4DA6FF")
EPIC = ("史诗", "#B266FF")
LEGENDARY = ("传说", "#FF9933")
def __init__(self, display_name: str, color_hex: str):
self.display_name = display_name
self.color_hex = color_hex
@classmethod
def from_string(cls, value: str) -> "BackpackItemTier":
for tier in cls:
if tier.name == value or tier.display_name == value:
return tier
raise ValueError(f"Unknown backpack item tier: {value}")
def to_markdown_label(self, content: str) -> str:
return f'<font color="{self.color_hex}">{content}</font>'
@dataclass(frozen=True)
class BackpackItemDefinition:
item_id: str
name: str
tier: BackpackItemTier
@dataclass(frozen=True)
class BackpackUserItem:
item_id: str
quantity: int
definition: BackpackItemDefinition
class WPSBackpackSystem(WPSAPI):
"""背包基础系统,负责物品注册与用户物品存取"""
ITEMS_TABLE = "backpack_items"
USER_ITEMS_TABLE = "backpack_user_items"
def __init__(self) -> None:
super().__init__()
self._item_cache: Dict[str, BackpackItemDefinition] = {}
@override
def dependencies(self) -> List[type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def register_db_model(self):
from PWF.CoreModules.plugin_interface import DatabaseModel
return [
DatabaseModel(
table_name=self.ITEMS_TABLE,
column_defs={
"item_id": "TEXT PRIMARY KEY",
"name": "TEXT NOT NULL",
"tier": "TEXT NOT NULL",
},
),
DatabaseModel(
table_name=self.USER_ITEMS_TABLE,
column_defs={
"user_id": "INTEGER NOT NULL",
"item_id": "TEXT NOT NULL",
"quantity": "INTEGER NOT NULL DEFAULT 0",
"PRIMARY KEY (user_id, item_id)": "",
},
),
]
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSBackpackSystem 插件已加载{ConsoleFrontColor.RESET}",
)
self.register_plugin("背包")
self.register_plugin("backpack")
self._warm_item_cache()
def _warm_item_cache(self) -> None:
cursor = get_db().conn.cursor()
cursor.execute(
f"SELECT item_id, name, tier FROM {self.ITEMS_TABLE}"
)
rows = cursor.fetchall()
for row in rows:
tier = BackpackItemTier.from_string(row["tier"])
self._item_cache[row["item_id"]] = BackpackItemDefinition(
item_id=row["item_id"],
name=row["name"],
tier=tier,
)
# region 对外接口
def register_item(
self,
item_id: str,
name: str,
tier: BackpackItemTier,
) -> None:
if not item_id or not name:
raise ValueError("item_id and name must be provided")
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.ITEMS_TABLE} (item_id, name, tier)
VALUES (?, ?, ?)
ON CONFLICT(item_id) DO UPDATE
SET name = excluded.name,
tier = excluded.tier
""",
(item_id, name, tier.name),
)
get_db().conn.commit()
self._item_cache[item_id] = BackpackItemDefinition(item_id, name, tier)
def add_item(
self,
user_id: int,
item_id: str,
delta: int,
) -> int:
if delta == 0:
definition = self._get_definition(item_id)
return self._get_user_quantity(user_id, item_id, definition)
if delta < 0:
raise ValueError("delta must be positive when using add_item")
definition = self._get_definition(item_id)
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity)
VALUES (?, ?, ?)
ON CONFLICT(user_id, item_id) DO UPDATE
SET quantity = {self.USER_ITEMS_TABLE}.quantity + excluded.quantity
""",
(user_id, item_id, delta),
)
get_db().conn.commit()
return self._get_user_quantity(user_id, item_id, definition)
def set_item_quantity(
self,
user_id: int,
item_id: str,
quantity: int,
) -> int:
if quantity < 0:
raise ValueError("quantity must be non-negative")
definition = self._get_definition(item_id)
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.USER_ITEMS_TABLE} (user_id, item_id, quantity)
VALUES (?, ?, ?)
ON CONFLICT(user_id, item_id) DO UPDATE
SET quantity = excluded.quantity
""",
(user_id, item_id, quantity),
)
get_db().conn.commit()
return self._get_user_quantity(user_id, item_id, definition)
def get_user_items(self, user_id: int) -> List[BackpackUserItem]:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT ui.item_id, ui.quantity, i.name, i.tier
FROM {self.USER_ITEMS_TABLE} ui
JOIN {self.ITEMS_TABLE} i ON ui.item_id = i.item_id
WHERE ui.user_id = ? AND ui.quantity > 0
ORDER BY ui.quantity DESC
""",
(user_id,),
)
rows = cursor.fetchall()
result: List[BackpackUserItem] = []
for row in rows:
definition = self._item_cache.get(row["item_id"])
if not definition:
try:
definition = BackpackItemDefinition(
item_id=row["item_id"],
name=row["name"],
tier=BackpackItemTier.from_string(row["tier"]),
)
except ValueError:
continue
result.append(
BackpackUserItem(
item_id=row["item_id"],
quantity=row["quantity"],
definition=definition,
)
)
return result
# endregion
def _get_definition(self, item_id: str) -> BackpackItemDefinition:
if item_id in self._item_cache:
return self._item_cache[item_id]
cursor = get_db().conn.cursor()
cursor.execute(
f"SELECT item_id, name, tier FROM {self.ITEMS_TABLE} WHERE item_id = ?",
(item_id,),
)
row = cursor.fetchone()
if not row:
raise ValueError(f"Item {item_id} not registered in backpack system")
definition = BackpackItemDefinition(
item_id=row["item_id"],
name=row["name"],
tier=BackpackItemTier.from_string(row["tier"]),
)
self._item_cache[item_id] = definition
return definition
def _get_user_quantity(
self,
user_id: int,
item_id: str,
definition: BackpackItemDefinition,
) -> int:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT quantity
FROM {self.USER_ITEMS_TABLE}
WHERE user_id = ? AND item_id = ?
""",
(user_id, item_id),
)
row = cursor.fetchone()
quantity = int(row["quantity"]) if row else 0
if quantity == 0:
cursor.execute(
f"""
DELETE FROM {self.USER_ITEMS_TABLE}
WHERE user_id = ? AND item_id = ?
""",
(user_id, item_id),
)
get_db().conn.commit()
return quantity
def _format_backpack_markdown(self, user_items: List[BackpackUserItem]) -> str:
if not user_items:
return "# 🎒 背包一览\n> 当前背包空空如也,去冒险获取一些物品吧!"
lines = ["# 🎒 背包一览"]
sorted_items = sorted(
user_items,
key=lambda item: (
list(BackpackItemTier).index(item.definition.tier),
-item.quantity,
item.definition.name,
),
)
for entry in sorted_items:
tier_label = entry.definition.tier.to_markdown_label(
entry.definition.tier.display_name
)
lines.append(
f"- {tier_label} · {entry.definition.name} × **{entry.quantity}**"
)
return "\n".join(lines)
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
message = self.parse_message_after_at(message).strip()
user_items = self.get_user_items(user_id)
markdown = self._format_backpack_markdown(user_items)
await self.send_markdown_message(markdown, chat_id, user_id)
return None
__all__ = ["WPSBackpackSystem", "BackpackItemTier", "BackpackItemDefinition"]