Files
NewWPSBot/Plugins/WPSAlchemyGame.py

1016 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import random
from collections import defaultdict, Counter
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Sequence, Set, Tuple, override
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db, STATUS_COMPLETED
from PWF.CoreModules.plugin_interface import DatabaseModel
from PWF.CoreModules.flags import get_internal_debug
from .WPSAPI import WPSAPI
from .WPSBackpackSystem import (
BackpackItemDefinition,
BackpackItemTier,
WPSBackpackSystem,
)
from .WPSConfigSystem import WPSConfigAPI
from .WPSStoreSystem import WPSStoreSystem
from .WPSFortuneSystem import WPSFortuneSystem
logger: ProjectConfig = Architecture.Get(ProjectConfig)
FORTUNE_COEFF:float = logger.FindItem("alchemy_fortune_coeff", 0.03)
COOLDOWN_MINUTES:int = logger.FindItem("alchemy_cooldown_minutes", 2)
logger.SaveProperties()
def clamp01(value: float) -> float:
return max(0.0, min(1.0, value))
@dataclass(frozen=True)
class AlchemyRecipe:
materials: Tuple[str, str, str]
success_item_id: str
fail_item_id: str
base_success_rate: float
class WPSAlchemyGame(WPSAPI):
ASH_ITEM_ID = "alchemy_ash"
ASH_ITEM_NAME = "炉灰"
SLAG_ITEM_ID = "alchemy_slag"
SLAG_ITEM_NAME = "炉渣"
MAX_BATCH_TIMES = 99
_PHASE_TABLE: List[Tuple[float, float, str, str]] = [
(0.1481481481, 0.0, "爆炸", "💥 炼金反噬,积分化为飞灰……"),
(0.4444444444, 0.5, "失败", "😖 炼金失败,仅保留半数积分。"),
(0.7407407407, 2.0, "成功", "😊 炼金成功,积分翻倍!"),
(0.8888888888, 5.0, "丰厚积分", "😁 运气加成,收获丰厚积分!"),
(0.9629629630, 10.0, "巨额积分", "🤩 巨额积分入账,今日欧气爆棚!"),
(1.0, 100.0, "传说积分", "🌈 传说级好运!积分暴涨一百倍!"),
]
def __init__(self) -> None:
super().__init__()
self._recipes: Dict[Tuple[str, str, str], AlchemyRecipe] = {}
self._material_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._success_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._fail_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._fortune_coeff = FORTUNE_COEFF
# 从配置读取冷却时间(分钟)
self._cooldown_minutes = 0 if get_internal_debug() else COOLDOWN_MINUTES
self._cooldown_ms = int(self._cooldown_minutes * 60 * 1000)
logger.SaveProperties()
@override
def dependencies(self) -> List[type]:
return [WPSAPI, WPSBackpackSystem, WPSConfigAPI, WPSFortuneSystem, WPSStoreSystem]
@override
def register_db_model(self) -> DatabaseModel:
"""注册炼金记录数据库表"""
return DatabaseModel(
table_name="alchemy_records",
column_defs={
"alchemy_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
"user_id": "INTEGER NOT NULL",
"chat_id": "INTEGER NOT NULL",
"alchemy_type": "TEXT NOT NULL",
"input_data": "TEXT NOT NULL",
"start_time": "TEXT NOT NULL",
"expected_end_time": "TEXT NOT NULL",
"status": "TEXT NOT NULL",
"result_data": "TEXT",
"scheduled_task_id": "INTEGER",
},
)
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSAlchemyGame 插件已加载{ConsoleFrontColor.RESET}",
)
self.register_plugin("alchemy")
self.register_plugin("炼金")
self._register_alchemy_items()
# 恢复过期炼金
self.recover_overdue_alchemy()
def _register_alchemy_items(self) -> None:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
try:
backpack.register_item(
self.ASH_ITEM_ID,
self.ASH_ITEM_NAME,
BackpackItemTier.COMMON,
"炼金失败时残留的炉灰,可作为低阶材料或出售。",
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉灰物品时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
else:
try:
store_system = Architecture.Get(WPSStoreSystem)
store_system.register_mode(
item_id=self.ASH_ITEM_ID,
price=8,
limit_amount=999,
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉灰商店模式时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
try:
backpack.register_item(
self.SLAG_ITEM_ID,
self.SLAG_ITEM_NAME,
BackpackItemTier.COMMON,
"经高温提炼后的炉渣,可在特殊任务中使用。",
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉渣物品时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
try:
self.register_recipe(
(self.ASH_ITEM_ID, self.ASH_ITEM_ID, self.ASH_ITEM_ID),
self.SLAG_ITEM_ID,
self.ASH_ITEM_ID,
1.0,
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉渣配方时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
def register_recipe(
self,
materials: Tuple[str, str, str]|Sequence[str],
success_item_id: str,
fail_item_id: str,
base_success_rate: float,
) -> None:
if len(materials) != 3:
raise ValueError("炼金配方必须提供三种材料")
sorted_materials = tuple(sorted(mat.strip() for mat in materials))
if any(not material for material in sorted_materials):
raise ValueError("炼金材料 ID 不能为空")
clamped_rate = clamp01(base_success_rate)
if clamped_rate != base_success_rate:
raise ValueError("配方成功率必须位于 0~1 之间")
sorted_key = tuple(sorted_materials)
existing = self._recipes.get(sorted_key)
if existing:
self._unindex_recipe(sorted_key, existing)
recipe = AlchemyRecipe(
materials=sorted_key,
success_item_id=success_item_id.strip(),
fail_item_id=fail_item_id.strip() or self.ASH_ITEM_ID,
base_success_rate=base_success_rate,
)
self._recipes[sorted_key] = recipe
self._index_recipe(sorted_key, recipe)
logger.Log(
"Info",
f"{ConsoleFrontColor.CYAN}已注册炼金配方 {sorted_materials} -> {success_item_id} ({base_success_rate:.2%}){ConsoleFrontColor.RESET}",
)
def _index_recipe(
self, materials_key: Tuple[str, str, str], recipe: AlchemyRecipe
) -> None:
for material in recipe.materials:
self._material_index[material].add(materials_key)
self._success_index[recipe.success_item_id].add(materials_key)
self._fail_index[recipe.fail_item_id].add(materials_key)
def _unindex_recipe(
self, materials_key: Tuple[str, str, str], recipe: AlchemyRecipe
) -> None:
for material in recipe.materials:
material_set = self._material_index.get(material)
if material_set and materials_key in material_set:
material_set.discard(materials_key)
if not material_set:
del self._material_index[material]
success_set = self._success_index.get(recipe.success_item_id)
if success_set and materials_key in success_set:
success_set.discard(materials_key)
if not success_set:
del self._success_index[recipe.success_item_id]
fail_set = self._fail_index.get(recipe.fail_item_id)
if fail_set and materials_key in fail_set:
fail_set.discard(materials_key)
if not fail_set:
del self._fail_index[recipe.fail_item_id]
def get_recipes_using_item(self, item_id: str) -> List[AlchemyRecipe]:
if not item_id:
return []
material_keys = sorted(self._material_index.get(item_id, set()))
return [self._recipes[key] for key in material_keys]
def get_recipes_producing_item(
self, item_id: str
) -> Dict[str, List[AlchemyRecipe]]:
if not item_id:
return {"success": [], "fail": []}
success_keys = sorted(
self._success_index.get(item_id, set()),
key=lambda key: (
-self._recipes[key].base_success_rate,
self._recipes[key].materials,
),
)
fail_keys = sorted(
self._fail_index.get(item_id, set()),
key=lambda key: (
-self._recipes[key].base_success_rate,
self._recipes[key].materials,
),
)
return {
"success": [self._recipes[key] for key in success_keys],
"fail": [self._recipes[key] for key in fail_keys],
}
async def callback(
self, message: str, chat_id: int, user_id: int
) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
tokens = [token.strip() for token in payload.split() if token.strip()]
if not tokens:
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
# 处理状态查询命令
if len(tokens) == 1 and tokens[0] in ["状态", "status"]:
response = await self._handle_status_query(chat_id, user_id)
return await self.send_markdown_message(response, chat_id, user_id)
if len(tokens) == 1 and tokens[0].isdigit():
points = int(tokens[0])
response = await self._handle_point_alchemy(
chat_id, user_id, points
)
return await self.send_markdown_message(response, chat_id, user_id)
if len(tokens) >= 3:
materials = tokens[:3]
times = 1
if len(tokens) >= 4:
try:
times = int(tokens[3])
except ValueError:
return await self.send_markdown_message(
"❌ 炼金次数必须是整数", chat_id, user_id
)
response = await self._handle_item_alchemy(
chat_id, user_id, materials, times
)
return await self.send_markdown_message(response, chat_id, user_id)
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
async def _handle_point_alchemy(
self, chat_id: int, user_id: int, points: int
) -> str:
if points <= 0:
return "❌ 投入积分必须大于 0"
# 检查冷却
is_on_cooldown, cooldown_msg = self._check_cooldown(user_id)
if is_on_cooldown:
return cooldown_msg
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
current_points = config_api.get_user_points(user_id)
if current_points < points:
return f"❌ 积分不足,需要 {points} 分,当前仅有 {current_points}"
# 扣除积分
await config_api.adjust_user_points(
chat_id, user_id, -points, "炼金消耗"
)
# 创建炼金记录
input_data = {"type": "point", "points": points}
alchemy_id = self._create_alchemy_record(
user_id, chat_id, "point", input_data
)
# 注册定时任务
task_id = self.register_clock(
self._settle_alchemy_callback,
self._cooldown_ms,
kwargs={
"alchemy_id": alchemy_id,
"user_id": user_id,
"chat_id": chat_id,
},
)
# 更新记录的任务ID
cursor = get_db().conn.cursor()
cursor.execute(
"UPDATE alchemy_records SET scheduled_task_id = ? WHERE alchemy_id = ?",
(task_id, alchemy_id),
)
get_db().conn.commit()
# 计算预计完成时间
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT expected_end_time FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
record = cursor.fetchone()
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
debug_hint = " **[DEBUG模式]**" if get_internal_debug() else ""
time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟"
return (
f"# ⚗️ 炼金开始{debug_hint}\n"
f"- 类型:积分炼金\n"
f"- 投入积分:`{points}`\n"
f"- 预计耗时:{time_str}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 状态:炼金进行中..."
)
def _draw_point_multiplier(
self, fortune_value: float
) -> Tuple[float, str, str]:
offset = fortune_value * self._fortune_coeff
random_value = random.random()
landing = clamp01(offset + random_value)
for threshold, multiplier, label, text in self._PHASE_TABLE:
if landing <= threshold:
return multiplier, label, text
return self._PHASE_TABLE[-1][1:]
async def _handle_item_alchemy(
self,
chat_id: int,
user_id: int,
materials: Sequence[str],
times: int,
) -> str:
if times <= 0:
return "❌ 炼金次数必须大于 0"
if times > self.MAX_BATCH_TIMES:
return f"❌ 每次最多只能炼金 {self.MAX_BATCH_TIMES}"
# 检查冷却
is_on_cooldown, cooldown_msg = self._check_cooldown(user_id)
if is_on_cooldown:
return cooldown_msg
resolved: List[BackpackItemDefinition] = []
for identifier in materials:
resolved_item = self._resolve_item(identifier)
if resolved_item is None:
return f"❌ 未找到材料 `{identifier}`,请确认已注册"
resolved.append(resolved_item)
material_ids = [item.item_id for item in resolved]
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
for item in resolved:
owned = self._get_user_quantity(user_id, item.item_id)
if owned < times:
return (
f"❌ 材料 `{item.name}` 数量不足,需要 {times} 个,当前仅有 {owned}"
)
# 扣除材料
for item in resolved:
current = self._get_user_quantity(user_id, item.item_id)
backpack.set_item_quantity(
user_id, item.item_id, current - times
)
# 创建炼金记录
input_data = {
"type": "item",
"materials": material_ids,
"times": times,
}
alchemy_id = self._create_alchemy_record(
user_id, chat_id, "item", input_data
)
# 注册定时任务
task_id = self.register_clock(
self._settle_alchemy_callback,
self._cooldown_ms,
kwargs={
"alchemy_id": alchemy_id,
"user_id": user_id,
"chat_id": chat_id,
},
)
# 更新记录的任务ID
cursor = get_db().conn.cursor()
cursor.execute(
"UPDATE alchemy_records SET scheduled_task_id = ? WHERE alchemy_id = ?",
(task_id, alchemy_id),
)
get_db().conn.commit()
# 计算预计完成时间
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT expected_end_time FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
record = cursor.fetchone()
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
debug_hint = " **[DEBUG模式]**" if get_internal_debug() else ""
time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟"
material_names = "".join([item.name for item in resolved])
return (
f"# ⚗️ 炼金开始{debug_hint}\n"
f"- 类型:物品炼金\n"
f"- 投入材料:{material_names} × {times}\n"
f"- 预计耗时:{time_str}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 状态:炼金进行中..."
)
def _resolve_item(
self, identifier: str
) -> Optional[BackpackItemDefinition]:
identifier_lower = identifier.strip().lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(identifier_lower, identifier_lower),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier.strip()
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
try:
return backpack._get_definition(item_id)
except Exception:
return None
def _get_user_quantity(self, user_id: int, item_id: str) -> int:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
for item in backpack.get_user_items(user_id):
if item.item_id == item_id:
return item.quantity
return 0
def _check_cooldown(self, user_id: int) -> Tuple[bool, Optional[str]]:
"""检查用户是否在冷却中"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT alchemy_id, expected_end_time, alchemy_type
FROM alchemy_records
WHERE user_id = ? AND status = 'in_progress'
ORDER BY start_time DESC
LIMIT 1
""",
(user_id,),
)
record = cursor.fetchone()
if not record:
return False, None
expected_end = datetime.fromisoformat(record["expected_end_time"])
now = datetime.now()
if now >= expected_end:
# 已过期,自动结算
try:
self.settle_alchemy(record["alchemy_id"])
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}自动结算过期炼金失败: {e}{ConsoleFrontColor.RESET}",
)
return False, None
# 仍在冷却中
remaining = expected_end - now
remaining_minutes = int(remaining.total_seconds() / 60) + 1
alchemy_type_name = "积分炼金" if record["alchemy_type"] == "point" else "物品炼金"
return True, (
f"❌ 炼金冷却中\n"
f"- 上次炼金类型:{alchemy_type_name}\n"
f"- 预计完成:{expected_end.strftime('%Y-%m-%d %H:%M')}\n"
f"- 剩余时间:约 {remaining_minutes} 分钟\n"
f"- 请等待冷却结束后再试"
)
def _create_alchemy_record(
self, user_id: int, chat_id: int, alchemy_type: str, input_data: Dict
) -> int:
"""创建炼金记录"""
start_time = datetime.now()
expected_end_time = start_time + timedelta(minutes=self._cooldown_minutes)
cursor = get_db().conn.cursor()
cursor.execute(
"""
INSERT INTO alchemy_records
(user_id, chat_id, alchemy_type, input_data, start_time, expected_end_time, status)
VALUES (?, ?, ?, ?, ?, ?, 'in_progress')
""",
(
user_id,
chat_id,
alchemy_type,
json.dumps(input_data),
start_time.isoformat(),
expected_end_time.isoformat(),
),
)
alchemy_id = cursor.lastrowid
get_db().conn.commit()
return alchemy_id
def settle_alchemy(self, alchemy_id: int) -> Tuple[bool, str, Optional[Dict]]:
"""结算炼金"""
import sqlite3
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT * FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
record: sqlite3.Row = cursor.fetchone()
if not record:
return False, "❌ 炼金记录不存在", None
if record["status"] != "in_progress":
return False, f"❌ 炼金已结算(状态:{record['status']}", None
user_id = record["user_id"]
chat_id = record["chat_id"]
alchemy_type = record["alchemy_type"]
input_data = json.loads(record["input_data"])
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
fortune_value = fortune_system.get_fortune_value(user_id)
result_data: Dict = {}
message_lines = ["# 🔮 炼金结算\n"]
try:
if alchemy_type == "point":
# 积分炼金结算
points = input_data["points"]
multiplier, phase_label, phase_text = self._draw_point_multiplier(
fortune_value
)
reward = int(points * multiplier)
if reward:
config_api.adjust_user_points_sync(
user_id, reward, f"炼金收益({phase_label}"
)
ash_reward = 0
if multiplier == 0.0:
ash_reward = min(points // 10, 99)
if ash_reward > 0:
backpack.add_item(user_id, self.ASH_ITEM_ID, ash_reward)
final_points = config_api.get_user_points(user_id)
extra_line = ""
if ash_reward > 0:
extra_line = f"- 额外获得:{self.ASH_ITEM_NAME} × `{ash_reward}`\n"
message_lines.extend([
f"- 投入积分:`{points}`\n",
f"- 结果:{phase_text}\n",
f"- 获得倍率:×{multiplier:.1f},返还 `+{reward}` 积分\n",
extra_line,
f"- 当前积分:`{final_points}`",
])
result_data = {
"multiplier": multiplier,
"reward_points": reward,
"ash_reward": ash_reward,
}
elif alchemy_type == "item":
# 物品炼金结算
materials = input_data["materials"]
times = input_data["times"]
# 解析材料
resolved: List[BackpackItemDefinition] = []
for material_id in materials:
try:
definition = backpack._get_definition(material_id)
resolved.append(definition)
except Exception:
# 如果材料不存在使用ID
pass
recipe = self._recipes.get(tuple(sorted(materials)))
adjusted_rate = (
clamp01(recipe.base_success_rate + fortune_value * self._fortune_coeff)
if recipe
else 0.0
)
success_count = 0
fail_count = 0
rewards: Dict[str, int] = {}
for _ in range(times):
if recipe and random.random() < adjusted_rate:
reward_id = recipe.success_item_id
success_count += 1
else:
reward_id = (
recipe.fail_item_id if recipe else self.ASH_ITEM_ID
)
fail_count += 1
backpack.add_item(user_id, reward_id, 1)
rewards[reward_id] = rewards.get(reward_id, 0) + 1
details = []
for item_id, count in rewards.items():
try:
definition = backpack._get_definition(item_id)
item_name = definition.name
except Exception:
item_name = item_id
details.append(f"- {item_name} × **{count}**")
success_line = (
f"- 成功次数:`{success_count}`"
if recipe
else "- 成功次数:`0`(未知配方必定失败)"
)
fail_line = (
f"- 失败次数:`{fail_count}`"
if recipe
else f"- 失败次数:`{times}`"
)
rate_line = (
f"- 基础成功率:`{recipe.base_success_rate:.2%}`"
if recipe
else "- ✅ 未知配方仅产出炉灰"
)
rewards_block = "\n".join(details) if details else "- (无物品获得)"
material_names = "".join([item.name for item in resolved])
message_lines.extend([
f"- 投入材料:{material_names} × {times}\n",
f"{success_line}\n",
f"{fail_line}\n",
f"{rate_line}\n",
"- 获得物品:\n",
f"{rewards_block}",
])
result_data = {
"success_count": success_count,
"fail_count": fail_count,
"rewards": rewards,
}
else:
return False, f"❌ 未知的炼金类型:{alchemy_type}", None
# 更新记录状态
cursor.execute(
"""
UPDATE alchemy_records
SET status = 'completed', result_data = ?
WHERE alchemy_id = ?
""",
(json.dumps(result_data), alchemy_id),
)
# 更新定时任务状态
scheduled_task_id: int = record["scheduled_task_id"]
if scheduled_task_id:
get_db().update_task_status(scheduled_task_id, STATUS_COMPLETED)
return True, "".join(message_lines), result_data
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}结算炼金失败: {e}{ConsoleFrontColor.RESET}",
)
# 标记为失败
cursor.execute(
"UPDATE alchemy_records SET status = 'failed' WHERE alchemy_id = ?",
(alchemy_id,),
)
get_db().conn.commit()
return False, f"❌ 结算失败:{str(e)}", None
async def _settle_alchemy_callback(
self, alchemy_id: int, user_id: int, chat_id: int
) -> None:
"""炼金结算回调(时钟任务)"""
success, msg, rewards = self.settle_alchemy(alchemy_id)
await self.send_markdown_message(msg, chat_id, user_id)
def _get_user_alchemy_status(self, user_id: int) -> Optional[Dict]:
"""获取用户当前炼金状态"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT * FROM alchemy_records
WHERE user_id = ? AND status = 'in_progress'
ORDER BY start_time DESC
LIMIT 1
""",
(user_id,),
)
record = cursor.fetchone()
return dict(record) if record else None
async def _handle_status_query(self, chat_id: int, user_id: int) -> str:
"""处理状态查询"""
record = self._get_user_alchemy_status(user_id)
if not record:
return (
"# ⚗️ 炼金状态\n"
"- 状态:无进行中的炼金\n"
"- 可以开始新的炼金"
)
alchemy_type = record["alchemy_type"]
alchemy_type_name = "积分炼金" if alchemy_type == "point" else "物品炼金"
start_time = datetime.fromisoformat(record["start_time"])
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
now = datetime.now()
if now >= expected_end_time:
remaining_str = "已完成,等待结算"
else:
remaining = expected_end_time - now
remaining_minutes = int(remaining.total_seconds() / 60) + 1
remaining_str = f"{remaining_minutes} 分钟"
return (
"# ⚗️ 炼金状态\n"
f"- 状态:进行中\n"
f"- 类型:{alchemy_type_name}\n"
f"- 开始时间:{start_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 剩余时间:{remaining_str}"
)
def recover_overdue_alchemy(self) -> None:
"""恢复过期但未结算的炼金"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT alchemy_id FROM alchemy_records
WHERE status = 'in_progress' AND expected_end_time < ?
""",
(datetime.now().isoformat(),),
)
overdue_records = cursor.fetchall()
for record in overdue_records:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}发现过期炼金 {record['alchemy_id']},执行恢复结算{ConsoleFrontColor.RESET}",
)
try:
self.settle_alchemy(record["alchemy_id"])
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}恢复炼金 {record['alchemy_id']} 失败: {e}{ConsoleFrontColor.RESET}",
)
if overdue_records:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}恢复了 {len(overdue_records)} 个过期炼金{ConsoleFrontColor.RESET}",
)
def _help_message(self) -> str:
return (
"# ⚗️ 炼金指令帮助\n"
"- `炼金 <积分>`:投入积分尝试炼金\n"
"- `炼金 <材料1> <材料2> <材料3> [次数]`:使用三件材料进行炼金(可选次数,默认 1\n"
"- `炼金 状态`:查询当前炼金状态\n"
"> 建议提前备足材料及积分,谨慎开启炼金流程。炼金需要等待一定时间后才会获得结果。"
)
class WPSAlchemyRecipeLookup(WPSAPI):
def __init__(self) -> None:
super().__init__()
self._alchemy: Optional[WPSAlchemyGame] = None
self._backpack: Optional[WPSBackpackSystem] = None
def dependencies(self) -> List[type]:
return [WPSAlchemyGame, WPSBackpackSystem]
def is_enable_plugin(self) -> bool:
return True
def wake_up(self) -> None:
self._alchemy = Architecture.Get(WPSAlchemyGame)
self._backpack = Architecture.Get(WPSBackpackSystem)
self.register_plugin("炼金配方")
self.register_plugin("alchemy_recipe")
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSAlchemyRecipeLookup 插件已加载{ConsoleFrontColor.RESET}",
)
async def callback(
self, message: str, chat_id: int, user_id: int
) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message(self._help_text(), chat_id, user_id)
backpack = self._backpack or Architecture.Get(WPSBackpackSystem)
definition = self._resolve_definition(payload, backpack)
if definition is None:
return await self.send_markdown_message(
f"❌ 未找到物品 `{payload}`,请确认输入的物品 ID 或名称。",
chat_id,
user_id,
)
alchemy = self._alchemy or Architecture.Get(WPSAlchemyGame)
material_recipes = alchemy.get_recipes_using_item(definition.item_id)
produce_map = alchemy.get_recipes_producing_item(definition.item_id)
success_recipes = produce_map["success"]
fail_recipes = produce_map["fail"]
message_text = self._format_markdown(
definition,
material_recipes,
success_recipes,
fail_recipes,
backpack,
)
return await self.send_markdown_message(message_text, chat_id, user_id)
def _resolve_definition(
self, identifier: str, backpack: WPSBackpackSystem
) -> Optional[BackpackItemDefinition]:
lowered = identifier.strip().lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(lowered, lowered),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier.strip()
try:
return backpack._get_definition(item_id) # noqa: SLF001
except Exception:
return None
def _format_markdown(
self,
target: BackpackItemDefinition,
material_recipes: List[AlchemyRecipe],
success_recipes: List[AlchemyRecipe],
fail_recipes: List[AlchemyRecipe],
backpack: WPSBackpackSystem,
) -> str:
lines = [
f"# 🔍 炼金配方查询|{target.name}",
f"- 物品 ID`{target.item_id}`",
"---",
]
lines.append("## 作为配方材料")
lines.extend(
self._format_recipe_entries(material_recipes, backpack)
or ["- 暂无记录"]
)
lines.append("\n## 作为成功产物")
lines.extend(
self._format_recipe_entries(success_recipes, backpack, role="success")
or ["- 暂无记录"]
)
lines.append("\n## 作为失败产物")
lines.extend(
self._format_recipe_entries(fail_recipes, backpack, role="fail")
or ["- 暂无记录"]
)
return "\n".join(lines)
def _format_recipe_entries(
self,
recipes: List[AlchemyRecipe],
backpack: WPSBackpackSystem,
*,
role: str = "material",
) -> List[str]:
if not recipes:
return []
entries: List[str] = []
for recipe in recipes:
materials = self._summarize_materials(recipe, backpack)
success_name = self._resolve_item_name(recipe.success_item_id, backpack)
fail_name = self._resolve_item_name(recipe.fail_item_id, backpack)
rate = f"{recipe.base_success_rate:.0%}"
if role == "material":
entry = (
f"- 材料:{materials}|成功产物:`{success_name}`"
f"失败产物:`{fail_name}`|成功率:{rate}"
)
elif role == "success":
entry = (
f"- 材料:{materials}|成功率:{rate}"
f"失败产物:`{fail_name}`"
)
else:
entry = (
f"- 材料:{materials}|成功率:{rate}"
f"成功产物:`{success_name}`"
)
entries.append(entry)
return entries
def _summarize_materials(
self, recipe: AlchemyRecipe, backpack: WPSBackpackSystem
) -> str:
counter = Counter(recipe.materials)
parts: List[str] = []
for item_id, count in sorted(counter.items()):
name = self._resolve_item_name(item_id, backpack)
if count == 1:
parts.append(f"`{name}`")
else:
parts.append(f"`{name}` × {count}")
return " + ".join(parts)
def _resolve_item_name(
self, item_id: str, backpack: WPSBackpackSystem
) -> str:
try:
definition = backpack._get_definition(item_id) # noqa: SLF001
return definition.name
except Exception:
return item_id
def _help_text(self) -> str:
return (
"# ⚗️ 炼金配方查询帮助\n"
"- `炼金配方 <物品ID>`\n"
"- `炼金配方 <物品名称>`\n"
"> 输入需要精确匹配注册物品,名称不区分大小写。"
)
__all__ = ["WPSAlchemyGame", "WPSAlchemyRecipeLookup"]