1089 lines
38 KiB
Python
1089 lines
38 KiB
Python
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import random
|
||
from datetime import datetime, timezone
|
||
from enum import Enum
|
||
from typing import Any, Dict, List, Optional, Tuple, override
|
||
from uuid import uuid4
|
||
|
||
from PWF.Convention.Runtime.Architecture import Architecture
|
||
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
|
||
from PWF.CoreModules.database import get_db
|
||
from PWF.CoreModules.plugin_interface import DatabaseModel
|
||
from pydantic import BaseModel, Field
|
||
|
||
from .WPSAPI import WPSAPI
|
||
from .WPSBackpackSystem import BackpackItemDefinition, WPSBackpackSystem
|
||
from .WPSConfigSystem import WPSConfigAPI
|
||
|
||
|
||
logger: ProjectConfig = Architecture.Get(ProjectConfig)
|
||
|
||
|
||
class StoreItemSource(Enum):
|
||
SYSTEM = "system"
|
||
PERMANENT = "permanent"
|
||
PLAYER = "player"
|
||
|
||
|
||
class StoreMode(BaseModel):
|
||
mode_id: str
|
||
item_id: str
|
||
price: int
|
||
limit_amount: int
|
||
is_permanent: bool = False
|
||
metadata: Dict[str, str] = Field(default_factory=dict)
|
||
registered_at: datetime
|
||
|
||
class Config:
|
||
allow_mutation = False
|
||
|
||
|
||
class StoreEntry(BaseModel):
|
||
entry_id: int
|
||
mode_id: str
|
||
item_id: str
|
||
display_name: str
|
||
price: int
|
||
limit_amount: int
|
||
remaining_amount: int
|
||
source: StoreItemSource
|
||
created_at: datetime
|
||
registry_version: int
|
||
|
||
class Config:
|
||
allow_mutation = False
|
||
|
||
|
||
class PlayerListing(BaseModel):
|
||
user_id: int
|
||
item_id: str
|
||
item_name: str
|
||
price: int
|
||
quantity: int
|
||
created_at: datetime
|
||
status: str
|
||
|
||
class Config:
|
||
allow_mutation = False
|
||
|
||
|
||
class WPSStoreSystem(WPSAPI):
|
||
SYSTEM_TABLE = "store_system_items"
|
||
PLAYER_TABLE = "store_user_listings"
|
||
|
||
def __init__(self) -> None:
|
||
super().__init__()
|
||
self._mode_registry: Dict[str, StoreMode] = {}
|
||
self._registry_version: int = 0
|
||
self._default_hourly_count: int = int(
|
||
logger.FindItem("store_hourly_count", 5)
|
||
)
|
||
logger.SaveProperties()
|
||
self._permanent_mode_ids: set[str] = set()
|
||
|
||
@override
|
||
def dependencies(self) -> List[type]:
|
||
return [WPSAPI, WPSConfigAPI, WPSBackpackSystem]
|
||
|
||
@override
|
||
def is_enable_plugin(self) -> bool:
|
||
return True
|
||
|
||
@override
|
||
def register_db_model(self):
|
||
return [
|
||
DatabaseModel(
|
||
table_name=self.SYSTEM_TABLE,
|
||
column_defs={
|
||
"entry_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
|
||
"mode_id": "TEXT UNIQUE",
|
||
"item_id": "TEXT NOT NULL",
|
||
"display_name": "TEXT NOT NULL",
|
||
"price": "INTEGER NOT NULL",
|
||
"limit_amount": "INTEGER",
|
||
"remaining_amount": "INTEGER",
|
||
"source": "TEXT NOT NULL",
|
||
"created_at": "TEXT NOT NULL",
|
||
"registry_version": "INTEGER NOT NULL",
|
||
},
|
||
),
|
||
DatabaseModel(
|
||
table_name=self.PLAYER_TABLE,
|
||
column_defs={
|
||
"user_id": "INTEGER PRIMARY KEY",
|
||
"item_id": "TEXT NOT NULL",
|
||
"price": "INTEGER NOT NULL",
|
||
"quantity": "INTEGER NOT NULL",
|
||
"created_at": "TEXT NOT NULL",
|
||
"status": "TEXT NOT NULL",
|
||
},
|
||
),
|
||
]
|
||
|
||
@override
|
||
def wake_up(self) -> None:
|
||
logger.Log(
|
||
"Info",
|
||
f"{ConsoleFrontColor.GREEN}WPSStoreSystem 插件已加载{ConsoleFrontColor.RESET}",
|
||
)
|
||
for alias in ["store", "商店"]:
|
||
self.register_plugin(alias)
|
||
self._cleanup_state()
|
||
|
||
def _cleanup_state(self) -> None:
|
||
cursor = get_db().conn.cursor()
|
||
# 移除无效的系统商品条目
|
||
cursor.execute(
|
||
f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ? AND remaining_amount = 0",
|
||
(StoreItemSource.SYSTEM.value,),
|
||
)
|
||
# 清理玩家非激活记录
|
||
cursor.execute(
|
||
f"DELETE FROM {self.PLAYER_TABLE} WHERE status != ?",
|
||
("active",),
|
||
)
|
||
get_db().conn.commit()
|
||
|
||
# region 模式注册接口
|
||
def register_mode(
|
||
self,
|
||
item_id: str,
|
||
price: int,
|
||
limit_amount: int,
|
||
*,
|
||
is_permanent: bool = False,
|
||
metadata: Optional[Dict[str, str]] = None,
|
||
) -> str:
|
||
if not item_id:
|
||
raise ValueError("item_id must be provided")
|
||
if price <= 0:
|
||
raise ValueError("price must be positive")
|
||
if limit_amount == 0:
|
||
raise ValueError("limit_amount must not be zero")
|
||
if limit_amount < -1:
|
||
raise ValueError("limit_amount must be -1 or positive")
|
||
|
||
backpack = Architecture.Get(WPSBackpackSystem)
|
||
try:
|
||
item_def = backpack._get_definition(item_id) # type: ignore[attr-defined]
|
||
except Exception as exc:
|
||
raise ValueError(f"Item {item_id} not registered in backpack system") from exc
|
||
|
||
mode_id = uuid4().hex
|
||
metadata = dict(metadata or {})
|
||
self._mode_registry[mode_id] = StoreMode(
|
||
mode_id=mode_id,
|
||
item_id=item_id,
|
||
price=price,
|
||
limit_amount=limit_amount,
|
||
is_permanent=is_permanent,
|
||
metadata=metadata,
|
||
registered_at=datetime.now(timezone.utc),
|
||
)
|
||
if is_permanent:
|
||
self._permanent_mode_ids.add(mode_id)
|
||
self._bump_registry_version()
|
||
logger.Log(
|
||
"Info",
|
||
f"{ConsoleFrontColor.CYAN}Store mode {mode_id} registered for item {item_def.name} ({item_id}){ConsoleFrontColor.RESET}",
|
||
)
|
||
return mode_id
|
||
|
||
def register_permanent_mode(
|
||
self,
|
||
item_id: str,
|
||
price: int,
|
||
*,
|
||
limit_amount: int = -1,
|
||
metadata: Optional[Dict[str, str]] = None,
|
||
) -> str:
|
||
return self.register_mode(
|
||
item_id=item_id,
|
||
price=price,
|
||
limit_amount=limit_amount,
|
||
is_permanent=True,
|
||
metadata=metadata,
|
||
)
|
||
|
||
def unregister_mode(self, mode_id: str) -> bool:
|
||
mode = self._mode_registry.pop(mode_id, None)
|
||
if not mode:
|
||
return False
|
||
if mode.is_permanent and mode_id in self._permanent_mode_ids:
|
||
self._permanent_mode_ids.remove(mode_id)
|
||
self._bump_registry_version()
|
||
logger.Log(
|
||
"Info",
|
||
f"{ConsoleFrontColor.YELLOW}Store mode {mode_id} unregistered{ConsoleFrontColor.RESET}",
|
||
)
|
||
return True
|
||
|
||
def list_modes(self) -> List[StoreMode]:
|
||
return list(self._mode_registry.values())
|
||
|
||
def _bump_registry_version(self) -> None:
|
||
self._registry_version += 1
|
||
|
||
def _get_permanent_modes(self) -> List[StoreMode]:
|
||
return [
|
||
mode
|
||
for mode in self._mode_registry.values()
|
||
if mode.is_permanent
|
||
]
|
||
|
||
def _get_candidate_modes(self) -> List[StoreMode]:
|
||
return [
|
||
mode
|
||
for mode in self._mode_registry.values()
|
||
if not mode.is_permanent
|
||
]
|
||
|
||
# endregion
|
||
|
||
# region 刷新与数据同步
|
||
def _current_hour_key(self) -> str:
|
||
return datetime.now(timezone.utc).strftime("%Y%m%d%H")
|
||
|
||
def _ensure_hour_snapshot(self) -> None:
|
||
hour_key = self._current_hour_key()
|
||
if self._should_refresh(hour_key):
|
||
self._rebuild_system_entries(hour_key)
|
||
self._sync_permanent_entries()
|
||
|
||
def _should_refresh(self, hour_key: str) -> bool:
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
SELECT created_at, registry_version
|
||
FROM {self.SYSTEM_TABLE}
|
||
WHERE source = ?
|
||
LIMIT 1
|
||
""",
|
||
(StoreItemSource.SYSTEM.value,),
|
||
)
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return True
|
||
try:
|
||
created_at = datetime.fromisoformat(row["created_at"])
|
||
except ValueError:
|
||
return True
|
||
current_hour = created_at.strftime("%Y%m%d%H")
|
||
registry_version = int(row["registry_version"])
|
||
return current_hour != hour_key or registry_version != self._registry_version
|
||
|
||
def _rebuild_system_entries(self, hour_key: str) -> None:
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ?",
|
||
(StoreItemSource.SYSTEM.value,),
|
||
)
|
||
candidates = self._get_candidate_modes()
|
||
if not candidates:
|
||
get_db().conn.commit()
|
||
return
|
||
|
||
seed = self._compute_seed(hour_key)
|
||
rng = random.Random(seed)
|
||
rng.shuffle(candidates)
|
||
selection = candidates[: self._default_hourly_count]
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
backpack = Architecture.Get(WPSBackpackSystem)
|
||
for mode in selection:
|
||
definition = backpack._get_definition(mode.item_id) # type: ignore[attr-defined]
|
||
remaining = mode.limit_amount if mode.limit_amount >= 0 else -1
|
||
cursor.execute(
|
||
f"""
|
||
INSERT INTO {self.SYSTEM_TABLE} (
|
||
mode_id, item_id, display_name, price,
|
||
limit_amount, remaining_amount, source,
|
||
created_at, registry_version
|
||
)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
mode.mode_id,
|
||
mode.item_id,
|
||
definition.name,
|
||
mode.price,
|
||
mode.limit_amount,
|
||
remaining,
|
||
StoreItemSource.SYSTEM.value,
|
||
now,
|
||
self._registry_version,
|
||
),
|
||
)
|
||
get_db().conn.commit()
|
||
|
||
def _sync_permanent_entries(self) -> None:
|
||
cursor = get_db().conn.cursor()
|
||
permanent_modes = self._get_permanent_modes()
|
||
permanent_ids = [mode.mode_id for mode in permanent_modes]
|
||
if permanent_ids:
|
||
placeholders = ", ".join("?" for _ in permanent_ids)
|
||
cursor.execute(
|
||
f"""
|
||
DELETE FROM {self.SYSTEM_TABLE}
|
||
WHERE source = ? AND mode_id NOT IN ({placeholders})
|
||
""",
|
||
(StoreItemSource.PERMANENT.value, *permanent_ids),
|
||
)
|
||
else:
|
||
cursor.execute(
|
||
f"DELETE FROM {self.SYSTEM_TABLE} WHERE source = ?",
|
||
(StoreItemSource.PERMANENT.value,),
|
||
)
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
backpack = Architecture.Get(WPSBackpackSystem)
|
||
for mode in permanent_modes:
|
||
definition = backpack._get_definition(mode.item_id) # type: ignore[attr-defined]
|
||
cursor.execute(
|
||
f"""
|
||
INSERT INTO {self.SYSTEM_TABLE} (
|
||
mode_id, item_id, display_name, price,
|
||
limit_amount, remaining_amount, source,
|
||
created_at, registry_version
|
||
)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
ON CONFLICT(mode_id) DO UPDATE SET
|
||
item_id = excluded.item_id,
|
||
display_name = excluded.display_name,
|
||
price = excluded.price,
|
||
limit_amount = excluded.limit_amount,
|
||
remaining_amount = excluded.remaining_amount,
|
||
created_at = excluded.created_at,
|
||
registry_version = excluded.registry_version
|
||
""",
|
||
(
|
||
mode.mode_id,
|
||
mode.item_id,
|
||
definition.name,
|
||
mode.price,
|
||
mode.limit_amount,
|
||
-1 if mode.limit_amount == -1 else mode.limit_amount,
|
||
StoreItemSource.PERMANENT.value,
|
||
now,
|
||
self._registry_version,
|
||
),
|
||
)
|
||
get_db().conn.commit()
|
||
|
||
def _compute_seed(self, hour_key: str) -> int:
|
||
payload = f"{hour_key}|{self._registry_version}".encode("utf-8")
|
||
digest = hashlib.sha256(payload).digest()
|
||
return int.from_bytes(digest[:8], "big")
|
||
|
||
# endregion
|
||
|
||
# region 指令处理
|
||
STORE_COMMANDS = {"store", "商店"}
|
||
|
||
@override
|
||
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
|
||
_ = self.parse_message_after_at(message).strip()
|
||
response = await self._handle_store(chat_id, user_id)
|
||
return await self.send_markdown_message(response, chat_id, user_id)
|
||
|
||
async def _handle_store(self, chat_id: int, user_id: int) -> str:
|
||
self._ensure_hour_snapshot()
|
||
system_entries, permanent_entries = self._fetch_system_entries()
|
||
player_listings = self._fetch_player_listings()
|
||
return self._format_store_markdown(
|
||
chat_id=chat_id,
|
||
user_id=user_id,
|
||
system_entries=system_entries,
|
||
permanent_entries=permanent_entries,
|
||
player_listings=player_listings,
|
||
)
|
||
|
||
async def purchase_item(
|
||
self,
|
||
*,
|
||
chat_id: int,
|
||
user_id: int,
|
||
identifier: str,
|
||
quantity: int,
|
||
) -> str:
|
||
identifier = identifier.strip()
|
||
if not identifier:
|
||
return "❌ 购买指令格式错误,请提供物品名称或ID"
|
||
if quantity <= 0:
|
||
return "❌ 购买数量必须大于0"
|
||
self._ensure_hour_snapshot()
|
||
system_entries, permanent_entries = self._fetch_system_entries()
|
||
player_listings = self._fetch_player_listings()
|
||
|
||
matched_system_entries = self._resolve_system_entries(
|
||
identifier=identifier,
|
||
system_entries=system_entries,
|
||
permanent_entries=permanent_entries,
|
||
)
|
||
matched_player_listings = await self._resolve_player_listings(
|
||
identifier=identifier,
|
||
listings=player_listings,
|
||
)
|
||
|
||
if not matched_system_entries and not matched_player_listings:
|
||
return "❌ 未找到匹配的商品,请确认名称或物品ID是否正确"
|
||
|
||
candidates: list[tuple[int, int, str, StoreEntry | PlayerListing]] = []
|
||
|
||
for idx, listing in enumerate(matched_player_listings):
|
||
candidates.append((listing.price, 0, f"player-{idx}", listing))
|
||
for idx, entry in enumerate(system_entries):
|
||
if entry in matched_system_entries["system"]:
|
||
candidates.append((entry.price, 1, f"system-{idx}", entry))
|
||
for idx, entry in enumerate(permanent_entries):
|
||
if entry in matched_system_entries["permanent"]:
|
||
candidates.append((entry.price, 2, f"permanent-{idx}", entry))
|
||
|
||
if not candidates:
|
||
return "❌ 未找到匹配的商品,请确认名称或物品ID是否正确"
|
||
|
||
candidates.sort(key=lambda item: (item[0], item[1], item[2]))
|
||
|
||
for price, source_priority, _, payload in candidates:
|
||
if source_priority == 0:
|
||
listing = payload # type: ignore[assignment]
|
||
response = await self._purchase_player_listing(
|
||
listing=listing,
|
||
quantity=quantity,
|
||
chat_id=chat_id,
|
||
user_id=user_id,
|
||
)
|
||
if not response.startswith("❌"):
|
||
return response
|
||
if "库存不足" in response:
|
||
continue
|
||
return response
|
||
entry = payload # type: ignore[assignment]
|
||
response = await self._purchase_system_entry(
|
||
entry=entry,
|
||
quantity=quantity,
|
||
chat_id=chat_id,
|
||
user_id=user_id,
|
||
)
|
||
if not response.startswith("❌"):
|
||
return response
|
||
if "库存不足" in response:
|
||
continue
|
||
return response
|
||
|
||
return "❌ 所有匹配的商品已售罄或库存不足"
|
||
|
||
async def sell_item(
|
||
self,
|
||
*,
|
||
chat_id: int,
|
||
user_id: int,
|
||
identifier: str,
|
||
quantity: int,
|
||
price: int,
|
||
) -> str:
|
||
identifier = identifier.strip()
|
||
if not identifier:
|
||
return "❌ 出售指令格式错误,请提供物品名称或ID"
|
||
if quantity < 0:
|
||
return "❌ 出售数量必须大于或等于0"
|
||
if price < 0:
|
||
return "❌ 出售单价必须是非负整数"
|
||
backpack = Architecture.Get(WPSBackpackSystem)
|
||
item_id, definition = self._resolve_item(identifier)
|
||
if item_id is None:
|
||
return "❌ 未找到对应物品,请确认物品已注册且名称/ID 正确"
|
||
|
||
existing_listing = self._get_user_listing(user_id)
|
||
now_iso = datetime.now(timezone.utc).isoformat()
|
||
|
||
if existing_listing and existing_listing.status == "active":
|
||
if existing_listing.item_id != item_id:
|
||
return (
|
||
"❌ 你已在出售其他物品,如需更换请先使用 `出售 <当前物品ID> 0` 撤下"
|
||
)
|
||
if quantity == 0:
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
UPDATE {self.PLAYER_TABLE}
|
||
SET quantity = 0, status = ?, created_at = ?
|
||
WHERE user_id = ?
|
||
""",
|
||
("closed", now_iso, user_id),
|
||
)
|
||
get_db().conn.commit()
|
||
backpack.add_item(user_id, item_id, existing_listing.quantity)
|
||
return f"✅ 已撤下出售的 {definition.name},返还数量 {existing_listing.quantity}"
|
||
|
||
diff = quantity - existing_listing.quantity
|
||
if diff > 0:
|
||
available_qty = self._get_user_inventory_quantity(user_id, item_id)
|
||
if available_qty < diff:
|
||
return f"❌ 背包数量不足,需要额外 {diff} 个 {definition.name}"
|
||
backpack.set_item_quantity(user_id, item_id, available_qty - diff)
|
||
elif diff < 0:
|
||
backpack.add_item(user_id, item_id, -diff)
|
||
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
UPDATE {self.PLAYER_TABLE}
|
||
SET quantity = ?, price = ?, created_at = ?, status = ?
|
||
WHERE user_id = ?
|
||
""",
|
||
(quantity, price, now_iso, "active", user_id),
|
||
)
|
||
get_db().conn.commit()
|
||
return f"✅ 已更新出售数量为 {quantity} 个 {definition.name},售价 {price} 分/个"
|
||
|
||
if quantity == 0:
|
||
return "ℹ️ 当前没有在出售的物品"
|
||
|
||
available_qty = self._get_user_inventory_quantity(user_id, item_id)
|
||
if available_qty < quantity:
|
||
return f"❌ 背包数量不足,当前拥有 {available_qty} 个 {definition.name}"
|
||
|
||
backpack.set_item_quantity(user_id, item_id, available_qty - quantity)
|
||
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
INSERT INTO {self.PLAYER_TABLE} (user_id, item_id, price, quantity, created_at, status)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
ON CONFLICT(user_id) DO UPDATE SET
|
||
item_id = excluded.item_id,
|
||
price = excluded.price,
|
||
quantity = excluded.quantity,
|
||
created_at = excluded.created_at,
|
||
status = excluded.status
|
||
""",
|
||
(user_id, item_id, price, quantity, now_iso, "active"),
|
||
)
|
||
get_db().conn.commit()
|
||
return f"✅ 已上架 {definition.name} × {quantity},售价 {price} 分/个"
|
||
|
||
def _help_message(self) -> str:
|
||
return """# 🛒 商店指令帮助
|
||
- `商店`:查看当前系统商品与玩家出售列表
|
||
- `购买 <物品名称或ID> <数量>`:购买指定商品
|
||
- `出售 <物品名称或ID> <数量> <单价>`:上架或更新自己的出售物品(限一种)
|
||
"""
|
||
|
||
def _fetch_system_entries(self) -> Tuple[List[StoreEntry], List[StoreEntry]]:
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
SELECT entry_id, mode_id, item_id, display_name, price,
|
||
limit_amount, remaining_amount, source,
|
||
created_at, registry_version
|
||
FROM {self.SYSTEM_TABLE}
|
||
ORDER BY source DESC, entry_id ASC
|
||
"""
|
||
)
|
||
rows = cursor.fetchall()
|
||
system_entries: List[StoreEntry] = []
|
||
permanent_entries: List[StoreEntry] = []
|
||
for row in rows:
|
||
created_at = self._parse_datetime(row["created_at"])
|
||
entry = StoreEntry(
|
||
entry_id=row["entry_id"],
|
||
mode_id=row["mode_id"],
|
||
item_id=row["item_id"],
|
||
display_name=row["display_name"],
|
||
price=row["price"],
|
||
limit_amount=row["limit_amount"],
|
||
remaining_amount=row["remaining_amount"],
|
||
source=StoreItemSource(row["source"]),
|
||
created_at=created_at,
|
||
registry_version=row["registry_version"],
|
||
)
|
||
if entry.source == StoreItemSource.PERMANENT:
|
||
permanent_entries.append(entry)
|
||
elif entry.source == StoreItemSource.SYSTEM:
|
||
system_entries.append(entry)
|
||
return system_entries, permanent_entries
|
||
|
||
def _fetch_player_listings(self) -> List[PlayerListing]:
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
SELECT user_id, item_id, price, quantity, created_at, status
|
||
FROM {self.PLAYER_TABLE}
|
||
WHERE status = ?
|
||
ORDER BY created_at ASC
|
||
""",
|
||
("active",),
|
||
)
|
||
rows = cursor.fetchall()
|
||
listings: List[PlayerListing] = []
|
||
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
|
||
for row in rows:
|
||
item_id = row["item_id"]
|
||
try:
|
||
definition = backpack._get_definition(item_id) # type: ignore[attr-defined]
|
||
item_name = definition.name
|
||
except Exception:
|
||
item_name = item_id
|
||
listings.append(
|
||
PlayerListing(
|
||
user_id=row["user_id"],
|
||
item_id=item_id,
|
||
item_name=item_name,
|
||
price=row["price"],
|
||
quantity=row["quantity"],
|
||
created_at=self._parse_datetime(row["created_at"]),
|
||
status=row["status"],
|
||
)
|
||
)
|
||
return listings
|
||
|
||
def _format_store_markdown(
|
||
self,
|
||
*,
|
||
chat_id: int,
|
||
user_id: int,
|
||
system_entries: List[StoreEntry],
|
||
permanent_entries: List[StoreEntry],
|
||
player_listings: List[PlayerListing],
|
||
) -> str:
|
||
lines: List[str] = ["# 🏬 每小时商店"]
|
||
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
|
||
if permanent_entries:
|
||
lines.append("## ♾️ 常驻商品")
|
||
for entry in permanent_entries:
|
||
limit_text = (
|
||
"不限量"
|
||
if entry.limit_amount == -1
|
||
else f"限购 {entry.limit_amount}"
|
||
)
|
||
lines.append(
|
||
f"- {entry.display_name}|价格 {entry.price} 分|{limit_text}"
|
||
)
|
||
if system_entries:
|
||
lines.append("## ⏱️ 本时段商品")
|
||
for entry in system_entries:
|
||
if entry.remaining_amount == -1:
|
||
stock_text = "剩余:不限量"
|
||
else:
|
||
stock_text = f"剩余:{entry.remaining_amount}"
|
||
lines.append(
|
||
f"- {entry.display_name}|价格 {entry.price} 分|{stock_text}"
|
||
)
|
||
if not permanent_entries and not system_entries:
|
||
lines.append("> ⚠️ 当前没有可售的系统商品")
|
||
|
||
lines.append("## 👥 玩家出售")
|
||
if player_listings:
|
||
for listing in player_listings:
|
||
owner = "你"
|
||
if listing.user_id != user_id:
|
||
owner = config_api.get_user_name(listing.user_id)
|
||
lines.append(
|
||
f"- {owner}|{listing.item_name}|数量 {listing.quantity}|价格 {listing.price} 分"
|
||
)
|
||
else:
|
||
lines.append("> 当前暂无玩家出售信息")
|
||
|
||
lines.append("\n---\n" + self._help_message())
|
||
return "\n".join(lines)
|
||
|
||
def _parse_datetime(self, value: str) -> datetime:
|
||
try:
|
||
return datetime.fromisoformat(value)
|
||
except ValueError:
|
||
return datetime.now(timezone.utc)
|
||
|
||
# endregion
|
||
|
||
# region 购买逻辑
|
||
async def _purchase_system_entry(
|
||
self,
|
||
*,
|
||
entry: StoreEntry,
|
||
quantity: int,
|
||
chat_id: int,
|
||
user_id: int,
|
||
) -> str:
|
||
total_price = entry.price * quantity
|
||
config_api = Architecture.Get(WPSConfigAPI)
|
||
user_points = Architecture.Get(WPSConfigAPI).get_user_points(chat_id, user_id)
|
||
if user_points < total_price:
|
||
return f"❌ 积分不足,需要 {total_price} 分,当前仅有 {user_points} 分"
|
||
|
||
cursor = get_db().conn.cursor()
|
||
if entry.remaining_amount != -1:
|
||
cursor.execute(
|
||
f"""
|
||
UPDATE {self.SYSTEM_TABLE}
|
||
SET remaining_amount = remaining_amount - ?
|
||
WHERE entry_id = ? AND remaining_amount >= ?
|
||
""",
|
||
(quantity, entry.entry_id, quantity),
|
||
)
|
||
if cursor.rowcount == 0:
|
||
get_db().conn.rollback()
|
||
return "❌ 库存不足,无法完成购买"
|
||
get_db().conn.commit()
|
||
|
||
new_points = await config_api.adjust_user_points(
|
||
chat_id, user_id, -total_price, reason=f"购买 {entry.display_name}"
|
||
)
|
||
backpack = Architecture.Get(WPSBackpackSystem)
|
||
backpack.add_item(user_id, entry.item_id, quantity)
|
||
return (
|
||
f"✅ 成功购买 {entry.display_name} × {quantity},花费 {total_price} 分\n"
|
||
f"当前剩余积分:{new_points}"
|
||
)
|
||
|
||
async def _resolve_player_listings(
|
||
self,
|
||
identifier: str,
|
||
listings: List[PlayerListing],
|
||
) -> List[PlayerListing]:
|
||
if not listings:
|
||
return []
|
||
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
|
||
identifier_lower = identifier.lower()
|
||
matches: List[PlayerListing] = []
|
||
for listing in listings:
|
||
if listing.quantity <= 0:
|
||
continue
|
||
if listing.item_id.lower() == identifier_lower:
|
||
matches.append(listing)
|
||
continue
|
||
if listing.item_name.lower() == identifier_lower:
|
||
matches.append(listing)
|
||
continue
|
||
try:
|
||
definition = backpack._get_definition(listing.item_id) # type: ignore[attr-defined]
|
||
except Exception:
|
||
continue
|
||
if definition.name.lower() == identifier_lower:
|
||
matches.append(listing)
|
||
return matches
|
||
|
||
async def _purchase_player_listing(
|
||
self,
|
||
*,
|
||
listing: PlayerListing,
|
||
quantity: int,
|
||
chat_id: int,
|
||
user_id: int,
|
||
) -> str:
|
||
if quantity <= 0:
|
||
return "❌ 购买数量必须大于0"
|
||
if listing.quantity < quantity:
|
||
return "❌ 玩家出售库存不足"
|
||
if listing.user_id == user_id:
|
||
return "❌ 无法购买自己上架的商品"
|
||
|
||
total_price = listing.price * quantity
|
||
config_api = Architecture.Get(WPSConfigAPI)
|
||
buyer_points = config_api.get_user_points(chat_id, user_id)
|
||
if buyer_points < total_price:
|
||
return f"❌ 积分不足,需要 {total_price} 分,当前仅有 {buyer_points} 分"
|
||
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
UPDATE {self.PLAYER_TABLE}
|
||
SET quantity = quantity - ?
|
||
WHERE user_id = ? AND quantity >= ?
|
||
""",
|
||
(quantity, listing.user_id, quantity),
|
||
)
|
||
if cursor.rowcount == 0:
|
||
get_db().conn.rollback()
|
||
return "❌ 玩家出售库存不足或已被其他人购买"
|
||
cursor.execute(
|
||
f"""
|
||
UPDATE {self.PLAYER_TABLE}
|
||
SET status = ?
|
||
WHERE user_id = ? AND quantity <= 0
|
||
""",
|
||
("closed", listing.user_id),
|
||
)
|
||
get_db().conn.commit()
|
||
|
||
buyer_new_points = await config_api.adjust_user_points(
|
||
chat_id, user_id, -total_price, reason="购买玩家商品"
|
||
)
|
||
await config_api.adjust_user_points(
|
||
chat_id,
|
||
listing.user_id,
|
||
total_price,
|
||
reason="玩家出售结算",
|
||
)
|
||
backpack = Architecture.Get(WPSBackpackSystem)
|
||
backpack.add_item(user_id, listing.item_id, quantity)
|
||
|
||
definition = backpack._get_definition(listing.item_id) # type: ignore[attr-defined]
|
||
return (
|
||
f"✅ 成功购买玩家商品 {definition.name} × {quantity},花费 {total_price} 分\n"
|
||
f"当前剩余积分:{buyer_new_points}"
|
||
)
|
||
|
||
def _resolve_system_entries(
|
||
self,
|
||
identifier: str,
|
||
system_entries: List[StoreEntry],
|
||
permanent_entries: List[StoreEntry],
|
||
) -> Dict[str, List[StoreEntry]]:
|
||
identifier_lower = identifier.lower()
|
||
matched_system: List[StoreEntry] = []
|
||
matched_permanent: List[StoreEntry] = []
|
||
for entry in permanent_entries:
|
||
if entry.item_id.lower() == identifier_lower or entry.display_name.lower() == identifier_lower:
|
||
matched_permanent.append(entry)
|
||
for entry in system_entries:
|
||
if entry.item_id.lower() == identifier_lower or entry.display_name.lower() == identifier_lower:
|
||
matched_system.append(entry)
|
||
return {"system": matched_system, "permanent": matched_permanent}
|
||
|
||
def _get_user_listing(self, user_id: int) -> Optional[PlayerListing]:
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
SELECT user_id, item_id, price, quantity, created_at, status
|
||
FROM {self.PLAYER_TABLE}
|
||
WHERE user_id = ?
|
||
""",
|
||
(user_id,),
|
||
)
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return None
|
||
item_id = row["item_id"]
|
||
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
|
||
try:
|
||
definition = backpack._get_definition(item_id) # type: ignore[attr-defined]
|
||
item_name = definition.name
|
||
except Exception:
|
||
item_name = item_id
|
||
return PlayerListing(
|
||
user_id=row["user_id"],
|
||
item_id=item_id,
|
||
item_name=item_name,
|
||
price=row["price"],
|
||
quantity=row["quantity"],
|
||
created_at=self._parse_datetime(row["created_at"]),
|
||
status=row["status"],
|
||
)
|
||
|
||
def _derive_player_price(self, item_id: str) -> int:
|
||
prices = [
|
||
mode.price
|
||
for mode in self._mode_registry.values()
|
||
if mode.item_id == item_id
|
||
]
|
||
if not prices:
|
||
raise ValueError(
|
||
f"Item {item_id} has no registered store mode to derive price"
|
||
)
|
||
return min(prices)
|
||
|
||
def _resolve_item(
|
||
self,
|
||
identifier: str,
|
||
) -> Tuple[Optional[str], Optional["BackpackItemDefinition"]]:
|
||
backpack = Architecture.Get(WPSBackpackSystem)
|
||
identifier_lower = identifier.lower()
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
SELECT item_id
|
||
FROM {WPSBackpackSystem.ITEMS_TABLE}
|
||
WHERE lower(item_id) = ? OR lower(name) = ?
|
||
LIMIT 1
|
||
""",
|
||
(identifier_lower, identifier_lower),
|
||
)
|
||
row = cursor.fetchone()
|
||
item_id = row["item_id"] if row else identifier
|
||
try:
|
||
definition = backpack._get_definition(item_id) # type: ignore[attr-defined]
|
||
return definition.item_id, definition
|
||
except Exception:
|
||
return None, None
|
||
|
||
def _get_user_inventory_quantity(self, user_id: int, item_id: str) -> int:
|
||
backpack = Architecture.Get(WPSBackpackSystem)
|
||
for item in backpack.get_user_items(user_id):
|
||
if item.item_id == item_id:
|
||
return item.quantity
|
||
return 0
|
||
|
||
def get_store_snapshot(self) -> Dict[str, Any]:
|
||
self._ensure_hour_snapshot()
|
||
system_entries, permanent_entries = self._fetch_system_entries()
|
||
player_listings = self._fetch_player_listings()
|
||
return {
|
||
"system": [self._entry_to_dict(entry) for entry in system_entries],
|
||
"permanent": [self._entry_to_dict(entry) for entry in permanent_entries],
|
||
"players": [self._listing_to_dict(listing) for listing in player_listings],
|
||
}
|
||
|
||
def get_user_listing_snapshot(self, user_id: int) -> Optional[Dict[str, Any]]:
|
||
listing = self._get_user_listing(user_id)
|
||
if not listing or listing.status != "active":
|
||
return None
|
||
return self._listing_to_dict(listing)
|
||
|
||
def _entry_to_dict(self, entry: StoreEntry) -> Dict[str, Any]:
|
||
return {
|
||
"entry_id": entry.entry_id,
|
||
"mode_id": entry.mode_id,
|
||
"item_id": entry.item_id,
|
||
"display_name": entry.display_name,
|
||
"price": entry.price,
|
||
"limit_amount": entry.limit_amount,
|
||
"remaining_amount": entry.remaining_amount,
|
||
"source": entry.source.value,
|
||
"created_at": entry.created_at.isoformat(),
|
||
"registry_version": entry.registry_version,
|
||
}
|
||
|
||
def _listing_to_dict(self, listing: PlayerListing) -> Dict[str, Any]:
|
||
return {
|
||
"user_id": listing.user_id,
|
||
"item_id": listing.item_id,
|
||
"price": listing.price,
|
||
"quantity": listing.quantity,
|
||
"created_at": listing.created_at.isoformat(),
|
||
"status": listing.status,
|
||
}
|
||
|
||
# endregion
|
||
|
||
# endregion
|
||
|
||
|
||
class WPSStoreBuyCommand(WPSAPI):
|
||
@override
|
||
def dependencies(self) -> list[type]:
|
||
return [WPSStoreSystem]
|
||
|
||
@override
|
||
def is_enable_plugin(self) -> bool:
|
||
return True
|
||
|
||
@override
|
||
def wake_up(self) -> None:
|
||
logger.Log(
|
||
"Info",
|
||
f"{ConsoleFrontColor.GREEN}WPSStoreBuyCommand 插件已加载{ConsoleFrontColor.RESET}",
|
||
)
|
||
for alias in ["buy", "购买"]:
|
||
self.register_plugin(alias)
|
||
|
||
@override
|
||
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
|
||
message = self.parse_message_after_at(message).strip()
|
||
if not message:
|
||
return await self._send_error(
|
||
"❌ 购买指令格式错误,请使用:`购买 <物品名称或ID> <数量>`",
|
||
chat_id,
|
||
user_id,
|
||
)
|
||
|
||
tokens = [token.strip() for token in message.split() if token.strip()]
|
||
if len(tokens) < 2:
|
||
return await self._send_error(
|
||
"❌ 购买指令格式错误,请使用:`购买 <物品名称或ID> <数量>`",
|
||
chat_id,
|
||
user_id,
|
||
)
|
||
|
||
identifier = " ".join(tokens[:-1]).strip()
|
||
quantity_token = tokens[-1]
|
||
try:
|
||
quantity = int(quantity_token)
|
||
except ValueError:
|
||
return await self._send_error("❌ 购买数量必须是整数", chat_id, user_id)
|
||
|
||
store_api = Architecture.Get(WPSStoreSystem)
|
||
response = await store_api.purchase_item(
|
||
chat_id=chat_id,
|
||
user_id=user_id,
|
||
identifier=identifier,
|
||
quantity=quantity,
|
||
)
|
||
return await self.send_markdown_message(response, chat_id, user_id)
|
||
|
||
async def _send_error(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
|
||
return await self.send_markdown_message(message, chat_id, user_id)
|
||
|
||
|
||
class WPSStoreSellCommand(WPSAPI):
|
||
@override
|
||
def dependencies(self) -> list[type]:
|
||
return [WPSStoreSystem]
|
||
|
||
@override
|
||
def is_enable_plugin(self) -> bool:
|
||
return True
|
||
|
||
@override
|
||
def wake_up(self) -> None:
|
||
logger.Log(
|
||
"Info",
|
||
f"{ConsoleFrontColor.GREEN}WPSStoreSellCommand 插件已加载{ConsoleFrontColor.RESET}",
|
||
)
|
||
for alias in ["sell", "出售"]:
|
||
self.register_plugin(alias)
|
||
|
||
@override
|
||
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
|
||
message = self.parse_message_after_at(message).strip()
|
||
if not message:
|
||
return await self._send_error(
|
||
"❌ 出售指令格式错误,请使用:`出售 <物品名称或ID> <数量> <单价>`",
|
||
chat_id,
|
||
user_id,
|
||
)
|
||
|
||
tokens = [token.strip() for token in message.split() if token.strip()]
|
||
if len(tokens) < 3:
|
||
return await self._send_error(
|
||
"❌ 出售指令格式错误,请使用:`出售 <物品名称或ID> <数量> <单价>`",
|
||
chat_id,
|
||
user_id,
|
||
)
|
||
|
||
identifier = " ".join(tokens[:-2]).strip()
|
||
quantity_token = tokens[-2]
|
||
price_token = tokens[-1]
|
||
if not identifier:
|
||
return await self._send_error(
|
||
"❌ 出售指令格式错误,请提供物品名称或ID",
|
||
chat_id,
|
||
user_id,
|
||
)
|
||
try:
|
||
quantity = int(quantity_token)
|
||
except ValueError:
|
||
return await self._send_error("❌ 出售数量必须是整数", chat_id, user_id)
|
||
try:
|
||
price = int(price_token)
|
||
except ValueError:
|
||
return await self._send_error("❌ 出售单价必须是整数", chat_id, user_id)
|
||
if price < 0:
|
||
return await self._send_error("❌ 出售单价必须是非负整数", chat_id, user_id)
|
||
|
||
store_api = Architecture.Get(WPSStoreSystem)
|
||
response = await store_api.sell_item(
|
||
chat_id=chat_id,
|
||
user_id=user_id,
|
||
identifier=identifier,
|
||
quantity=quantity,
|
||
price=price,
|
||
)
|
||
return await self.send_markdown_message(response, chat_id, user_id)
|
||
|
||
async def _send_error(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
|
||
return await self.send_markdown_message(message, chat_id, user_id)
|
||
|
||
|
||
__all__ = ["WPSStoreSystem"]
|