Files
NewWPSBot/Plugins/WPSStoreSystem.py
2025-11-08 23:26:40 +08:00

1012 lines
35 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import hashlib
import random
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, override
from uuid import uuid4
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
from PWF.CoreModules.plugin_interface import DatabaseModel
from pydantic import BaseModel, Field
from .WPSAPI import WPSAPI
from .WPSBackpackSystem import BackpackItemDefinition, WPSBackpackSystem
from .WPSConfigSystem import WPSConfigAPI
logger: ProjectConfig = Architecture.Get(ProjectConfig)
class StoreItemSource(Enum):
SYSTEM = "system"
PERMANENT = "permanent"
PLAYER = "player"
class StoreMode(BaseModel):
mode_id: str
item_id: str
price: int
limit_amount: int
is_permanent: bool = False
metadata: Dict[str, str] = Field(default_factory=dict)
registered_at: datetime
class Config:
allow_mutation = False
class StoreEntry(BaseModel):
entry_id: int
mode_id: str
item_id: str
display_name: str
price: int
limit_amount: int
remaining_amount: int
source: StoreItemSource
created_at: datetime
registry_version: int
class Config:
allow_mutation = False
class PlayerListing(BaseModel):
user_id: int
item_id: str
price: int
quantity: int
created_at: datetime
status: str
class Config:
allow_mutation = False
class WPSStoreSystem(WPSAPI):
SYSTEM_TABLE = "store_system_items"
PLAYER_TABLE = "store_user_listings"
def __init__(self) -> None:
super().__init__()
self._mode_registry: Dict[str, StoreMode] = {}
self._registry_version: int = 0
self._default_hourly_count: int = int(
logger.FindItem("store_hourly_count", 5)
)
logger.SaveProperties()
self._permanent_mode_ids: set[str] = set()
@override
def dependencies(self) -> List[type]:
return [WPSAPI, WPSConfigAPI, WPSBackpackSystem]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def register_db_model(self):
return [
DatabaseModel(
table_name=self.SYSTEM_TABLE,
column_defs={
"entry_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
"mode_id": "TEXT UNIQUE",
"item_id": "TEXT NOT NULL",
"display_name": "TEXT NOT NULL",
"price": "INTEGER NOT NULL",
"limit_amount": "INTEGER",
"remaining_amount": "INTEGER",
"source": "TEXT NOT NULL",
"created_at": "TEXT NOT NULL",
"registry_version": "INTEGER NOT NULL",
},
),
DatabaseModel(
table_name=self.PLAYER_TABLE,
column_defs={
"user_id": "INTEGER PRIMARY KEY",
"item_id": "TEXT NOT NULL",
"price": "INTEGER NOT NULL",
"quantity": "INTEGER NOT NULL",
"created_at": "TEXT NOT NULL",
"status": "TEXT NOT NULL",
},
),
]
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSStoreSystem 插件已加载{ConsoleFrontColor.RESET}",
)
for alias in ["store", "商店"]:
self.register_plugin(alias)
self._cleanup_state()
def _cleanup_state(self) -> None:
cursor = get_db().conn.cursor()
# 移除无效的系统商品条目
cursor.execute(
f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ? AND remaining_amount = 0",
(StoreItemSource.SYSTEM.value,),
)
# 清理玩家非激活记录
cursor.execute(
f"DELETE FROM {self.PLAYER_TABLE} WHERE status != ?",
("active",),
)
get_db().conn.commit()
# region 模式注册接口
def register_mode(
self,
item_id: str,
price: int,
limit_amount: int,
*,
is_permanent: bool = False,
metadata: Optional[Dict[str, str]] = None,
) -> str:
if not item_id:
raise ValueError("item_id must be provided")
if price <= 0:
raise ValueError("price must be positive")
if limit_amount == 0:
raise ValueError("limit_amount must not be zero")
if limit_amount < -1:
raise ValueError("limit_amount must be -1 or positive")
backpack = Architecture.Get(WPSBackpackSystem)
try:
item_def = backpack._get_definition(item_id) # type: ignore[attr-defined]
except Exception as exc:
raise ValueError(f"Item {item_id} not registered in backpack system") from exc
mode_id = uuid4().hex
metadata = dict(metadata or {})
self._mode_registry[mode_id] = StoreMode(
mode_id=mode_id,
item_id=item_id,
price=price,
limit_amount=limit_amount,
is_permanent=is_permanent,
metadata=metadata,
registered_at=datetime.now(timezone.utc),
)
if is_permanent:
self._permanent_mode_ids.add(mode_id)
self._bump_registry_version()
logger.Log(
"Info",
f"{ConsoleFrontColor.CYAN}Store mode {mode_id} registered for item {item_def.name} ({item_id}){ConsoleFrontColor.RESET}",
)
return mode_id
def register_permanent_mode(
self,
item_id: str,
price: int,
*,
limit_amount: int = -1,
metadata: Optional[Dict[str, str]] = None,
) -> str:
return self.register_mode(
item_id=item_id,
price=price,
limit_amount=limit_amount,
is_permanent=True,
metadata=metadata,
)
def unregister_mode(self, mode_id: str) -> bool:
mode = self._mode_registry.pop(mode_id, None)
if not mode:
return False
if mode.is_permanent and mode_id in self._permanent_mode_ids:
self._permanent_mode_ids.remove(mode_id)
self._bump_registry_version()
logger.Log(
"Info",
f"{ConsoleFrontColor.YELLOW}Store mode {mode_id} unregistered{ConsoleFrontColor.RESET}",
)
return True
def list_modes(self) -> List[StoreMode]:
return list(self._mode_registry.values())
def _bump_registry_version(self) -> None:
self._registry_version += 1
def _get_permanent_modes(self) -> List[StoreMode]:
return [
mode
for mode in self._mode_registry.values()
if mode.is_permanent
]
def _get_candidate_modes(self) -> List[StoreMode]:
return [
mode
for mode in self._mode_registry.values()
if not mode.is_permanent
]
# endregion
# region 刷新与数据同步
def _current_hour_key(self) -> str:
return datetime.now(timezone.utc).strftime("%Y%m%d%H")
def _ensure_hour_snapshot(self) -> None:
hour_key = self._current_hour_key()
if self._should_refresh(hour_key):
self._rebuild_system_entries(hour_key)
self._sync_permanent_entries()
def _should_refresh(self, hour_key: str) -> bool:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT created_at, registry_version
FROM {self.SYSTEM_TABLE}
WHERE source = ?
LIMIT 1
""",
(StoreItemSource.SYSTEM.value,),
)
row = cursor.fetchone()
if not row:
return True
try:
created_at = datetime.fromisoformat(row["created_at"])
except ValueError:
return True
current_hour = created_at.strftime("%Y%m%d%H")
registry_version = int(row["registry_version"])
return current_hour != hour_key or registry_version != self._registry_version
def _rebuild_system_entries(self, hour_key: str) -> None:
cursor = get_db().conn.cursor()
cursor.execute(
f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ?",
(StoreItemSource.SYSTEM.value,),
)
candidates = self._get_candidate_modes()
if not candidates:
get_db().conn.commit()
return
seed = self._compute_seed(hour_key)
rng = random.Random(seed)
rng.shuffle(candidates)
selection = candidates[: self._default_hourly_count]
now = datetime.now(timezone.utc).isoformat()
backpack = Architecture.Get(WPSBackpackSystem)
for mode in selection:
definition = backpack._get_definition(mode.item_id) # type: ignore[attr-defined]
remaining = mode.limit_amount if mode.limit_amount >= 0 else -1
cursor.execute(
f"""
INSERT INTO {self.SYSTEM_TABLE} (
mode_id, item_id, display_name, price,
limit_amount, remaining_amount, source,
created_at, registry_version
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
mode.mode_id,
mode.item_id,
definition.name,
mode.price,
mode.limit_amount,
remaining,
StoreItemSource.SYSTEM.value,
now,
self._registry_version,
),
)
get_db().conn.commit()
def _sync_permanent_entries(self) -> None:
cursor = get_db().conn.cursor()
permanent_modes = self._get_permanent_modes()
permanent_ids = [mode.mode_id for mode in permanent_modes]
if permanent_ids:
placeholders = ", ".join("?" for _ in permanent_ids)
cursor.execute(
f"""
DELETE FROM {self.SYSTEM_TABLE}
WHERE source = ? AND mode_id NOT IN ({placeholders})
""",
(StoreItemSource.PERMANENT.value, *permanent_ids),
)
else:
cursor.execute(
f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ?",
(StoreItemSource.PERMANENT.value,),
)
now = datetime.now(timezone.utc).isoformat()
backpack = Architecture.Get(WPSBackpackSystem)
for mode in permanent_modes:
definition = backpack._get_definition(mode.item_id) # type: ignore[attr-defined]
cursor.execute(
f"""
INSERT INTO {self.SYSTEM_TABLE} (
mode_id, item_id, display_name, price,
limit_amount, remaining_amount, source,
created_at, registry_version
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(mode_id) DO UPDATE SET
item_id = excluded.item_id,
display_name = excluded.display_name,
price = excluded.price,
limit_amount = excluded.limit_amount,
remaining_amount = excluded.remaining_amount,
created_at = excluded.created_at,
registry_version = excluded.registry_version
""",
(
mode.mode_id,
mode.item_id,
definition.name,
mode.price,
mode.limit_amount,
-1 if mode.limit_amount == -1 else mode.limit_amount,
StoreItemSource.PERMANENT.value,
now,
self._registry_version,
),
)
get_db().conn.commit()
def _compute_seed(self, hour_key: str) -> int:
payload = f"{hour_key}|{self._registry_version}".encode("utf-8")
digest = hashlib.sha256(payload).digest()
return int.from_bytes(digest[:8], "big")
# endregion
# region 指令处理
STORE_COMMANDS = {"store", "商店"}
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
_ = self.parse_message_after_at(message).strip()
response = await self._handle_store(chat_id, user_id)
return await self.send_markdown_message(response, chat_id, user_id)
async def _handle_store(self, chat_id: int, user_id: int) -> str:
self._ensure_hour_snapshot()
system_entries, permanent_entries = self._fetch_system_entries()
player_listings = self._fetch_player_listings()
return self._format_store_markdown(
user_id=user_id,
system_entries=system_entries,
permanent_entries=permanent_entries,
player_listings=player_listings,
)
async def purchase_item(
self,
*,
chat_id: int,
user_id: int,
identifier: str,
quantity: int,
) -> str:
identifier = identifier.strip()
if not identifier:
return "❌ 购买指令格式错误请提供物品名称或ID"
if quantity <= 0:
return "❌ 购买数量必须大于0"
self._ensure_hour_snapshot()
system_entries, permanent_entries = self._fetch_system_entries()
player_listings = self._fetch_player_listings()
target = self._resolve_system_entry(identifier, system_entries, permanent_entries)
if target:
return await self._purchase_system_entry(
entry=target,
quantity=quantity,
chat_id=chat_id,
user_id=user_id,
)
listing = await self._resolve_player_listing(identifier, player_listings)
if listing:
return await self._purchase_player_listing(
listing=listing,
quantity=quantity,
chat_id=chat_id,
user_id=user_id,
)
return "❌ 未找到匹配的商品请确认名称或物品ID是否正确"
async def sell_item(
self,
*,
chat_id: int,
user_id: int,
identifier: str,
quantity: int,
) -> str:
identifier = identifier.strip()
if not identifier:
return "❌ 出售指令格式错误请提供物品名称或ID"
if quantity < 0:
return "❌ 出售数量必须大于或等于0"
backpack = Architecture.Get(WPSBackpackSystem)
item_id, definition = self._resolve_item(identifier)
if item_id is None:
return "❌ 未找到对应物品,请确认物品已注册且名称/ID 正确"
existing_listing = self._get_user_listing(user_id)
now_iso = datetime.now(timezone.utc).isoformat()
if existing_listing and existing_listing.status == "active":
if existing_listing.item_id != item_id:
return (
"❌ 你已在出售其他物品,如需更换请先使用 `出售 <当前物品ID> 0` 撤下"
)
if quantity == 0:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
UPDATE {self.PLAYER_TABLE}
SET quantity = 0, status = ?, created_at = ?
WHERE user_id = ?
""",
("closed", now_iso, user_id),
)
get_db().conn.commit()
backpack.add_item(user_id, item_id, existing_listing.quantity)
return f"✅ 已撤下出售的 {definition.name},返还数量 {existing_listing.quantity}"
diff = quantity - existing_listing.quantity
if diff > 0:
available_qty = self._get_user_inventory_quantity(user_id, item_id)
if available_qty < diff:
return f"❌ 背包数量不足,需要额外 {diff}{definition.name}"
backpack.set_item_quantity(user_id, item_id, available_qty - diff)
elif diff < 0:
backpack.add_item(user_id, item_id, -diff)
try:
price = self._derive_player_price(item_id)
except ValueError as exc:
return f"❌ 当前物品 {definition.name} 未设置售价,无法出售({exc})"
cursor = get_db().conn.cursor()
cursor.execute(
f"""
UPDATE {self.PLAYER_TABLE}
SET quantity = ?, price = ?, created_at = ?, status = ?
WHERE user_id = ?
""",
(quantity, price, now_iso, "active", user_id),
)
get_db().conn.commit()
return f"✅ 已更新出售数量为 {quantity}{definition.name},售价 {price} 分/个"
if quantity == 0:
return " 当前没有在出售的物品"
available_qty = self._get_user_inventory_quantity(user_id, item_id)
if available_qty < quantity:
return f"❌ 背包数量不足,当前拥有 {available_qty}{definition.name}"
try:
price = self._derive_player_price(item_id)
except ValueError as exc:
return f"❌ 当前物品 {definition.name} 未设置售价,无法出售({exc})"
backpack.set_item_quantity(user_id, item_id, available_qty - quantity)
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.PLAYER_TABLE} (user_id, item_id, price, quantity, created_at, status)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
item_id = excluded.item_id,
price = excluded.price,
quantity = excluded.quantity,
created_at = excluded.created_at,
status = excluded.status
""",
(user_id, item_id, price, quantity, now_iso, "active"),
)
get_db().conn.commit()
return f"✅ 已上架 {definition.name} × {quantity},售价 {price} 分/个"
def _help_message(self) -> str:
return """# 🛒 商店指令帮助
- `商店`:查看当前系统商品与玩家出售列表
- `购买 <物品名称或ID> <数量>`:购买指定商品
- `出售 <物品名称或ID> <数量>`:上架自己的出售物品(限一种)
"""
def _fetch_system_entries(self) -> Tuple[List[StoreEntry], List[StoreEntry]]:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT entry_id, mode_id, item_id, display_name, price,
limit_amount, remaining_amount, source,
created_at, registry_version
FROM {self.SYSTEM_TABLE}
ORDER BY source DESC, entry_id ASC
"""
)
rows = cursor.fetchall()
system_entries: List[StoreEntry] = []
permanent_entries: List[StoreEntry] = []
for row in rows:
created_at = self._parse_datetime(row["created_at"])
entry = StoreEntry(
entry_id=row["entry_id"],
mode_id=row["mode_id"],
item_id=row["item_id"],
display_name=row["display_name"],
price=row["price"],
limit_amount=row["limit_amount"],
remaining_amount=row["remaining_amount"],
source=StoreItemSource(row["source"]),
created_at=created_at,
registry_version=row["registry_version"],
)
if entry.source == StoreItemSource.PERMANENT:
permanent_entries.append(entry)
elif entry.source == StoreItemSource.SYSTEM:
system_entries.append(entry)
return system_entries, permanent_entries
def _fetch_player_listings(self) -> List[PlayerListing]:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT user_id, item_id, price, quantity, created_at, status
FROM {self.PLAYER_TABLE}
WHERE status = ?
ORDER BY created_at ASC
""",
("active",),
)
rows = cursor.fetchall()
listings: List[PlayerListing] = []
for row in rows:
listings.append(
PlayerListing(
user_id=row["user_id"],
item_id=row["item_id"],
price=row["price"],
quantity=row["quantity"],
created_at=self._parse_datetime(row["created_at"]),
status=row["status"],
)
)
return listings
def _format_store_markdown(
self,
*,
user_id: int,
system_entries: List[StoreEntry],
permanent_entries: List[StoreEntry],
player_listings: List[PlayerListing],
) -> str:
lines: List[str] = ["# 🏬 每小时商店"]
if permanent_entries:
lines.append("## ♾️ 常驻商品")
for entry in permanent_entries:
limit_text = (
"不限量"
if entry.limit_amount == -1
else f"限购 {entry.limit_amount}"
)
lines.append(
f"- `{entry.item_id}` · {entry.display_name}|价格 {entry.price} 分|{limit_text}"
)
if system_entries:
lines.append("## ⏱️ 本时段商品")
for entry in system_entries:
if entry.remaining_amount == -1:
stock_text = "剩余:不限量"
else:
stock_text = f"剩余:{entry.remaining_amount}"
lines.append(
f"- `{entry.item_id}` · {entry.display_name}|价格 {entry.price} 分|{stock_text}"
)
if not permanent_entries and not system_entries:
lines.append("> ⚠️ 当前没有可售的系统商品")
lines.append("## 👥 玩家出售")
if player_listings:
for listing in player_listings:
owner = "" if listing.user_id == user_id else f"玩家 {listing.user_id}"
lines.append(
f"- {owner}`{listing.item_id}`|数量 {listing.quantity}|价格 {listing.price}"
)
else:
lines.append("> 当前暂无玩家出售信息")
lines.append("\n---\n" + self._help_message())
return "\n".join(lines)
def _parse_datetime(self, value: str) -> datetime:
try:
return datetime.fromisoformat(value)
except ValueError:
return datetime.now(timezone.utc)
# endregion
# region 购买逻辑
async def _purchase_system_entry(
self,
*,
entry: StoreEntry,
quantity: int,
chat_id: int,
user_id: int,
) -> str:
total_price = entry.price * quantity
config_api = Architecture.Get(WPSConfigAPI)
user_points = Architecture.Get(WPSConfigAPI).get_user_points(chat_id, user_id)
if user_points < total_price:
return f"❌ 积分不足,需要 {total_price} 分,当前仅有 {user_points}"
cursor = get_db().conn.cursor()
if entry.remaining_amount != -1:
cursor.execute(
f"""
UPDATE {self.SYSTEM_TABLE}
SET remaining_amount = remaining_amount - ?
WHERE entry_id = ? AND remaining_amount >= ?
""",
(quantity, entry.entry_id, quantity),
)
if cursor.rowcount == 0:
get_db().conn.rollback()
return "❌ 库存不足,无法完成购买"
get_db().conn.commit()
new_points = await config_api.adjust_user_points(
chat_id, user_id, -total_price, reason=f"购买 {entry.display_name}"
)
backpack = Architecture.Get(WPSBackpackSystem)
backpack.add_item(user_id, entry.item_id, quantity)
return (
f"✅ 成功购买 {entry.display_name} × {quantity},花费 {total_price}\n"
f"当前剩余积分:{new_points}"
)
async def _resolve_player_listing(
self,
identifier: str,
listings: List[PlayerListing],
) -> Optional[PlayerListing]:
if not listings:
return None
backpack = Architecture.Get(WPSBackpackSystem)
identifier_lower = identifier.lower()
for listing in listings:
if listing.quantity <= 0:
continue
if listing.item_id.lower() == identifier_lower:
return listing
try:
definition = backpack._get_definition(listing.item_id) # type: ignore[attr-defined]
except Exception:
continue
if definition.name.lower() == identifier_lower:
return listing
return None
async def _purchase_player_listing(
self,
*,
listing: PlayerListing,
quantity: int,
chat_id: int,
user_id: int,
) -> str:
if quantity <= 0:
return "❌ 购买数量必须大于0"
if listing.quantity < quantity:
return "❌ 玩家出售库存不足"
if listing.user_id == user_id:
return "❌ 无法购买自己上架的商品"
total_price = listing.price * quantity
config_api = Architecture.Get(WPSConfigAPI)
buyer_points = config_api.get_user_points(chat_id, user_id)
if buyer_points < total_price:
return f"❌ 积分不足,需要 {total_price} 分,当前仅有 {buyer_points}"
cursor = get_db().conn.cursor()
cursor.execute(
f"""
UPDATE {self.PLAYER_TABLE}
SET quantity = quantity - ?
WHERE user_id = ? AND quantity >= ?
""",
(quantity, listing.user_id, quantity),
)
if cursor.rowcount == 0:
get_db().conn.rollback()
return "❌ 玩家出售库存不足或已被其他人购买"
cursor.execute(
f"""
UPDATE {self.PLAYER_TABLE}
SET status = ?
WHERE user_id = ? AND quantity <= 0
""",
("closed", listing.user_id),
)
get_db().conn.commit()
buyer_new_points = await config_api.adjust_user_points(
chat_id, user_id, -total_price, reason="购买玩家商品"
)
await config_api.adjust_user_points(
chat_id,
listing.user_id,
total_price,
reason="玩家出售结算",
)
backpack = Architecture.Get(WPSBackpackSystem)
backpack.add_item(user_id, listing.item_id, quantity)
definition = backpack._get_definition(listing.item_id) # type: ignore[attr-defined]
return (
f"✅ 成功购买玩家商品 {definition.name} × {quantity},花费 {total_price}\n"
f"当前剩余积分:{buyer_new_points}"
)
def _resolve_system_entry(
self,
identifier: str,
system_entries: List[StoreEntry],
permanent_entries: List[StoreEntry],
) -> Optional[StoreEntry]:
identifier_lower = identifier.lower()
for entry in permanent_entries + system_entries:
if entry.item_id.lower() == identifier_lower:
return entry
if entry.display_name.lower() == identifier_lower:
return entry
return None
def _get_user_listing(self, user_id: int) -> Optional[PlayerListing]:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT user_id, item_id, price, quantity, created_at, status
FROM {self.PLAYER_TABLE}
WHERE user_id = ?
""",
(user_id,),
)
row = cursor.fetchone()
if not row:
return None
return PlayerListing(
user_id=row["user_id"],
item_id=row["item_id"],
price=row["price"],
quantity=row["quantity"],
created_at=self._parse_datetime(row["created_at"]),
status=row["status"],
)
def _derive_player_price(self, item_id: str) -> int:
prices = [
mode.price
for mode in self._mode_registry.values()
if mode.item_id == item_id
]
if not prices:
raise ValueError(
f"Item {item_id} has no registered store mode to derive price"
)
return min(prices)
def _resolve_item(
self,
identifier: str,
) -> Tuple[Optional[str], Optional["BackpackItemDefinition"]]:
backpack = Architecture.Get(WPSBackpackSystem)
identifier_lower = identifier.lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(identifier_lower, identifier_lower),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier
try:
definition = backpack._get_definition(item_id) # type: ignore[attr-defined]
return definition.item_id, definition
except Exception:
return None, None
def _get_user_inventory_quantity(self, user_id: int, item_id: str) -> int:
backpack = Architecture.Get(WPSBackpackSystem)
for item in backpack.get_user_items(user_id):
if item.item_id == item_id:
return item.quantity
return 0
def get_store_snapshot(self) -> Dict[str, Any]:
self._ensure_hour_snapshot()
system_entries, permanent_entries = self._fetch_system_entries()
player_listings = self._fetch_player_listings()
return {
"system": [self._entry_to_dict(entry) for entry in system_entries],
"permanent": [self._entry_to_dict(entry) for entry in permanent_entries],
"players": [self._listing_to_dict(listing) for listing in player_listings],
}
def get_user_listing_snapshot(self, user_id: int) -> Optional[Dict[str, Any]]:
listing = self._get_user_listing(user_id)
if not listing or listing.status != "active":
return None
return self._listing_to_dict(listing)
def _entry_to_dict(self, entry: StoreEntry) -> Dict[str, Any]:
return {
"entry_id": entry.entry_id,
"mode_id": entry.mode_id,
"item_id": entry.item_id,
"display_name": entry.display_name,
"price": entry.price,
"limit_amount": entry.limit_amount,
"remaining_amount": entry.remaining_amount,
"source": entry.source.value,
"created_at": entry.created_at.isoformat(),
"registry_version": entry.registry_version,
}
def _listing_to_dict(self, listing: PlayerListing) -> Dict[str, Any]:
return {
"user_id": listing.user_id,
"item_id": listing.item_id,
"price": listing.price,
"quantity": listing.quantity,
"created_at": listing.created_at.isoformat(),
"status": listing.status,
}
# endregion
# endregion
class WPSStoreBuyCommand(WPSAPI):
@override
def dependencies(self) -> list[type]:
return [WPSStoreSystem]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSStoreBuyCommand 插件已加载{ConsoleFrontColor.RESET}",
)
for alias in ["buy", "购买"]:
self.register_plugin(alias)
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
message = self.parse_message_after_at(message).strip()
if not message:
return await self._send_error(
"❌ 购买指令格式错误,请使用:`购买 <物品名称或ID> <数量>`",
chat_id,
user_id,
)
tokens = [token.strip() for token in message.split() if token.strip()]
if len(tokens) < 2:
return await self._send_error(
"❌ 购买指令格式错误,请使用:`购买 <物品名称或ID> <数量>`",
chat_id,
user_id,
)
identifier = " ".join(tokens[:-1]).strip()
quantity_token = tokens[-1]
try:
quantity = int(quantity_token)
except ValueError:
return await self._send_error("❌ 购买数量必须是整数", chat_id, user_id)
store_api = Architecture.Get(WPSStoreSystem)
response = await store_api.purchase_item(
chat_id=chat_id,
user_id=user_id,
identifier=identifier,
quantity=quantity,
)
return await self.send_markdown_message(response, chat_id, user_id)
async def _send_error(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
return await self.send_markdown_message(message, chat_id, user_id)
class WPSStoreSellCommand(WPSAPI):
@override
def dependencies(self) -> list[type]:
return [WPSStoreSystem]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSStoreSellCommand 插件已加载{ConsoleFrontColor.RESET}",
)
for alias in ["sell", "出售"]:
self.register_plugin(alias)
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
message = self.parse_message_after_at(message).strip()
if not message:
return await self._send_error(
"❌ 出售指令格式错误,请使用:`出售 <物品名称或ID> <数量>`",
chat_id,
user_id,
)
tokens = [token.strip() for token in message.split() if token.strip()]
if len(tokens) < 2:
return await self._send_error(
"❌ 出售指令格式错误,请使用:`出售 <物品名称或ID> <数量>`",
chat_id,
user_id,
)
identifier = " ".join(tokens[:-1]).strip()
quantity_token = tokens[-1]
try:
quantity = int(quantity_token)
except ValueError:
return await self._send_error("❌ 出售数量必须是整数", chat_id, user_id)
store_api = Architecture.Get(WPSStoreSystem)
response = await store_api.sell_item(
chat_id=chat_id,
user_id=user_id,
identifier=identifier,
quantity=quantity,
)
return await self.send_markdown_message(response, chat_id, user_id)
async def _send_error(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
return await self.send_markdown_message(message, chat_id, user_id)
__all__ = ["WPSStoreSystem"]