Files
NewWPSBot/Plugins/WPSAlchemyGame.py
2025-11-12 22:58:36 +08:00

1256 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import random
from collections import defaultdict, Counter
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Sequence, Set, Tuple, override
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db, STATUS_COMPLETED
from PWF.CoreModules.plugin_interface import DatabaseModel
from PWF.CoreModules.flags import get_internal_debug
from .WPSAPI import GuideEntry, GuideSection, WPSAPI
from .WPSBackpackSystem import (
BackpackItemDefinition,
BackpackItemTier,
WPSBackpackSystem,
)
from .WPSConfigSystem import WPSConfigAPI
from .WPSStoreSystem import WPSStoreSystem
from .WPSFortuneSystem import WPSFortuneSystem
logger: ProjectConfig = Architecture.Get(ProjectConfig)
FORTUNE_COEFF:float = logger.FindItem("alchemy_fortune_coeff", 0.03)
COOLDOWN_MINUTES:int = logger.FindItem("alchemy_cooldown_minutes", 2)
MAX_POINTS_PER_BATCH:int = logger.FindItem("alchemy_max_points_per_batch", 100)
logger.SaveProperties()
def clamp01(value: float) -> float:
return max(0.0, min(1.0, value))
@dataclass(frozen=True)
class AlchemyRecipe:
materials: Tuple[str, str, str]
success_item_id: str
fail_item_id: str
base_success_rate: float
class WPSAlchemyGame(WPSAPI):
ASH_ITEM_ID = "alchemy_ash"
ASH_ITEM_NAME = "炉灰"
SLAG_ITEM_ID = "alchemy_slag"
SLAG_ITEM_NAME = "炉渣"
MAX_BATCH_TIMES = 99
_PHASE_TABLE: List[Tuple[float, float, str, str]] = [
(0.1481481481, 0.0, "爆炸", "💥 炼金反噬,积分化为飞灰……"),
(0.4444444444, 0.5, "失败", "😖 炼金失败,仅保留半数积分。"),
(0.7407407407, 2.0, "成功", "😊 炼金成功,积分翻倍!"),
(0.8888888888, 5.0, "丰厚积分", "😁 运气加成,收获丰厚积分!"),
(0.9629629630, 10.0, "巨额积分", "🤩 巨额积分入账,今日欧气爆棚!"),
(1.0, 100.0, "传说积分", "🌈 传说级好运!积分暴涨一百倍!"),
]
def __init__(self) -> None:
super().__init__()
self._recipes: Dict[Tuple[str, str, str], AlchemyRecipe] = {}
self._material_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._success_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._fail_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._fortune_coeff = FORTUNE_COEFF
# 从配置读取冷却时间(分钟)
self._cooldown_minutes = 0 if get_internal_debug() else COOLDOWN_MINUTES
self._cooldown_ms = int(self._cooldown_minutes * 60 * 1000)
# 从配置读取单次积分炼金上限
self._max_points_per_batch = MAX_POINTS_PER_BATCH
logger.SaveProperties()
def get_guide_subtitle(self) -> str:
return "积分炼金与材料合成系统"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"配方数量": str(len(self._recipes)),
"冷却时间(分钟)": str(self._cooldown_minutes),
"单次积分上限": str(self._max_points_per_batch),
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "炼金",
"identifier": "炼金",
"description": "投入积分或三件材料,等待冷却后获取结果。",
"metadata": {"别名": "alchemy"},
},
)
def collect_item_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": self.ASH_ITEM_NAME,
"identifier": self.ASH_ITEM_ID,
"description": "炼金失败后获得的基础材料,可再次参与配方或出售。",
},
{
"title": self.SLAG_ITEM_NAME,
"identifier": self.SLAG_ITEM_ID,
"description": "由 `炉灰` 合成,部分园艺/商店配方会引用该物品。",
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "积分炼金",
"description": (
f"`炼金 <积分>` 消耗积分尝试炼金,单次上限 {self._max_points_per_batch}"
"结果与运势、阶段概率表 `_PHASE_TABLE` 相关。"
),
"icon": "💎",
},
{
"title": "材料炼金",
"description": (
"`炼金 <材料1> <材料2> <材料3> [次数]` 支持批量执行,配方信息可通过 `炼金配方` 查询。"
),
"icon": "🧪",
},
{
"title": "冷却与恢复",
"description": (
f"默认冷却 {self._cooldown_minutes} 分钟,任务调度用于结算完成;"
"重启后会自动恢复未结算记录。"
),
"icon": "⏲️",
},
)
def collect_additional_sections(self) -> Sequence[GuideSection]:
sections = list(super().collect_additional_sections())
sections.append(
GuideSection(
title="基础配方",
entries=self._build_core_recipes(),
layout="grid",
section_id="alchemy-core",
description="系统内置的基础配方,可在没有额外模块时直接使用。",
)
)
garden_recipes = self._build_garden_wine_recipes()
if garden_recipes:
sections.append(
GuideSection(
title="果酒配方",
entries=garden_recipes,
layout="grid",
section_id="alchemy-garden",
description="菜园系统提供的果酒炼金配方,使用三份果实即可酿造果酒。",
)
)
crystal_recipes = self._build_crystal_chain_recipes()
if crystal_recipes:
sections.append(
GuideSection(
title="水晶链路",
entries=crystal_recipes,
layout="list",
section_id="alchemy-crystal",
description="水晶系统的多阶段炼金链路,最终可获得黑水晶核心。",
)
)
return tuple(sections)
def _build_core_recipes(self) -> Sequence[GuideEntry]:
entries: List[GuideEntry] = []
entries.append(
GuideEntry(
title="炉灰 → 炉渣",
identifier=f"{self.ASH_ITEM_ID} × 3",
description="循环利用基础产物,将多余炉灰转化为更稀有的炉渣。",
category="基础配方",
metadata={
"成功率": "100%",
"失败产物": self.ASH_ITEM_ID,
},
icon="🔥",
details=[
{
"type": "list",
"items": [
f"材料:{self.ASH_ITEM_ID} × 3",
f"成功产物:{self.SLAG_ITEM_ID}",
f"失败产物:{self.ASH_ITEM_ID}",
],
}
],
)
)
return entries
def _build_garden_wine_recipes(self) -> Sequence[GuideEntry]:
try:
from Plugins.WPSGardenSystem.garden_models import GARDEN_CROPS
except ImportError:
return ()
entries: List[GuideEntry] = []
for crop in GARDEN_CROPS.values():
if not crop.wine_item_id:
continue
items = [
f"材料:{crop.fruit_id} × 3",
f"成功产物:{crop.wine_item_id}",
"失败产物garden_item_rot_fruit",
"基础成功率75%",
]
entries.append(
GuideEntry(
title=f"{crop.display_name}果酒",
identifier=f"{crop.fruit_id} ×3",
description=f"使用 {crop.display_name} 的果实炼制同名果酒。",
category="果酒配方",
metadata={
"果酒稀有度": crop.wine_tier or "rare",
},
icon="🍷",
tags=(crop.tier.title(),),
details=[{"type": "list", "items": items}],
)
)
return entries
def _build_crystal_chain_recipes(self) -> Sequence[GuideEntry]:
try:
from Plugins.WPSCrystalSystem.crystal_models import (
DEFAULT_CRYSTAL_COLOR_MAP,
DEFAULT_CRYSTAL_EXCHANGE_ENTRIES,
)
except ImportError:
return ()
entries: List[GuideEntry] = []
for color_def in DEFAULT_CRYSTAL_COLOR_MAP.values():
stage_details: List[str] = []
for stage in color_def.chain_stages:
mats = " + ".join(stage.materials)
stage_details.append(
f"{stage.identifier}{mats}{stage.result_item}(成功率 {stage.base_success_rate*100:.0f}%"
)
stage_details.append(
f"等待阶段:消耗 {', '.join(f'{k}×{v}' for k, v in color_def.wait_stage.consumed_items.items())}"
f"耗时 {color_def.wait_stage.delay_minutes} 分钟"
)
fusion = color_def.final_fusion
stage_details.append(
f"最终融合:{', '.join(fusion.materials)}{fusion.result_item}(成功率 {fusion.base_success_rate*100:.0f}%"
)
entries.append(
GuideEntry(
title=color_def.display_name,
identifier=color_def.color_key,
description="水晶染色与融合的完整链路。",
category="水晶链路",
icon="💠",
details=[{"type": "list", "items": stage_details}],
)
)
for exchange in DEFAULT_CRYSTAL_EXCHANGE_ENTRIES.values():
items = ", ".join(f"{item_id}×{qty}" for item_id, qty in exchange.required_items.items())
entries.append(
GuideEntry(
title=exchange.metadata.get("display_name", exchange.identifier),
identifier=exchange.identifier,
description=f"兑换奖励:{exchange.reward_item}",
category="水晶兑换",
icon="🔁",
details=[f"需求:{items}"],
)
)
return entries
@override
def dependencies(self) -> List[type]:
return [WPSAPI, WPSBackpackSystem, WPSConfigAPI, WPSFortuneSystem, WPSStoreSystem]
@override
def register_db_model(self) -> DatabaseModel:
"""注册炼金记录数据库表"""
return DatabaseModel(
table_name="alchemy_records",
column_defs={
"alchemy_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
"user_id": "INTEGER NOT NULL",
"chat_id": "INTEGER NOT NULL",
"alchemy_type": "TEXT NOT NULL",
"input_data": "TEXT NOT NULL",
"start_time": "TEXT NOT NULL",
"expected_end_time": "TEXT NOT NULL",
"status": "TEXT NOT NULL",
"result_data": "TEXT",
"scheduled_task_id": "INTEGER",
},
)
@override
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSAlchemyGame 插件已加载{ConsoleFrontColor.RESET}",
)
self.register_plugin("alchemy")
self.register_plugin("炼金")
self._register_alchemy_items()
# 恢复过期炼金
self.recover_overdue_alchemy()
def _register_alchemy_items(self) -> None:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
try:
backpack.register_item(
self.ASH_ITEM_ID,
self.ASH_ITEM_NAME,
BackpackItemTier.COMMON,
"炼金失败时残留的炉灰,可作为低阶材料或出售。",
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉灰物品时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
else:
try:
store_system = Architecture.Get(WPSStoreSystem)
store_system.register_mode(
item_id=self.ASH_ITEM_ID,
price=8,
limit_amount=999,
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉灰商店模式时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
try:
backpack.register_item(
self.SLAG_ITEM_ID,
self.SLAG_ITEM_NAME,
BackpackItemTier.COMMON,
"经高温提炼后的炉渣,可在特殊任务中使用。",
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉渣物品时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
try:
self.register_recipe(
(self.ASH_ITEM_ID, self.ASH_ITEM_ID, self.ASH_ITEM_ID),
self.SLAG_ITEM_ID,
self.ASH_ITEM_ID,
1.0,
)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}注册炉渣配方时出现问题: {exc}{ConsoleFrontColor.RESET}",
)
def register_recipe(
self,
materials: Tuple[str, str, str]|Sequence[str],
success_item_id: str,
fail_item_id: str,
base_success_rate: float,
) -> None:
if len(materials) != 3:
raise ValueError("炼金配方必须提供三种材料")
sorted_materials = tuple(sorted(mat.strip() for mat in materials))
if any(not material for material in sorted_materials):
raise ValueError("炼金材料 ID 不能为空")
clamped_rate = clamp01(base_success_rate)
if clamped_rate != base_success_rate:
raise ValueError("配方成功率必须位于 0~1 之间")
sorted_key = tuple(sorted_materials)
existing = self._recipes.get(sorted_key)
if existing:
self._unindex_recipe(sorted_key, existing)
recipe = AlchemyRecipe(
materials=sorted_key,
success_item_id=success_item_id.strip(),
fail_item_id=fail_item_id.strip() or self.ASH_ITEM_ID,
base_success_rate=base_success_rate,
)
self._recipes[sorted_key] = recipe
self._index_recipe(sorted_key, recipe)
logger.Log(
"Info",
f"{ConsoleFrontColor.CYAN}已注册炼金配方 {sorted_materials} -> {success_item_id} ({base_success_rate:.2%}){ConsoleFrontColor.RESET}",
)
def _index_recipe(
self, materials_key: Tuple[str, str, str], recipe: AlchemyRecipe
) -> None:
for material in recipe.materials:
self._material_index[material].add(materials_key)
self._success_index[recipe.success_item_id].add(materials_key)
self._fail_index[recipe.fail_item_id].add(materials_key)
def _unindex_recipe(
self, materials_key: Tuple[str, str, str], recipe: AlchemyRecipe
) -> None:
for material in recipe.materials:
material_set = self._material_index.get(material)
if material_set and materials_key in material_set:
material_set.discard(materials_key)
if not material_set:
del self._material_index[material]
success_set = self._success_index.get(recipe.success_item_id)
if success_set and materials_key in success_set:
success_set.discard(materials_key)
if not success_set:
del self._success_index[recipe.success_item_id]
fail_set = self._fail_index.get(recipe.fail_item_id)
if fail_set and materials_key in fail_set:
fail_set.discard(materials_key)
if not fail_set:
del self._fail_index[recipe.fail_item_id]
def get_recipes_using_item(self, item_id: str) -> List[AlchemyRecipe]:
if not item_id:
return []
material_keys = sorted(self._material_index.get(item_id, set()))
return [self._recipes[key] for key in material_keys]
def get_recipes_producing_item(
self, item_id: str
) -> Dict[str, List[AlchemyRecipe]]:
if not item_id:
return {"success": [], "fail": []}
success_keys = sorted(
self._success_index.get(item_id, set()),
key=lambda key: (
-self._recipes[key].base_success_rate,
self._recipes[key].materials,
),
)
fail_keys = sorted(
self._fail_index.get(item_id, set()),
key=lambda key: (
-self._recipes[key].base_success_rate,
self._recipes[key].materials,
),
)
return {
"success": [self._recipes[key] for key in success_keys],
"fail": [self._recipes[key] for key in fail_keys],
}
async def callback(
self, message: str, chat_id: int, user_id: int
) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
tokens = [token.strip() for token in payload.split() if token.strip()]
if not tokens:
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
# 处理状态查询命令
if len(tokens) == 1 and tokens[0] in ["状态", "status"]:
response = await self._handle_status_query(chat_id, user_id)
return await self.send_markdown_message(response, chat_id, user_id)
if len(tokens) == 1 and tokens[0].isdigit():
points = int(tokens[0])
response = await self._handle_point_alchemy(
chat_id, user_id, points
)
return await self.send_markdown_message(response, chat_id, user_id)
if len(tokens) >= 3:
materials = tokens[:3]
times = 1
if len(tokens) >= 4:
try:
times = int(tokens[3])
except ValueError:
return await self.send_markdown_message(
"❌ 炼金次数必须是整数", chat_id, user_id
)
response = await self._handle_item_alchemy(
chat_id, user_id, materials, times
)
return await self.send_markdown_message(response, chat_id, user_id)
return await self.send_markdown_message(
self._help_message(), chat_id, user_id
)
async def _handle_point_alchemy(
self, chat_id: int, user_id: int, points: int
) -> str:
if points <= 0:
return "❌ 投入积分必须大于 0"
# 检查冷却
is_on_cooldown, cooldown_msg = self._check_cooldown(user_id)
if is_on_cooldown:
return cooldown_msg
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
current_points = config_api.get_user_points(user_id)
if points > self._max_points_per_batch:
return f"❌ 单次炼金积分不能超过 {self._max_points_per_batch}"
if current_points < points:
return f"❌ 积分不足,需要 {points} 分,当前仅有 {current_points}"
# 扣除积分
await config_api.adjust_user_points(
chat_id, user_id, -points, "炼金消耗"
)
# 创建炼金记录
input_data = {"type": "point", "points": points}
alchemy_id = self._create_alchemy_record(
user_id, chat_id, "point", input_data
)
# 注册定时任务
task_id = self.register_clock(
self._settle_alchemy_callback,
self._cooldown_ms,
kwargs={
"alchemy_id": alchemy_id,
"user_id": user_id,
"chat_id": chat_id,
},
)
# 更新记录的任务ID
cursor = get_db().conn.cursor()
cursor.execute(
"UPDATE alchemy_records SET scheduled_task_id = ? WHERE alchemy_id = ?",
(task_id, alchemy_id),
)
get_db().conn.commit()
# 计算预计完成时间
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT expected_end_time FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
record = cursor.fetchone()
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
debug_hint = " **[DEBUG模式]**" if get_internal_debug() else ""
time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟"
return (
f"# ⚗️ 炼金开始{debug_hint}\n"
f"- 类型:积分炼金\n"
f"- 投入积分:`{points}`\n"
f"- 预计耗时:{time_str}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 状态:炼金进行中..."
)
def _draw_point_multiplier(
self, fortune_value: float
) -> Tuple[float, str, str]:
offset = fortune_value * self._fortune_coeff
random_value = random.random()
landing = clamp01(offset + random_value)
for threshold, multiplier, label, text in self._PHASE_TABLE:
if landing <= threshold:
return multiplier, label, text
return self._PHASE_TABLE[-1][1:]
async def _handle_item_alchemy(
self,
chat_id: int,
user_id: int,
materials: Sequence[str],
times: int,
) -> str:
if times <= 0:
return "❌ 炼金次数必须大于 0"
if times > self.MAX_BATCH_TIMES:
return f"❌ 每次最多只能炼金 {self.MAX_BATCH_TIMES}"
# 检查冷却
is_on_cooldown, cooldown_msg = self._check_cooldown(user_id)
if is_on_cooldown:
return cooldown_msg
resolved: List[BackpackItemDefinition] = []
for identifier in materials:
resolved_item = self._resolve_item(identifier)
if resolved_item is None:
return f"❌ 未找到材料 `{identifier}`,请确认已注册"
resolved.append(resolved_item)
material_ids = [item.item_id for item in resolved]
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
material_usage = Counter(item.item_id for item in resolved)
name_map = {item.item_id: item.name for item in resolved}
current_quantities: Dict[str, int] = {}
for item_id, single_batch_count in material_usage.items():
required_amount = single_batch_count * times
current = self._get_user_quantity(user_id, item_id)
current_quantities[item_id] = current
if current < required_amount:
item_name = name_map.get(item_id, item_id)
return (
f"❌ 材料 `{item_name}` 数量不足,需要 {required_amount} 个,当前仅有 {current}"
)
# 扣除材料
for item_id, single_batch_count in material_usage.items():
required_amount = single_batch_count * times
backpack.set_item_quantity(
user_id, item_id, current_quantities[item_id] - required_amount
)
# 创建炼金记录
input_data = {
"type": "item",
"materials": material_ids,
"times": times,
}
alchemy_id = self._create_alchemy_record(
user_id, chat_id, "item", input_data
)
# 注册定时任务
task_id = self.register_clock(
self._settle_alchemy_callback,
self._cooldown_ms,
kwargs={
"alchemy_id": alchemy_id,
"user_id": user_id,
"chat_id": chat_id,
},
)
# 更新记录的任务ID
cursor = get_db().conn.cursor()
cursor.execute(
"UPDATE alchemy_records SET scheduled_task_id = ? WHERE alchemy_id = ?",
(task_id, alchemy_id),
)
get_db().conn.commit()
# 计算预计完成时间
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT expected_end_time FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
record = cursor.fetchone()
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
debug_hint = " **[DEBUG模式]**" if get_internal_debug() else ""
time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟"
material_names = "".join([item.name for item in resolved])
return (
f"# ⚗️ 炼金开始{debug_hint}\n"
f"- 类型:物品炼金\n"
f"- 投入材料:{material_names} × {times}\n"
f"- 预计耗时:{time_str}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 状态:炼金进行中..."
)
def _resolve_item(
self, identifier: str
) -> Optional[BackpackItemDefinition]:
identifier_lower = identifier.strip().lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(identifier_lower, identifier_lower),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier.strip()
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
try:
return backpack._get_definition(item_id)
except Exception:
return None
def _get_user_quantity(self, user_id: int, item_id: str) -> int:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
for item in backpack.get_user_items(user_id):
if item.item_id == item_id:
return item.quantity
return 0
def _check_cooldown(self, user_id: int) -> Tuple[bool, Optional[str]]:
"""检查用户是否在冷却中"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT alchemy_id, expected_end_time, alchemy_type
FROM alchemy_records
WHERE user_id = ? AND status = 'in_progress'
ORDER BY start_time DESC
LIMIT 1
""",
(user_id,),
)
record = cursor.fetchone()
if not record:
return False, None
expected_end = datetime.fromisoformat(record["expected_end_time"])
now = datetime.now()
if now >= expected_end:
# 已过期,自动结算
try:
self.settle_alchemy(record["alchemy_id"])
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}自动结算过期炼金失败: {e}{ConsoleFrontColor.RESET}",
)
return False, None
# 仍在冷却中
remaining = expected_end - now
remaining_minutes = int(remaining.total_seconds() / 60) + 1
alchemy_type_name = "积分炼金" if record["alchemy_type"] == "point" else "物品炼金"
return True, (
f"❌ 炼金冷却中\n"
f"- 上次炼金类型:{alchemy_type_name}\n"
f"- 预计完成:{expected_end.strftime('%Y-%m-%d %H:%M')}\n"
f"- 剩余时间:约 {remaining_minutes} 分钟\n"
f"- 请等待冷却结束后再试"
)
def _create_alchemy_record(
self, user_id: int, chat_id: int, alchemy_type: str, input_data: Dict
) -> int:
"""创建炼金记录"""
start_time = datetime.now()
expected_end_time = start_time + timedelta(minutes=self._cooldown_minutes)
cursor = get_db().conn.cursor()
cursor.execute(
"""
INSERT INTO alchemy_records
(user_id, chat_id, alchemy_type, input_data, start_time, expected_end_time, status)
VALUES (?, ?, ?, ?, ?, ?, 'in_progress')
""",
(
user_id,
chat_id,
alchemy_type,
json.dumps(input_data),
start_time.isoformat(),
expected_end_time.isoformat(),
),
)
alchemy_id = cursor.lastrowid
get_db().conn.commit()
return alchemy_id
def settle_alchemy(self, alchemy_id: int) -> Tuple[bool, str, Optional[Dict]]:
"""结算炼金"""
import sqlite3
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT * FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
record: sqlite3.Row = cursor.fetchone()
if not record:
return False, "❌ 炼金记录不存在", None
if record["status"] != "in_progress":
return False, f"❌ 炼金已结算(状态:{record['status']}", None
user_id = record["user_id"]
chat_id = record["chat_id"]
alchemy_type = record["alchemy_type"]
input_data = json.loads(record["input_data"])
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
fortune_value = fortune_system.get_fortune_value(user_id)
result_data: Dict = {}
message_lines = ["# 🔮 炼金结算\n"]
try:
if alchemy_type == "point":
# 积分炼金结算
points = input_data["points"]
multiplier, phase_label, phase_text = self._draw_point_multiplier(
fortune_value
)
reward = int(points * multiplier)
if reward:
config_api.adjust_user_points_sync(
user_id, reward, f"炼金收益({phase_label}"
)
ash_reward = 0
if multiplier == 0.0:
ash_reward = min(points // 10, 99)
if ash_reward > 0:
backpack.add_item(user_id, self.ASH_ITEM_ID, ash_reward)
final_points = config_api.get_user_points(user_id)
extra_line = ""
if ash_reward > 0:
extra_line = f"- 额外获得:{self.ASH_ITEM_NAME} × `{ash_reward}`\n"
message_lines.extend([
f"- 投入积分:`{points}`\n",
f"- 结果:{phase_text}\n",
f"- 获得倍率:×{multiplier:.1f},返还 `+{reward}` 积分\n",
extra_line,
f"- 当前积分:`{final_points}`",
])
result_data = {
"multiplier": multiplier,
"reward_points": reward,
"ash_reward": ash_reward,
}
elif alchemy_type == "item":
# 物品炼金结算
materials = input_data["materials"]
times = input_data["times"]
# 解析材料
resolved: List[BackpackItemDefinition] = []
for material_id in materials:
try:
definition = backpack._get_definition(material_id)
resolved.append(definition)
except Exception:
# 如果材料不存在使用ID
pass
recipe = self._recipes.get(tuple(sorted(materials)))
adjusted_rate = (
clamp01(recipe.base_success_rate + fortune_value * self._fortune_coeff)
if recipe
else 0.0
)
success_count = 0
fail_count = 0
rewards: Dict[str, int] = {}
for _ in range(times):
if recipe and random.random() < adjusted_rate:
reward_id = recipe.success_item_id
success_count += 1
else:
reward_id = (
recipe.fail_item_id if recipe else self.ASH_ITEM_ID
)
fail_count += 1
backpack.add_item(user_id, reward_id, 1)
rewards[reward_id] = rewards.get(reward_id, 0) + 1
details = []
for item_id, count in rewards.items():
try:
definition = backpack._get_definition(item_id)
item_name = definition.name
except Exception:
item_name = item_id
details.append(f"- {item_name} × **{count}**")
success_line = (
f"- 成功次数:`{success_count}`"
if recipe
else "- 成功次数:`0`(未知配方必定失败)"
)
fail_line = (
f"- 失败次数:`{fail_count}`"
if recipe
else f"- 失败次数:`{times}`"
)
rate_line = (
f"- 基础成功率:`{recipe.base_success_rate:.2%}`"
if recipe
else "- ✅ 未知配方仅产出炉灰"
)
rewards_block = "\n".join(details) if details else "- (无物品获得)"
material_names = "".join([item.name for item in resolved])
message_lines.extend([
f"- 投入材料:{material_names} × {times}\n",
f"{success_line}\n",
f"{fail_line}\n",
f"{rate_line}\n",
"- 获得物品:\n",
f"{rewards_block}",
])
result_data = {
"success_count": success_count,
"fail_count": fail_count,
"rewards": rewards,
}
else:
return False, f"❌ 未知的炼金类型:{alchemy_type}", None
# 更新记录状态
cursor.execute(
"""
UPDATE alchemy_records
SET status = 'completed', result_data = ?
WHERE alchemy_id = ?
""",
(json.dumps(result_data), alchemy_id),
)
# 更新定时任务状态
scheduled_task_id: int = record["scheduled_task_id"]
if scheduled_task_id:
get_db().update_task_status(scheduled_task_id, STATUS_COMPLETED)
return True, "".join(message_lines), result_data
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}结算炼金失败: {e}{ConsoleFrontColor.RESET}",
)
# 标记为失败
cursor.execute(
"UPDATE alchemy_records SET status = 'failed' WHERE alchemy_id = ?",
(alchemy_id,),
)
get_db().conn.commit()
return False, f"❌ 结算失败:{str(e)}", None
async def _settle_alchemy_callback(
self, alchemy_id: int, user_id: int, chat_id: int
) -> None:
"""炼金结算回调(时钟任务)"""
success, msg, rewards = self.settle_alchemy(alchemy_id)
await self.send_markdown_message(msg, chat_id, user_id)
def _get_user_alchemy_status(self, user_id: int) -> Optional[Dict]:
"""获取用户当前炼金状态"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT * FROM alchemy_records
WHERE user_id = ? AND status = 'in_progress'
ORDER BY start_time DESC
LIMIT 1
""",
(user_id,),
)
record = cursor.fetchone()
return dict(record) if record else None
async def _handle_status_query(self, chat_id: int, user_id: int) -> str:
"""处理状态查询"""
record = self._get_user_alchemy_status(user_id)
if not record:
return (
"# ⚗️ 炼金状态\n"
"- 状态:无进行中的炼金\n"
"- 可以开始新的炼金"
)
alchemy_type = record["alchemy_type"]
alchemy_type_name = "积分炼金" if alchemy_type == "point" else "物品炼金"
start_time = datetime.fromisoformat(record["start_time"])
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
now = datetime.now()
if now >= expected_end_time:
remaining_str = "已完成,等待结算"
else:
remaining = expected_end_time - now
remaining_minutes = int(remaining.total_seconds() / 60) + 1
remaining_str = f"{remaining_minutes} 分钟"
return (
"# ⚗️ 炼金状态\n"
f"- 状态:进行中\n"
f"- 类型:{alchemy_type_name}\n"
f"- 开始时间:{start_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 剩余时间:{remaining_str}"
)
def recover_overdue_alchemy(self) -> None:
"""恢复过期但未结算的炼金"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT alchemy_id FROM alchemy_records
WHERE status = 'in_progress' AND expected_end_time < ?
""",
(datetime.now().isoformat(),),
)
overdue_records = cursor.fetchall()
for record in overdue_records:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}发现过期炼金 {record['alchemy_id']},执行恢复结算{ConsoleFrontColor.RESET}",
)
try:
self.settle_alchemy(record["alchemy_id"])
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}恢复炼金 {record['alchemy_id']} 失败: {e}{ConsoleFrontColor.RESET}",
)
if overdue_records:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}恢复了 {len(overdue_records)} 个过期炼金{ConsoleFrontColor.RESET}",
)
def _help_message(self) -> str:
return (
"# ⚗️ 炼金指令帮助\n"
f"- `炼金 <积分>`:投入积分尝试炼金(单次最多 {self._max_points_per_batch} 分)\n"
"- `炼金 <材料1> <材料2> <材料3> [次数]`:使用三件材料进行炼金(可选次数,默认 1\n"
"- `炼金 状态`:查询当前炼金状态\n"
"> 建议提前备足材料及积分,谨慎开启炼金流程。炼金需要等待一定时间后才会获得结果。"
)
class WPSAlchemyRecipeLookup(WPSAPI):
def __init__(self) -> None:
super().__init__()
self._alchemy: Optional[WPSAlchemyGame] = None
self._backpack: Optional[WPSBackpackSystem] = None
def get_guide_subtitle(self) -> str:
return "查询指定物品涉及的炼金配方"
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "炼金配方",
"identifier": "炼金配方",
"description": "展示物品作为材料、成功产物或失败产物的所有配方。",
"metadata": {"别名": "alchemy_recipe"},
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "查询格式",
"description": "`炼金配方 <物品ID>` 或 `炼金配方 <物品名称>`,忽略大小写。",
},
{
"title": "输出结构",
"description": "结果按材料/成功/失败三类分组,列出配方材料与成功率。",
},
)
def dependencies(self) -> List[type]:
return [WPSAlchemyGame, WPSBackpackSystem]
def is_enable_plugin(self) -> bool:
return True
def wake_up(self) -> None:
self._alchemy = Architecture.Get(WPSAlchemyGame)
self._backpack = Architecture.Get(WPSBackpackSystem)
self.register_plugin("炼金配方")
self.register_plugin("alchemy_recipe")
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSAlchemyRecipeLookup 插件已加载{ConsoleFrontColor.RESET}",
)
async def callback(
self, message: str, chat_id: int, user_id: int
) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
if not payload:
return await self.send_markdown_message(self._help_text(), chat_id, user_id)
backpack = self._backpack or Architecture.Get(WPSBackpackSystem)
definition = self._resolve_definition(payload, backpack)
if definition is None:
return await self.send_markdown_message(
f"❌ 未找到物品 `{payload}`,请确认输入的物品 ID 或名称。",
chat_id,
user_id,
)
alchemy = self._alchemy or Architecture.Get(WPSAlchemyGame)
material_recipes = alchemy.get_recipes_using_item(definition.item_id)
produce_map = alchemy.get_recipes_producing_item(definition.item_id)
success_recipes = produce_map["success"]
fail_recipes = produce_map["fail"]
message_text = self._format_markdown(
definition,
material_recipes,
success_recipes,
fail_recipes,
backpack,
)
return await self.send_markdown_message(message_text, chat_id, user_id)
def _resolve_definition(
self, identifier: str, backpack: WPSBackpackSystem
) -> Optional[BackpackItemDefinition]:
lowered = identifier.strip().lower()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
SELECT item_id
FROM {WPSBackpackSystem.ITEMS_TABLE}
WHERE lower(item_id) = ? OR lower(name) = ?
LIMIT 1
""",
(lowered, lowered),
)
row = cursor.fetchone()
item_id = row["item_id"] if row else identifier.strip()
try:
return backpack._get_definition(item_id) # noqa: SLF001
except Exception:
return None
def _format_markdown(
self,
target: BackpackItemDefinition,
material_recipes: List[AlchemyRecipe],
success_recipes: List[AlchemyRecipe],
fail_recipes: List[AlchemyRecipe],
backpack: WPSBackpackSystem,
) -> str:
lines = [
f"# 🔍 炼金配方查询|{target.name}",
f"- 物品 ID`{target.item_id}`",
"---",
]
lines.append("## 作为配方材料")
lines.extend(
self._format_recipe_entries(material_recipes, backpack)
or ["- 暂无记录"]
)
lines.append("\n## 作为成功产物")
lines.extend(
self._format_recipe_entries(success_recipes, backpack, role="success")
or ["- 暂无记录"]
)
lines.append("\n## 作为失败产物")
lines.extend(
self._format_recipe_entries(fail_recipes, backpack, role="fail")
or ["- 暂无记录"]
)
return "\n".join(lines)
def _format_recipe_entries(
self,
recipes: List[AlchemyRecipe],
backpack: WPSBackpackSystem,
*,
role: str = "material",
) -> List[str]:
if not recipes:
return []
entries: List[str] = []
for recipe in recipes:
materials = self._summarize_materials(recipe, backpack)
success_name = self._resolve_item_name(recipe.success_item_id, backpack)
fail_name = self._resolve_item_name(recipe.fail_item_id, backpack)
rate = f"{recipe.base_success_rate:.0%}"
if role == "material":
entry = (
f"- 材料:{materials}|成功产物:`{success_name}`"
f"失败产物:`{fail_name}`|成功率:{rate}"
)
elif role == "success":
entry = (
f"- 材料:{materials}|成功率:{rate}"
f"失败产物:`{fail_name}`"
)
else:
entry = (
f"- 材料:{materials}|成功率:{rate}"
f"成功产物:`{success_name}`"
)
entries.append(entry)
return entries
def _summarize_materials(
self, recipe: AlchemyRecipe, backpack: WPSBackpackSystem
) -> str:
counter = Counter(recipe.materials)
parts: List[str] = []
for item_id, count in sorted(counter.items()):
name = self._resolve_item_name(item_id, backpack)
if count == 1:
parts.append(f"`{name}`")
else:
parts.append(f"`{name}` × {count}")
return " + ".join(parts)
def _resolve_item_name(
self, item_id: str, backpack: WPSBackpackSystem
) -> str:
try:
definition = backpack._get_definition(item_id) # noqa: SLF001
return definition.name
except Exception:
return item_id
def _help_text(self) -> str:
return (
"# ⚗️ 炼金配方查询帮助\n"
"- `炼金配方 <物品ID>`\n"
"- `炼金配方 <物品名称>`\n"
"> 输入需要精确匹配注册物品,名称不区分大小写。"
)
__all__ = ["WPSAlchemyGame", "WPSAlchemyRecipeLookup"]