Files
NewWPSBot/Plugins/WPSStoreSystem.py
2025-11-12 22:58:36 +08:00

1175 lines
41 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import hashlib
import random
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, override
from uuid import uuid4
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
from PWF.CoreModules.plugin_interface import DatabaseModel
from pydantic import BaseModel, Field
from .WPSAPI import WPSAPI
from .WPSBackpackSystem import BackpackItemDefinition, WPSBackpackSystem
from .WPSConfigSystem import WPSConfigAPI
logger: ProjectConfig = Architecture.Get(ProjectConfig)
class StoreItemSource(Enum):
SYSTEM = "system"
PERMANENT = "permanent"
PLAYER = "player"
class StoreMode(BaseModel):
mode_id: str
item_id: str
price: int
limit_amount: int
is_permanent: bool = False
metadata: Dict[str, str] = Field(default_factory=dict)
registered_at: datetime
class Config:
allow_mutation = False
class StoreEntry(BaseModel):
entry_id: int
mode_id: str
item_id: str
display_name: str
price: int
limit_amount: int
remaining_amount: int
source: StoreItemSource
created_at: datetime
registry_version: int
class Config:
allow_mutation = False
class PlayerListing(BaseModel):
user_id: int
item_id: str
item_name: str
price: int
quantity: int
created_at: datetime
status: str
class Config:
allow_mutation = False
class WPSStoreSystem(WPSAPI):
SYSTEM_TABLE = "store_system_items"
PLAYER_TABLE = "store_user_listings"
def __init__(self) -> None:
super().__init__()
self._mode_registry: Dict[str, StoreMode] = {}
self._registry_version: int = 0
self._default_hourly_count: int = int(
logger.FindItem("store_hourly_count", 5)
)
logger.SaveProperties()
self._permanent_mode_ids: set[str] = set()
def get_guide_subtitle(self) -> str:
return "系统商品与玩家寄售的统一商店"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"已注册模式": str(len(self._mode_registry)),
"永久模式": str(len(self._permanent_mode_ids)),
"数据表": f"{self.SYSTEM_TABLE}, {self.PLAYER_TABLE}",
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "store",
"identifier": "store",
"description": "查看系统刷新的商品列表,包含系统和玩家寄售。",
"metadata": {"别名": "商店"},
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "模式注册",
"description": "`register_mode/register_permanent_mode` 将背包物品以指定库存与价格投放至系统表。",
},
{
"title": "刷新机制",
"description": "每小时根据 `store_hourly_count` 配置刷新系统库存,同时同步永久模式。",
},
{
"title": "玩家寄售",
"description": "`sell_item` 将玩家物品挂售至寄售表,`purchase_item` 支持购买系统或玩家商品。",
},
)
@override
def dependencies(self) -> List[type]:
return [WPSAPI, WPSConfigAPI, WPSBackpackSystem]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def register_db_model(self):
return [
DatabaseModel(
table_name=self.SYSTEM_TABLE,
column_defs={
"entry_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
"mode_id": "TEXT UNIQUE",
"item_id": "TEXT NOT NULL",
"display_name": "TEXT NOT NULL",
"price": "INTEGER NOT NULL",
"limit_amount": "INTEGER",
"remaining_amount": "INTEGER",
"source": "TEXT NOT NULL",
"created_at": "TEXT NOT NULL",
"registry_version": "INTEGER NOT NULL",
},
),
DatabaseModel(
table_name=self.PLAYER_TABLE,
column_defs={
"user_id": "INTEGER PRIMARY KEY",
"item_id": "TEXT NOT NULL",
"price": "INTEGER NOT NULL",
"quantity": "INTEGER NOT NULL",
"created_at": "TEXT NOT NULL",
"status": "TEXT NOT NULL",
},
),
]
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSStoreSystem 插件已加载{ConsoleFrontColor.RESET}",
)
for alias in ["store", "商店"]:
self.register_plugin(alias)
self._cleanup_state()
def _cleanup_state(self) -> None:
cursor = get_db().conn.cursor()
# 移除无效的系统商品条目
cursor.execute(
f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ? AND remaining_amount = 0",
(StoreItemSource.SYSTEM.value,),
)
# 清理玩家非激活记录
cursor.execute(
f"DELETE FROM {self.PLAYER_TABLE} WHERE status != ?",
("active",),
)
get_db().conn.commit()
# region 模式注册接口
def register_mode(
self,
item_id: str,
price: int,
limit_amount: int,
*,
is_permanent: bool = False,
metadata: Optional[Dict[str, str]] = None,
) -> str:
if not item_id:
raise ValueError("item_id must be provided")
if price <= 0:
raise ValueError("price must be positive")
if limit_amount == 0:
raise ValueError("limit_amount must not be zero")
if limit_amount < -1:
raise ValueError("limit_amount must be -1 or positive")
backpack = Architecture.Get(WPSBackpackSystem)
try:
item_def = backpack._get_definition(item_id)
except Exception as exc:
raise ValueError(f"Item {item_id} not registered in backpack system") from exc
mode_id = uuid4().hex
metadata = dict(metadata or {})
self._mode_registry[mode_id] = StoreMode(
mode_id=mode_id,
item_id=item_id,
price=price,
limit_amount=limit_amount,
is_permanent=is_permanent,
metadata=metadata,
registered_at=datetime.now(timezone.utc),
)
if is_permanent:
self._permanent_mode_ids.add(mode_id)
self._bump_registry_version()
logger.Log(
"Info",
f"{ConsoleFrontColor.CYAN}Store mode {mode_id} registered for item {item_def.name} ({item_id}){ConsoleFrontColor.RESET}",
)
return mode_id
def register_permanent_mode(
self,
item_id: str,
price: int,
*,
limit_amount: int = -1,
metadata: Optional[Dict[str, str]] = None,
) -> str:
return self.register_mode(
item_id=item_id,
price=price,
limit_amount=limit_amount,
is_permanent=True,
metadata=metadata,
)
def unregister_mode(self, mode_id: str) -> bool:
mode = self._mode_registry.pop(mode_id, None)
if not mode:
return False
if mode.is_permanent and mode_id in self._permanent_mode_ids:
self._permanent_mode_ids.remove(mode_id)
self._bump_registry_version()
logger.Log(
"Info",
f"{ConsoleFrontColor.YELLOW}Store mode {mode_id} unregistered{ConsoleFrontColor.RESET}",
)
return True
def list_modes(self) -> List[StoreMode]:
return list(self._mode_registry.values())
def _bump_registry_version(self) -> None:
self._registry_version += 1
def _get_permanent_modes(self) -> List[StoreMode]:
return [
mode
for mode in self._mode_registry.values()
if mode.is_permanent
]
def _get_candidate_modes(self) -> List[StoreMode]:
return [
mode
for mode in self._mode_registry.values()
if not mode.is_permanent
]
# endregion
# region 刷新与数据同步
def _current_hour_key(self) -> str:
return datetime.now(timezone.utc).strftime("%Y%m%d%H")
def _ensure_hour_snapshot(self) -> None:
hour_key = self._current_hour_key()
if self._should_refresh(hour_key):
self._rebuild_system_entries(hour_key)
self._sync_permanent_entries()
def _should_refresh(self, hour_key: str) -> bool:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT created_at, registry_version
FROM {self.SYSTEM_TABLE}
WHERE source = ?
LIMIT 1
""",
(StoreItemSource.SYSTEM.value,),
)
row = cursor.fetchone()
if not row:
return True
try:
created_at = datetime.fromisoformat(row["created_at"])
except ValueError:
return True
current_hour = created_at.strftime("%Y%m%d%H")
registry_version = int(row["registry_version"])
return current_hour != hour_key or registry_version != self._registry_version
def _rebuild_system_entries(self, hour_key: str) -> None:
cursor = get_db().conn.cursor()
cursor.execute(
f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ?",
(StoreItemSource.SYSTEM.value,),
)
candidates = self._get_candidate_modes()
if not candidates:
get_db().conn.commit()
return
seed = self._compute_seed(hour_key)
rng = random.Random(seed)
rng.shuffle(candidates)
selection = candidates[: self._default_hourly_count]
now = datetime.now(timezone.utc).isoformat()
backpack = Architecture.Get(WPSBackpackSystem)
for mode in selection:
definition = backpack._get_definition(mode.item_id)
remaining = mode.limit_amount if mode.limit_amount >= 0 else -1
cursor.execute(
f"""
INSERT INTO {self.SYSTEM_TABLE} (
mode_id, item_id, display_name, price,
limit_amount, remaining_amount, source,
created_at, registry_version
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
mode.mode_id,
mode.item_id,
definition.name,
mode.price,
mode.limit_amount,
remaining,
StoreItemSource.SYSTEM.value,
now,
self._registry_version,
),
)
get_db().conn.commit()
def _sync_permanent_entries(self) -> None:
cursor = get_db().conn.cursor()
permanent_modes = self._get_permanent_modes()
permanent_ids = [mode.mode_id for mode in permanent_modes]
if permanent_ids:
placeholders = ", ".join("?" for _ in permanent_ids)
cursor.execute(
f"""
DELETE FROM {self.SYSTEM_TABLE}
WHERE source = ? AND mode_id NOT IN ({placeholders})
""",
(StoreItemSource.PERMANENT.value, *permanent_ids),
)
else:
cursor.execute(
f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ?",
(StoreItemSource.PERMANENT.value,),
)
now = datetime.now(timezone.utc).isoformat()
backpack = Architecture.Get(WPSBackpackSystem)
for mode in permanent_modes:
definition = backpack._get_definition(mode.item_id)
cursor.execute(
f"""
INSERT INTO {self.SYSTEM_TABLE} (
mode_id, item_id, display_name, price,
limit_amount, remaining_amount, source,
created_at, registry_version
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(mode_id) DO UPDATE SET
item_id = excluded.item_id,
display_name = excluded.display_name,
price = excluded.price,
limit_amount = excluded.limit_amount,
remaining_amount = excluded.remaining_amount,
created_at = excluded.created_at,
registry_version = excluded.registry_version
""",
(
mode.mode_id,
mode.item_id,
definition.name,
mode.price,
mode.limit_amount,
-1 if mode.limit_amount == -1 else mode.limit_amount,
StoreItemSource.PERMANENT.value,
now,
self._registry_version,
),
)
get_db().conn.commit()
def _compute_seed(self, hour_key: str) -> int:
payload = f"{hour_key}|{self._registry_version}".encode("utf-8")
digest = hashlib.sha256(payload).digest()
return int.from_bytes(digest[:8], "big")
# endregion
# region 指令处理
STORE_COMMANDS = {"store", "商店"}
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
_ = self.parse_message_after_at(message).strip()
response = await self._handle_store(chat_id, user_id)
return await self.send_markdown_message(response, chat_id, user_id)
async def _handle_store(self, chat_id: int, user_id: int) -> str:
self._ensure_hour_snapshot()
system_entries, permanent_entries = self._fetch_system_entries()
player_listings = self._fetch_player_listings()
return self._format_store_markdown(
chat_id=chat_id,
user_id=user_id,
system_entries=system_entries,
permanent_entries=permanent_entries,
player_listings=player_listings,
)
async def purchase_item(
self,
*,
chat_id: int,
user_id: int,
identifier: str,
quantity: int,
) -> str:
identifier = identifier.strip()
if not identifier:
return "❌ 购买指令格式错误请提供物品名称或ID"
if quantity <= 0:
return "❌ 购买数量必须大于0"
self._ensure_hour_snapshot()
system_entries, permanent_entries = self._fetch_system_entries()
player_listings = self._fetch_player_listings()
matched_system_entries = self._resolve_system_entries(
identifier=identifier,
system_entries=system_entries,
permanent_entries=permanent_entries,
)
matched_player_listings = await self._resolve_player_listings(
identifier=identifier,
listings=player_listings,
)
if not matched_system_entries and not matched_player_listings:
return "❌ 未找到匹配的商品请确认名称或物品ID是否正确"
candidates: list[tuple[int, int, str, StoreEntry | PlayerListing]] = []
for idx, listing in enumerate(matched_player_listings):
candidates.append((listing.price, 0, f"player-{idx}", listing))
for idx, entry in enumerate(system_entries):
if entry in matched_system_entries["system"]:
candidates.append((entry.price, 1, f"system-{idx}", entry))
for idx, entry in enumerate(permanent_entries):
if entry in matched_system_entries["permanent"]:
candidates.append((entry.price, 2, f"permanent-{idx}", entry))
if not candidates:
return "❌ 未找到匹配的商品请确认名称或物品ID是否正确"
candidates.sort(key=lambda item: (item[0], item[1], item[2]))
for price, source_priority, _, payload in candidates:
if source_priority == 0:
listing = payload # type: ignore[assignment]
response = await self._purchase_player_listing(
listing=listing,
quantity=quantity,
chat_id=chat_id,
user_id=user_id,
)
if not response.startswith(""):
return response
if "库存不足" in response:
continue
return response
entry = payload # type: ignore[assignment]
response = await self._purchase_system_entry(
entry=entry,
quantity=quantity,
chat_id=chat_id,
user_id=user_id,
)
if not response.startswith(""):
return response
if "库存不足" in response:
continue
return response
return "❌ 所有匹配的商品已售罄或库存不足"
async def sell_item(
self,
*,
chat_id: int,
user_id: int,
identifier: str,
quantity: int,
price: int,
) -> str:
identifier = identifier.strip()
if not identifier:
return "❌ 出售指令格式错误请提供物品名称或ID"
if quantity < 0:
return "❌ 出售数量必须大于或等于0"
if price < 0:
return "❌ 出售单价必须是非负整数"
backpack = Architecture.Get(WPSBackpackSystem)
item_id, definition = self._resolve_item(identifier)
if item_id is None:
return "❌ 未找到对应物品,请确认物品已注册且名称/ID 正确"
existing_listing = self._get_user_listing(user_id)
now_iso = datetime.now(timezone.utc).isoformat()
if existing_listing and existing_listing.status == "active":
if existing_listing.item_id != item_id:
return (
"❌ 你已在出售其他物品,如需更换请先使用 `出售 <当前物品ID> 0` 撤下"
)
if quantity == 0:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
UPDATE {self.PLAYER_TABLE}
SET quantity = 0, status = ?, created_at = ?
WHERE user_id = ?
""",
("closed", now_iso, user_id),
)
get_db().conn.commit()
backpack.add_item(user_id, item_id, existing_listing.quantity)
return f"✅ 已撤下出售的 {definition.name},返还数量 {existing_listing.quantity}"
diff = quantity - existing_listing.quantity
if diff > 0:
available_qty = self._get_user_inventory_quantity(user_id, item_id)
if available_qty < diff:
return f"❌ 背包数量不足,需要额外 {diff}{definition.name}"
backpack.set_item_quantity(user_id, item_id, available_qty - diff)
elif diff < 0:
backpack.add_item(user_id, item_id, -diff)
cursor = get_db().conn.cursor()
cursor.execute(
f"""
UPDATE {self.PLAYER_TABLE}
SET quantity = ?, price = ?, created_at = ?, status = ?
WHERE user_id = ?
""",
(quantity, price, now_iso, "active", user_id),
)
get_db().conn.commit()
return f"✅ 已更新出售数量为 {quantity}{definition.name},售价 {price} 分/个"
if quantity == 0:
return " 当前没有在出售的物品"
available_qty = self._get_user_inventory_quantity(user_id, item_id)
if available_qty < quantity:
return f"❌ 背包数量不足,当前拥有 {available_qty}{definition.name}"
backpack.set_item_quantity(user_id, item_id, available_qty - quantity)
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.PLAYER_TABLE} (user_id, item_id, price, quantity, created_at, status)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
item_id = excluded.item_id,
price = excluded.price,
quantity = excluded.quantity,
created_at = excluded.created_at,
status = excluded.status
""",
(user_id, item_id, price, quantity, now_iso, "active"),
)
get_db().conn.commit()
return f"✅ 已上架 {definition.name} × {quantity},售价 {price} 分/个"
def _help_message(self) -> str:
return """# 🛒 商店指令帮助
- `商店`:查看当前系统商品与玩家出售列表
- `购买 <物品名称或ID> <数量>`:购买指定商品
- `出售 <物品名称或ID> <数量> <单价>`:上架或更新自己的出售物品(限一种)
"""
def _fetch_system_entries(self) -> Tuple[List[StoreEntry], List[StoreEntry]]:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT entry_id, mode_id, item_id, display_name, price,
limit_amount, remaining_amount, source,
created_at, registry_version
FROM {self.SYSTEM_TABLE}
ORDER BY source DESC, entry_id ASC
"""
)
rows = cursor.fetchall()
system_entries: List[StoreEntry] = []
permanent_entries: List[StoreEntry] = []
for row in rows:
created_at = self._parse_datetime(row["created_at"])
entry = StoreEntry(
entry_id=row["entry_id"],
mode_id=row["mode_id"],
item_id=row["item_id"],
display_name=row["display_name"],
price=row["price"],
limit_amount=row["limit_amount"],
remaining_amount=row["remaining_amount"],
source=StoreItemSource(row["source"]),
created_at=created_at,
registry_version=row["registry_version"],
)
if entry.source == StoreItemSource.PERMANENT:
permanent_entries.append(entry)
elif entry.source == StoreItemSource.SYSTEM:
system_entries.append(entry)
return system_entries, permanent_entries
def _fetch_player_listings(self) -> List[PlayerListing]:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT user_id, item_id, price, quantity, created_at, status
FROM {self.PLAYER_TABLE}
WHERE status = ?
ORDER BY created_at ASC
""",
("active",),
)
rows = cursor.fetchall()
listings: List[PlayerListing] = []
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
for row in rows:
item_id = row["item_id"]
try:
definition = backpack._get_definition(item_id)
item_name = definition.name
except Exception:
item_name = item_id
listings.append(
PlayerListing(
user_id=row["user_id"],
item_id=item_id,
item_name=item_name,
price=row["price"],
quantity=row["quantity"],
created_at=self._parse_datetime(row["created_at"]),
status=row["status"],
)
)
return listings
def _format_store_markdown(
self,
*,
chat_id: int,
user_id: int,
system_entries: List[StoreEntry],
permanent_entries: List[StoreEntry],
player_listings: List[PlayerListing],
) -> str:
lines: List[str] = ["# 🏬 每小时商店"]
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
if permanent_entries:
lines.append("## ♾️ 常驻商品")
for entry in permanent_entries:
limit_text = (
"不限量"
if entry.limit_amount == -1
else f"限购 {entry.limit_amount}"
)
lines.append(
f"- {entry.display_name}|价格 {entry.price} 分|{limit_text}"
)
if system_entries:
lines.append("## ⏱️ 本时段商品")
for entry in system_entries:
if entry.remaining_amount == -1:
stock_text = "剩余:不限量"
else:
stock_text = f"剩余:{entry.remaining_amount}"
lines.append(
f"- {entry.display_name}|价格 {entry.price} 分|{stock_text}"
)
if not permanent_entries and not system_entries:
lines.append("> ⚠️ 当前没有可售的系统商品")
lines.append("## 👥 玩家出售")
if player_listings:
for listing in player_listings:
owner = ""
if listing.user_id != user_id:
owner = config_api.get_user_name(listing.user_id)
lines.append(
f"- {owner}{listing.item_name}|数量 {listing.quantity}|价格 {listing.price}"
)
else:
lines.append("> 当前暂无玩家出售信息")
lines.append("\n---\n" + self._help_message())
return "\n".join(lines)
def _parse_datetime(self, value: str) -> datetime:
try:
return datetime.fromisoformat(value)
except ValueError:
return datetime.now(timezone.utc)
# endregion
# region 购买逻辑
async def _purchase_system_entry(
self,
*,
entry: StoreEntry,
quantity: int,
chat_id: int,
user_id: int,
) -> str:
total_price = entry.price * quantity
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
user_points = config_api.get_user_points(user_id)
if user_points < total_price:
return f"❌ 积分不足,需要 {total_price} 分,当前仅有 {user_points}"
cursor = get_db().conn.cursor()
if entry.remaining_amount != -1:
cursor.execute(
f"""
UPDATE {self.SYSTEM_TABLE}
SET remaining_amount = remaining_amount - ?
WHERE entry_id = ? AND remaining_amount >= ?
""",
(quantity, entry.entry_id, quantity),
)
if cursor.rowcount == 0:
get_db().conn.rollback()
return "❌ 库存不足,无法完成购买"
get_db().conn.commit()
new_points = await config_api.adjust_user_points(
chat_id, user_id, -total_price, reason=f"购买 {entry.display_name}"
)
backpack = Architecture.Get(WPSBackpackSystem)
backpack.add_item(user_id, entry.item_id, quantity)
return (
f"✅ 成功购买 {entry.display_name} × {quantity},花费 {total_price}\n"
f"当前剩余积分:{new_points}"
)
async def _resolve_player_listings(
self,
identifier: str,
listings: List[PlayerListing],
) -> List[PlayerListing]:
if not listings:
return []
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
identifier_lower = identifier.lower()
matches: List[PlayerListing] = []
for listing in listings:
if listing.quantity <= 0:
continue
if listing.item_id.lower() == identifier_lower:
matches.append(listing)
continue
if listing.item_name.lower() == identifier_lower:
matches.append(listing)
continue
try:
definition = backpack._get_definition(listing.item_id)
except Exception:
continue
if definition.name.lower() == identifier_lower:
matches.append(listing)
return matches
async def _purchase_player_listing(
self,
*,
listing: PlayerListing,
quantity: int,
chat_id: int,
user_id: int,
) -> str:
if quantity <= 0:
return "❌ 购买数量必须大于0"
if listing.quantity < quantity:
return "❌ 玩家出售库存不足"
if listing.user_id == user_id:
return "❌ 无法购买自己上架的商品"
total_price = listing.price * quantity
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
buyer_points = config_api.get_user_points(user_id)
if buyer_points < total_price:
return f"❌ 积分不足,需要 {total_price} 分,当前仅有 {buyer_points}"
cursor = get_db().conn.cursor()
cursor.execute(
f"""
UPDATE {self.PLAYER_TABLE}
SET quantity = quantity - ?
WHERE user_id = ? AND quantity >= ?
""",
(quantity, listing.user_id, quantity),
)
if cursor.rowcount == 0:
get_db().conn.rollback()
return "❌ 玩家出售库存不足或已被其他人购买"
cursor.execute(
f"""
UPDATE {self.PLAYER_TABLE}
SET status = ?
WHERE user_id = ? AND quantity <= 0
""",
("closed", listing.user_id),
)
get_db().conn.commit()
buyer_new_points = await config_api.adjust_user_points(
chat_id, user_id, -total_price, reason="购买玩家商品"
)
await config_api.adjust_user_points(
chat_id,
listing.user_id,
total_price,
reason="玩家出售结算",
)
backpack = Architecture.Get(WPSBackpackSystem)
backpack.add_item(user_id, listing.item_id, quantity)
definition = backpack._get_definition(listing.item_id)
return (
f"✅ 成功购买玩家商品 {definition.name} × {quantity},花费 {total_price}\n"
f"当前剩余积分:{buyer_new_points}"
)
def _resolve_system_entries(
self,
identifier: str,
system_entries: List[StoreEntry],
permanent_entries: List[StoreEntry],
) -> Dict[str, List[StoreEntry]]:
identifier_lower = identifier.lower()
matched_system: List[StoreEntry] = []
matched_permanent: List[StoreEntry] = []
for entry in permanent_entries:
if entry.item_id.lower() == identifier_lower or entry.display_name.lower() == identifier_lower:
matched_permanent.append(entry)
for entry in system_entries:
if entry.item_id.lower() == identifier_lower or entry.display_name.lower() == identifier_lower:
matched_system.append(entry)
return {"system": matched_system, "permanent": matched_permanent}
def _get_user_listing(self, user_id: int) -> Optional[PlayerListing]:
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT user_id, item_id, price, quantity, created_at, status
FROM {self.PLAYER_TABLE}
WHERE user_id = ?
""",
(user_id,),
)
row = cursor.fetchone()
if not row:
return None
item_id = row["item_id"]
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
try:
definition = backpack._get_definition(item_id)
item_name = definition.name
except Exception:
item_name = item_id
return PlayerListing(
user_id=row["user_id"],
item_id=item_id,
item_name=item_name,
price=row["price"],
quantity=row["quantity"],
created_at=self._parse_datetime(row["created_at"]),
status=row["status"],
)
def _derive_player_price(self, item_id: str) -> int:
prices = [
mode.price
for mode in self._mode_registry.values()
if mode.item_id == item_id
]
if not prices:
raise ValueError(
f"Item {item_id} has no registered store mode to derive price"
)
return min(prices)
def _resolve_item(
self,
identifier: str,
) -> Tuple[Optional[str], Optional["BackpackItemDefinition"]]:
backpack = Architecture.Get(WPSBackpackSystem)
identifier_lower = identifier.lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(identifier_lower, identifier_lower),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier
try:
definition = backpack._get_definition(item_id)
return definition.item_id, definition
except Exception:
return None, None
def _get_user_inventory_quantity(self, user_id: int, item_id: str) -> int:
backpack = Architecture.Get(WPSBackpackSystem)
for item in backpack.get_user_items(user_id):
if item.item_id == item_id:
return item.quantity
return 0
def get_store_snapshot(self) -> Dict[str, Any]:
self._ensure_hour_snapshot()
system_entries, permanent_entries = self._fetch_system_entries()
player_listings = self._fetch_player_listings()
return {
"system": [self._entry_to_dict(entry) for entry in system_entries],
"permanent": [self._entry_to_dict(entry) for entry in permanent_entries],
"players": [self._listing_to_dict(listing) for listing in player_listings],
}
def get_user_listing_snapshot(self, user_id: int) -> Optional[Dict[str, Any]]:
listing = self._get_user_listing(user_id)
if not listing or listing.status != "active":
return None
return self._listing_to_dict(listing)
def _entry_to_dict(self, entry: StoreEntry) -> Dict[str, Any]:
return {
"entry_id": entry.entry_id,
"mode_id": entry.mode_id,
"item_id": entry.item_id,
"display_name": entry.display_name,
"price": entry.price,
"limit_amount": entry.limit_amount,
"remaining_amount": entry.remaining_amount,
"source": entry.source.value,
"created_at": entry.created_at.isoformat(),
"registry_version": entry.registry_version,
}
def _listing_to_dict(self, listing: PlayerListing) -> Dict[str, Any]:
return {
"user_id": listing.user_id,
"item_id": listing.item_id,
"price": listing.price,
"quantity": listing.quantity,
"created_at": listing.created_at.isoformat(),
"status": listing.status,
}
# endregion
# endregion
class WPSStoreBuyCommand(WPSAPI):
def get_guide_subtitle(self) -> str:
return "购买商店及玩家寄售物品"
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "buy",
"identifier": "buy",
"description": "购买系统或玩家寄售商品,数量需为正整数。",
"metadata": {"别名": "购买"},
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "参数格式",
"description": "`购买 <物品名称或ID> <数量>`,内部支持模糊匹配模式名称。",
},
{
"title": "权限校验",
"description": "调用 `purchase_item` 时校验库存、积分并自动扣除商品库存。",
},
)
@override
def dependencies(self) -> list[type]:
return [WPSStoreSystem]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSStoreBuyCommand 插件已加载{ConsoleFrontColor.RESET}",
)
for alias in ["buy", "购买"]:
self.register_plugin(alias)
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
message = self.parse_message_after_at(message).strip()
if not message:
return await self._send_error(
"❌ 购买指令格式错误,请使用:`购买 <物品名称或ID> <数量>`",
chat_id,
user_id,
)
tokens = [token.strip() for token in message.split() if token.strip()]
if len(tokens) < 2:
return await self._send_error(
"❌ 购买指令格式错误,请使用:`购买 <物品名称或ID> <数量>`",
chat_id,
user_id,
)
identifier = " ".join(tokens[:-1]).strip()
quantity_token = tokens[-1]
try:
quantity = int(quantity_token)
except ValueError:
return await self._send_error("❌ 购买数量必须是整数", chat_id, user_id)
store_api: WPSStoreSystem = Architecture.Get(WPSStoreSystem)
response = await store_api.purchase_item(
chat_id=chat_id,
user_id=user_id,
identifier=identifier,
quantity=quantity,
)
return await self.send_markdown_message(response, chat_id, user_id)
async def _send_error(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
return await self.send_markdown_message(message, chat_id, user_id)
class WPSStoreSellCommand(WPSAPI):
def get_guide_subtitle(self) -> str:
return "挂售物品至商店寄售区"
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "sell",
"identifier": "sell",
"description": "将背包物品以指定数量和单价挂售。",
"metadata": {"别名": "出售"},
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "参数格式",
"description": "`出售 <物品名称或ID> <数量> <单价>`。",
},
{
"title": "寄售生命周期",
"description": "寄售记录写入玩家表,状态变更后定期清理无效记录。",
},
)
@override
def dependencies(self) -> list[type]:
return [WPSStoreSystem]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSStoreSellCommand 插件已加载{ConsoleFrontColor.RESET}",
)
for alias in ["sell", "出售"]:
self.register_plugin(alias)
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
message = self.parse_message_after_at(message).strip()
if not message:
return await self._send_error(
"❌ 出售指令格式错误,请使用:`出售 <物品名称或ID> <数量> <单价>`",
chat_id,
user_id,
)
tokens = [token.strip() for token in message.split() if token.strip()]
if len(tokens) < 3:
return await self._send_error(
"❌ 出售指令格式错误,请使用:`出售 <物品名称或ID> <数量> <单价>`",
chat_id,
user_id,
)
identifier = " ".join(tokens[:-2]).strip()
quantity_token = tokens[-2]
price_token = tokens[-1]
if not identifier:
return await self._send_error(
"❌ 出售指令格式错误请提供物品名称或ID",
chat_id,
user_id,
)
try:
quantity = int(quantity_token)
except ValueError:
return await self._send_error("❌ 出售数量必须是整数", chat_id, user_id)
try:
price = int(price_token)
except ValueError:
return await self._send_error("❌ 出售单价必须是整数", chat_id, user_id)
if price < 0:
return await self._send_error("❌ 出售单价必须是非负整数", chat_id, user_id)
store_api = Architecture.Get(WPSStoreSystem)
response = await store_api.sell_item(
chat_id=chat_id,
user_id=user_id,
identifier=identifier,
quantity=quantity,
price=price,
)
return await self.send_markdown_message(response, chat_id, user_id)
async def _send_error(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
return await self.send_markdown_message(message, chat_id, user_id)
__all__ = ["WPSStoreSystem"]