开始添加炼金冷却

This commit is contained in:
2025-11-11 18:03:35 +08:00
parent 4a546e9e4b
commit fae54c966d

View File

@@ -1,13 +1,16 @@
from __future__ import annotations
import json
import random
from collections import defaultdict, Counter
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Sequence, Set, Tuple, override
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
from PWF.CoreModules.database import get_db, STATUS_COMPLETED
from PWF.CoreModules.plugin_interface import DatabaseModel
from .WPSAPI import WPSAPI
from .WPSBackpackSystem import (
@@ -60,11 +63,38 @@ class WPSAlchemyGame(WPSAPI):
self._success_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._fail_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set)
self._fortune_coeff = FORTUNE_COEFF
# 从配置读取冷却时间(分钟)
from PWF.CoreModules.flags import get_internal_debug
cooldown_minutes = logger.FindItem("alchemy_cooldown_minutes", 2)
if get_internal_debug():
cooldown_minutes = 0
self._cooldown_minutes = cooldown_minutes
self._cooldown_ms = int(cooldown_minutes * 60 * 1000)
logger.SaveProperties()
@override
def dependencies(self) -> List[type]:
return [WPSAPI, WPSBackpackSystem, WPSConfigAPI, WPSFortuneSystem, WPSStoreSystem]
@override
def register_db_model(self) -> DatabaseModel:
"""注册炼金记录数据库表"""
return DatabaseModel(
table_name="alchemy_records",
column_defs={
"alchemy_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
"user_id": "INTEGER NOT NULL",
"chat_id": "INTEGER NOT NULL",
"alchemy_type": "TEXT NOT NULL",
"input_data": "TEXT NOT NULL",
"start_time": "TEXT NOT NULL",
"expected_end_time": "TEXT NOT NULL",
"status": "TEXT NOT NULL",
"result_data": "TEXT",
"scheduled_task_id": "INTEGER",
},
)
@override
def wake_up(self) -> None:
logger.Log(
@@ -74,6 +104,8 @@ class WPSAlchemyGame(WPSAPI):
self.register_plugin("alchemy")
self.register_plugin("炼金")
self._register_alchemy_items()
# 恢复过期炼金
self.recover_overdue_alchemy()
def _register_alchemy_items(self) -> None:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
@@ -231,6 +263,11 @@ class WPSAlchemyGame(WPSAPI):
self._help_message(), chat_id, user_id
)
# 处理状态查询命令
if len(tokens) == 1 and tokens[0] in ["状态", "status"]:
response = await self._handle_status_query(chat_id, user_id)
return await self.send_markdown_message(response, chat_id, user_id)
if len(tokens) == 1 and tokens[0].isdigit():
points = int(tokens[0])
response = await self._handle_point_alchemy(
@@ -262,45 +299,73 @@ class WPSAlchemyGame(WPSAPI):
) -> str:
if points <= 0:
return "❌ 投入积分必须大于 0"
# 检查冷却
is_on_cooldown, cooldown_msg = self._check_cooldown(user_id)
if is_on_cooldown:
return cooldown_msg
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
current_points = config_api.get_user_points(user_id)
if current_points < points:
return f"❌ 积分不足,需要 {points} 分,当前仅有 {current_points}"
# 扣除积分
await config_api.adjust_user_points(
chat_id, user_id, -points, "炼金消耗"
)
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
fortune_value = fortune_system.get_fortune_value(user_id)
multiplier, phase_label, phase_text = self._draw_point_multiplier(
fortune_value
# 创建炼金记录
input_data = {"type": "point", "points": points}
alchemy_id = self._create_alchemy_record(
user_id, chat_id, "point", input_data
)
reward = int(points * multiplier)
if reward:
await config_api.adjust_user_points(
chat_id, user_id, reward, f"炼金收益({phase_label}"
)
ash_reward = 0
if multiplier == 0.0:
ash_reward = min(points // 10, 99)
if ash_reward > 0:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
backpack.add_item(user_id, self.ASH_ITEM_ID, ash_reward)
final_points = config_api.get_user_points(user_id)
extra_line = ""
if ash_reward > 0:
extra_line = (
f"- 额外获得:{self.ASH_ITEM_NAME} × `{ash_reward}`\n"
# 注册定时任务
task_id = None
if self._cooldown_ms > 0:
task_id = self.register_clock(
self._settle_alchemy_callback,
self._cooldown_ms,
kwargs={
"alchemy_id": alchemy_id,
"user_id": user_id,
"chat_id": chat_id,
},
)
# 更新记录的任务ID
cursor = get_db().conn.cursor()
cursor.execute(
"UPDATE alchemy_records SET scheduled_task_id = ? WHERE alchemy_id = ?",
(task_id, alchemy_id),
)
get_db().conn.commit()
else:
# Debug模式立即结算
success, msg, rewards = self.settle_alchemy(alchemy_id)
return msg
# 计算预计完成时间
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT expected_end_time FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
record = cursor.fetchone()
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
from PWF.CoreModules.flags import get_internal_debug
debug_hint = " **[DEBUG模式]**" if get_internal_debug() else ""
time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟"
return (
"# 🔮 炼金结算\n"
f"# ⚗️ 炼金开始{debug_hint}\n"
f"- 类型:积分炼金\n"
f"- 投入积分:`{points}`\n"
f"- 结果{phase_text}\n"
f"- 获得倍率:×{multiplier:.1f},返还 `+{reward}` 积分\n"
f"{extra_line}"
f"- 当前积分:`{final_points}`"
f"- 预计耗时{time_str}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 状态:炼金进行中..."
)
def _draw_point_multiplier(
@@ -326,6 +391,11 @@ class WPSAlchemyGame(WPSAPI):
if times > self.MAX_BATCH_TIMES:
return f"❌ 每次最多只能炼金 {self.MAX_BATCH_TIMES}"
# 检查冷却
is_on_cooldown, cooldown_msg = self._check_cooldown(user_id)
if is_on_cooldown:
return cooldown_msg
resolved: List[BackpackItemDefinition] = []
for identifier in materials:
resolved_item = self._resolve_item(identifier)
@@ -342,70 +412,69 @@ class WPSAlchemyGame(WPSAPI):
f"❌ 材料 `{item.name}` 数量不足,需要 {times} 个,当前仅有 {owned}"
)
# 扣除材料
for item in resolved:
current = self._get_user_quantity(user_id, item.item_id)
backpack.set_item_quantity(
user_id, item.item_id, current - times
)
recipe = self._recipes.get(tuple(sorted(material_ids)))
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
fortune_value = fortune_system.get_fortune_value(user_id)
adjusted_rate = (
clamp01(recipe.base_success_rate + fortune_value * self._fortune_coeff)
if recipe
else 0.0
# 创建炼金记录
input_data = {
"type": "item",
"materials": material_ids,
"times": times,
}
alchemy_id = self._create_alchemy_record(
user_id, chat_id, "item", input_data
)
success_count = 0
fail_count = 0
rewards: Dict[str, int] = {}
for _ in range(times):
if recipe and random.random() < adjusted_rate:
reward_id = recipe.success_item_id
success_count += 1
else:
reward_id = (
recipe.fail_item_id if recipe else self.ASH_ITEM_ID
)
fail_count += 1
backpack.add_item(user_id, reward_id, 1)
rewards[reward_id] = rewards.get(reward_id, 0) + 1
# 注册定时任务
task_id = None
if self._cooldown_ms > 0:
task_id = self.register_clock(
self._settle_alchemy_callback,
self._cooldown_ms,
kwargs={
"alchemy_id": alchemy_id,
"user_id": user_id,
"chat_id": chat_id,
},
)
# 更新记录的任务ID
cursor = get_db().conn.cursor()
cursor.execute(
"UPDATE alchemy_records SET scheduled_task_id = ? WHERE alchemy_id = ?",
(task_id, alchemy_id),
)
get_db().conn.commit()
else:
# Debug模式立即结算
success, msg, rewards = self.settle_alchemy(alchemy_id)
return msg
details = []
for item_id, count in rewards.items():
try:
definition = backpack._get_definition(item_id)
item_name = definition.name
except Exception:
item_name = item_id
details.append(f"- {item_name} × **{count}**")
success_line = (
f"- 成功次数:`{success_count}`"
if recipe
else "- 成功次数:`0`(未知配方必定失败)"
# 计算预计完成时间
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT expected_end_time FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
fail_line = (
f"- 失败次数:`{fail_count}`"
if recipe
else f"- 失败次数:`{times}`"
)
rate_line = (
f"- 基础成功率:`{recipe.base_success_rate:.2%}`"
if recipe
else "- ✅ 未知配方仅产出炉灰"
)
rewards_block = "\n".join(details) if details else "- (无物品获得)"
record = cursor.fetchone()
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
from PWF.CoreModules.flags import get_internal_debug
debug_hint = " **[DEBUG模式]**" if get_internal_debug() else ""
time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟"
material_names = "".join([item.name for item in resolved])
return (
"# ⚗️ 物品炼金结果\n"
f"- 投入材料:{''.join([item.name for item in resolved])} × {times}\n"
f"{success_line}\n"
f"{fail_line}\n"
f"{rate_line}\n"
"- 获得物品:\n"
f"{rewards_block}"
f"# ⚗️ 炼金开始{debug_hint}\n"
f"- 类型:物品炼金\n"
f"- 投入材料:{material_names} × {times}\n"
f"- 预计耗时:{time_str}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 状态:炼金进行中..."
)
def _resolve_item(
@@ -437,12 +506,356 @@ class WPSAlchemyGame(WPSAPI):
return item.quantity
return 0
def _check_cooldown(self, user_id: int) -> Tuple[bool, Optional[str]]:
"""检查用户是否在冷却中"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT alchemy_id, expected_end_time, alchemy_type
FROM alchemy_records
WHERE user_id = ? AND status = 'in_progress'
ORDER BY start_time DESC
LIMIT 1
""",
(user_id,),
)
record = cursor.fetchone()
if not record:
return False, None
expected_end = datetime.fromisoformat(record["expected_end_time"])
now = datetime.now()
if now >= expected_end:
# 已过期,自动结算
try:
self.settle_alchemy(record["alchemy_id"])
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}自动结算过期炼金失败: {e}{ConsoleFrontColor.RESET}",
)
return False, None
# 仍在冷却中
remaining = expected_end - now
remaining_minutes = int(remaining.total_seconds() / 60) + 1
alchemy_type_name = "积分炼金" if record["alchemy_type"] == "point" else "物品炼金"
return True, (
f"❌ 炼金冷却中\n"
f"- 上次炼金类型:{alchemy_type_name}\n"
f"- 预计完成:{expected_end.strftime('%Y-%m-%d %H:%M')}\n"
f"- 剩余时间:约 {remaining_minutes} 分钟\n"
f"- 请等待冷却结束后再试"
)
def _create_alchemy_record(
self, user_id: int, chat_id: int, alchemy_type: str, input_data: Dict
) -> int:
"""创建炼金记录"""
start_time = datetime.now()
expected_end_time = start_time + timedelta(minutes=self._cooldown_minutes)
cursor = get_db().conn.cursor()
cursor.execute(
"""
INSERT INTO alchemy_records
(user_id, chat_id, alchemy_type, input_data, start_time, expected_end_time, status)
VALUES (?, ?, ?, ?, ?, ?, 'in_progress')
""",
(
user_id,
chat_id,
alchemy_type,
json.dumps(input_data),
start_time.isoformat(),
expected_end_time.isoformat(),
),
)
alchemy_id = cursor.lastrowid
get_db().conn.commit()
return alchemy_id
def settle_alchemy(self, alchemy_id: int) -> Tuple[bool, str, Optional[Dict]]:
"""结算炼金"""
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT * FROM alchemy_records WHERE alchemy_id = ?",
(alchemy_id,),
)
record = cursor.fetchone()
if not record:
return False, "❌ 炼金记录不存在", None
if record["status"] != "in_progress":
return False, f"❌ 炼金已结算(状态:{record['status']}", None
user_id = record["user_id"]
chat_id = record["chat_id"]
alchemy_type = record["alchemy_type"]
input_data = json.loads(record["input_data"])
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
fortune_value = fortune_system.get_fortune_value(user_id)
result_data: Dict = {}
message_lines = ["# 🔮 炼金结算\n"]
try:
if alchemy_type == "point":
# 积分炼金结算
points = input_data["points"]
multiplier, phase_label, phase_text = self._draw_point_multiplier(
fortune_value
)
reward = int(points * multiplier)
if reward:
config_api.adjust_user_points_sync(
user_id, reward, f"炼金收益({phase_label}"
)
ash_reward = 0
if multiplier == 0.0:
ash_reward = min(points // 10, 99)
if ash_reward > 0:
backpack.add_item(user_id, self.ASH_ITEM_ID, ash_reward)
final_points = config_api.get_user_points(user_id)
extra_line = ""
if ash_reward > 0:
extra_line = f"- 额外获得:{self.ASH_ITEM_NAME} × `{ash_reward}`\n"
message_lines.extend([
f"- 投入积分:`{points}`\n",
f"- 结果:{phase_text}\n",
f"- 获得倍率:×{multiplier:.1f},返还 `+{reward}` 积分\n",
extra_line,
f"- 当前积分:`{final_points}`",
])
result_data = {
"multiplier": multiplier,
"reward_points": reward,
"ash_reward": ash_reward,
}
elif alchemy_type == "item":
# 物品炼金结算
materials = input_data["materials"]
times = input_data["times"]
# 解析材料
resolved: List[BackpackItemDefinition] = []
for material_id in materials:
try:
definition = backpack._get_definition(material_id)
resolved.append(definition)
except Exception:
# 如果材料不存在使用ID
pass
recipe = self._recipes.get(tuple(sorted(materials)))
adjusted_rate = (
clamp01(recipe.base_success_rate + fortune_value * self._fortune_coeff)
if recipe
else 0.0
)
success_count = 0
fail_count = 0
rewards: Dict[str, int] = {}
for _ in range(times):
if recipe and random.random() < adjusted_rate:
reward_id = recipe.success_item_id
success_count += 1
else:
reward_id = (
recipe.fail_item_id if recipe else self.ASH_ITEM_ID
)
fail_count += 1
backpack.add_item(user_id, reward_id, 1)
rewards[reward_id] = rewards.get(reward_id, 0) + 1
details = []
for item_id, count in rewards.items():
try:
definition = backpack._get_definition(item_id)
item_name = definition.name
except Exception:
item_name = item_id
details.append(f"- {item_name} × **{count}**")
success_line = (
f"- 成功次数:`{success_count}`"
if recipe
else "- 成功次数:`0`(未知配方必定失败)"
)
fail_line = (
f"- 失败次数:`{fail_count}`"
if recipe
else f"- 失败次数:`{times}`"
)
rate_line = (
f"- 基础成功率:`{recipe.base_success_rate:.2%}`"
if recipe
else "- ✅ 未知配方仅产出炉灰"
)
rewards_block = "\n".join(details) if details else "- (无物品获得)"
material_names = "".join([item.name for item in resolved])
message_lines.extend([
f"- 投入材料:{material_names} × {times}\n",
f"{success_line}\n",
f"{fail_line}\n",
f"{rate_line}\n",
"- 获得物品:\n",
f"{rewards_block}",
])
result_data = {
"success_count": success_count,
"fail_count": fail_count,
"rewards": rewards,
}
else:
return False, f"❌ 未知的炼金类型:{alchemy_type}", None
# 更新记录状态
cursor.execute(
"""
UPDATE alchemy_records
SET status = 'completed', result_data = ?
WHERE alchemy_id = ?
""",
(json.dumps(result_data), alchemy_id),
)
# 更新定时任务状态
scheduled_task_id = record.get("scheduled_task_id")
if scheduled_task_id:
try:
get_db().update_task_status(int(scheduled_task_id), STATUS_COMPLETED)
except Exception:
pass
get_db().conn.commit()
return True, "".join(message_lines), result_data
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}结算炼金失败: {e}{ConsoleFrontColor.RESET}",
)
# 标记为失败
cursor.execute(
"UPDATE alchemy_records SET status = 'failed' WHERE alchemy_id = ?",
(alchemy_id,),
)
get_db().conn.commit()
return False, f"❌ 结算失败:{str(e)}", None
async def _settle_alchemy_callback(
self, alchemy_id: int, user_id: int, chat_id: int
) -> None:
"""炼金结算回调(时钟任务)"""
success, msg, rewards = self.settle_alchemy(alchemy_id)
await self.send_markdown_message(msg, chat_id, user_id)
def _get_user_alchemy_status(self, user_id: int) -> Optional[Dict]:
"""获取用户当前炼金状态"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT * FROM alchemy_records
WHERE user_id = ? AND status = 'in_progress'
ORDER BY start_time DESC
LIMIT 1
""",
(user_id,),
)
record = cursor.fetchone()
return dict(record) if record else None
async def _handle_status_query(self, chat_id: int, user_id: int) -> str:
"""处理状态查询"""
record = self._get_user_alchemy_status(user_id)
if not record:
return (
"# ⚗️ 炼金状态\n"
"- 状态:无进行中的炼金\n"
"- 可以开始新的炼金"
)
alchemy_type = record["alchemy_type"]
alchemy_type_name = "积分炼金" if alchemy_type == "point" else "物品炼金"
start_time = datetime.fromisoformat(record["start_time"])
expected_end_time = datetime.fromisoformat(record["expected_end_time"])
now = datetime.now()
if now >= expected_end_time:
remaining_str = "已完成,等待结算"
else:
remaining = expected_end_time - now
remaining_minutes = int(remaining.total_seconds() / 60) + 1
remaining_str = f"{remaining_minutes} 分钟"
return (
"# ⚗️ 炼金状态\n"
f"- 状态:进行中\n"
f"- 类型:{alchemy_type_name}\n"
f"- 开始时间:{start_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n"
f"- 剩余时间:{remaining_str}"
)
def recover_overdue_alchemy(self) -> None:
"""恢复过期但未结算的炼金"""
cursor = get_db().conn.cursor()
cursor.execute(
"""
SELECT alchemy_id FROM alchemy_records
WHERE status = 'in_progress' AND expected_end_time < ?
""",
(datetime.now().isoformat(),),
)
overdue_records = cursor.fetchall()
for record in overdue_records:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}发现过期炼金 {record['alchemy_id']},执行恢复结算{ConsoleFrontColor.RESET}",
)
try:
self.settle_alchemy(record["alchemy_id"])
except Exception as e:
logger.Log(
"Error",
f"{ConsoleFrontColor.RED}恢复炼金 {record['alchemy_id']} 失败: {e}{ConsoleFrontColor.RESET}",
)
if overdue_records:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}恢复了 {len(overdue_records)} 个过期炼金{ConsoleFrontColor.RESET}",
)
def _help_message(self) -> str:
return (
"# ⚗️ 炼金指令帮助\n"
"- `炼金 <积分>`:投入积分尝试炼金\n"
"- `炼金 <材料1> <材料2> <材料3> [次数]`:使用三件材料进行炼金(可选次数,默认 1\n"
"> 建议提前备足材料及积分,谨慎开启炼金流程。"
"- `炼金 状态`:查询当前炼金状态\n"
"> 建议提前备足材料及积分,谨慎开启炼金流程。炼金需要等待一定时间后才会获得结果。"
)