"""战斗系统核心业务逻辑服务""" from __future__ import annotations import json import math import random from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from PWF.CoreModules.database import STATUS_COMPLETED, get_db from Plugins.WPSBackpackSystem import WPSBackpackSystem, BackpackItemTier from Plugins.WPSConfigSystem import WPSConfigAPI from Plugins.WPSFortuneSystem import WPSFortuneSystem from .combat_models import ( BattleState, CombatConfig, EQUIPMENT_REGISTRY, EquipmentDefinition, PlayerStats, SKILL_REGISTRY, SkillDefinition, VIRTUAL_EQUIPMENT, WINE_BUFFS, ) class CombatService: """战斗系统核心服务类""" def __init__(self): self._db = get_db() self._config: ProjectConfig = Architecture.Get(ProjectConfig) # 加载关键配置到实例变量 self.heal_cost = CombatConfig.get_int("combat_heal_cost") self.pvp_reward = CombatConfig.get_int("combat_pvp_reward") self.pvp_penalty = CombatConfig.get_int("combat_pvp_penalty") # ======================================================================== # 装备管理 # ======================================================================== def equip_item(self, user_id: int, item_id: str) -> Tuple[bool, str]: """ 装备物品 Args: user_id: 用户ID item_id: 物品ID Returns: (是否成功, 消息) """ # 1. 检查物品是否存在 equipment = EQUIPMENT_REGISTRY.get(item_id) if not equipment: return False, f"❌ 未知装备:{item_id}" # 2. 检查玩家是否拥有该物品 backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) user_items = backpack.get_user_items(user_id) if not any(item.item_id == item_id for item in user_items): return False, f"❌ 你没有 {equipment.name}" # 3. 确保玩家状态记录存在 self._ensure_player_status(user_id) # 4. 获取当前装备槽 slot_column = f"equipped_{equipment.slot}" cursor = self._db.conn.cursor() cursor.execute( f"SELECT {slot_column} FROM combat_player_status WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() old_equipped = row[slot_column] if row else None # 5. 更新装备槽 cursor.execute( f"UPDATE combat_player_status SET {slot_column} = ? WHERE user_id = ?", (item_id, user_id) ) self._db.conn.commit() # 6. 扣除装备物品 backpack.set_item_quantity(user_id, item_id, backpack.add_item(user_id, item_id, 0) - 1) # 7. 恢复被卸下的装备 if old_equipped: backpack.add_item(user_id, old_equipped, 1) result_msg = f"✅ 已装备 {equipment.name}" if old_equipped and old_equipped != item_id: old_eq = EQUIPMENT_REGISTRY.get(old_equipped) old_name = old_eq.name if old_eq else old_equipped result_msg += f",卸下了 {old_name}" return True, result_msg def unequip_item(self, user_id: int, slot: str) -> Tuple[bool, str]: """ 卸下装备 Args: user_id: 用户ID slot: 槽位名称 (weapon/helmet/armor/boots/accessory) Returns: (是否成功, 消息) """ valid_slots = ["weapon", "helmet", "armor", "boots", "accessory"] if slot not in valid_slots: return False, f"❌ 无效的槽位:{slot},可用:{', '.join(valid_slots)}" self._ensure_player_status(user_id) slot_column = f"equipped_{slot}" cursor = self._db.conn.cursor() cursor.execute( f"SELECT {slot_column} FROM combat_player_status WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() if not row or not row[slot_column]: return False, f"❌ {slot}槽位没有装备任何物品" old_item_id = row[slot_column] old_eq = EQUIPMENT_REGISTRY.get(old_item_id) old_name = old_eq.name if old_eq else old_item_id cursor.execute( f"UPDATE combat_player_status SET {slot_column} = NULL WHERE user_id = ?", (user_id,) ) self._db.conn.commit() # 8. 恢复被卸下的装备 backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) backpack.add_item(user_id, old_item_id, 1) return True, f"✅ 已卸下 {old_name}" def get_equipped_items(self, user_id: int) -> Dict[str, Optional[EquipmentDefinition]]: """ 获取玩家当前装备 Returns: {slot: EquipmentDefinition or None} """ self._ensure_player_status(user_id) cursor = self._db.conn.cursor() cursor.execute( """ SELECT equipped_weapon, equipped_helmet, equipped_armor, equipped_boots, equipped_accessory FROM combat_player_status WHERE user_id = ? """, (user_id,) ) row = cursor.fetchone() if not row: return { "weapon": None, "helmet": None, "armor": None, "boots": None, "accessory": None } return { "weapon": self._resolve_equipment(row["equipped_weapon"]), "helmet": self._resolve_equipment(row["equipped_helmet"]), "armor": self._resolve_equipment(row["equipped_armor"]), "boots": self._resolve_equipment(row["equipped_boots"]), "accessory": self._resolve_equipment(row["equipped_accessory"]), } def _resolve_equipment(self, equipment_ref: Optional[str]) -> Optional[EquipmentDefinition]: """解析装备引用(item_id 或 instance:123)""" if not equipment_ref: return None # TODO: 未来支持装备实例时,解析 "instance:123" 格式 if equipment_ref.startswith("instance:"): instance_id = int(equipment_ref.split(":")[1]) return self._get_equipment_instance(instance_id) else: return EQUIPMENT_REGISTRY.get(equipment_ref) def _get_equipment_instance(self, instance_id: int) -> Optional[EquipmentDefinition]: """获取装备实例(预留接口)""" # TODO: 实现装备实例查询 return None # ======================================================================== # 属性计算 # ======================================================================== def calculate_player_stats(self, user_id: int) -> PlayerStats: """ 计算玩家完整属性(基础+装备) Returns: PlayerStats对象 """ # 1. 获取基础属性 base_stats = self._get_base_stats(user_id) # 2. 获取装备 equipped = self.get_equipped_items(user_id) # 3. 计算总属性 total_stats = { "HP": base_stats["hp"], "ATK": base_stats["atk"], "DEF": base_stats["def_"], "SPD": base_stats["spd"], "CRIT": base_stats["crit"], "CRIT_DMG": base_stats["crit_dmg"], } for equipment in equipped.values(): if equipment: for stat, value in equipment.attributes.items(): total_stats[stat] = total_stats.get(stat, 0) + value # 4. 计算装备强度 equipment_strength = self.calculate_equipment_strength(total_stats) # 5. 获取可用技能 available_skills = self.get_available_skills(user_id) return PlayerStats( user_id=user_id, hp=total_stats["HP"], atk=total_stats["ATK"], def_=total_stats["DEF"], spd=total_stats["SPD"], crit=total_stats["CRIT"], crit_dmg=total_stats["CRIT_DMG"], equipment_strength=equipment_strength, available_skills=available_skills ) def calculate_equipment_strength(self, stats: Dict[str, int]) -> float: """ 计算装备强度(加权求和) Args: stats: 属性字典 Returns: 装备强度值 """ return ( stats.get("ATK", 0) * CombatConfig.get_float("combat_weight_atk") + stats.get("DEF", 0) * CombatConfig.get_float("combat_weight_def") + stats.get("HP", 0) * CombatConfig.get_float("combat_weight_hp") + stats.get("SPD", 0) * CombatConfig.get_float("combat_weight_spd") + stats.get("CRIT", 0) * CombatConfig.get_float("combat_weight_crit") + stats.get("CRIT_DMG", 0) * CombatConfig.get_float("combat_weight_crit_dmg") ) def get_available_skills(self, user_id: int) -> List[SkillDefinition]: """ 获取玩家可用技能(虚拟装备+实际装备) Returns: 技能列表 """ skills = [] skill_ids_seen = set() # 1. 添加虚拟装备提供的默认技能 virtual_eq = VIRTUAL_EQUIPMENT["virtual_default_skills"] for skill_id in virtual_eq.skill_ids: if skill_id in SKILL_REGISTRY and skill_id not in skill_ids_seen: skills.append(SKILL_REGISTRY[skill_id]) skill_ids_seen.add(skill_id) # 2. 添加实际装备提供的技能 equipped = self.get_equipped_items(user_id) for equipment in equipped.values(): if equipment and equipment.skill_ids: for skill_id in equipment.skill_ids: if skill_id in SKILL_REGISTRY and skill_id not in skill_ids_seen: skills.append(SKILL_REGISTRY[skill_id]) skill_ids_seen.add(skill_id) return skills # ======================================================================== # 辅助方法 # ======================================================================== def _ensure_player_status(self, user_id: int) -> None: """确保玩家状态记录存在""" cursor = self._db.conn.cursor() cursor.execute( "SELECT user_id FROM combat_player_status WHERE user_id = ?", (user_id,) ) if not cursor.fetchone(): # 创建默认状态记录 base_hp = CombatConfig.get_int("combat_base_hp") base_atk = CombatConfig.get_int("combat_base_atk") base_def = CombatConfig.get_int("combat_base_def") base_spd = CombatConfig.get_int("combat_base_spd") base_crit = CombatConfig.get_int("combat_base_crit") base_crit_dmg = CombatConfig.get_int("combat_base_crit_dmg") cursor.execute( """ INSERT INTO combat_player_status (user_id, base_hp, base_atk, base_def, base_spd, base_crit, base_crit_dmg) VALUES (?, ?, ?, ?, ?, ?, ?) """, (user_id, base_hp, base_atk, base_def, base_spd, base_crit, base_crit_dmg) ) self._db.conn.commit() def _get_base_stats(self, user_id: int) -> Dict[str, int]: """获取玩家基础属性""" self._ensure_player_status(user_id) cursor = self._db.conn.cursor() cursor.execute( """ SELECT base_hp, base_atk, base_def, base_spd, base_crit, base_crit_dmg FROM combat_player_status WHERE user_id = ? """, (user_id,) ) row = cursor.fetchone() return { "hp": row["base_hp"], "atk": row["base_atk"], "def_": row["base_def"], "spd": row["base_spd"], "crit": row["base_crit"], "crit_dmg": row["base_crit_dmg"], } # ======================================================================== # 状态管理 # ======================================================================== def is_injured(self, user_id: int) -> bool: """检查玩家是否受伤""" self._ensure_player_status(user_id) cursor = self._db.conn.cursor() cursor.execute( "SELECT is_injured FROM combat_player_status WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() return bool(row["is_injured"]) if row else False def set_injured(self, user_id: int, injured: bool) -> None: """设置玩家受伤状态""" self._ensure_player_status(user_id) cursor = self._db.conn.cursor() cursor.execute( "UPDATE combat_player_status SET is_injured = ? WHERE user_id = ?", (1 if injured else 0, user_id) ) self._db.conn.commit() def heal_player(self, user_id: int) -> Tuple[bool, str]: """ 治疗玩家(消耗积分解除受伤状态) Returns: (是否成功, 消息) """ if not self.is_injured(user_id): return False, "❌ 你没有受伤" config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI) user_points = config_api.get_user_points(user_id) if user_points < self.heal_cost: return False, f"❌ 积分不足,治疗需要 {self.heal_cost} 积分,当前仅有 {user_points} 积分" # 扣除积分 config_api.adjust_user_points_sync(user_id, -self.heal_cost, "治疗费用") # 解除受伤状态 self.set_injured(user_id, False) new_points = config_api.get_user_points(user_id) return True, f"✅ 治疗成功,花费 {self.heal_cost} 积分,当前剩余 {new_points} 积分" def get_player_status(self, user_id: int) -> Dict[str, Any]: """ 获取玩家完整状态 Returns: 状态字典 """ cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM combat_player_status WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() if not row: self._ensure_player_status(user_id) return self.get_player_status(user_id) return dict(row) # ======================================================================== # 冒险系统 - 计算方法 # ======================================================================== def calculate_adventure_time( self, stage: int, equipment_strength: float, wine_buffs: List[str] ) -> int: """ 计算冒险实际耗时(分钟) Args: stage: 阶段数 equipment_strength: 装备强度 wine_buffs: 使用的果酒列表 Returns: 实际耗时(分钟) """ # 0. Debug模式:冒险时长为0,立即结算 from PWF.CoreModules.flags import get_internal_debug if get_internal_debug(): self._config.Log( "Info", f"{ConsoleFrontColor.YELLOW}[DEBUG模式] 冒险时长设为0{ConsoleFrontColor.RESET}" ) return 0 # 1. 计算基础时间(线性增长,上限24小时) base_time = CombatConfig.get_int("combat_adventure_base_time") max_time = CombatConfig.get_int("combat_adventure_max_time") # 3. 果酒时间缩减buff wine_time_reduction = 0.0 for wine_id in wine_buffs: if wine_id in WINE_BUFFS: wine_time_reduction += WINE_BUFFS[wine_id].get("time_reduction", 0.0) # 4. 最终时间 actual_time = min(max_time, base_time * stage) actual_time = actual_time * (1 - wine_time_reduction) actual_time = max(1, int(actual_time)) # 最少1分钟 return actual_time def calculate_success_rate( self, stage: int, equipment_strength: float, fortune_value: float, wine_buffs: List[str] ) -> Tuple[float, str]: """ 计算冒险成功率 Returns: 成功率(0.0-1.0) 描述性文本 """ # 1. 基础成功率 base_rate = CombatConfig.get_float("combat_adventure_base_success_rate") stage_penalty = CombatConfig.get_float("combat_adventure_stage_penalty") base_success = base_rate - (stage - 1) * stage_penalty # 2. 装备加成 equip_coeff = CombatConfig.get_float("combat_adventure_equipment_coeff") equip_bonus = math.log2(1 + equipment_strength * equip_coeff) # 3. 运势加成 fortune_coeff = CombatConfig.get_float("combat_adventure_fortune_coeff") fortune_bonus = fortune_value * fortune_coeff # 4. 果酒buff加成 buff_bonus = 0.0 for wine_id in wine_buffs: if wine_id in WINE_BUFFS: buff_bonus += WINE_BUFFS[wine_id].get("success_rate", 0.0) # 5. 最终成功率 max_rate = CombatConfig.get_float("combat_adventure_max_success_rate") total = base_success + equip_bonus + fortune_bonus + buff_bonus # 6.拼接描述 desc = f"{int(base_success*100)}%基础" desc += f"+{int(equip_bonus*100)}%装备" desc += f"+{int(fortune_bonus*100)}%运势" desc += f"+{int(buff_bonus*100)}%buff" return (min(max_rate, total), desc) def check_food_requirement(self, stage: int, food_items: List[str]) -> Tuple[bool, str, int]: """ 检查食物(可选,用于提供buff) Args: stage: 阶段数 food_items: 食物item_id列表(可为空) Returns: (是否满足, 提示消息, 推荐的食物数量) """ # 食物是可选的,不强制要求 # 如果提供食物(果酒),可以获得buff效果(时间缩减、收益提升等) # 计算推荐的食物数量(仅供参考) base_time = CombatConfig.get_int("combat_adventure_base_time") max_time = CombatConfig.get_int("combat_adventure_max_time") stage_time = min(base_time * (2 ** (stage - 1)), max_time) food_support_time = CombatConfig.get_int("combat_food_support_time") recommended_food = math.ceil(stage_time / food_support_time) # 如果没有提供食物,给出提示但允许继续 #if len(food_items) == 0: # msg = f"ℹ️ 未使用食物。推荐使用 {recommended_food} 个果酒以获得buff加成" # return True, msg, recommended_food return True, "", recommended_food # ======================================================================== # 冒险系统 - 流程方法 # ======================================================================== def abort_current_adventure(self, user_id: int) -> Tuple[bool, str]: """ 立即放弃当前冒险(无奖励,清理状态) Args: user_id: 玩家ID Returns: (是否成功, 消息) """ cursor = self._db.conn.cursor() cursor.execute( "SELECT current_adventure_id FROM combat_player_status WHERE user_id = ?", (user_id,), ) status_row = cursor.fetchone() if not status_row or not status_row["current_adventure_id"]: return False, "❌ 当前没有进行中的冒险" adventure_id = status_row["current_adventure_id"] cursor.execute( """ SELECT adventure_id, stage, status, scheduled_task_id FROM combat_adventure_records WHERE adventure_id = ? """, (adventure_id,), ) adventure = cursor.fetchone() if not adventure: cursor.execute( "UPDATE combat_player_status SET current_adventure_id = NULL WHERE user_id = ?", (user_id,), ) self._db.conn.commit() return False, "❌ 未找到冒险记录,状态已重置" if adventure["status"] != "in_progress": cursor.execute( "UPDATE combat_player_status SET current_adventure_id = NULL WHERE user_id = ?", (user_id,), ) self._db.conn.commit() return False, f"❌ 冒险已结算(状态:{adventure['status']})" scheduled_task_id = adventure["scheduled_task_id"] if scheduled_task_id: try: self._db.update_task_status(int(scheduled_task_id), STATUS_COMPLETED) except Exception as exc: # pylint: disable=broad-except self._config.Log( "Warning", f"{ConsoleFrontColor.YELLOW}取消冒险定时任务失败: {exc}{ConsoleFrontColor.RESET}", ) cursor.execute( """ UPDATE combat_adventure_records SET status = ?, rewards = NULL, scheduled_task_id = NULL WHERE adventure_id = ? AND status = 'in_progress' """, ("abandoned", adventure_id), ) cursor.execute( "UPDATE combat_player_status SET current_adventure_id = NULL WHERE user_id = ?", (user_id,), ) self._db.conn.commit() stage = adventure["stage"] self._config.Log( "Info", f"{ConsoleFrontColor.LIGHTCYAN_EX}用户 {user_id} 放弃冒险 {adventure_id}(阶段 {stage}){ConsoleFrontColor.RESET}", ) return True, f"⚠️ 已放弃第 {stage} 阶段冒险,本阶段奖励作废" def get_last_adventure_record(self, user_id: int) -> Optional[Dict[str, Any]]: """ 获取最近一次冒险记录(无论状态) """ cursor = self._db.conn.cursor() cursor.execute( """ SELECT * FROM combat_adventure_records WHERE user_id = ? ORDER BY adventure_id DESC LIMIT 1 """, (user_id,), ) row = cursor.fetchone() return dict(row) if row else None def get_adventure_by_id(self, adventure_id: int) -> Optional[Dict[str, Any]]: """ 根据冒险ID获取冒险记录 """ cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM combat_adventure_records WHERE adventure_id = ?", (adventure_id,), ) row = cursor.fetchone() return dict(row) if row else None def start_adventure( self, user_id: int, chat_id: int, stage: int, food_items: List[str], register_callback: Optional[object] = None ) -> Tuple[bool, str, Optional[int]]: """ 开始冒险 Args: user_id: 用户ID chat_id: 会话ID stage: 阶段数 food_items: 食物item_id列表 register_callback: 插件实例(用于注册时钟任务) Returns: (是否成功, 消息, 冒险ID) """ # 1. 检查玩家状态 if self.is_injured(user_id): return False, "❌ 你处于受伤状态,无法冒险。使用 `治疗` 恢复", None status = self.get_player_status(user_id) if status.get("current_adventure_id"): return False, "❌ 你已经在冒险中", None if status.get("current_battle_id"): return False, "❌ 你正在战斗中,无法开始冒险", None # 2. 检查食物(可选) can_adventure, food_hint, recommended = self.check_food_requirement(stage, food_items) # 3. 检查并消耗食物(如果提供了食物) if food_items: backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) food_count: Dict[str, int] = {} for food_id in food_items: food_count[food_id] = food_count.get(food_id, 0) + 1 # 验证拥有 user_items = backpack.get_user_items(user_id) for food_id, count in food_count.items(): owned = sum(item.quantity for item in user_items if item.item_id == food_id) if owned < count: return False, f"❌ {food_id} 数量不足,还需要 {count - owned} 个, 当前只有{owned}个", None # 消耗食物 for food_id, count in food_count.items(): current = next((item.quantity for item in user_items if item.item_id == food_id), 0) backpack.set_item_quantity(user_id, food_id, current - count) # 4. 创建装备快照 stats = self.calculate_player_stats(user_id) equipped = self.get_equipped_items(user_id) equipment_snapshot = { "stats": { "HP": stats.hp, "ATK": stats.atk, "DEF": stats.def_, "SPD": stats.spd, "CRIT": stats.crit, "CRIT_DMG": stats.crit_dmg, }, "equipment_strength": stats.equipment_strength, "equipped": { slot: eq.item_id if eq else None for slot, eq in equipped.items() } } # 5. 获取运势值 fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem) fortune_value = fortune_system.get_fortune_value(user_id) if fortune_system else 0.0 # 6. 计算冒险参数 actual_time = self.calculate_adventure_time(stage, stats.equipment_strength, food_items) success_rate, success_rate_desc = self.calculate_success_rate(stage, stats.equipment_strength, fortune_value, food_items) # 7. 创建冒险记录 start_time = datetime.now() expected_end = start_time + timedelta(minutes=actual_time) cursor = self._db.conn.cursor() cursor.execute( """ INSERT INTO combat_adventure_records (user_id, chat_id, stage, equipment_snapshot, foods_used, start_time, expected_end_time, status, fortune_value, equipment_strength, success_rate) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( user_id, chat_id, stage, json.dumps(equipment_snapshot), json.dumps(food_items), start_time.isoformat(), expected_end.isoformat(), "in_progress", fortune_value, stats.equipment_strength, success_rate ) ) adventure_id = cursor.lastrowid # 8. 更新玩家状态 cursor.execute( "UPDATE combat_player_status SET current_adventure_id = ? WHERE user_id = ?", (adventure_id, user_id) ) self._db.conn.commit() # 9. 注册时钟任务 task_id = None if register_callback: delay_ms = int(actual_time * 60 * 1000) task_id = register_callback.register_clock( register_callback._settle_adventure_callback, delay_ms, kwargs={ "adventure_id": adventure_id, "user_id": user_id, "chat_id": chat_id } ) cursor.execute( "UPDATE combat_adventure_records SET scheduled_task_id = ? WHERE adventure_id = ?", (task_id, adventure_id) ) self._db.conn.commit() # Debug模式提示 from PWF.CoreModules.flags import get_internal_debug debug_hint = " **[DEBUG模式]**" if get_internal_debug() else "" # 时间格式化 if actual_time == 0: time_str = "立即结算" else: time_str = f"{actual_time} 分钟" # 构建消息 msg_lines = [ f"✅ 开始第 {stage} 阶段冒险{debug_hint}", f"- 预计耗时:{time_str}", f"- 成功率: {success_rate*100:.2f}% ({success_rate_desc})", f"- 预计完成:{expected_end.strftime('%Y-%m-%d %H:%M')}" ] # 如果有食物提示,添加到消息中 if food_hint: msg_lines.append(f"\n{food_hint}") msg = "\n".join(msg_lines) return True, msg, adventure_id def settle_adventure(self, adventure_id: int) -> Tuple[bool, str, Optional[Dict]]: """ 结算冒险 Args: adventure_id: 冒险ID Returns: (是否成功, 消息, 奖励字典) """ # 1. 获取冒险记录 cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM combat_adventure_records WHERE adventure_id = ?", (adventure_id,) ) adventure = cursor.fetchone() if not adventure: return False, "❌ 冒险记录不存在", None if adventure["status"] != "in_progress": return False, f"❌ 冒险已结算(状态:{adventure['status']})", None user_id = adventure["user_id"] stage = adventure["stage"] success_rate = adventure["success_rate"] # 2. 判定成功/失败 roll = random.random() success = roll < success_rate # 3. 生成奖励 rewards = None if success: fortune_value = adventure["fortune_value"] foods_used = json.loads(adventure["foods_used"]) rewards = self.generate_adventure_rewards(stage, fortune_value, foods_used) # 发放奖励 self._grant_rewards(user_id, rewards) else: # 失败:设置受伤状态 self.set_injured(user_id, True) # 4. 更新冒险记录 cursor.execute( """ UPDATE combat_adventure_records SET status = ?, rewards = ? WHERE adventure_id = ? """, ("success" if success else "failed", json.dumps(rewards) if rewards else None, adventure_id) ) # 5. 清除玩家冒险状态 cursor.execute( "UPDATE combat_player_status SET current_adventure_id = NULL WHERE user_id = ?", (user_id,) ) self._db.conn.commit() # 6. 生成结算消息 if success: msg = self._format_success_message(stage, rewards) else: msg = ( f"# 💔 冒险失败\n" f"- 阶段:第 {stage} 阶段\n" f"- 成功率:{success_rate*100:.1f}%\n" f"- 结果:失败(骰子:{roll*100:.1f}%)\n" f"- 你受了伤,需要消耗 100 积分治疗" ) return success, msg, rewards def generate_adventure_rewards( self, stage: int, fortune_value: float, wine_buffs: List[str] ) -> Dict[str, Any]: """ 生成冒险奖励 Returns: 奖励字典 {type: item_id/amount, ...} """ from .combat_models import ( ADVENTURE_MATERIALS, ADVENTURE_SEEDS, ADVENTURE_SOUVENIRS, COMBAT_POTIONS, EQUIPMENT_REGISTRY, ) rewards = {} # 1. 基础积分奖励 base_points = CombatConfig.get_int("combat_loot_points_base") points_per_stage = CombatConfig.get_int("combat_loot_points_per_stage") # 运势影响 fortune_mult = CombatConfig.get_float("combat_loot_fortune_multiplier") fortune_bonus = 1.0 + fortune_value * fortune_mult # 果酒收益加成 reward_boost = 1.0 for wine_id in wine_buffs: if wine_id in WINE_BUFFS: reward_boost += WINE_BUFFS[wine_id].get("reward_boost", 0.0) points = int((base_points + points_per_stage * stage) * fortune_bonus * reward_boost) rewards["points"] = points # 2. 掉落物品(权重随机) loot_weights = { "equipment": CombatConfig.get_int("combat_loot_weight_equipment"), "material": CombatConfig.get_int("combat_loot_weight_material"), "souvenir": CombatConfig.get_int("combat_loot_weight_souvenir"), "potion": CombatConfig.get_int("combat_loot_weight_potion"), "seed": CombatConfig.get_int("combat_loot_weight_seed"), } # 根据阶段决定掉落数量 num_drops = max(0, random.randint(0, stage) + random.randint(-3,3)) rewards["items"] = [] for _ in range(num_drops): loot_type = random.choices( list(loot_weights.keys()), weights=list(loot_weights.values()) )[0] item = self._generate_loot_item(loot_type, stage) if item: rewards["items"].append(item) return rewards def _get_tier_weights_by_stage(self, stage: int) -> Dict[BackpackItemTier, int]: """ 根据阶段计算品质权重 Args: stage: 当前冒险阶段 Returns: 品质权重字典 {BackpackItemTier: weight} """ weights = {} # COMMON: 阶段1+,权重随阶段下降 weights[BackpackItemTier.COMMON] = max(100 - stage * 3, 20) # RARE: 阶段3+,权重随阶段上升 if stage >= 3: weights[BackpackItemTier.RARE] = min(30 + stage * 4, 80) else: weights[BackpackItemTier.RARE] = 0 # EPIC: 阶段6+,权重随阶段上升 if stage >= 6: weights[BackpackItemTier.EPIC] = min(15 + stage * 3, 60) else: weights[BackpackItemTier.EPIC] = 0 # LEGENDARY: 阶段10+,权重随阶段上升 if stage >= 10: weights[BackpackItemTier.LEGENDARY] = min(5 + stage * 2, 40) else: weights[BackpackItemTier.LEGENDARY] = 0 return weights def _generate_loot_item(self, loot_type: str, stage: int) -> Optional[Dict[str, Any]]: """生成单个掉落物品""" from .combat_models import ( ADVENTURE_MATERIALS, ADVENTURE_SEEDS, ADVENTURE_SOUVENIRS, COMBAT_POTIONS, EQUIPMENT_REGISTRY, ) if loot_type == "equipment": # 根据阶段获取品质权重 tier_weights = self._get_tier_weights_by_stage(stage) # 过滤掉权重为0的品质 available_tiers = {t: w for t, w in tier_weights.items() if w > 0} if not available_tiers: return None tier = random.choices( list(available_tiers.keys()), weights=list(available_tiers.values()) )[0] # 筛选该品质的装备 candidates = [eq for eq in EQUIPMENT_REGISTRY.values() if eq.tier == tier] if candidates: equipment = random.choice(candidates) return {"type": "equipment", "item_id": equipment.item_id, "quantity": 1} elif loot_type == "material": # 根据阶段获取品质权重 tier_weights = self._get_tier_weights_by_stage(stage) # 过滤掉权重为0的品质 available_tiers = {t: w for t, w in tier_weights.items() if w > 0} if not available_tiers: return None # 按品质筛选材料 materials_by_tier = {} for item_id, (name, tier, desc) in ADVENTURE_MATERIALS.items(): if tier not in materials_by_tier: materials_by_tier[tier] = [] materials_by_tier[tier].append((item_id, (name, tier, desc))) # 选择品质 tier = random.choices( list(available_tiers.keys()), weights=list(available_tiers.values()) )[0] # 从该品质的材料中随机选择 if tier in materials_by_tier: item_id, (_, _, _) = random.choice(materials_by_tier[tier]) quantity = random.randint(1, 3) return {"type": "material", "item_id": item_id, "quantity": quantity} elif loot_type == "souvenir": # 根据阶段获取品质权重 tier_weights = self._get_tier_weights_by_stage(stage) # 过滤掉权重为0的品质 available_tiers = {t: w for t, w in tier_weights.items() if w > 0} if not available_tiers: return None # 按品质筛选纪念品 souvenirs_by_tier = {} for item_id, (name, tier, sell_price, desc) in ADVENTURE_SOUVENIRS.items(): if tier not in souvenirs_by_tier: souvenirs_by_tier[tier] = [] souvenirs_by_tier[tier].append(item_id) # 选择品质 tier = random.choices( list(available_tiers.keys()), weights=list(available_tiers.values()) )[0] # 从该品质的纪念品中随机选择 if tier in souvenirs_by_tier: item_id = random.choice(souvenirs_by_tier[tier]) return {"type": "souvenir", "item_id": item_id, "quantity": 1} elif loot_type == "potion": # 根据阶段获取品质权重 tier_weights = self._get_tier_weights_by_stage(stage) # 过滤掉权重为0的品质 available_tiers = {t: w for t, w in tier_weights.items() if w > 0} if not available_tiers: return None # 按品质筛选药剂 potions_by_tier = {} for item_id, (name, tier, desc) in COMBAT_POTIONS.items(): if tier not in potions_by_tier: potions_by_tier[tier] = [] potions_by_tier[tier].append(item_id) # 选择品质 tier = random.choices( list(available_tiers.keys()), weights=list(available_tiers.values()) )[0] # 从该品质的药剂中随机选择 if tier in potions_by_tier: item_id = random.choice(potions_by_tier[tier]) quantity = random.randint(1, 2) return {"type": "potion", "item_id": item_id, "quantity": quantity} elif loot_type == "seed": # 根据阶段获取品质权重 tier_weights = self._get_tier_weights_by_stage(stage) # 过滤掉权重为0的品质 available_tiers = {t: w for t, w in tier_weights.items() if w > 0} if not available_tiers: return None # 按品质筛选种子 seeds_by_tier = {} for item_id, (name, tier, desc) in ADVENTURE_SEEDS.items(): if tier not in seeds_by_tier: seeds_by_tier[tier] = [] seeds_by_tier[tier].append(item_id) # 选择品质 tier = random.choices( list(available_tiers.keys()), weights=list(available_tiers.values()) )[0] # 从该品质的种子中随机选择 if tier in seeds_by_tier: item_id = random.choice(seeds_by_tier[tier]) return {"type": "seed", "item_id": item_id, "quantity": 1} return None def _grant_rewards(self, user_id: int, rewards: Dict[str, Any]) -> None: """发放奖励""" # 1. 积分 if "points" in rewards: config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI) config_api.adjust_user_points_sync(user_id, rewards["points"], "冒险奖励") # 2. 物品 if "items" in rewards: backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) for item in rewards["items"]: backpack.add_item(user_id, item["item_id"], item["quantity"]) def _format_success_message(self, stage: int, rewards: Dict[str, Any]) -> str: """格式化成功消息""" lines = [ f"# 🎉 冒险成功", f"- 阶段:第 {stage} 阶段", f"", f"**获得奖励:**", ] if "points" in rewards: lines.append(f"- 积分:`+{rewards['points']}`") if "items" in rewards and rewards["items"]: lines.append("\n**物品:**") backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem) for item in rewards["items"]: try: item_def = backpack._get_definition(item["item_id"]) lines.append(f"- {item_def.name} × {item['quantity']}") except: lines.append(f"- {item['item_id']} × {item['quantity']}") lines.append("\n> 使用 `继续冒险` 进入下一阶段,或停止冒险") return "\n".join(lines) # ======================================================================== # 冒险系统 - 恢复机制 # ======================================================================== def recover_overdue_adventures(self) -> None: """恢复过期但未结算的冒险""" cursor = self._db.conn.cursor() cursor.execute( """ SELECT * FROM combat_adventure_records WHERE status = 'in_progress' AND expected_end_time < ? """, (datetime.now().isoformat(),) ) overdue_adventures = cursor.fetchall() for adventure in overdue_adventures: self._config.Log( "Warning", f"{ConsoleFrontColor.YELLOW}发现过期冒险 {adventure['adventure_id']},执行恢复结算{ConsoleFrontColor.RESET}" ) # 同步结算 self.settle_adventure(adventure["adventure_id"]) if overdue_adventures: self._config.Log( "Info", f"{ConsoleFrontColor.GREEN}恢复了 {len(overdue_adventures)} 个过期冒险{ConsoleFrontColor.RESET}" ) # ======================================================================== # PVP系统 - 挑战管理 # ======================================================================== def create_pvp_challenge( self, challenger_id: int, target_id: int, timeout_minutes: int = 15 ) -> Tuple[bool, str, Optional[int]]: """ 发起PVP挑战 Returns: (是否成功, 消息, 挑战ID) """ # 1. 验证双方状态 challenger_status = self.get_player_status(challenger_id) target_status = self.get_player_status(target_id) if challenger_status.get("is_injured"): return False, "❌ 你受了伤,无法发起挑战", None if target_status.get("is_injured"): return False, "❌ 对方受了伤,无法接受挑战", None if challenger_status.get("current_adventure_id"): return False, "❌ 你正在冒险中,无法发起挑战", None if target_status.get("current_adventure_id"): return False, "❌ 对方正在冒险中,无法接受挑战", None # 2. 检查积分 config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI) challenger_points = config_api.get_user_points(0, challenger_id) target_points = config_api.get_user_points(0, target_id) if challenger_points < 100: return False, "❌ 你的积分不足100,无法发起挑战", None if target_points < 100: return False, "❌ 对方积分不足100,无法接受挑战", None # 3. 创建挑战记录 cursor = self._db.conn.cursor() expire_time = datetime.now() + timedelta(minutes=timeout_minutes) cursor.execute( """ INSERT INTO combat_pvp_challenges (challenger_id, target_id, status, created_at, expires_at) VALUES (?, ?, 'pending', ?, ?) """, (challenger_id, target_id, datetime.now().isoformat(), expire_time.isoformat()) ) challenge_id = cursor.lastrowid self._db.conn.commit() msg = ( f"# ⚔️ PVP挑战已发送\n" f"- 挑战者:用户 {challenger_id}\n" f"- 目标:用户 {target_id}\n" f"- 过期时间:{expire_time.strftime('%H:%M')}\n" f"\n> 对方需使用 `接受挑战 {challenge_id}` 接受,或 `拒绝挑战 {challenge_id}` 拒绝" ) return True, msg, challenge_id def accept_challenge( self, challenge_id: int, target_id: int ) -> Tuple[bool, str, Optional[int]]: """ 接受挑战 Returns: (是否成功, 消息, 战斗ID) """ cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM combat_pvp_challenges WHERE challenge_id = ?", (challenge_id,) ) challenge = cursor.fetchone() if not challenge: return False, "❌ 挑战不存在", None if challenge["target_id"] != target_id: return False, "❌ 这不是发给你的挑战", None if challenge["status"] != "pending": return False, f"❌ 挑战已失效(状态:{challenge['status']})", None # 检查是否过期 if datetime.now() > datetime.fromisoformat(challenge["expires_at"]): cursor.execute( "UPDATE combat_pvp_challenges SET status = 'expired' WHERE challenge_id = ?", (challenge_id,) ) self._db.conn.commit() return False, "❌ 挑战已过期", None # 更新挑战状态 cursor.execute( "UPDATE combat_pvp_challenges SET status = 'accepted' WHERE challenge_id = ?", (challenge_id,) ) # 创建战斗 battle_id = self._create_battle(challenge["challenger_id"], target_id) self._db.conn.commit() msg = ( f"# ⚔️ 挑战已接受\n" f"- 战斗ID:{battle_id}\n" f"\n> 战斗开始!使用 `战斗 {battle_id} [技能]` 进行操作" ) return True, msg, battle_id def reject_challenge( self, challenge_id: int, target_id: int ) -> Tuple[bool, str]: """拒绝挑战""" cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM combat_pvp_challenges WHERE challenge_id = ?", (challenge_id,) ) challenge = cursor.fetchone() if not challenge: return False, "❌ 挑战不存在" if challenge["target_id"] != target_id: return False, "❌ 这不是发给你的挑战" if challenge["status"] != "pending": return False, f"❌ 挑战已失效(状态:{challenge['status']})" cursor.execute( "UPDATE combat_pvp_challenges SET status = 'rejected' WHERE challenge_id = ?", (challenge_id,) ) self._db.conn.commit() return True, "✅ 已拒绝挑战" def _create_battle(self, player1_id: int, player2_id: int) -> int: """ 创建战斗实例 Returns: battle_id """ from .combat_models import BattleState # 获取双方属性快照 stats1 = self.calculate_player_stats(player1_id) stats2 = self.calculate_player_stats(player2_id) # 初始化战斗状态 battle_state = BattleState( player1_id=player1_id, player2_id=player2_id, player1_hp=stats1.total_hp, player2_hp=stats2.total_hp, player1_max_hp=stats1.total_hp, player2_max_hp=stats2.total_hp, player1_stats=stats1.__dict__, player2_stats=stats2.__dict__, current_turn=player1_id if stats1.total_spd >= stats2.total_spd else player2_id, turn_number=1, cooldowns={}, battle_log=[] ) cursor = self._db.conn.cursor() cursor.execute( """ INSERT INTO combat_pvp_battles (player1_id, player2_id, status, battle_state, created_at, last_action_time) VALUES (?, ?, 'active', ?, ?, ?) """, ( player1_id, player2_id, json.dumps(battle_state.__dict__), datetime.now().isoformat(), datetime.now().isoformat() ) ) return cursor.lastrowid # ======================================================================== # PVP系统 - 战斗执行 # ======================================================================== def execute_battle_action( self, battle_id: int, user_id: int, skill_name: str ) -> Tuple[bool, str]: """ 执行战斗动作 Args: battle_id: 战斗ID user_id: 操作用户 skill_name: 技能名称 Returns: (是否成功, 消息) """ from .combat_models import BattleState, SKILL_REGISTRY # 1. 获取战斗状态 cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM combat_pvp_battles WHERE battle_id = ?", (battle_id,) ) battle_row = cursor.fetchone() if not battle_row: return False, "❌ 战斗不存在" if battle_row["status"] != "active": return False, f"❌ 战斗已结束(状态:{battle_row['status']})" # 2. 解析战斗状态 battle_state = BattleState(**json.loads(battle_row["battle_state"])) # 3. 检查回合 if battle_state.current_turn != user_id: return False, "❌ 现在不是你的回合" # 4. 获取技能 skill = SKILL_REGISTRY.get(skill_name) if not skill: return False, f"❌ 技能 {skill_name} 不存在" # 5. 检查技能是否可用 if not self._can_use_skill(user_id, skill, battle_state): return False, f"❌ 技能 {skill_name} 暂不可用(冷却中)" # 6. 执行技能效果 log_entry = self._execute_skill_effect(user_id, skill, battle_state) battle_state.battle_log.append(log_entry) # 7. 更新冷却 if skill.cooldown > 0: cooldown_key = f"{user_id}_{skill.skill_id}" battle_state.cooldowns[cooldown_key] = skill.cooldown # 8. 检查战斗是否结束 winner_id = None if battle_state.player1_hp <= 0: winner_id = battle_state.player2_id elif battle_state.player2_hp <= 0: winner_id = battle_state.player1_id if winner_id: return self._end_battle(battle_id, winner_id, battle_state) # 9. 切换回合 battle_state.current_turn = ( battle_state.player2_id if battle_state.current_turn == battle_state.player1_id else battle_state.player1_id ) battle_state.turn_number += 1 # 减少所有技能冷却 for key in list(battle_state.cooldowns.keys()): battle_state.cooldowns[key] -= 1 if battle_state.cooldowns[key] <= 0: del battle_state.cooldowns[key] # 10. 保存状态 cursor.execute( """ UPDATE combat_pvp_battles SET battle_state = ?, last_action_time = ? WHERE battle_id = ? """, (json.dumps(battle_state.__dict__), datetime.now().isoformat(), battle_id) ) self._db.conn.commit() # 11. 生成消息 msg = self._format_battle_message(battle_state, log_entry) return True, msg def _can_use_skill(self, user_id: int, skill, battle_state) -> bool: """检查技能是否可用""" cooldown_key = f"{user_id}_{skill.skill_id}" return cooldown_key not in battle_state.cooldowns def _execute_skill_effect(self, user_id: int, skill, battle_state) -> Dict[str, Any]: """ 执行技能效果(DSL引擎) Returns: 战斗日志条目 """ # 判断施法者和目标 is_player1 = (user_id == battle_state.player1_id) caster_hp = battle_state.player1_hp if is_player1 else battle_state.player2_hp target_hp = battle_state.player2_hp if is_player1 else battle_state.player1_hp caster_stats = battle_state.player1_stats if is_player1 else battle_state.player2_stats target_stats = battle_state.player2_stats if is_player1 else battle_state.player1_stats log_entry = { "turn": battle_state.turn_number, "caster": user_id, "skill": skill.name, "effects": [] } # 执行DSL效果 for effect in skill.effects: effect_type = effect.get("type") if effect_type == "damage": # 伤害计算 base_dmg = effect.get("base", 0) atk_ratio = effect.get("atk_ratio", 0.0) raw_damage = base_dmg + caster_stats["total_atk"] * atk_ratio # 暴击判定 crit_rate = caster_stats["total_crit"] is_crit = random.random() < crit_rate if is_crit: crit_dmg = caster_stats["total_crit_dmg"] raw_damage *= crit_dmg # 防御减伤 defense_reduction = self._calculate_defense_reduction(target_stats["total_def"]) final_damage = int(raw_damage * (1 - defense_reduction)) # 应用伤害 target_hp -= final_damage log_entry["effects"].append({ "type": "damage", "value": final_damage, "is_crit": is_crit }) elif effect_type == "heal": # 治疗 base_heal = effect.get("base", 0) hp_ratio = effect.get("hp_ratio", 0.0) max_hp = battle_state.player1_max_hp if is_player1 else battle_state.player2_max_hp heal_amount = int(base_heal + max_hp * hp_ratio) caster_hp = min(caster_hp + heal_amount, max_hp) log_entry["effects"].append({ "type": "heal", "value": heal_amount }) elif effect_type == "shield": # 护盾(简化为临时HP) shield_amount = effect.get("value", 0) caster_hp += shield_amount log_entry["effects"].append({ "type": "shield", "value": shield_amount }) # 更新战斗状态 if is_player1: battle_state.player1_hp = max(0, caster_hp) battle_state.player2_hp = max(0, target_hp) else: battle_state.player2_hp = max(0, caster_hp) battle_state.player1_hp = max(0, target_hp) return log_entry def _calculate_defense_reduction(self, defense: float) -> float: """计算防御减伤率""" def_coef = CombatConfig.get_float("combat_pvp_defense_coefficient") return defense / (defense + def_coef) def _end_battle( self, battle_id: int, winner_id: int, battle_state ) -> Tuple[bool, str]: """结束战斗""" loser_id = ( battle_state.player2_id if winner_id == battle_state.player1_id else battle_state.player1_id ) # 1. 更新战斗状态 cursor = self._db.conn.cursor() cursor.execute( """ UPDATE combat_pvp_battles SET status = 'finished', winner_id = ?, battle_state = ? WHERE battle_id = ? """, (winner_id, json.dumps(battle_state.__dict__), battle_id) ) # 2. 积分转移 reward = CombatConfig.get_int("combat_pvp_reward_points") config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI) loser_points = config_api.get_user_points(0, loser_id) actual_reward = min(reward, loser_points) config_api.adjust_user_points_sync(loser_id, -actual_reward, f"PVP战斗失败(战斗{battle_id})") config_api.adjust_user_points_sync(winner_id, actual_reward, f"PVP战斗胜利(战斗{battle_id})") self._db.conn.commit() # 3. 生成结算消息 msg = ( f"# 🏆 战斗结束\n" f"- 胜利者:用户 {winner_id}\n" f"- 失败者:用户 {loser_id}\n" f"- 积分转移:{actual_reward} 点\n" f"\n**战斗统计:**\n" f"- 总回合数:{battle_state.turn_number}\n" f"- 剩余HP:{battle_state.player1_hp if winner_id == battle_state.player1_id else battle_state.player2_hp}" ) return True, msg def surrender_battle( self, battle_id: int, user_id: int ) -> Tuple[bool, str]: """投降""" cursor = self._db.conn.cursor() cursor.execute( "SELECT * FROM combat_pvp_battles WHERE battle_id = ?", (battle_id,) ) battle_row = cursor.fetchone() if not battle_row: return False, "❌ 战斗不存在" if battle_row["status"] != "active": return False, f"❌ 战斗已结束(状态:{battle_row['status']})" # 确定胜者 from .combat_models import BattleState battle_state = BattleState(**json.loads(battle_row["battle_state"])) winner_id = ( battle_state.player2_id if user_id == battle_state.player1_id else battle_state.player1_id ) return self._end_battle(battle_id, winner_id, battle_state) def _format_battle_message(self, battle_state, log_entry: Dict) -> str: """格式化战斗消息""" lines = [ f"# ⚔️ 回合 {battle_state.turn_number}", f"", f"**{log_entry['caster']} 使用了 {log_entry['skill']}**", ] for effect in log_entry["effects"]: if effect["type"] == "damage": crit_text = "【暴击】" if effect.get("is_crit") else "" lines.append(f"- 造成 {effect['value']} 点伤害 {crit_text}") elif effect["type"] == "heal": lines.append(f"- 恢复 {effect['value']} 点HP") elif effect["type"] == "shield": lines.append(f"- 获得 {effect['value']} 点护盾") lines.append("") lines.append(f"**当前HP:**") lines.append(f"- 玩家1:{battle_state.player1_hp}/{battle_state.player1_max_hp}") lines.append(f"- 玩家2:{battle_state.player2_hp}/{battle_state.player2_max_hp}") lines.append("") lines.append(f"> 当前回合:用户 {battle_state.current_turn}") return "\n".join(lines) def check_battle_timeout(self) -> None: """检查战斗超时""" timeout_seconds = CombatConfig.get_int("combat_pvp_turn_timeout") cursor = self._db.conn.cursor() cursor.execute( """ SELECT * FROM combat_pvp_battles WHERE status = 'active' """, ) active_battles = cursor.fetchall() for battle_row in active_battles: last_action = datetime.fromisoformat(battle_row["last_action_time"]) if (datetime.now() - last_action).total_seconds() > timeout_seconds: # 超时,判定当前回合者失败 from .combat_models import BattleState battle_state = BattleState(**json.loads(battle_row["battle_state"])) winner_id = ( battle_state.player2_id if battle_state.current_turn == battle_state.player1_id else battle_state.player1_id ) self._config.Log( "Warning", f"{ConsoleFrontColor.YELLOW}战斗 {battle_row['battle_id']} 超时,玩家 {battle_state.current_turn} 失败{ConsoleFrontColor.RESET}" ) self._end_battle(battle_row["battle_id"], winner_id, battle_state) # 全局服务实例(单例模式) _service_instance: Optional[CombatService] = None def get_combat_service() -> CombatService: """获取战斗服务单例""" global _service_instance if _service_instance is None: _service_instance = CombatService() return _service_instance