from __future__ import annotations import json import random from collections import defaultdict, Counter from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple, override from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from PWF.CoreModules.database import get_db, STATUS_COMPLETED from PWF.CoreModules.plugin_interface import DatabaseModel from PWF.CoreModules.flags import get_internal_debug from .WPSAPI import GuideEntry, GuideSection, WPSAPI from .WPSBackpackSystem import ( BackpackItemDefinition, BackpackItemTier, WPSBackpackSystem, ) from .WPSConfigSystem import WPSConfigAPI from .WPSStoreSystem import WPSStoreSystem from .WPSFortuneSystem import WPSFortuneSystem logger: ProjectConfig = Architecture.Get(ProjectConfig) FORTUNE_COEFF: float = logger.FindItem("alchemy_fortune_coeff", 0.03) COOLDOWN_MINUTES: int = logger.FindItem("alchemy_cooldown_minutes", 2) MAX_POINTS_PER_BATCH: int = logger.FindItem("alchemy_max_points_per_batch", 10000) POINT_ALCHEMY_FEE: int = logger.FindItem("alchemy_point_fee", 100) POOL_INITIAL_BALANCE: int = logger.FindItem("alchemy_pool_initial_balance", 10000) REWARD_SLOT_CONFIG = logger.FindItem("alchemy_reward_slots", sorted([ (50,0.001), (40,0.004), (30,0.016), (20,0.05), (10,0.1), (5,0.15), (1,0.2) ], key=lambda x: x[0])) COMMON_BONUS_RATE: float = logger.FindItem("alchemy_common_bonus_rate", 0.3333333) COMMON_BONUS_BLACKLIST: Sequence[str] = logger.FindItem("alchemy_common_blacklist", []) logger.SaveProperties() def clamp01(value: float) -> float: return max(0.0, min(1.0, value)) @dataclass(frozen=True) class AlchemyRecipe: materials: Tuple[str, str, str] success_item_id: str fail_item_id: str base_success_rate: float class WPSAlchemyGame(WPSAPI): ASH_ITEM_ID = "alchemy_ash" ASH_ITEM_NAME = "炉灰" SLAG_ITEM_ID = "alchemy_slag" SLAG_ITEM_NAME = "炉渣" MAX_BATCH_TIMES = 99 def __init__(self) -> None: super().__init__() self._recipes: Dict[Tuple[str, str, str], AlchemyRecipe] = {} self._material_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set) self._success_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set) self._fail_index: Dict[str, Set[Tuple[str, str, str]]] = defaultdict(set) self._fortune_coeff = FORTUNE_COEFF # 从配置读取冷却时间(分钟) self._cooldown_minutes = 0 if get_internal_debug() else COOLDOWN_MINUTES self._cooldown_ms = int(self._cooldown_minutes * 60 * 1000) # 从配置读取单次积分炼金上限 self._max_points_per_batch = MAX_POINTS_PER_BATCH self._point_fee = max(1, int(POINT_ALCHEMY_FEE)) self._pool_initial_balance = max(0, int(POOL_INITIAL_BALANCE)) self._reward_slot_config = REWARD_SLOT_CONFIG self._reward_total_probability = sum( probability for _, probability in self._reward_slot_config ) self._common_bonus_rate = clamp01(float(COMMON_BONUS_RATE)) self._common_bonus_blacklist = self._parse_identifier_collection( COMMON_BONUS_BLACKLIST ) self._pool_balance_cache: Optional[int] = None logger.SaveProperties() def get_guide_subtitle(self) -> str: return "积分炼金与材料合成系统" def get_guide_metadata(self) -> Dict[str, str]: pool_balance = self._get_reward_pool_balance(refresh=True) return { "配方数量": str(len(self._recipes)), "冷却时间(分钟)": str(self._cooldown_minutes), "单次费用上限": str(self._max_points_per_batch), "当前奖池": str(pool_balance), "单次费用": str(self._point_fee), } def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "炼金", "identifier": "炼金", "description": "投入积分或三件材料,等待冷却后获取结果。", "metadata": {"别名": "alchemy"}, }, ) def collect_item_entries(self) -> Sequence[GuideEntry]: return ( { "title": self.ASH_ITEM_NAME, "identifier": self.ASH_ITEM_ID, "description": "炼金失败后获得的基础材料,可再次参与配方或出售。", }, { "title": self.SLAG_ITEM_NAME, "identifier": self.SLAG_ITEM_ID, "description": "由 `炉灰` 合成,部分园艺/商店配方会引用该物品。", }, ) def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "积分炼金", "description": ( f"`炼金 <次数>` 按次扣除费用(每次 {self._point_fee} 分)," "所有费用注入奖池,再依据挡位概率领取奖池百分比奖励。" ), "icon": "💎", }, { "title": "材料炼金", "description": ( "`炼金 <材料1> <材料2> <材料3> [次数]` 支持批量执行,配方信息可通过 `炼金配方` 查询。" ), "icon": "🧪", }, { "title": "冷却与恢复", "description": ( f"默认冷却 {self._cooldown_minutes} 分钟,任务调度用于结算完成;" "重启后会自动恢复未结算记录。" ), "icon": "⏲️", }, ) def collect_additional_sections(self) -> Sequence[GuideSection]: sections = list(super().collect_additional_sections()) core_recipes = self._build_core_recipes() if core_recipes: sections.append( GuideSection( title="基础配方", entries=core_recipes, layout="grid", section_id="alchemy-core", description="系统内置的基础配方,可在没有额外模块时直接使用。", ) ) all_recipes = self._build_all_recipe_entries() if all_recipes: sections.append( GuideSection( title="所有炼金配方", entries=all_recipes, layout="list", section_id="alchemy-all", description="当前已注册的全部炼金配方列表。", ) ) return tuple(sections) def _build_core_recipes(self) -> Sequence[GuideEntry]: entries: List[GuideEntry] = [] entries.append( GuideEntry( title="炉灰 → 炉渣", identifier=f"{self.ASH_ITEM_ID} × 3", description="循环利用基础产物,将多余炉灰转化为更稀有的炉渣。", category="基础配方", metadata={ "成功率": "100%", "失败产物": self.ASH_ITEM_ID, }, icon="🔥", details=[ { "type": "list", "items": [ f"材料:{self.ASH_ITEM_ID} × 3", f"成功产物:{self.SLAG_ITEM_ID}", f"失败产物:{self.ASH_ITEM_ID}", ], } ], ) ) return entries def _build_all_recipe_entries(self) -> Sequence[GuideEntry]: if not self._recipes: return () try: backpack: Optional[WPSBackpackSystem] = Architecture.Get(WPSBackpackSystem) except Exception: # pylint: disable=broad-except backpack = None name_cache: Dict[str, str] = {} def resolve_name(item_id: str) -> str: if not item_id: return item_id if item_id in name_cache: return name_cache[item_id] display = item_id if backpack is not None: try: definition = backpack._get_definition(item_id) # noqa: SLF001 display = definition.name except Exception: # pylint: disable=broad-except display = item_id name_cache[item_id] = display return display def format_materials(materials: Tuple[str, str, str]) -> str: counter = Counter(materials) parts: List[str] = [] for item_id in sorted(counter.keys()): count = counter[item_id] label = f"{resolve_name(item_id)}({item_id})" if count > 1: label += f" × {count}" parts.append(label) return " + ".join(parts) entries: List[GuideEntry] = [] for _materials_key, recipe in sorted( self._recipes.items(), key=lambda item: (item[1].success_item_id, item[0]), ): materials_text = format_materials(recipe.materials) success_display = resolve_name(recipe.success_item_id) fail_display = resolve_name(recipe.fail_item_id) details = [ f"材料:{materials_text}", f"成功产物:{success_display}({recipe.success_item_id})", f"失败产物:{fail_display}({recipe.fail_item_id})", ] entries.append( GuideEntry( title=success_display or recipe.success_item_id, identifier=recipe.success_item_id, description=f"{materials_text} → {success_display}({recipe.success_item_id})", metadata={ "成功率": f"{recipe.base_success_rate:.2%}", "失败产物": f"{fail_display}({recipe.fail_item_id})", }, details=[ { "type": "list", "items": [ *details, f"基础成功率:{recipe.base_success_rate:.2%}", ], } ], ) ) return tuple(entries) @override def dependencies(self) -> List[type]: return [WPSAPI, WPSBackpackSystem, WPSConfigAPI, WPSFortuneSystem, WPSStoreSystem] _POOL_TABLE = "alchemy_reward_pool" def register_db_model(self) -> DatabaseModel | List[DatabaseModel]: """注册炼金记录数据库表与奖池表""" records_model = DatabaseModel( table_name="alchemy_records", column_defs={ "alchemy_id": "INTEGER PRIMARY KEY AUTOINCREMENT", "user_id": "INTEGER NOT NULL", "chat_id": "INTEGER NOT NULL", "alchemy_type": "TEXT NOT NULL", "input_data": "TEXT NOT NULL", "start_time": "TEXT NOT NULL", "expected_end_time": "TEXT NOT NULL", "status": "TEXT NOT NULL", "result_data": "TEXT", "scheduled_task_id": "INTEGER", }, ) pool_model = DatabaseModel( table_name=self._POOL_TABLE, column_defs={ "pool_id": "INTEGER PRIMARY KEY", "balance": "INTEGER NOT NULL", "updated_at": "TEXT NOT NULL", }, ) return [records_model, pool_model] @override def wake_up(self) -> None: logger.Log( "Info", f"{ConsoleFrontColor.GREEN}WPSAlchemyGame 插件已加载{ConsoleFrontColor.RESET}", ) self.register_plugin("alchemy") self.register_plugin("炼金") self._register_alchemy_items() self._ensure_reward_pool() # 恢复过期炼金 self.recover_overdue_alchemy() def _register_alchemy_items(self) -> None: backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) try: backpack.register_item( self.ASH_ITEM_ID, self.ASH_ITEM_NAME, BackpackItemTier.COMMON, "炼金失败时残留的炉灰,可作为低阶材料或出售。", ) except Exception as exc: logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册炉灰物品时出现问题: {exc}{ConsoleFrontColor.RESET}", ) else: try: store_system = Architecture.Get(WPSStoreSystem) store_system.register_mode( item_id=self.ASH_ITEM_ID, price=8, limit_amount=999, ) except Exception as exc: logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册炉灰商店模式时出现问题: {exc}{ConsoleFrontColor.RESET}", ) try: backpack.register_item( self.SLAG_ITEM_ID, self.SLAG_ITEM_NAME, BackpackItemTier.COMMON, "经高温提炼后的炉渣,可在特殊任务中使用。", ) except Exception as exc: logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册炉渣物品时出现问题: {exc}{ConsoleFrontColor.RESET}", ) try: self.register_recipe( (self.ASH_ITEM_ID, self.ASH_ITEM_ID, self.ASH_ITEM_ID), self.SLAG_ITEM_ID, self.ASH_ITEM_ID, 1.0, ) except Exception as exc: logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}注册炉渣配方时出现问题: {exc}{ConsoleFrontColor.RESET}", ) def register_recipe( self, materials: Tuple[str, str, str]|Sequence[str], success_item_id: str, fail_item_id: str, base_success_rate: float, ) -> None: if len(materials) != 3: raise ValueError("炼金配方必须提供三种材料") sorted_materials = tuple(sorted(mat.strip() for mat in materials)) if any(not material for material in sorted_materials): raise ValueError("炼金材料 ID 不能为空") clamped_rate = clamp01(base_success_rate) if clamped_rate != base_success_rate: raise ValueError("配方成功率必须位于 0~1 之间") sorted_key = tuple(sorted_materials) existing = self._recipes.get(sorted_key) if existing: self._unindex_recipe(sorted_key, existing) recipe = AlchemyRecipe( materials=sorted_key, success_item_id=success_item_id.strip(), fail_item_id=fail_item_id.strip() or self.ASH_ITEM_ID, base_success_rate=base_success_rate, ) self._recipes[sorted_key] = recipe self._index_recipe(sorted_key, recipe) logger.Log( "Info", f"{ConsoleFrontColor.CYAN}已注册炼金配方 {sorted_materials} -> {success_item_id} ({base_success_rate:.2%}){ConsoleFrontColor.RESET}", ) def _index_recipe( self, materials_key: Tuple[str, str, str], recipe: AlchemyRecipe ) -> None: for material in recipe.materials: self._material_index[material].add(materials_key) self._success_index[recipe.success_item_id].add(materials_key) self._fail_index[recipe.fail_item_id].add(materials_key) def _unindex_recipe( self, materials_key: Tuple[str, str, str], recipe: AlchemyRecipe ) -> None: for material in recipe.materials: material_set = self._material_index.get(material) if material_set and materials_key in material_set: material_set.discard(materials_key) if not material_set: del self._material_index[material] success_set = self._success_index.get(recipe.success_item_id) if success_set and materials_key in success_set: success_set.discard(materials_key) if not success_set: del self._success_index[recipe.success_item_id] fail_set = self._fail_index.get(recipe.fail_item_id) if fail_set and materials_key in fail_set: fail_set.discard(materials_key) if not fail_set: del self._fail_index[recipe.fail_item_id] def get_recipes_using_item(self, item_id: str) -> List[AlchemyRecipe]: if not item_id: return [] material_keys = sorted(self._material_index.get(item_id, set())) return [self._recipes[key] for key in material_keys] def get_recipes_producing_item( self, item_id: str ) -> Dict[str, List[AlchemyRecipe]]: if not item_id: return {"success": [], "fail": []} success_keys = sorted( self._success_index.get(item_id, set()), key=lambda key: ( -self._recipes[key].base_success_rate, self._recipes[key].materials, ), ) fail_keys = sorted( self._fail_index.get(item_id, set()), key=lambda key: ( -self._recipes[key].base_success_rate, self._recipes[key].materials, ), ) return { "success": [self._recipes[key] for key in success_keys], "fail": [self._recipes[key] for key in fail_keys], } @staticmethod def _parse_identifier_collection(raw: object) -> Set[str]: if raw is None: return set() items: Iterable[str] if isinstance(raw, str): stripped = raw.strip() if not stripped: return set() try: parsed = json.loads(stripped) if isinstance(parsed, (list, tuple, set)): items = [str(x) for x in parsed] else: items = [str(parsed)] except json.JSONDecodeError: items = [token.strip() for token in stripped.split(",")] elif isinstance(raw, Iterable): items = [str(item) for item in raw] else: items = [str(raw)] normalized = { item.strip().lower() for item in items if isinstance(item, str) and item.strip() } return normalized def _ensure_reward_pool(self) -> None: cursor = get_db().conn.cursor() cursor.execute( f"SELECT balance FROM {self._POOL_TABLE} WHERE pool_id = 1" ) row = cursor.fetchone() if row is None: now = datetime.now().isoformat() cursor.execute( f"INSERT INTO {self._POOL_TABLE} (pool_id, balance, updated_at) VALUES (1, ?, ?)", (self._pool_initial_balance, now), ) get_db().conn.commit() self._pool_balance_cache = self._pool_initial_balance elif self._pool_balance_cache is None: self._pool_balance_cache = int(row["balance"]) def _get_reward_pool_balance(self, *, refresh: bool = False) -> int: if refresh or self._pool_balance_cache is None: self._ensure_reward_pool() cursor = get_db().conn.cursor() cursor.execute( f"SELECT balance FROM {self._POOL_TABLE} WHERE pool_id = 1" ) row = cursor.fetchone() balance = int(row["balance"]) if row else 0 self._pool_balance_cache = balance return self._pool_balance_cache or 0 def _update_reward_pool(self, delta: int) -> int: self._ensure_reward_pool() cursor = get_db().conn.cursor() cursor.execute( f"UPDATE {self._POOL_TABLE} SET balance = balance + ?, updated_at = ? WHERE pool_id = 1", (delta, datetime.now().isoformat()), ) get_db().conn.commit() return self._get_reward_pool_balance(refresh=True) def _set_reward_pool(self, balance: int) -> int: self._ensure_reward_pool() cursor = get_db().conn.cursor() cursor.execute( f"UPDATE {self._POOL_TABLE} SET balance = ?, updated_at = ? WHERE pool_id = 1", (balance, datetime.now().isoformat()), ) get_db().conn.commit() self._pool_balance_cache = balance return balance async def callback( self, message: str, chat_id: int, user_id: int ) -> Optional[str]: payload = self.parse_message_after_at(message).strip() if not payload: return await self.send_markdown_message( self._help_message(), chat_id, user_id ) tokens = [token.strip() for token in payload.split() if token.strip()] if not tokens: return await self.send_markdown_message( self._help_message(), chat_id, user_id ) # 处理状态查询命令 if len(tokens) == 1 and tokens[0] in ["状态", "status"]: response = await self._handle_status_query(chat_id, user_id) return await self.send_markdown_message(response, chat_id, user_id) if len(tokens) == 1 and tokens[0].isdigit(): times = int(tokens[0]) response = await self._handle_point_alchemy( chat_id, user_id, times ) return await self.send_markdown_message(response, chat_id, user_id) if len(tokens) >= 3: materials = tokens[:3] times = 1 if len(tokens) >= 4: try: times = int(tokens[3]) except ValueError: return await self.send_markdown_message( "❌ 炼金次数必须是整数", chat_id, user_id ) response = await self._handle_item_alchemy( chat_id, user_id, materials, times ) return await self.send_markdown_message(response, chat_id, user_id) return await self.send_markdown_message( self._help_message(), chat_id, user_id ) async def _handle_point_alchemy( self, chat_id: int, user_id: int, times: int ) -> str: if times <= 0: return "❌ 炼金次数必须大于 0" if times > self.MAX_BATCH_TIMES: return f"❌ 每次最多只能炼金 {self.MAX_BATCH_TIMES} 次" # 检查冷却 is_on_cooldown, cooldown_msg = self._check_cooldown(user_id) if is_on_cooldown: return cooldown_msg total_cost = self._point_fee * times config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI) current_points = config_api.get_user_points(user_id) if total_cost > self._max_points_per_batch: return f"❌ 单次炼金总费用不能超过 {self._max_points_per_batch} 分" if current_points < total_cost: return ( f"❌ 积分不足,需要 {total_cost} 分,当前仅有 {current_points} 分" ) # 扣除积分 await config_api.adjust_user_points( chat_id, user_id, -total_cost, "炼金消耗" ) # 奖池增加投入 pool_balance = self._update_reward_pool(total_cost) # 创建炼金记录 input_data = { "type": "point", "times": times, "cost_per_attempt": self._point_fee, "pool_deposit": total_cost, } alchemy_id = self._create_alchemy_record( user_id, chat_id, "point", input_data ) # 注册定时任务 task_id = self.register_clock( self._settle_alchemy_callback, self._cooldown_ms, kwargs={ "alchemy_id": alchemy_id, "user_id": user_id, "chat_id": chat_id, }, ) # 更新记录的任务ID cursor = get_db().conn.cursor() cursor.execute( "UPDATE alchemy_records SET scheduled_task_id = ? WHERE alchemy_id = ?", (task_id, alchemy_id), ) get_db().conn.commit() # 计算预计完成时间 cursor = get_db().conn.cursor() cursor.execute( "SELECT expected_end_time FROM alchemy_records WHERE alchemy_id = ?", (alchemy_id,), ) record = cursor.fetchone() expected_end_time = datetime.fromisoformat(record["expected_end_time"]) debug_hint = " **[DEBUG模式]**" if get_internal_debug() else "" time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟" reward_probability_line = "" slot_brief_line = "" if self._reward_slot_config: reward_probability_line = ( f"- 有奖励挡位总概率:{self._reward_total_probability:.2%}\n" ) preview = ", ".join( f"{percent:.0f}%|{probability:.2%}" for percent, probability in self._reward_slot_config[:5] ) if preview: slot_brief_line = f"- 挡位示例:{preview}{' …' if len(self._reward_slot_config) > 5 else ''}\n" return ( f"# ⚗️ 炼金开始{debug_hint}\n" f"- 类型:积分炼金\n" f"- 炼金次数:`{times}`(每次费用 {self._point_fee} 分)\n" f"- 本次消耗:`{total_cost}` 分\n" f"- 当前奖池:`{pool_balance}` 分\n" f"- 预计耗时:{time_str}\n" f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n" f"{reward_probability_line}" f"{slot_brief_line}" f"- 状态:炼金进行中..." ) def _draw_reward_slot( self, fortune_value: float ) -> Tuple[Optional[float], float]: if not self._reward_slot_config: return None, 0.0 offset = fortune_value * self._fortune_coeff landing = clamp01(random.random() + offset) cumulative = 0.0 for percent, probability in self._reward_slot_config: cumulative += probability if landing <= cumulative: return percent, probability return None, max(0.0, 1.0 - cumulative) @staticmethod def _calculate_slot_reward(pool_balance: int, percent: float) -> int: if percent <= 0 or pool_balance <= 0: return 0 reward = pool_balance * (percent / 100.0) return int(reward) def _should_drop_common_bonus(self) -> bool: if self._common_bonus_rate <= 0: return False return random.random() < self._common_bonus_rate def _draw_common_bonus_item( self, backpack: WPSBackpackSystem ) -> Optional[BackpackItemDefinition]: candidates = backpack.get_items_by_tier( BackpackItemTier.COMMON, blacklist=self._common_bonus_blacklist, ) if not candidates: return None return random.choice(candidates) async def _handle_item_alchemy( self, chat_id: int, user_id: int, materials: Sequence[str], times: int, ) -> str: if times <= 0: return "❌ 炼金次数必须大于 0" if times > self.MAX_BATCH_TIMES: return f"❌ 每次最多只能炼金 {self.MAX_BATCH_TIMES} 次" # 检查冷却 is_on_cooldown, cooldown_msg = self._check_cooldown(user_id) if is_on_cooldown: return cooldown_msg resolved: List[BackpackItemDefinition] = [] for identifier in materials: resolved_item = self._resolve_item(identifier) if resolved_item is None: return f"❌ 未找到材料 `{identifier}`,请确认已注册" resolved.append(resolved_item) material_ids = [item.item_id for item in resolved] backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) material_usage = Counter(item.item_id for item in resolved) name_map = {item.item_id: item.name for item in resolved} current_quantities: Dict[str, int] = {} for item_id, single_batch_count in material_usage.items(): required_amount = single_batch_count * times current = self._get_user_quantity(user_id, item_id) current_quantities[item_id] = current if current < required_amount: item_name = name_map.get(item_id, item_id) return ( f"❌ 材料 `{item_name}` 数量不足,需要 {required_amount} 个,当前仅有 {current} 个" ) # 扣除材料 for item_id, single_batch_count in material_usage.items(): required_amount = single_batch_count * times backpack.set_item_quantity( user_id, item_id, current_quantities[item_id] - required_amount ) # 创建炼金记录 input_data = { "type": "item", "materials": material_ids, "times": times, } alchemy_id = self._create_alchemy_record( user_id, chat_id, "item", input_data ) # 注册定时任务 task_id = self.register_clock( self._settle_alchemy_callback, self._cooldown_ms, kwargs={ "alchemy_id": alchemy_id, "user_id": user_id, "chat_id": chat_id, }, ) # 更新记录的任务ID cursor = get_db().conn.cursor() cursor.execute( "UPDATE alchemy_records SET scheduled_task_id = ? WHERE alchemy_id = ?", (task_id, alchemy_id), ) get_db().conn.commit() # 计算预计完成时间 cursor = get_db().conn.cursor() cursor.execute( "SELECT expected_end_time FROM alchemy_records WHERE alchemy_id = ?", (alchemy_id,), ) record = cursor.fetchone() expected_end_time = datetime.fromisoformat(record["expected_end_time"]) debug_hint = " **[DEBUG模式]**" if get_internal_debug() else "" time_str = "立即结算" if self._cooldown_minutes == 0 else f"{self._cooldown_minutes} 分钟" material_names = "、".join([item.name for item in resolved]) return ( f"# ⚗️ 炼金开始{debug_hint}\n" f"- 类型:物品炼金\n" f"- 投入材料:{material_names} × {times}\n" f"- 预计耗时:{time_str}\n" f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}\n" f"- 状态:炼金进行中..." ) def _resolve_item( self, identifier: str ) -> Optional[BackpackItemDefinition]: identifier_lower = identifier.strip().lower() cursor = get_db().conn.cursor() cursor.execute( f""" SELECT item_id FROM {WPSBackpackSystem.ITEMS_TABLE} WHERE lower(item_id) = ? OR lower(name) = ? LIMIT 1 """, (identifier_lower, identifier_lower), ) row = cursor.fetchone() item_id = row["item_id"] if row else identifier.strip() backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) try: return backpack._get_definition(item_id) except Exception: return None def _get_user_quantity(self, user_id: int, item_id: str) -> int: backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) for item in backpack.get_user_items(user_id): if item.item_id == item_id: return item.quantity return 0 def _check_cooldown(self, user_id: int) -> Tuple[bool, Optional[str]]: """检查用户是否在冷却中""" cursor = get_db().conn.cursor() cursor.execute( """ SELECT alchemy_id, expected_end_time, alchemy_type FROM alchemy_records WHERE user_id = ? AND status = 'in_progress' ORDER BY start_time DESC LIMIT 1 """, (user_id,), ) record = cursor.fetchone() if not record: return False, None expected_end = datetime.fromisoformat(record["expected_end_time"]) now = datetime.now() if now >= expected_end: # 已过期,自动结算 try: self.settle_alchemy(record["alchemy_id"]) except Exception as e: logger.Log( "Error", f"{ConsoleFrontColor.RED}自动结算过期炼金失败: {e}{ConsoleFrontColor.RESET}", ) return False, None # 仍在冷却中 remaining = expected_end - now remaining_minutes = int(remaining.total_seconds() / 60) + 1 alchemy_type_name = "积分炼金" if record["alchemy_type"] == "point" else "物品炼金" return True, ( f"❌ 炼金冷却中\n" f"- 上次炼金类型:{alchemy_type_name}\n" f"- 预计完成:{expected_end.strftime('%Y-%m-%d %H:%M')}\n" f"- 剩余时间:约 {remaining_minutes} 分钟\n" f"- 请等待冷却结束后再试" ) def _create_alchemy_record( self, user_id: int, chat_id: int, alchemy_type: str, input_data: Dict ) -> int: """创建炼金记录""" start_time = datetime.now() expected_end_time = start_time + timedelta(minutes=self._cooldown_minutes) cursor = get_db().conn.cursor() cursor.execute( """ INSERT INTO alchemy_records (user_id, chat_id, alchemy_type, input_data, start_time, expected_end_time, status) VALUES (?, ?, ?, ?, ?, ?, 'in_progress') """, ( user_id, chat_id, alchemy_type, json.dumps(input_data), start_time.isoformat(), expected_end_time.isoformat(), ), ) alchemy_id = cursor.lastrowid get_db().conn.commit() return alchemy_id def settle_alchemy(self, alchemy_id: int) -> Tuple[bool, str, Optional[Dict]]: """结算炼金""" import sqlite3 cursor = get_db().conn.cursor() cursor.execute( "SELECT * FROM alchemy_records WHERE alchemy_id = ?", (alchemy_id,), ) record: sqlite3.Row = cursor.fetchone() if not record: return False, "❌ 炼金记录不存在", None if record["status"] != "in_progress": return False, f"❌ 炼金已结算(状态:{record['status']})", None user_id = record["user_id"] chat_id = record["chat_id"] alchemy_type = record["alchemy_type"] input_data = json.loads(record["input_data"]) config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI) backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem) fortune_value = fortune_system.get_fortune_value(user_id) result_data: Dict = {} message_lines = ["# 🔮 炼金结算\n"] try: if alchemy_type == "point": # 积分炼金结算(奖池模式) times = int(input_data.get("times", 1)) cost_per_attempt = int(input_data.get("cost_per_attempt", self._point_fee)) total_cost = cost_per_attempt * times pool_balance = self._get_reward_pool_balance(refresh=True) slot_counter: Dict[float, int] = defaultdict(int) empty_count = 0 total_reward_points = 0 ash_reward = 0 bonus_items: Dict[str, int] = {} attempt_records: List[Dict[str, Any]] = [] for _ in range(times): slot_percent, _ = self._draw_reward_slot(fortune_value) if slot_percent is None: empty_count += 1 attempt_records.append({"slot": None, "reward": 0}) continue slot_counter[slot_percent] += 1 reward_points = self._calculate_slot_reward(pool_balance, slot_percent) if reward_points > 0: pool_balance = self._update_reward_pool(-reward_points) total_reward_points += reward_points config_api.adjust_user_points_sync( user_id, reward_points, f"炼金奖池奖励({slot_percent:.0f}%挡位)", ) attempt_record: Dict[str, Any] = { "slot": slot_percent, "reward": reward_points, } if slot_percent <= 0: backpack.add_item(user_id, self.ASH_ITEM_ID, 1) ash_reward += 1 attempt_record["ash"] = True if slot_percent < 50 and self._should_drop_common_bonus(): bonus_item = self._draw_common_bonus_item(backpack) if bonus_item is not None: backpack.add_item(user_id, bonus_item.item_id, 1) bonus_items[bonus_item.item_id] = bonus_items.get(bonus_item.item_id, 0) + 1 attempt_record["bonus_item"] = bonus_item.item_id attempt_records.append(attempt_record) final_points = config_api.get_user_points(user_id) slot_lines: List[str] = [] if slot_counter: for percent in sorted(slot_counter.keys()): slot_lines.append( f" - {percent:.0f}% 挡位:{slot_counter[percent]} 次" ) if empty_count > 0: slot_lines.append(f" - 空白挡位:{empty_count} 次") message_lines.extend([ f"- 炼金次数:`{times}`(单次扣除 {cost_per_attempt} 分)\n", f"- 本次总消耗:`{total_cost}` 分\n", f"- 奖池剩余:`{pool_balance}` 分\n", f"- 总计返还:`+{total_reward_points}` 分\n", f"- 当前积分:`{final_points}`\n", ]) if slot_lines: message_lines.append("- 挡位统计:\n" + "\n".join(slot_lines)) if ash_reward > 0: message_lines.append( f"- 额外获得:{self.ASH_ITEM_NAME} × `{ash_reward}`" ) if bonus_items: bonus_lines: List[str] = [] for item_id, count in bonus_items.items(): try: definition = backpack._get_definition(item_id) item_name = definition.name except Exception: item_name = item_id bonus_lines.append(f" - {item_name} × {count}") message_lines.append("\n- 普通物品掉落:\n" + "\n".join(bonus_lines)) result_data = { "times": times, "total_cost": total_cost, "total_reward": total_reward_points, "slots": {str(percent): count for percent, count in slot_counter.items()}, "empty": empty_count, "ash_reward": ash_reward, "bonus_items": bonus_items, "pool_balance": pool_balance, "attempts": attempt_records, } elif alchemy_type == "item": # 物品炼金结算 materials = input_data["materials"] times = input_data["times"] # 解析材料 resolved: List[BackpackItemDefinition] = [] for material_id in materials: try: definition = backpack._get_definition(material_id) resolved.append(definition) except Exception: # 如果材料不存在,使用ID pass recipe = self._recipes.get(tuple(sorted(materials))) adjusted_rate = ( clamp01(recipe.base_success_rate + fortune_value * self._fortune_coeff) if recipe else 0.0 ) success_count = 0 fail_count = 0 rewards: Dict[str, int] = {} for _ in range(times): if recipe and random.random() < adjusted_rate: reward_id = recipe.success_item_id success_count += 1 else: reward_id = ( recipe.fail_item_id if recipe else self.ASH_ITEM_ID ) fail_count += 1 backpack.add_item(user_id, reward_id, 1) rewards[reward_id] = rewards.get(reward_id, 0) + 1 details = [] for item_id, count in rewards.items(): try: definition = backpack._get_definition(item_id) item_name = definition.name except Exception: item_name = item_id details.append(f"- {item_name} × **{count}**") success_line = ( f"- 成功次数:`{success_count}`" if recipe else "- 成功次数:`0`(未知配方必定失败)" ) fail_line = ( f"- 失败次数:`{fail_count}`" if recipe else f"- 失败次数:`{times}`" ) rate_line = ( f"- 基础成功率:`{recipe.base_success_rate:.2%}`" if recipe else "- ✅ 未知配方仅产出炉灰" ) rewards_block = "\n".join(details) if details else "- (无物品获得)" material_names = "、".join([item.name for item in resolved]) message_lines.extend([ f"- 投入材料:{material_names} × {times}\n", f"{success_line}\n", f"{fail_line}\n", f"{rate_line}\n", "- 获得物品:\n", f"{rewards_block}", ]) result_data = { "success_count": success_count, "fail_count": fail_count, "rewards": rewards, } else: return False, f"❌ 未知的炼金类型:{alchemy_type}", None # 更新记录状态 cursor.execute( """ UPDATE alchemy_records SET status = 'completed', result_data = ? WHERE alchemy_id = ? """, (json.dumps(result_data), alchemy_id), ) # 更新定时任务状态 scheduled_task_id: int = record["scheduled_task_id"] if scheduled_task_id: get_db().update_task_status(scheduled_task_id, STATUS_COMPLETED) return True, "".join(message_lines), result_data except Exception as e: logger.Log( "Error", f"{ConsoleFrontColor.RED}结算炼金失败: {e}{ConsoleFrontColor.RESET}", ) # 标记为失败 cursor.execute( "UPDATE alchemy_records SET status = 'failed' WHERE alchemy_id = ?", (alchemy_id,), ) get_db().conn.commit() return False, f"❌ 结算失败:{str(e)}", None async def _settle_alchemy_callback( self, alchemy_id: int, user_id: int, chat_id: int ) -> None: """炼金结算回调(时钟任务)""" success, msg, rewards = self.settle_alchemy(alchemy_id) await self.send_markdown_message(msg, chat_id, user_id) def _get_user_alchemy_status(self, user_id: int) -> Optional[Dict]: """获取用户当前炼金状态""" cursor = get_db().conn.cursor() cursor.execute( """ SELECT * FROM alchemy_records WHERE user_id = ? AND status = 'in_progress' ORDER BY start_time DESC LIMIT 1 """, (user_id,), ) record = cursor.fetchone() return dict(record) if record else None async def _handle_status_query(self, chat_id: int, user_id: int) -> str: """处理状态查询""" record = self._get_user_alchemy_status(user_id) if not record: return ( "# ⚗️ 炼金状态\n" "- 状态:无进行中的炼金\n" f"- 当前奖池:`{self._get_reward_pool_balance(refresh=True)}` 分\n" "- 可以开始新的炼金" ) alchemy_type = record["alchemy_type"] alchemy_type_name = "积分炼金" if alchemy_type == "point" else "物品炼金" start_time = datetime.fromisoformat(record["start_time"]) expected_end_time = datetime.fromisoformat(record["expected_end_time"]) now = datetime.now() if now >= expected_end_time: remaining_str = "已完成,等待结算" else: remaining = expected_end_time - now remaining_minutes = int(remaining.total_seconds() / 60) + 1 remaining_str = f"约 {remaining_minutes} 分钟" lines = [ "# ⚗️ 炼金状态", "- 状态:进行中", f"- 类型:{alchemy_type_name}", f"- 开始时间:{start_time.strftime('%Y-%m-%d %H:%M')}", f"- 预计完成:{expected_end_time.strftime('%Y-%m-%d %H:%M')}", f"- 剩余时间:{remaining_str}", ] if alchemy_type == "point": try: input_data = json.loads(record["input_data"]) except Exception: input_data = {} times = int(input_data.get("times", 1)) cost_per_attempt = int(input_data.get("cost_per_attempt", self._point_fee)) total_cost = cost_per_attempt * times lines.extend( [ f"- 计划次数:{times}", f"- 单次费用:{cost_per_attempt} 分", f"- 总投入:{total_cost} 分", ] ) lines.append( f"- 当前奖池:`{self._get_reward_pool_balance(refresh=True)}` 分" ) return "\n".join(lines) def recover_overdue_alchemy(self) -> None: """恢复过期但未结算的炼金""" cursor = get_db().conn.cursor() cursor.execute( """ SELECT alchemy_id FROM alchemy_records WHERE status = 'in_progress' AND expected_end_time < ? """, (datetime.now().isoformat(),), ) overdue_records = cursor.fetchall() for record in overdue_records: logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}发现过期炼金 {record['alchemy_id']},执行恢复结算{ConsoleFrontColor.RESET}", ) try: self.settle_alchemy(record["alchemy_id"]) except Exception as e: logger.Log( "Error", f"{ConsoleFrontColor.RED}恢复炼金 {record['alchemy_id']} 失败: {e}{ConsoleFrontColor.RESET}", ) if overdue_records: logger.Log( "Info", f"{ConsoleFrontColor.GREEN}恢复了 {len(overdue_records)} 个过期炼金{ConsoleFrontColor.RESET}", ) def _help_message(self) -> str: return ( "# ⚗️ 炼金指令帮助\n" f"- `炼金 <次数>`:按次数扣费参与积分炼金(每次 {self._point_fee} 分,单次总费用不超过 {self._max_points_per_batch} 分)\n" "- `炼金 <材料1> <材料2> <材料3> [次数]`:使用三件材料进行炼金(可选次数,默认 1)\n" "- `炼金 状态`:查询当前炼金状态\n" "> 积分炼金费用将进入奖池,根据挡位概率抽取奖池奖励;部分挡位会额外掉落普通物品。" ) class WPSAlchemyRecipeLookup(WPSAPI): def __init__(self) -> None: super().__init__() self._alchemy: Optional[WPSAlchemyGame] = None self._backpack: Optional[WPSBackpackSystem] = None def get_guide_subtitle(self) -> str: return "查询指定物品涉及的炼金配方" def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "炼金配方", "identifier": "炼金配方", "description": "展示物品作为材料、成功产物或失败产物的所有配方。", "metadata": {"别名": "alchemy_recipe"}, }, ) def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "查询格式", "description": "`炼金配方 <物品ID>` 或 `炼金配方 <物品名称>`,忽略大小写。", }, { "title": "输出结构", "description": "结果按材料/成功/失败三类分组,列出配方材料与成功率。", }, ) def dependencies(self) -> List[type]: return [WPSAlchemyGame, WPSBackpackSystem] def is_enable_plugin(self) -> bool: return True def wake_up(self) -> None: self._alchemy = Architecture.Get(WPSAlchemyGame) self._backpack = Architecture.Get(WPSBackpackSystem) self.register_plugin("炼金配方") self.register_plugin("alchemy_recipe") logger.Log( "Info", f"{ConsoleFrontColor.GREEN}WPSAlchemyRecipeLookup 插件已加载{ConsoleFrontColor.RESET}", ) async def callback( self, message: str, chat_id: int, user_id: int ) -> Optional[str]: payload = self.parse_message_after_at(message).strip() if not payload: return await self.send_markdown_message(self._help_text(), chat_id, user_id) backpack = self._backpack or Architecture.Get(WPSBackpackSystem) definition = self._resolve_definition(payload, backpack) if definition is None: return await self.send_markdown_message( f"❌ 未找到物品 `{payload}`,请确认输入的物品 ID 或名称。", chat_id, user_id, ) alchemy = self._alchemy or Architecture.Get(WPSAlchemyGame) material_recipes = alchemy.get_recipes_using_item(definition.item_id) produce_map = alchemy.get_recipes_producing_item(definition.item_id) success_recipes = produce_map["success"] fail_recipes = produce_map["fail"] message_text = self._format_markdown( definition, material_recipes, success_recipes, fail_recipes, backpack, ) return await self.send_markdown_message(message_text, chat_id, user_id) def _resolve_definition( self, identifier: str, backpack: WPSBackpackSystem ) -> Optional[BackpackItemDefinition]: lowered = identifier.strip().lower() cursor = get_db().conn.cursor() cursor.execute( f""" SELECT item_id FROM {WPSBackpackSystem.ITEMS_TABLE} WHERE lower(item_id) = ? OR lower(name) = ? LIMIT 1 """, (lowered, lowered), ) row = cursor.fetchone() item_id = row["item_id"] if row else identifier.strip() try: return backpack._get_definition(item_id) # noqa: SLF001 except Exception: return None def _format_markdown( self, target: BackpackItemDefinition, material_recipes: List[AlchemyRecipe], success_recipes: List[AlchemyRecipe], fail_recipes: List[AlchemyRecipe], backpack: WPSBackpackSystem, ) -> str: lines = [ f"# 🔍 炼金配方查询|{target.name}", f"- 物品 ID:`{target.item_id}`", "---", ] lines.append("## 作为配方材料") lines.extend( self._format_recipe_entries(material_recipes, backpack) or ["- 暂无记录"] ) lines.append("\n## 作为成功产物") lines.extend( self._format_recipe_entries(success_recipes, backpack, role="success") or ["- 暂无记录"] ) lines.append("\n## 作为失败产物") lines.extend( self._format_recipe_entries(fail_recipes, backpack, role="fail") or ["- 暂无记录"] ) return "\n".join(lines) def _format_recipe_entries( self, recipes: List[AlchemyRecipe], backpack: WPSBackpackSystem, *, role: str = "material", ) -> List[str]: if not recipes: return [] entries: List[str] = [] for recipe in recipes: materials = self._summarize_materials(recipe, backpack) success_name = self._resolve_item_name(recipe.success_item_id, backpack) fail_name = self._resolve_item_name(recipe.fail_item_id, backpack) rate = f"{recipe.base_success_rate:.0%}" if role == "material": entry = ( f"- 材料:{materials}|成功产物:`{success_name}`|" f"失败产物:`{fail_name}`|成功率:{rate}" ) elif role == "success": entry = ( f"- 材料:{materials}|成功率:{rate}|" f"失败产物:`{fail_name}`" ) else: entry = ( f"- 材料:{materials}|成功率:{rate}|" f"成功产物:`{success_name}`" ) entries.append(entry) return entries def _summarize_materials( self, recipe: AlchemyRecipe, backpack: WPSBackpackSystem ) -> str: counter = Counter(recipe.materials) parts: List[str] = [] for item_id, count in sorted(counter.items()): name = self._resolve_item_name(item_id, backpack) if count == 1: parts.append(f"`{name}`") else: parts.append(f"`{name}` × {count}") return " + ".join(parts) def _resolve_item_name( self, item_id: str, backpack: WPSBackpackSystem ) -> str: try: definition = backpack._get_definition(item_id) # noqa: SLF001 return definition.name except Exception: return item_id def _help_text(self) -> str: return ( "# ⚗️ 炼金配方查询帮助\n" "- `炼金配方 <物品ID>`\n" "- `炼金配方 <物品名称>`\n" "> 输入需要精确匹配注册物品,名称不区分大小写。\n" "> 积分炼金费用将进入奖池,根据挡位概率抽取奖池奖励;部分挡位会额外掉落普通物品。" ) __all__ = ["WPSAlchemyGame", "WPSAlchemyRecipeLookup"]