Files
NewWPSBot/Plugins/WPSAlchemyGame.py

1510 lines
57 KiB
Python
Raw Normal View History

2025-11-09 01:31:12 +08:00
from __future__ import annotations
2025-11-11 18:03:35 +08:00
import json
2025-11-09 01:31:12 +08:00
import random
2025-11-11 00:06:56 +08:00
from collections import defaultdict, Counter
2025-11-09 01:31:12 +08:00
from dataclasses import dataclass
2025-11-11 18:03:35 +08:00
from datetime import datetime, timedelta
2025-11-13 20:31:17 +08:00
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple, override
2025-11-09 01:31:12 +08:00
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
2025-11-11 18:03:35 +08:00
from PWF.CoreModules.database import get_db, STATUS_COMPLETED
from PWF.CoreModules.plugin_interface import DatabaseModel
from PWF.CoreModules.flags import get_internal_debug
2025-11-09 01:31:12 +08:00
2025-11-12 22:58:36 +08:00
from .WPSAPI import GuideEntry, GuideSection, WPSAPI
2025-11-09 01:31:12 +08:00
from .WPSBackpackSystem import (
BackpackItemDefinition,
BackpackItemTier,
WPSBackpackSystem,
)
from .WPSConfigSystem import WPSConfigAPI
from .WPSStoreSystem import WPSStoreSystem
2025-11-09 01:31:12 +08:00
from .WPSFortuneSystem import WPSFortuneSystem
logger: ProjectConfig = Architecture.Get(ProjectConfig)
2025-11-13 20:31:17 +08:00
FORTUNE_COEFF: float = logger.FindItem("alchemy_fortune_coeff", 0.03)
COOLDOWN_MINUTES: int = logger.FindItem("alchemy_cooldown_minutes", 2)
MAX_POINTS_PER_BATCH: int = logger.FindItem("alchemy_max_points_per_batch", 10000)
POINT_ALCHEMY_FEE: int = logger.FindItem("alchemy_point_fee", 100)
POOL_INITIAL_BALANCE: int = logger.FindItem("alchemy_pool_initial_balance", 10000)
REWARD_SLOT_CONFIG = logger.FindItem("alchemy_reward_slots", sorted([
(50,0.01),
(45,0.02),
(40,0.03),
(35,0.05),
(30,0.07),
(20,0.1),
(10,0.2)
], key=lambda x: x[0]))
COMMON_BONUS_RATE: float = logger.FindItem("alchemy_common_bonus_rate", 0.3333333)
COMMON_BONUS_BLACKLIST: Sequence[str] = logger.FindItem("alchemy_common_blacklist", [])
2025-11-09 01:31:12 +08:00
logger.SaveProperties()
def clamp01(value: float) -> float:
return max(0.0, min(1.0, value))
@dataclass(frozen=True)
class AlchemyRecipe:
materials: Tuple[str, str, str]
success_item_id: str
fail_item_id: str
base_success_rate: float
class WPSAlchemyGame(WPSAPI):
ASH_ITEM_ID = "alchemy_ash"
ASH_ITEM_NAME = "炉灰"
SLAG_ITEM_ID = "alchemy_slag"
SLAG_ITEM_NAME = "炉渣"
MAX_BATCH_TIMES = 99
2025-11-09 01:31:12 +08:00
_PHASE_TABLE: List[Tuple[float, float, str, str]] = [
(0.1481481481, 0.0, "爆炸", "💥 炼金反噬,积分化为飞灰……"),
(0.4444444444, 0.5, "失败", "😖 炼金失败,仅保留半数积分。"),
(0.7407407407, 2.0, "成功", "😊 炼金成功,积分翻倍!"),
(0.8888888888, 5.0, "丰厚积分", "😁 运气加成,收获丰厚积分!"),
(0.9629629630, 10.0, "巨额积分", "🤩 巨额积分入账,今日欧气爆棚!"),
(1.0, 100.0, "传说积分", "🌈 传说级好运!积分暴涨一百倍!"),
]
def __init__(self) -> None:
super().__init__()
self._recipes: Dict[Tuple[str, str, str], AlchemyRecipe] = {}
2025-11-11 00:06:56 +08:00
self._material_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._success_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._fail_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
2025-11-09 01:31:12 +08:00
self._fortune_coeff = FORTUNE_COEFF
2025-11-11 18:03:35 +08:00
# 从配置读取冷却时间(分钟)
self._cooldown_minutes = 0 if get_internal_debug() else COOLDOWN_MINUTES
self._cooldown_ms = int(self._cooldown_minutes * 60 * 1000)
2025-11-11 20:45:28 +08:00
# 从配置读取单次积分炼金上限
self._max_points_per_batch = MAX_POINTS_PER_BATCH
2025-11-13 20:31:17 +08:00
self._point_fee = max(1, int(POINT_ALCHEMY_FEE))
self._pool_initial_balance = max(0, int(POOL_INITIAL_BALANCE))
self._reward_slot_config = REWARD_SLOT_CONFIG
self._reward_total_probability = sum(
probability for _, probability in self._reward_slot_config
)
self._common_bonus_rate = clamp01(float(COMMON_BONUS_RATE))
self._common_bonus_blacklist = self._parse_identifier_collection(
COMMON_BONUS_BLACKLIST
)
self._pool_balance_cache: Optional[int] = None
2025-11-11 18:03:35 +08:00
logger.SaveProperties()
2025-11-09 01:31:12 +08:00
2025-11-12 22:58:36 +08:00
def get_guide_subtitle(self) -> str:
return "积分炼金与材料合成系统"
def get_guide_metadata(self) -> Dict[str, str]:
2025-11-13 20:31:17 +08:00
pool_balance = self._get_reward_pool_balance(refresh=True)
2025-11-12 22:58:36 +08:00
return {
"配方数量": str(len(self._recipes)),
"冷却时间(分钟)": str(self._cooldown_minutes),
2025-11-13 20:31:17 +08:00
"单次费用上限": str(self._max_points_per_batch),
"当前奖池": str(pool_balance),
"单次费用": str(self._point_fee),
2025-11-12 22:58:36 +08:00
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "炼金",
"identifier": "炼金",
"description": "投入积分或三件材料,等待冷却后获取结果。",
"metadata": {"别名": "alchemy"},
},
)
def collect_item_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": self.ASH_ITEM_NAME,
"identifier": self.ASH_ITEM_ID,
"description": "炼金失败后获得的基础材料,可再次参与配方或出售。",
},
{
"title": self.SLAG_ITEM_NAME,
"identifier": self.SLAG_ITEM_ID,
"description": "由 `炉灰` 合成,部分园艺/商店配方会引用该物品。",
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "积分炼金",
"description": (
2025-11-13 20:31:17 +08:00
f"`炼金 <次数>` 按次扣除费用(每次 {self._point_fee} 分),"
"所有费用注入奖池,再依据挡位概率领取奖池百分比奖励。"
2025-11-12 22:58:36 +08:00
),
"icon": "💎",
},
{
"title": "材料炼金",
"description": (
"`炼金 <材料1> <材料2> <材料3> [次数]` 支持批量执行,配方信息可通过 `炼金配方` 查询。"
),
"icon": "🧪",
},
{
"title": "冷却与恢复",
"description": (
f"默认冷却 {self._cooldown_minutes} 分钟,任务调度用于结算完成;"
"重启后会自动恢复未结算记录。"
),
"icon": "⏲️",
},
)
def collect_additional_sections(self) -> Sequence[GuideSection]:
sections = list(super().collect_additional_sections())
2025-11-14 00:56:24 +08:00
core_recipes = self._build_core_recipes()
if core_recipes:
2025-11-12 22:58:36 +08:00
sections.append(
GuideSection(
2025-11-14 00:56:24 +08:00
title="基础配方",
entries=core_recipes,
2025-11-12 22:58:36 +08:00
layout="grid",
2025-11-14 00:56:24 +08:00
section_id="alchemy-core",
description="系统内置的基础配方,可在没有额外模块时直接使用。",
2025-11-12 22:58:36 +08:00
)
)
2025-11-14 00:56:24 +08:00
all_recipes = self._build_all_recipe_entries()
if all_recipes:
2025-11-12 22:58:36 +08:00
sections.append(
GuideSection(
2025-11-14 00:56:24 +08:00
title="所有炼金配方",
entries=all_recipes,
2025-11-12 22:58:36 +08:00
layout="list",
2025-11-14 00:56:24 +08:00
section_id="alchemy-all",
description="当前已注册的全部炼金配方列表。",
2025-11-12 22:58:36 +08:00
)
)
return tuple(sections)
def _build_core_recipes(self) -> Sequence[GuideEntry]:
entries: List[GuideEntry] = []
entries.append(
GuideEntry(
title="炉灰 → 炉渣",
identifier=f"{self.ASH_ITEM_ID} × 3",
description="循环利用基础产物,将多余炉灰转化为更稀有的炉渣。",
category="基础配方",
metadata={
"成功率": "100%",
"失败产物": self.ASH_ITEM_ID,
},
icon="🔥",
details=[
{
"type": "list",
"items": [
f"材料:{self.ASH_ITEM_ID} × 3",
f"成功产物:{self.SLAG_ITEM_ID}",
f"失败产物:{self.ASH_ITEM_ID}",
],
}
],
)
)
return entries
2025-11-14 00:56:24 +08:00
def _build_all_recipe_entries(self) -> Sequence[GuideEntry]:
if not self._recipes:
2025-11-12 22:58:36 +08:00
return ()
2025-11-14 00:56:24 +08:00
try:
backpack: Optional[WPSBackpackSystem] = Architecture.Get(WPSBackpackSystem)
except Exception: # pylint: disable=broad-except
backpack = None
name_cache: Dict[str, str] = {}
def resolve_name(item_id: str) -> str:
if not item_id:
return item_id
if item_id in name_cache:
return name_cache[item_id]
display = item_id
if backpack is not None:
try:
definition = backpack._get_definition(item_id) # noqa: SLF001
display = definition.name
except Exception: # pylint: disable=broad-except
display = item_id
name_cache[item_id] = display
return display
def format_materials(materials: Tuple[str, str, str]) -> str:
counter = Counter(materials)
parts: List[str] = []
for item_id in sorted(counter.keys()):
count = counter[item_id]
label = f"{resolve_name(item_id)}{item_id}"
if count > 1:
label += f" × {count}"
parts.append(label)
return " + ".join(parts)
2025-11-12 22:58:36 +08:00
entries: List[GuideEntry] = []
2025-11-14 00:56:24 +08:00
for _materials_key, recipe in sorted(
self._recipes.items(),
key=lambda item: (item[1].success_item_id, item[0]),
):
materials_text = format_materials(recipe.materials)
success_display = resolve_name(recipe.success_item_id)
fail_display = resolve_name(recipe.fail_item_id)
details = [
f"材料:{materials_text}",
f"成功产物:{success_display}{recipe.success_item_id}",
f"失败产物:{fail_display}{recipe.fail_item_id}",
2025-11-12 22:58:36 +08:00
]
2025-11-14 00:56:24 +08:00
2025-11-12 22:58:36 +08:00
entries.append(
GuideEntry(
2025-11-14 00:56:24 +08:00
title=success_display or recipe.success_item_id,
identifier=recipe.success_item_id,
description=f"{materials_text}{success_display}{recipe.success_item_id}",
2025-11-12 22:58:36 +08:00
metadata={
2025-11-14 00:56:24 +08:00
"成功率": f"{recipe.base_success_rate:.2%}",
"失败产物": f"{fail_display}{recipe.fail_item_id}",
2025-11-12 22:58:36 +08:00
},
2025-11-14 00:56:24 +08:00
details=[
{
"type": "list",
"items": [
*details,
f"基础成功率:{recipe.base_success_rate:.2%}",
],
}
],
2025-11-12 22:58:36 +08:00
)
)
2025-11-14 00:56:24 +08:00
return tuple(entries)
2025-11-09 01:31:12 +08:00
@override
def dependencies(self) -> List[type]:
return [WPSAPI, WPSBackpackSystem, WPSConfigAPI, WPSFortuneSystem, WPSStoreSystem]
2025-11-09 01:31:12 +08:00
2025-11-13 20:31:17 +08:00
_POOL_TABLE = "alchemy_reward_pool"
def register_db_model(self) -> DatabaseModel | List[DatabaseModel]:
"""注册炼金记录数据库表与奖池表"""
records_model = DatabaseModel(
2025-11-11 18:03:35 +08:00
table_name="alchemy_records",
column_defs={
"alchemy_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
"user_id": "INTEGER NOT NULL",
"chat_id": "INTEGER NOT NULL",
"alchemy_type": "TEXT NOT NULL",
"input_data": "TEXT NOT NULL",
"start_time": "TEXT NOT NULL",
"expected_end_time": "TEXT NOT NULL",
"status": "TEXT NOT NULL",
"result_data": "TEXT",
"scheduled_task_id": "INTEGER",
},
)
2025-11-13 20:31:17 +08:00
pool_model = DatabaseModel(
table_name=self._POOL_TABLE,
column_defs={
"pool_id": "INTEGER PRIMARY KEY",
"balance": "INTEGER NOT NULL",
"updated_at": "TEXT NOT NULL",
},
)
return [records_model, pool_model]
2025-11-11 18:03:35 +08:00
2025-11-09 01:31:12 +08:00
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSAlchemyGame 插件已加载{ConsoleFrontColor.RESET}",
)
self.register_plugin("alchemy")
self.register_plugin("炼金")
self._register_alchemy_items()
2025-11-13 20:31:17 +08:00
self._ensure_reward_pool()
2025-11-11 18:03:35 +08:00
# 恢复过期炼金
self.recover_overdue_alchemy()
2025-11-09 01:31:12 +08:00
def _register_alchemy_items(self) -> None:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
try:
backpack.register_item(
self.ASH_ITEM_ID,
self.ASH_ITEM_NAME,
BackpackItemTier.COMMON,
2025-11-10 22:30:16 +08:00
"炼金失败时残留的炉灰,可作为低阶材料或出售。",
2025-11-09 01:31:12 +08:00
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉灰物品时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
else:
try:
store_system = Architecture.Get(WPSStoreSystem)
store_system.register_mode(
item_id=self.ASH_ITEM_ID,
price=8,
limit_amount=999,
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉灰商店模式时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
2025-11-09 01:31:12 +08:00
try:
backpack.register_item(
self.SLAG_ITEM_ID,
self.SLAG_ITEM_NAME,
BackpackItemTier.COMMON,
2025-11-10 22:30:16 +08:00
"经高温提炼后的炉渣,可在特殊任务中使用。",
2025-11-09 01:31:12 +08:00
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉渣物品时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
try:
self.register_recipe(
(self.ASH_ITEM_ID, self.ASH_ITEM_ID, self.ASH_ITEM_ID),
self.SLAG_ITEM_ID,
self.ASH_ITEM_ID,
1.0,
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉渣配方时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
def register_recipe(
self,
materials: Tuple[str, str, str]|Sequence[str],
success_item_id: str,
fail_item_id: str,
base_success_rate: float,
) -> None:
if len(materials) != 3:
raise ValueError("炼金配方必须提供三种材料")
sorted_materials = tuple(sorted(mat.strip() for mat in materials))
if any(not material for material in sorted_materials):
raise ValueError("炼金材料 ID 不能为空")
clamped_rate = clamp01(base_success_rate)
if clamped_rate != base_success_rate:
raise ValueError("配方成功率必须位于 0~1 之间")
2025-11-11 00:06:56 +08:00
sorted_key = tuple(sorted_materials)
existing = self._recipes.get(sorted_key)
if existing:
self._unindex_recipe(sorted_key, existing)
2025-11-09 01:31:12 +08:00
recipe = AlchemyRecipe(
2025-11-11 00:06:56 +08:00
materials=sorted_key,
2025-11-09 01:31:12 +08:00
success_item_id=success_item_id.strip(),
fail_item_id=fail_item_id.strip() or self.ASH_ITEM_ID,
base_success_rate=base_success_rate,
)
2025-11-11 00:06:56 +08:00
self._recipes[sorted_key] = recipe
self._index_recipe(sorted_key, recipe)
2025-11-09 01:31:12 +08:00
logger.Log(
"Info",
f"{ConsoleFrontColor.CYAN}已注册炼金配方 {sorted_materials} -> {success_item_id} ({base_success_rate:.2%}){ConsoleFrontColor.RESET}",
)
2025-11-11 00:06:56 +08:00
def _index_recipe(
self, materials_key: Tuple[str, str, str], recipe: AlchemyRecipe
) -> None:
for material in recipe.materials:
self._material_index[material].add(materials_key)
self._success_index[recipe.success_item_id].add(materials_key)
self._fail_index[recipe.fail_item_id].add(materials_key)
def _unindex_recipe(
self, materials_key: Tuple[str, str, str], recipe: AlchemyRecipe
) -> None:
for material in recipe.materials:
material_set = self._material_index.get(material)
if material_set and materials_key in material_set:
material_set.discard(materials_key)
if not material_set:
del self._material_index[material]
success_set = self._success_index.get(recipe.success_item_id)
if success_set and materials_key in success_set:
success_set.discard(materials_key)
if not success_set:
del self._success_index[recipe.success_item_id]
fail_set = self._fail_index.get(recipe.fail_item_id)
if fail_set and materials_key in fail_set:
fail_set.discard(materials_key)
if not fail_set:
del self._fail_index[recipe.fail_item_id]
def get_recipes_using_item(self, item_id: str) -> List[AlchemyRecipe]:
if not item_id:
return []
material_keys = sorted(self._material_index.get(item_id, set()))
return [self._recipes[key] for key in material_keys]
def get_recipes_producing_item(
self, item_id: str
) -> Dict[str, List[AlchemyRecipe]]:
if not item_id:
return {"success": [], "fail": []}
success_keys = sorted(
self._success_index.get(item_id, set()),
key=lambda key: (
-self._recipes[key].base_success_rate,
self._recipes[key].materials,
),
)
fail_keys = sorted(
self._fail_index.get(item_id, set()),
key=lambda key: (
-self._recipes[key].base_success_rate,
self._recipes[key].materials,
),
)
return {
"success": [self._recipes[key] for key in success_keys],
"fail": [self._recipes[key] for key in fail_keys],
}
2025-11-13 20:31:17 +08:00
@staticmethod
def _parse_identifier_collection(raw: object) -> Set[str]:
if raw is None:
return set()
items: Iterable[str]
if isinstance(raw, str):
stripped = raw.strip()
if not stripped:
return set()
try:
parsed = json.loads(stripped)
if isinstance(parsed, (list, tuple, set)):
items = [str(x) for x in parsed]
else:
items = [str(parsed)]
except json.JSONDecodeError:
items = [token.strip() for token in stripped.split(",")]
elif isinstance(raw, Iterable):
items = [str(item) for item in raw]
else:
items = [str(raw)]
normalized = {
item.strip().lower()
for item in items
if isinstance(item, str) and item.strip()
}
return normalized
def _ensure_reward_pool(self) -> None:
cursor = get_db().conn.cursor()
cursor.execute(
f"SELECT balance FROM {self._POOL_TABLE} WHERE pool_id = 1"
)
row = cursor.fetchone()
if row is None:
now = datetime.now().isoformat()
cursor.execute(
f"INSERT INTO {self._POOL_TABLE} (pool_id, balance, updated_at) VALUES (1, ?, ?)",
(self._pool_initial_balance, now),
)
get_db().conn.commit()
self._pool_balance_cache = self._pool_initial_balance
elif self._pool_balance_cache is None:
self._pool_balance_cache = int(row["balance"])
def _get_reward_pool_balance(self, *, refresh: bool = False) -> int:
if refresh or self._pool_balance_cache is None:
self._ensure_reward_pool()
cursor = get_db().conn.cursor()
cursor.execute(
f"SELECT balance FROM {self._POOL_TABLE} WHERE pool_id = 1"
)
row = cursor.fetchone()
balance = int(row["balance"]) if row else 0
self._pool_balance_cache = balance
return self._pool_balance_cache or 0
def _update_reward_pool(self, delta: int) -> int:
self._ensure_reward_pool()
cursor = get_db().conn.cursor()
cursor.execute(
f"UPDATE {self._POOL_TABLE} SET balance = balance + ?, updated_at = ? WHERE pool_id = 1",
(delta, datetime.now().isoformat()),
)
get_db().conn.commit()
return self._get_reward_pool_balance(refresh=True)
def _set_reward_pool(self, balance: int) -> int:
self._ensure_reward_pool()
cursor = get_db().conn.cursor()
cursor.execute(
f"UPDATE {self._POOL_TABLE} SET balance = ?, updated_at = ? WHERE pool_id = 1",
(balance, datetime.now().isoformat()),
)
get_db().conn.commit()
self._pool_balance_cache = balance
return balance
2025-11-09 01:31:12 +08:00
async def callback(
self, message: str, chat_id: int, user_id: int
) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
tokens = [token.strip() for token in payload.split() if token.strip()]
if not tokens:
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
2025-11-11 18:03:35 +08:00
# 处理状态查询命令
if len(tokens) == 1 and tokens[0] in ["状态", "status"]:
response = await self._handle_status_query(chat_id, user_id)
return await self.send_markdown_message(response, chat_id, user_id)
2025-11-09 01:31:12 +08:00
if len(tokens) == 1 and tokens[0].isdigit():
2025-11-13 20:31:17 +08:00
times = int(tokens[0])
2025-11-09 01:31:12 +08:00
response = await self._handle_point_alchemy(
2025-11-13 20:31:17 +08:00
chat_id, user_id, times
2025-11-09 01:31:12 +08:00
)
return await self.send_markdown_message(response, chat_id, user_id)
if len(tokens) >= 3:
materials = tokens[:3]
times = 1
if len(tokens) >= 4:
try:
times = int(tokens[3])
except ValueError:
return await self.send_markdown_message(
"❌ 炼金次数必须是整数", chat_id, user_id
)
response = await self._handle_item_alchemy(
chat_id, user_id, materials, times
)
return await self.send_markdown_message(response, chat_id, user_id)
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
async def _handle_point_alchemy(
2025-11-13 20:31:17 +08:00
self, chat_id: int, user_id: int, times: int
2025-11-09 01:31:12 +08:00
) -> str:
2025-11-13 20:31:17 +08:00
if times <= 0:
return "❌ 炼金次数必须大于 0"
if times > self.MAX_BATCH_TIMES:
return f"❌ 每次最多只能炼金 {self.MAX_BATCH_TIMES}"
2025-11-11 18:03:35 +08:00
# 检查冷却
is_on_cooldown, cooldown_msg = self._check_cooldown(user_id)
if is_on_cooldown:
return cooldown_msg
2025-11-13 20:31:17 +08:00
total_cost = self._point_fee * times
2025-11-09 01:31:12 +08:00
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
current_points = config_api.get_user_points(user_id)
2025-11-13 20:31:17 +08:00
if total_cost > self._max_points_per_batch:
return f"❌ 单次炼金总费用不能超过 {self._max_points_per_batch}"
if current_points < total_cost:
return (
f"❌ 积分不足,需要 {total_cost} 分,当前仅有 {current_points}"
)
2025-11-09 01:31:12 +08:00
2025-11-11 18:03:35 +08:00
# 扣除积分
2025-11-09 01:31:12 +08:00
await config_api.adjust_user_points(
2025-11-13 20:31:17 +08:00
chat_id, user_id, -total_cost, "炼金消耗"
2025-11-09 01:31:12 +08:00
)
2025-11-13 20:31:17 +08:00
# 奖池增加投入
pool_balance = self._update_reward_pool(total_cost)
2025-11-11 18:03:35 +08:00
# 创建炼金记录
2025-11-13 20:31:17 +08:00
input_data = {
"type": "point",
"times": times,
"cost_per_attempt": self._point_fee,
"pool_deposit": total_cost,
}
2025-11-11 18:03:35 +08:00
alchemy_id = self._create_alchemy_record(
user_id, chat_id, "point", input_data
2025-11-09 01:31:12 +08:00
)
2025-11-11 18:03:35 +08:00
# 注册定时任务
task_id = self.register_clock(
2025-11-13 20:31:17 +08:00
self._settle_alchemy_callback,
self._cooldown_ms,
kwargs={
"alchemy_id": alchemy_id,
"user_id": user_id,
"chat_id": chat_id,
},
)
# 更新记录的任务ID
cursor = get_db().conn.cursor()
cursor.execute(
2025-11-13 20:31:17 +08:00
"UPDATE alchemy_records SET scheduled_task_id = ? WHERE alchemy_id = ?",
(task_id, alchemy_id),
)
get_db().conn.commit()
2025-11-11 18:03:35 +08:00
# 计算预计完成时间
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT expected_end_time FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
record = cursor.fetchone()
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
2025-11-13 20:31:17 +08:00
2025-11-11 18:03:35 +08:00
debug_hint = " **[DEBUG模式]**" if get_internal_debug() else ""
2025-11-13 20:31:17 +08:00
2025-11-11 18:03:35 +08:00
time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟"
2025-11-13 20:31:17 +08:00
reward_probability_line = ""
slot_brief_line = ""
if self._reward_slot_config:
reward_probability_line = (
f"- 有奖励挡位总概率:{self._reward_total_probability:.0%}\n"
)
preview = ", ".join(
f"{percent:.0f}%{probability:.2%}"
for percent, probability in self._reward_slot_config[:5]
)
if preview:
slot_brief_line = f"- 挡位示例:{preview}{'' if len(self._reward_slot_config) > 5 else ''}\n"
2025-11-09 01:31:12 +08:00
return (
2025-11-11 18:03:35 +08:00
f"# ⚗️ 炼金开始{debug_hint}\n"
f"- 类型:积分炼金\n"
2025-11-13 20:31:17 +08:00
f"- 炼金次数:`{times}`(每次费用 {self._point_fee} 分)\n"
f"- 本次消耗:`{total_cost}` 分\n"
f"- 当前奖池:`{pool_balance}` 分\n"
2025-11-11 18:03:35 +08:00
f"- 预计耗时:{time_str}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
2025-11-13 20:31:17 +08:00
f"{reward_probability_line}"
f"{slot_brief_line}"
2025-11-11 18:03:35 +08:00
f"- 状态:炼金进行中..."
2025-11-09 01:31:12 +08:00
)
def _draw_point_multiplier(
self, fortune_value: float
) -> Tuple[float, str, str]:
offset = fortune_value * self._fortune_coeff
random_value = random.random()
landing = clamp01(offset + random_value)
for threshold, multiplier, label, text in self._PHASE_TABLE:
if landing <= threshold:
return multiplier, label, text
return self._PHASE_TABLE[-1][1:]
2025-11-13 20:31:17 +08:00
def _draw_reward_slot(
self, fortune_value: float
) -> Tuple[Optional[float], float]:
if not self._reward_slot_config:
return None, 0.0
offset = fortune_value * self._fortune_coeff
landing = clamp01(random.random() + offset)
cumulative = 0.0
for percent, probability in self._reward_slot_config:
cumulative += probability
if landing <= cumulative:
return percent, probability
return None, max(0.0, 1.0 - cumulative)
@staticmethod
def _calculate_slot_reward(pool_balance: int, percent: float) -> int:
if percent <= 0 or pool_balance <= 0:
return 0
reward = pool_balance * (percent / 100.0)
return int(reward)
def _should_drop_common_bonus(self) -> bool:
if self._common_bonus_rate <= 0:
return False
return random.random() < self._common_bonus_rate
def _draw_common_bonus_item(
self, backpack: WPSBackpackSystem
) -> Optional[BackpackItemDefinition]:
candidates = backpack.get_items_by_tier(
BackpackItemTier.COMMON,
blacklist=self._common_bonus_blacklist,
)
if not candidates:
return None
return random.choice(candidates)
2025-11-09 01:31:12 +08:00
async def _handle_item_alchemy(
self,
chat_id: int,
user_id: int,
materials: Sequence[str],
times: int,
) -> str:
if times <= 0:
return "❌ 炼金次数必须大于 0"
if times > self.MAX_BATCH_TIMES:
return f"❌ 每次最多只能炼金 {self.MAX_BATCH_TIMES}"
2025-11-11 18:03:35 +08:00
# 检查冷却
is_on_cooldown, cooldown_msg = self._check_cooldown(user_id)
if is_on_cooldown:
return cooldown_msg
2025-11-09 01:31:12 +08:00
resolved: List[BackpackItemDefinition] = []
for identifier in materials:
resolved_item = self._resolve_item(identifier)
if resolved_item is None:
return f"❌ 未找到材料 `{identifier}`,请确认已注册"
resolved.append(resolved_item)
material_ids = [item.item_id for item in resolved]
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
2025-11-12 17:31:34 +08:00
material_usage = Counter(item.item_id for item in resolved)
name_map = {item.item_id: item.name for item in resolved}
current_quantities: Dict[str, int] = {}
for item_id, single_batch_count in material_usage.items():
required_amount = single_batch_count * times
current = self._get_user_quantity(user_id, item_id)
current_quantities[item_id] = current
if current < required_amount:
item_name = name_map.get(item_id, item_id)
2025-11-09 01:31:12 +08:00
return (
2025-11-12 17:31:34 +08:00
f"❌ 材料 `{item_name}` 数量不足,需要 {required_amount} 个,当前仅有 {current}"
2025-11-09 01:31:12 +08:00
)
2025-11-11 18:03:35 +08:00
# 扣除材料
2025-11-12 17:31:34 +08:00
for item_id, single_batch_count in material_usage.items():
required_amount = single_batch_count * times
2025-11-09 01:31:12 +08:00
backpack.set_item_quantity(
2025-11-12 17:31:34 +08:00
user_id, item_id, current_quantities[item_id] - required_amount
2025-11-09 01:31:12 +08:00
)
2025-11-11 18:03:35 +08:00
# 创建炼金记录
input_data = {
"type": "item",
"materials": material_ids,
"times": times,
}
alchemy_id = self._create_alchemy_record(
user_id, chat_id, "item", input_data
2025-11-09 01:31:12 +08:00
)
2025-11-11 18:03:35 +08:00
# 注册定时任务
task_id = self.register_clock(
self._settle_alchemy_callback,
self._cooldown_ms,
kwargs={
"alchemy_id": alchemy_id,
"user_id": user_id,
"chat_id": chat_id,
},
)
# 更新记录的任务ID
cursor = get_db().conn.cursor()
cursor.execute(
2025-11-11 18:03:35 +08:00
"UPDATE alchemy_records SET scheduled_task_id = ? WHERE alchemy_id = ?",
(task_id, alchemy_id),
)
get_db().conn.commit()
2025-11-09 01:31:12 +08:00
2025-11-11 18:03:35 +08:00
# 计算预计完成时间
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT expected_end_time FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
2025-11-09 01:31:12 +08:00
)
2025-11-11 18:03:35 +08:00
record = cursor.fetchone()
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
debug_hint = " **[DEBUG模式]**" if get_internal_debug() else ""
time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟"
material_names = "".join([item.name for item in resolved])
2025-11-09 01:31:12 +08:00
return (
2025-11-11 18:03:35 +08:00
f"# ⚗️ 炼金开始{debug_hint}\n"
f"- 类型:物品炼金\n"
f"- 投入材料:{material_names} × {times}\n"
f"- 预计耗时:{time_str}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 状态:炼金进行中..."
2025-11-09 01:31:12 +08:00
)
def _resolve_item(
self, identifier: str
) -> Optional[BackpackItemDefinition]:
identifier_lower = identifier.strip().lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(identifier_lower, identifier_lower),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier.strip()
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
try:
2025-11-10 01:15:17 +08:00
return backpack._get_definition(item_id)
2025-11-09 01:31:12 +08:00
except Exception:
return None
def _get_user_quantity(self, user_id: int, item_id: str) -> int:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
for item in backpack.get_user_items(user_id):
if item.item_id == item_id:
return item.quantity
return 0
2025-11-11 18:03:35 +08:00
def _check_cooldown(self, user_id: int) -> Tuple[bool, Optional[str]]:
"""检查用户是否在冷却中"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT alchemy_id, expected_end_time, alchemy_type
FROM alchemy_records
WHERE user_id = ? AND status = 'in_progress'
ORDER BY start_time DESC
LIMIT 1
""",
(user_id,),
)
record = cursor.fetchone()
if not record:
return False, None
expected_end = datetime.fromisoformat(record["expected_end_time"])
now = datetime.now()
if now >= expected_end:
# 已过期,自动结算
try:
self.settle_alchemy(record["alchemy_id"])
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}自动结算过期炼金失败: {e}{ConsoleFrontColor.RESET}",
)
return False, None
# 仍在冷却中
remaining = expected_end - now
remaining_minutes = int(remaining.total_seconds() / 60) + 1
alchemy_type_name = "积分炼金" if record["alchemy_type"] == "point" else "物品炼金"
return True, (
f"❌ 炼金冷却中\n"
f"- 上次炼金类型:{alchemy_type_name}\n"
f"- 预计完成:{expected_end.strftime('%Y-%m-%d %H:%M')}\n"
f"- 剩余时间:约 {remaining_minutes} 分钟\n"
f"- 请等待冷却结束后再试"
)
def _create_alchemy_record(
self, user_id: int, chat_id: int, alchemy_type: str, input_data: Dict
) -> int:
"""创建炼金记录"""
start_time = datetime.now()
expected_end_time = start_time + timedelta(minutes=self._cooldown_minutes)
cursor = get_db().conn.cursor()
cursor.execute(
"""
INSERT INTO alchemy_records
(user_id, chat_id, alchemy_type, input_data, start_time, expected_end_time, status)
VALUES (?, ?, ?, ?, ?, ?, 'in_progress')
""",
(
user_id,
chat_id,
alchemy_type,
json.dumps(input_data),
start_time.isoformat(),
expected_end_time.isoformat(),
),
)
alchemy_id = cursor.lastrowid
get_db().conn.commit()
return alchemy_id
def settle_alchemy(self, alchemy_id: int) -> Tuple[bool, str, Optional[Dict]]:
"""结算炼金"""
import sqlite3
2025-11-11 18:03:35 +08:00
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT * FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
record: sqlite3.Row = cursor.fetchone()
2025-11-11 18:03:35 +08:00
if not record:
return False, "❌ 炼金记录不存在", None
if record["status"] != "in_progress":
return False, f"❌ 炼金已结算(状态:{record['status']}", None
user_id = record["user_id"]
chat_id = record["chat_id"]
alchemy_type = record["alchemy_type"]
input_data = json.loads(record["input_data"])
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
fortune_value = fortune_system.get_fortune_value(user_id)
result_data: Dict = {}
message_lines = ["# 🔮 炼金结算\n"]
try:
if alchemy_type == "point":
2025-11-13 20:31:17 +08:00
# 积分炼金结算(奖池模式)
times = int(input_data.get("times", 1))
cost_per_attempt = int(input_data.get("cost_per_attempt", self._point_fee))
total_cost = cost_per_attempt * times
pool_balance = self._get_reward_pool_balance(refresh=True)
slot_counter: Dict[float, int] = defaultdict(int)
empty_count = 0
total_reward_points = 0
2025-11-11 18:03:35 +08:00
ash_reward = 0
2025-11-13 20:31:17 +08:00
bonus_items: Dict[str, int] = {}
attempt_records: List[Dict[str, Any]] = []
for _ in range(times):
slot_percent, _ = self._draw_reward_slot(fortune_value)
if slot_percent is None:
empty_count += 1
attempt_records.append({"slot": None, "reward": 0})
continue
slot_counter[slot_percent] += 1
reward_points = self._calculate_slot_reward(pool_balance, slot_percent)
if reward_points > 0:
pool_balance = self._update_reward_pool(-reward_points)
total_reward_points += reward_points
config_api.adjust_user_points_sync(
user_id,
reward_points,
f"炼金奖池奖励({slot_percent:.0f}%挡位)",
)
attempt_record: Dict[str, Any] = {
"slot": slot_percent,
"reward": reward_points,
}
if slot_percent <= 0:
backpack.add_item(user_id, self.ASH_ITEM_ID, 1)
ash_reward += 1
attempt_record["ash"] = True
if slot_percent < 50 and self._should_drop_common_bonus():
bonus_item = self._draw_common_bonus_item(backpack)
if bonus_item is not None:
backpack.add_item(user_id, bonus_item.item_id, 1)
bonus_items[bonus_item.item_id] = bonus_items.get(bonus_item.item_id, 0) + 1
attempt_record["bonus_item"] = bonus_item.item_id
attempt_records.append(attempt_record)
2025-11-11 18:03:35 +08:00
final_points = config_api.get_user_points(user_id)
2025-11-13 20:31:17 +08:00
slot_lines: List[str] = []
if slot_counter:
for percent in sorted(slot_counter.keys()):
slot_lines.append(
f" - {percent:.0f}% 挡位:{slot_counter[percent]}"
)
if empty_count > 0:
slot_lines.append(f" - 空白挡位:{empty_count}")
2025-11-11 18:03:35 +08:00
message_lines.extend([
2025-11-13 20:31:17 +08:00
f"- 炼金次数:`{times}`(单次扣除 {cost_per_attempt} 分)\n",
f"- 本次总消耗:`{total_cost}` 分\n",
f"- 奖池剩余:`{pool_balance}` 分\n",
f"- 总计返还:`+{total_reward_points}` 分\n",
f"- 当前积分:`{final_points}`\n",
2025-11-11 18:03:35 +08:00
])
2025-11-13 20:31:17 +08:00
if slot_lines:
message_lines.append("- 挡位统计:\n" + "\n".join(slot_lines))
if ash_reward > 0:
message_lines.append(
f"- 额外获得:{self.ASH_ITEM_NAME} × `{ash_reward}`"
)
if bonus_items:
bonus_lines: List[str] = []
for item_id, count in bonus_items.items():
try:
definition = backpack._get_definition(item_id)
item_name = definition.name
except Exception:
item_name = item_id
bonus_lines.append(f" - {item_name} × {count}")
message_lines.append("\n- 普通物品掉落:\n" + "\n".join(bonus_lines))
2025-11-11 18:03:35 +08:00
result_data = {
2025-11-13 20:31:17 +08:00
"times": times,
"total_cost": total_cost,
"total_reward": total_reward_points,
"slots": {str(percent): count for percent, count in slot_counter.items()},
"empty": empty_count,
2025-11-11 18:03:35 +08:00
"ash_reward": ash_reward,
2025-11-13 20:31:17 +08:00
"bonus_items": bonus_items,
"pool_balance": pool_balance,
"attempts": attempt_records,
2025-11-11 18:03:35 +08:00
}
2025-11-13 20:31:17 +08:00
2025-11-11 18:03:35 +08:00
elif alchemy_type == "item":
# 物品炼金结算
materials = input_data["materials"]
times = input_data["times"]
# 解析材料
resolved: List[BackpackItemDefinition] = []
for material_id in materials:
try:
definition = backpack._get_definition(material_id)
resolved.append(definition)
except Exception:
# 如果材料不存在使用ID
pass
recipe = self._recipes.get(tuple(sorted(materials)))
adjusted_rate = (
clamp01(recipe.base_success_rate + fortune_value * self._fortune_coeff)
if recipe
else 0.0
)
success_count = 0
fail_count = 0
rewards: Dict[str, int] = {}
for _ in range(times):
if recipe and random.random() < adjusted_rate:
reward_id = recipe.success_item_id
success_count += 1
else:
reward_id = (
recipe.fail_item_id if recipe else self.ASH_ITEM_ID
)
fail_count += 1
backpack.add_item(user_id, reward_id, 1)
rewards[reward_id] = rewards.get(reward_id, 0) + 1
details = []
for item_id, count in rewards.items():
try:
definition = backpack._get_definition(item_id)
item_name = definition.name
except Exception:
item_name = item_id
details.append(f"- {item_name} × **{count}**")
success_line = (
f"- 成功次数:`{success_count}`"
if recipe
else "- 成功次数:`0`(未知配方必定失败)"
)
fail_line = (
f"- 失败次数:`{fail_count}`"
if recipe
else f"- 失败次数:`{times}`"
)
rate_line = (
f"- 基础成功率:`{recipe.base_success_rate:.2%}`"
if recipe
else "- ✅ 未知配方仅产出炉灰"
)
rewards_block = "\n".join(details) if details else "- (无物品获得)"
material_names = "".join([item.name for item in resolved])
message_lines.extend([
f"- 投入材料:{material_names} × {times}\n",
f"{success_line}\n",
f"{fail_line}\n",
f"{rate_line}\n",
"- 获得物品:\n",
f"{rewards_block}",
])
result_data = {
"success_count": success_count,
"fail_count": fail_count,
"rewards": rewards,
}
else:
return False, f"❌ 未知的炼金类型:{alchemy_type}", None
# 更新记录状态
cursor.execute(
"""
UPDATE alchemy_records
SET status = 'completed', result_data = ?
WHERE alchemy_id = ?
""",
(json.dumps(result_data), alchemy_id),
)
# 更新定时任务状态
scheduled_task_id: int = record["scheduled_task_id"]
2025-11-11 18:03:35 +08:00
if scheduled_task_id:
get_db().update_task_status(scheduled_task_id, STATUS_COMPLETED)
2025-11-11 18:03:35 +08:00
return True, "".join(message_lines), result_data
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}结算炼金失败: {e}{ConsoleFrontColor.RESET}",
)
# 标记为失败
cursor.execute(
"UPDATE alchemy_records SET status = 'failed' WHERE alchemy_id = ?",
(alchemy_id,),
)
get_db().conn.commit()
return False, f"❌ 结算失败:{str(e)}", None
async def _settle_alchemy_callback(
self, alchemy_id: int, user_id: int, chat_id: int
) -> None:
"""炼金结算回调(时钟任务)"""
success, msg, rewards = self.settle_alchemy(alchemy_id)
await self.send_markdown_message(msg, chat_id, user_id)
def _get_user_alchemy_status(self, user_id: int) -> Optional[Dict]:
"""获取用户当前炼金状态"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT * FROM alchemy_records
WHERE user_id = ? AND status = 'in_progress'
ORDER BY start_time DESC
LIMIT 1
""",
(user_id,),
)
record = cursor.fetchone()
return dict(record) if record else None
async def _handle_status_query(self, chat_id: int, user_id: int) -> str:
"""处理状态查询"""
record = self._get_user_alchemy_status(user_id)
if not record:
return (
"# ⚗️ 炼金状态\n"
"- 状态:无进行中的炼金\n"
2025-11-13 20:31:17 +08:00
f"- 当前奖池:`{self._get_reward_pool_balance(refresh=True)}` 分\n"
2025-11-11 18:03:35 +08:00
"- 可以开始新的炼金"
)
alchemy_type = record["alchemy_type"]
alchemy_type_name = "积分炼金" if alchemy_type == "point" else "物品炼金"
start_time = datetime.fromisoformat(record["start_time"])
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
now = datetime.now()
if now >= expected_end_time:
remaining_str = "已完成,等待结算"
else:
remaining = expected_end_time - now
remaining_minutes = int(remaining.total_seconds() / 60) + 1
remaining_str = f"{remaining_minutes} 分钟"
2025-11-13 20:31:17 +08:00
lines = [
"# ⚗️ 炼金状态",
"- 状态:进行中",
f"- 类型:{alchemy_type_name}",
f"- 开始时间:{start_time.strftime('%Y-%m-%d %H:%M')}",
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}",
f"- 剩余时间:{remaining_str}",
]
if alchemy_type == "point":
try:
input_data = json.loads(record["input_data"])
except Exception:
input_data = {}
times = int(input_data.get("times", 1))
cost_per_attempt = int(input_data.get("cost_per_attempt", self._point_fee))
total_cost = cost_per_attempt * times
lines.extend(
[
f"- 计划次数:{times}",
f"- 单次费用:{cost_per_attempt}",
f"- 总投入:{total_cost}",
]
)
lines.append(
f"- 当前奖池:`{self._get_reward_pool_balance(refresh=True)}` 分"
2025-11-11 18:03:35 +08:00
)
2025-11-13 20:31:17 +08:00
return "\n".join(lines)
2025-11-11 18:03:35 +08:00
def recover_overdue_alchemy(self) -> None:
"""恢复过期但未结算的炼金"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT alchemy_id FROM alchemy_records
WHERE status = 'in_progress' AND expected_end_time < ?
""",
(datetime.now().isoformat(),),
)
overdue_records = cursor.fetchall()
for record in overdue_records:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}发现过期炼金 {record['alchemy_id']},执行恢复结算{ConsoleFrontColor.RESET}",
)
try:
self.settle_alchemy(record["alchemy_id"])
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}恢复炼金 {record['alchemy_id']} 失败: {e}{ConsoleFrontColor.RESET}",
)
if overdue_records:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}恢复了 {len(overdue_records)} 个过期炼金{ConsoleFrontColor.RESET}",
)
2025-11-09 01:31:12 +08:00
def _help_message(self) -> str:
return (
"# ⚗️ 炼金指令帮助\n"
2025-11-13 20:31:17 +08:00
f"- `炼金 <次数>`:按次数扣费参与积分炼金(每次 {self._point_fee} 分,单次总费用不超过 {self._max_points_per_batch} 分)\n"
2025-11-09 01:31:12 +08:00
"- `炼金 <材料1> <材料2> <材料3> [次数]`:使用三件材料进行炼金(可选次数,默认 1\n"
2025-11-11 18:03:35 +08:00
"- `炼金 状态`:查询当前炼金状态\n"
2025-11-13 20:31:17 +08:00
"> 积分炼金费用将进入奖池,根据挡位概率抽取奖池奖励;部分挡位会额外掉落普通物品。"
2025-11-11 00:06:56 +08:00
)
class WPSAlchemyRecipeLookup(WPSAPI):
def __init__(self) -> None:
super().__init__()
self._alchemy: Optional[WPSAlchemyGame] = None
self._backpack: Optional[WPSBackpackSystem] = None
def get_guide_subtitle(self) -> str:
return "查询指定物品涉及的炼金配方"
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "炼金配方",
"identifier": "炼金配方",
"description": "展示物品作为材料、成功产物或失败产物的所有配方。",
"metadata": {"别名": "alchemy_recipe"},
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "查询格式",
"description": "`炼金配方 <物品ID>` 或 `炼金配方 <物品名称>`,忽略大小写。",
},
{
"title": "输出结构",
"description": "结果按材料/成功/失败三类分组,列出配方材料与成功率。",
},
)
def dependencies(self) -> List[type]:
return [WPSAlchemyGame, WPSBackpackSystem]
def is_enable_plugin(self) -> bool:
return True
def wake_up(self) -> None:
self._alchemy = Architecture.Get(WPSAlchemyGame)
self._backpack = Architecture.Get(WPSBackpackSystem)
self.register_plugin("炼金配方")
self.register_plugin("alchemy_recipe")
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSAlchemyRecipeLookup 插件已加载{ConsoleFrontColor.RESET}",
)
async def callback(
self, message: str, chat_id: int, user_id: int
) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message(self._help_text(), chat_id, user_id)
backpack = self._backpack or Architecture.Get(WPSBackpackSystem)
definition = self._resolve_definition(payload, backpack)
if definition is None:
return await self.send_markdown_message(
f"❌ 未找到物品 `{payload}`,请确认输入的物品 ID 或名称。",
chat_id,
user_id,
)
alchemy = self._alchemy or Architecture.Get(WPSAlchemyGame)
material_recipes = alchemy.get_recipes_using_item(definition.item_id)
produce_map = alchemy.get_recipes_producing_item(definition.item_id)
success_recipes = produce_map["success"]
fail_recipes = produce_map["fail"]
message_text = self._format_markdown(
definition,
material_recipes,
success_recipes,
fail_recipes,
backpack,
)
return await self.send_markdown_message(message_text, chat_id, user_id)
def _resolve_definition(
self, identifier: str, backpack: WPSBackpackSystem
) -> Optional[BackpackItemDefinition]:
lowered = identifier.strip().lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(lowered, lowered),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier.strip()
try:
return backpack._get_definition(item_id) # noqa: SLF001
except Exception:
return None
def _format_markdown(
self,
target: BackpackItemDefinition,
material_recipes: List[AlchemyRecipe],
success_recipes: List[AlchemyRecipe],
fail_recipes: List[AlchemyRecipe],
backpack: WPSBackpackSystem,
) -> str:
lines = [
f"# 🔍 炼金配方查询|{target.name}",
f"- 物品 ID`{target.item_id}`",
"---",
]
lines.append("## 作为配方材料")
lines.extend(
self._format_recipe_entries(material_recipes, backpack)
or ["- 暂无记录"]
)
lines.append("\n## 作为成功产物")
lines.extend(
self._format_recipe_entries(success_recipes, backpack, role="success")
or ["- 暂无记录"]
)
lines.append("\n## 作为失败产物")
lines.extend(
self._format_recipe_entries(fail_recipes, backpack, role="fail")
or ["- 暂无记录"]
)
return "\n".join(lines)
def _format_recipe_entries(
self,
recipes: List[AlchemyRecipe],
backpack: WPSBackpackSystem,
*,
role: str = "material",
) -> List[str]:
if not recipes:
return []
entries: List[str] = []
for recipe in recipes:
materials = self._summarize_materials(recipe, backpack)
success_name = self._resolve_item_name(recipe.success_item_id, backpack)
fail_name = self._resolve_item_name(recipe.fail_item_id, backpack)
rate = f"{recipe.base_success_rate:.0%}"
if role == "material":
entry = (
f"- 材料:{materials}|成功产物:`{success_name}`"
f"失败产物:`{fail_name}`|成功率:{rate}"
)
elif role == "success":
entry = (
f"- 材料:{materials}|成功率:{rate}"
f"失败产物:`{fail_name}`"
)
else:
entry = (
f"- 材料:{materials}|成功率:{rate}"
f"成功产物:`{success_name}`"
)
entries.append(entry)
return entries
def _summarize_materials(
self, recipe: AlchemyRecipe, backpack: WPSBackpackSystem
) -> str:
counter = Counter(recipe.materials)
parts: List[str] = []
for item_id, count in sorted(counter.items()):
name = self._resolve_item_name(item_id, backpack)
if count == 1:
parts.append(f"`{name}`")
else:
parts.append(f"`{name}` × {count}")
return " + ".join(parts)
def _resolve_item_name(
self, item_id: str, backpack: WPSBackpackSystem
) -> str:
try:
definition = backpack._get_definition(item_id) # noqa: SLF001
return definition.name
except Exception:
return item_id
def _help_text(self) -> str:
return (
"# ⚗️ 炼金配方查询帮助\n"
"- `炼金配方 <物品ID>`\n"
"- `炼金配方 <物品名称>`\n"
"> 输入需要精确匹配注册物品,名称不区分大小写。\n"
"> 积分炼金费用将进入奖池,根据挡位概率抽取奖池奖励;部分挡位会额外掉落普通物品。"
)
2025-11-11 00:06:56 +08:00
__all__ = ["WPSAlchemyGame", "WPSAlchemyRecipeLookup"]
2025-11-09 01:31:12 +08:00