Files
NewWPSBot/Plugins/WPSCombatSystem/combat_service.py

1634 lines
58 KiB
Python
Raw Normal View History

2025-11-10 14:59:07 +08:00
"""战斗系统核心业务逻辑服务"""
from __future__ import annotations
import json
import math
import random
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import STATUS_COMPLETED, get_db
2025-11-10 14:59:07 +08:00
from Plugins.WPSBackpackSystem import WPSBackpackSystem, BackpackItemTier
from Plugins.WPSConfigSystem import WPSConfigAPI
from Plugins.WPSFortuneSystem import WPSFortuneSystem
from .combat_models import (
BattleState,
CombatConfig,
EQUIPMENT_REGISTRY,
EquipmentDefinition,
PlayerStats,
SKILL_REGISTRY,
SkillDefinition,
VIRTUAL_EQUIPMENT,
WINE_BUFFS,
)
class CombatService:
"""战斗系统核心服务类"""
def __init__(self):
self._db = get_db()
self._config: ProjectConfig = Architecture.Get(ProjectConfig)
# 加载关键配置到实例变量
self.heal_cost = CombatConfig.get_int("combat_heal_cost")
self.pvp_reward = CombatConfig.get_int("combat_pvp_reward")
self.pvp_penalty = CombatConfig.get_int("combat_pvp_penalty")
2025-11-10 14:59:07 +08:00
# ========================================================================
# 装备管理
# ========================================================================
def equip_item(self, user_id: int, item_id: str) -> Tuple[bool, str]:
"""
装备物品
Args:
user_id: 用户ID
item_id: 物品ID
Returns:
(是否成功, 消息)
"""
# 1. 检查物品是否存在
equipment = EQUIPMENT_REGISTRY.get(item_id)
if not equipment:
return False, f"❌ 未知装备:{item_id}"
# 2. 检查玩家是否拥有该物品
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
user_items = backpack.get_user_items(user_id)
if not any(item.item_id == item_id for item in user_items):
return False, f"❌ 你没有 {equipment.name}"
# 3. 确保玩家状态记录存在
self._ensure_player_status(user_id)
# 4. 获取当前装备槽
slot_column = f"equipped_{equipment.slot}"
cursor = self._db.conn.cursor()
cursor.execute(
f"SELECT {slot_column} FROM combat_player_status WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
old_equipped = row[slot_column] if row else None
# 5. 更新装备槽
cursor.execute(
f"UPDATE combat_player_status SET {slot_column} = ? WHERE user_id = ?",
(item_id, user_id)
)
self._db.conn.commit()
# 6. 扣除装备物品
backpack.set_item_quantity(user_id, item_id, backpack.add_item(user_id, item_id, 0) - 1)
# 7. 恢复被卸下的装备
if old_equipped:
backpack.add_item(user_id, old_equipped, 1)
2025-11-10 14:59:07 +08:00
result_msg = f"✅ 已装备 {equipment.name}"
if old_equipped and old_equipped != item_id:
old_eq = EQUIPMENT_REGISTRY.get(old_equipped)
old_name = old_eq.name if old_eq else old_equipped
result_msg += f",卸下了 {old_name}"
return True, result_msg
def unequip_item(self, user_id: int, slot: str) -> Tuple[bool, str]:
"""
卸下装备
Args:
user_id: 用户ID
slot: 槽位名称 (weapon/helmet/armor/boots/accessory)
Returns:
(是否成功, 消息)
"""
valid_slots = ["weapon", "helmet", "armor", "boots", "accessory"]
if slot not in valid_slots:
return False, f"❌ 无效的槽位:{slot},可用:{', '.join(valid_slots)}"
self._ensure_player_status(user_id)
slot_column = f"equipped_{slot}"
cursor = self._db.conn.cursor()
cursor.execute(
f"SELECT {slot_column} FROM combat_player_status WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if not row or not row[slot_column]:
return False, f"{slot}槽位没有装备任何物品"
old_item_id = row[slot_column]
old_eq = EQUIPMENT_REGISTRY.get(old_item_id)
old_name = old_eq.name if old_eq else old_item_id
cursor.execute(
f"UPDATE combat_player_status SET {slot_column} = NULL WHERE user_id = ?",
(user_id,)
)
self._db.conn.commit()
# 8. 恢复被卸下的装备
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
backpack.add_item(user_id, old_item_id, 1)
2025-11-10 14:59:07 +08:00
return True, f"✅ 已卸下 {old_name}"
def get_equipped_items(self, user_id: int) -> Dict[str, Optional[EquipmentDefinition]]:
"""
获取玩家当前装备
Returns:
{slot: EquipmentDefinition or None}
"""
self._ensure_player_status(user_id)
cursor = self._db.conn.cursor()
cursor.execute(
"""
SELECT equipped_weapon, equipped_helmet, equipped_armor,
equipped_boots, equipped_accessory
FROM combat_player_status WHERE user_id = ?
""",
(user_id,)
)
row = cursor.fetchone()
if not row:
return {
"weapon": None,
"helmet": None,
"armor": None,
"boots": None,
"accessory": None
}
return {
"weapon": self._resolve_equipment(row["equipped_weapon"]),
"helmet": self._resolve_equipment(row["equipped_helmet"]),
"armor": self._resolve_equipment(row["equipped_armor"]),
"boots": self._resolve_equipment(row["equipped_boots"]),
"accessory": self._resolve_equipment(row["equipped_accessory"]),
}
def _resolve_equipment(self, equipment_ref: Optional[str]) -> Optional[EquipmentDefinition]:
"""解析装备引用item_id 或 instance:123"""
if not equipment_ref:
return None
# TODO: 未来支持装备实例时,解析 "instance:123" 格式
if equipment_ref.startswith("instance:"):
instance_id = int(equipment_ref.split(":")[1])
return self._get_equipment_instance(instance_id)
else:
return EQUIPMENT_REGISTRY.get(equipment_ref)
def _get_equipment_instance(self, instance_id: int) -> Optional[EquipmentDefinition]:
"""获取装备实例(预留接口)"""
# TODO: 实现装备实例查询
return None
# ========================================================================
# 属性计算
# ========================================================================
def calculate_player_stats(self, user_id: int) -> PlayerStats:
"""
计算玩家完整属性基础+装备
Returns:
PlayerStats对象
"""
# 1. 获取基础属性
base_stats = self._get_base_stats(user_id)
# 2. 获取装备
equipped = self.get_equipped_items(user_id)
# 3. 计算总属性
total_stats = {
"HP": base_stats["hp"],
"ATK": base_stats["atk"],
"DEF": base_stats["def_"],
"SPD": base_stats["spd"],
"CRIT": base_stats["crit"],
"CRIT_DMG": base_stats["crit_dmg"],
}
for equipment in equipped.values():
if equipment:
for stat, value in equipment.attributes.items():
total_stats[stat] = total_stats.get(stat, 0) + value
# 4. 计算装备强度
equipment_strength = self.calculate_equipment_strength(total_stats)
# 5. 获取可用技能
available_skills = self.get_available_skills(user_id)
return PlayerStats(
user_id=user_id,
hp=total_stats["HP"],
atk=total_stats["ATK"],
def_=total_stats["DEF"],
spd=total_stats["SPD"],
crit=total_stats["CRIT"],
crit_dmg=total_stats["CRIT_DMG"],
equipment_strength=equipment_strength,
available_skills=available_skills
)
def calculate_equipment_strength(self, stats: Dict[str, int]) -> float:
"""
计算装备强度加权求和
Args:
stats: 属性字典
Returns:
装备强度值
"""
return (
stats.get("ATK", 0) * CombatConfig.get_float("combat_weight_atk") +
stats.get("DEF", 0) * CombatConfig.get_float("combat_weight_def") +
stats.get("HP", 0) * CombatConfig.get_float("combat_weight_hp") +
stats.get("SPD", 0) * CombatConfig.get_float("combat_weight_spd") +
stats.get("CRIT", 0) * CombatConfig.get_float("combat_weight_crit") +
stats.get("CRIT_DMG", 0) * CombatConfig.get_float("combat_weight_crit_dmg")
2025-11-10 14:59:07 +08:00
)
def get_available_skills(self, user_id: int) -> List[SkillDefinition]:
"""
获取玩家可用技能虚拟装备+实际装备
Returns:
技能列表
"""
skills = []
skill_ids_seen = set()
# 1. 添加虚拟装备提供的默认技能
virtual_eq = VIRTUAL_EQUIPMENT["virtual_default_skills"]
for skill_id in virtual_eq.skill_ids:
if skill_id in SKILL_REGISTRY and skill_id not in skill_ids_seen:
skills.append(SKILL_REGISTRY[skill_id])
skill_ids_seen.add(skill_id)
# 2. 添加实际装备提供的技能
equipped = self.get_equipped_items(user_id)
for equipment in equipped.values():
if equipment and equipment.skill_ids:
for skill_id in equipment.skill_ids:
if skill_id in SKILL_REGISTRY and skill_id not in skill_ids_seen:
skills.append(SKILL_REGISTRY[skill_id])
skill_ids_seen.add(skill_id)
return skills
# ========================================================================
# 辅助方法
# ========================================================================
def _ensure_player_status(self, user_id: int) -> None:
"""确保玩家状态记录存在"""
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT user_id FROM combat_player_status WHERE user_id = ?",
(user_id,)
)
if not cursor.fetchone():
# 创建默认状态记录
base_hp = CombatConfig.get_int("combat_base_hp")
base_atk = CombatConfig.get_int("combat_base_atk")
base_def = CombatConfig.get_int("combat_base_def")
base_spd = CombatConfig.get_int("combat_base_spd")
base_crit = CombatConfig.get_int("combat_base_crit")
base_crit_dmg = CombatConfig.get_int("combat_base_crit_dmg")
2025-11-10 14:59:07 +08:00
cursor.execute(
"""
INSERT INTO combat_player_status
(user_id, base_hp, base_atk, base_def, base_spd, base_crit, base_crit_dmg)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(user_id, base_hp, base_atk, base_def, base_spd, base_crit, base_crit_dmg)
)
self._db.conn.commit()
def _get_base_stats(self, user_id: int) -> Dict[str, int]:
"""获取玩家基础属性"""
self._ensure_player_status(user_id)
cursor = self._db.conn.cursor()
cursor.execute(
"""
SELECT base_hp, base_atk, base_def, base_spd, base_crit, base_crit_dmg
FROM combat_player_status WHERE user_id = ?
""",
(user_id,)
)
row = cursor.fetchone()
return {
"hp": row["base_hp"],
"atk": row["base_atk"],
"def_": row["base_def"],
"spd": row["base_spd"],
"crit": row["base_crit"],
"crit_dmg": row["base_crit_dmg"],
}
# ========================================================================
# 状态管理
# ========================================================================
def is_injured(self, user_id: int) -> bool:
"""检查玩家是否受伤"""
self._ensure_player_status(user_id)
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT is_injured FROM combat_player_status WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
return bool(row["is_injured"]) if row else False
def set_injured(self, user_id: int, injured: bool) -> None:
"""设置玩家受伤状态"""
self._ensure_player_status(user_id)
cursor = self._db.conn.cursor()
cursor.execute(
"UPDATE combat_player_status SET is_injured = ? WHERE user_id = ?",
(1 if injured else 0, user_id)
)
self._db.conn.commit()
def heal_player(self, user_id: int) -> Tuple[bool, str]:
"""
治疗玩家消耗积分解除受伤状态
Returns:
(是否成功, 消息)
"""
if not self.is_injured(user_id):
return False, "❌ 你没有受伤"
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
user_points = config_api.get_user_points(user_id)
if user_points < self.heal_cost:
return False, f"❌ 积分不足,治疗需要 {self.heal_cost} 积分,当前仅有 {user_points} 积分"
# 扣除积分
config_api.adjust_user_points_sync(user_id, -self.heal_cost, "治疗费用")
2025-11-10 14:59:07 +08:00
# 解除受伤状态
self.set_injured(user_id, False)
new_points = config_api.get_user_points(user_id)
return True, f"✅ 治疗成功,花费 {self.heal_cost} 积分,当前剩余 {new_points} 积分"
def get_player_status(self, user_id: int) -> Dict[str, Any]:
"""
获取玩家完整状态
Returns:
状态字典
"""
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT * FROM combat_player_status WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if not row:
self._ensure_player_status(user_id)
return self.get_player_status(user_id)
return dict(row)
# ========================================================================
# 冒险系统 - 计算方法
# ========================================================================
def calculate_adventure_time(
self,
stage: int,
equipment_strength: float,
wine_buffs: List[str]
) -> int:
"""
计算冒险实际耗时分钟
Args:
stage: 阶段数
equipment_strength: 装备强度
wine_buffs: 使用的果酒列表
Returns:
实际耗时分钟
"""
# 0. Debug模式冒险时长为0立即结算
from PWF.CoreModules.flags import get_internal_debug
if get_internal_debug():
self._config.Log(
"Info",
f"{ConsoleFrontColor.YELLOW}[DEBUG模式] 冒险时长设为0{ConsoleFrontColor.RESET}"
)
return 0
2025-11-12 23:58:42 +08:00
# 1. 计算基础时间线性增长上限24小时
base_time = CombatConfig.get_int("combat_adventure_base_time")
max_time = CombatConfig.get_int("combat_adventure_max_time")
2025-11-10 14:59:07 +08:00
# 3. 果酒时间缩减buff
wine_time_reduction = 0.0
for wine_id in wine_buffs:
if wine_id in WINE_BUFFS:
wine_time_reduction += WINE_BUFFS[wine_id].get("time_reduction", 0.0)
# 4. 最终时间
2025-11-12 23:58:42 +08:00
actual_time = min(max_time, base_time * stage)
2025-11-10 14:59:07 +08:00
actual_time = actual_time * (1 - wine_time_reduction)
actual_time = max(1, int(actual_time)) # 最少1分钟
return actual_time
def calculate_success_rate(
self,
stage: int,
equipment_strength: float,
fortune_value: float,
wine_buffs: List[str]
) -> Tuple[float, str]:
2025-11-10 14:59:07 +08:00
"""
计算冒险成功率
Returns:
成功率0.0-1.0
描述性文本
2025-11-10 14:59:07 +08:00
"""
# 1. 基础成功率
base_rate = CombatConfig.get_float("combat_adventure_base_success_rate")
stage_penalty = CombatConfig.get_float("combat_adventure_stage_penalty")
2025-11-10 14:59:07 +08:00
base_success = base_rate - (stage - 1) * stage_penalty
2025-11-10 14:59:07 +08:00
# 2. 装备加成
equip_coeff = CombatConfig.get_float("combat_adventure_equipment_coeff")
equip_bonus = math.log2(1 + equipment_strength * equip_coeff)
2025-11-10 14:59:07 +08:00
# 3. 运势加成
fortune_coeff = CombatConfig.get_float("combat_adventure_fortune_coeff")
2025-11-10 14:59:07 +08:00
fortune_bonus = fortune_value * fortune_coeff
# 4. 果酒buff加成
buff_bonus = 0.0
for wine_id in wine_buffs:
if wine_id in WINE_BUFFS:
buff_bonus += WINE_BUFFS[wine_id].get("success_rate", 0.0)
# 5. 最终成功率
max_rate = CombatConfig.get_float("combat_adventure_max_success_rate")
2025-11-10 14:59:07 +08:00
total = base_success + equip_bonus + fortune_bonus + buff_bonus
# 6.拼接描述
desc = f"{int(base_success*100)}%基础"
2025-11-11 11:34:55 +08:00
desc += f"+{int(equip_bonus*100)}%装备"
desc += f"+{int(fortune_bonus*100)}%运势"
desc += f"+{int(buff_bonus*100)}%buff"
return (min(max_rate, total), desc)
2025-11-10 14:59:07 +08:00
def check_food_requirement(self, stage: int, food_items: List[str]) -> Tuple[bool, str, int]:
"""
检查食物可选用于提供buff
Args:
stage: 阶段数
food_items: 食物item_id列表可为空
Returns:
(是否满足, 提示消息, 推荐的食物数量)
"""
# 食物是可选的,不强制要求
# 如果提供食物果酒可以获得buff效果时间缩减、收益提升等
# 计算推荐的食物数量(仅供参考)
base_time = CombatConfig.get_int("combat_adventure_base_time")
max_time = CombatConfig.get_int("combat_adventure_max_time")
2025-11-10 14:59:07 +08:00
stage_time = min(base_time * (2 ** (stage - 1)), max_time)
food_support_time = CombatConfig.get_int("combat_food_support_time")
2025-11-10 14:59:07 +08:00
recommended_food = math.ceil(stage_time / food_support_time)
# 如果没有提供食物,给出提示但允许继续
#if len(food_items) == 0:
# msg = f" 未使用食物。推荐使用 {recommended_food} 个果酒以获得buff加成"
# return True, msg, recommended_food
2025-11-10 14:59:07 +08:00
return True, "", recommended_food
# ========================================================================
# 冒险系统 - 流程方法
# ========================================================================
def abort_current_adventure(self, user_id: int) -> Tuple[bool, str]:
"""
立即放弃当前冒险无奖励清理状态
Args:
user_id: 玩家ID
Returns:
(是否成功, 消息)
"""
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT current_adventure_id FROM combat_player_status WHERE user_id = ?",
(user_id,),
)
status_row = cursor.fetchone()
if not status_row or not status_row["current_adventure_id"]:
return False, "❌ 当前没有进行中的冒险"
adventure_id = status_row["current_adventure_id"]
cursor.execute(
"""
SELECT adventure_id, stage, status, scheduled_task_id
FROM combat_adventure_records
WHERE adventure_id = ?
""",
(adventure_id,),
)
adventure = cursor.fetchone()
if not adventure:
cursor.execute(
"UPDATE combat_player_status SET current_adventure_id = NULL WHERE user_id = ?",
(user_id,),
)
self._db.conn.commit()
return False, "❌ 未找到冒险记录,状态已重置"
if adventure["status"] != "in_progress":
cursor.execute(
"UPDATE combat_player_status SET current_adventure_id = NULL WHERE user_id = ?",
(user_id,),
)
self._db.conn.commit()
return False, f"❌ 冒险已结算(状态:{adventure['status']}"
scheduled_task_id = adventure["scheduled_task_id"]
if scheduled_task_id:
try:
self._db.update_task_status(int(scheduled_task_id), STATUS_COMPLETED)
except Exception as exc: # pylint: disable=broad-except
self._config.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}取消冒险定时任务失败: {exc}{ConsoleFrontColor.RESET}",
)
cursor.execute(
"""
UPDATE combat_adventure_records
SET status = ?, rewards = NULL, scheduled_task_id = NULL
WHERE adventure_id = ? AND status = 'in_progress'
""",
("abandoned", adventure_id),
)
cursor.execute(
"UPDATE combat_player_status SET current_adventure_id = NULL WHERE user_id = ?",
(user_id,),
)
self._db.conn.commit()
stage = adventure["stage"]
self._config.Log(
"Info",
f"{ConsoleFrontColor.LIGHTCYAN_EX}用户 {user_id} 放弃冒险 {adventure_id}(阶段 {stage}{ConsoleFrontColor.RESET}",
)
return True, f"⚠️ 已放弃第 {stage} 阶段冒险,本阶段奖励作废"
def get_last_adventure_record(self, user_id: int) -> Optional[Dict[str, Any]]:
"""
获取最近一次冒险记录无论状态
"""
cursor = self._db.conn.cursor()
cursor.execute(
"""
SELECT * FROM combat_adventure_records
WHERE user_id = ?
ORDER BY adventure_id DESC
LIMIT 1
""",
(user_id,),
)
row = cursor.fetchone()
2025-11-12 16:37:10 +08:00
return dict(row) if row else None
def get_adventure_by_id(self, adventure_id: int) -> Optional[Dict[str, Any]]:
"""
根据冒险ID获取冒险记录
"""
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT * FROM combat_adventure_records WHERE adventure_id = ?",
(adventure_id,),
)
row = cursor.fetchone()
return dict(row) if row else None
2025-11-10 14:59:07 +08:00
def start_adventure(
self,
user_id: int,
chat_id: int,
stage: int,
food_items: List[str],
register_callback: Optional[object] = None
) -> Tuple[bool, str, Optional[int]]:
"""
开始冒险
Args:
user_id: 用户ID
chat_id: 会话ID
stage: 阶段数
food_items: 食物item_id列表
register_callback: 插件实例用于注册时钟任务
Returns:
(是否成功, 消息, 冒险ID)
"""
# 1. 检查玩家状态
if self.is_injured(user_id):
return False, "❌ 你处于受伤状态,无法冒险。使用 `治疗` 恢复", None
status = self.get_player_status(user_id)
if status.get("current_adventure_id"):
return False, "❌ 你已经在冒险中", None
if status.get("current_battle_id"):
return False, "❌ 你正在战斗中,无法开始冒险", None
# 2. 检查食物(可选)
can_adventure, food_hint, recommended = self.check_food_requirement(stage, food_items)
# 3. 检查并消耗食物(如果提供了食物)
if food_items:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
food_count: Dict[str, int] = {}
for food_id in food_items:
food_count[food_id] = food_count.get(food_id, 0) + 1
# 验证拥有
user_items = backpack.get_user_items(user_id)
for food_id, count in food_count.items():
owned = sum(item.quantity for item in user_items if item.item_id == food_id)
2025-11-10 14:59:07 +08:00
if owned < count:
return False, f"{food_id} 数量不足,还需要 {count - owned} 个, 当前只有{owned}", None
2025-11-10 14:59:07 +08:00
# 消耗食物
for food_id, count in food_count.items():
current = next((item.quantity for item in user_items if item.item_id == food_id), 0)
backpack.set_item_quantity(user_id, food_id, current - count)
# 4. 创建装备快照
stats = self.calculate_player_stats(user_id)
equipped = self.get_equipped_items(user_id)
equipment_snapshot = {
"stats": {
"HP": stats.hp,
"ATK": stats.atk,
"DEF": stats.def_,
"SPD": stats.spd,
"CRIT": stats.crit,
"CRIT_DMG": stats.crit_dmg,
},
"equipment_strength": stats.equipment_strength,
"equipped": {
slot: eq.item_id if eq else None
for slot, eq in equipped.items()
}
}
# 5. 获取运势值
fortune_system: WPSFortuneSystem = Architecture.Get(WPSFortuneSystem)
fortune_value = fortune_system.get_fortune_value(user_id) if fortune_system else 0.0
# 6. 计算冒险参数
actual_time = self.calculate_adventure_time(stage, stats.equipment_strength, food_items)
success_rate, success_rate_desc = self.calculate_success_rate(stage, stats.equipment_strength, fortune_value, food_items)
2025-11-10 14:59:07 +08:00
# 7. 创建冒险记录
start_time = datetime.now()
expected_end = start_time + timedelta(minutes=actual_time)
cursor = self._db.conn.cursor()
cursor.execute(
"""
INSERT INTO combat_adventure_records
(user_id, chat_id, stage, equipment_snapshot, foods_used,
start_time, expected_end_time, status, fortune_value,
equipment_strength, success_rate)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
user_id, chat_id, stage,
json.dumps(equipment_snapshot),
json.dumps(food_items),
start_time.isoformat(),
expected_end.isoformat(),
"in_progress",
fortune_value,
stats.equipment_strength,
success_rate
)
)
adventure_id = cursor.lastrowid
# 8. 更新玩家状态
cursor.execute(
"UPDATE combat_player_status SET current_adventure_id = ? WHERE user_id = ?",
(adventure_id, user_id)
)
self._db.conn.commit()
# 9. 注册时钟任务
task_id = None
if register_callback:
delay_ms = int(actual_time * 60 * 1000)
task_id = register_callback.register_clock(
register_callback._settle_adventure_callback,
delay_ms,
kwargs={
"adventure_id": adventure_id,
"user_id": user_id,
"chat_id": chat_id
}
)
cursor.execute(
"UPDATE combat_adventure_records SET scheduled_task_id = ? WHERE adventure_id = ?",
(task_id, adventure_id)
)
self._db.conn.commit()
# Debug模式提示
from PWF.CoreModules.flags import get_internal_debug
debug_hint = " **[DEBUG模式]**" if get_internal_debug() else ""
# 时间格式化
if actual_time == 0:
time_str = "立即结算"
else:
time_str = f"{actual_time} 分钟"
# 构建消息
msg_lines = [
f"✅ 开始第 {stage} 阶段冒险{debug_hint}",
f"- 预计耗时:{time_str}",
f"- 成功率: {success_rate*100:.2f}% ({success_rate_desc})",
2025-11-10 14:59:07 +08:00
f"- 预计完成:{expected_end.strftime('%Y-%m-%d %H:%M')}"
]
# 如果有食物提示,添加到消息中
if food_hint:
msg_lines.append(f"\n{food_hint}")
msg = "\n".join(msg_lines)
return True, msg, adventure_id
def settle_adventure(self, adventure_id: int) -> Tuple[bool, str, Optional[Dict]]:
"""
结算冒险
Args:
adventure_id: 冒险ID
Returns:
(是否成功, 消息, 奖励字典)
"""
# 1. 获取冒险记录
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT * FROM combat_adventure_records WHERE adventure_id = ?",
(adventure_id,)
)
adventure = cursor.fetchone()
if not adventure:
return False, "❌ 冒险记录不存在", None
if adventure["status"] != "in_progress":
return False, f"❌ 冒险已结算(状态:{adventure['status']}", None
user_id = adventure["user_id"]
stage = adventure["stage"]
success_rate = adventure["success_rate"]
# 2. 判定成功/失败
roll = random.random()
success = roll < success_rate
# 3. 生成奖励
rewards = None
if success:
fortune_value = adventure["fortune_value"]
foods_used = json.loads(adventure["foods_used"])
rewards = self.generate_adventure_rewards(stage, fortune_value, foods_used)
# 发放奖励
self._grant_rewards(user_id, rewards)
else:
# 失败:设置受伤状态
self.set_injured(user_id, True)
# 4. 更新冒险记录
cursor.execute(
"""
UPDATE combat_adventure_records
SET status = ?, rewards = ?
WHERE adventure_id = ?
""",
("success" if success else "failed", json.dumps(rewards) if rewards else None, adventure_id)
)
# 5. 清除玩家冒险状态
cursor.execute(
"UPDATE combat_player_status SET current_adventure_id = NULL WHERE user_id = ?",
(user_id,)
)
self._db.conn.commit()
# 6. 生成结算消息
if success:
msg = self._format_success_message(stage, rewards)
else:
msg = (
f"# 💔 冒险失败\n"
f"- 阶段:第 {stage} 阶段\n"
f"- 成功率:{success_rate*100:.1f}%\n"
f"- 结果:失败(骰子:{roll*100:.1f}%\n"
f"- 你受了伤,需要消耗 100 积分治疗"
)
return success, msg, rewards
def generate_adventure_rewards(
self,
stage: int,
fortune_value: float,
wine_buffs: List[str]
) -> Dict[str, Any]:
"""
生成冒险奖励
Returns:
奖励字典 {type: item_id/amount, ...}
"""
from .combat_models import (
ADVENTURE_MATERIALS,
ADVENTURE_SEEDS,
ADVENTURE_SOUVENIRS,
COMBAT_POTIONS,
EQUIPMENT_REGISTRY,
)
rewards = {}
# 1. 基础积分奖励
base_points = CombatConfig.get_int("combat_loot_points_base")
points_per_stage = CombatConfig.get_int("combat_loot_points_per_stage")
2025-11-10 14:59:07 +08:00
# 运势影响
fortune_mult = CombatConfig.get_float("combat_loot_fortune_multiplier")
2025-11-10 14:59:07 +08:00
fortune_bonus = 1.0 + fortune_value * fortune_mult
# 果酒收益加成
reward_boost = 1.0
for wine_id in wine_buffs:
if wine_id in WINE_BUFFS:
reward_boost += WINE_BUFFS[wine_id].get("reward_boost", 0.0)
points = int((base_points + points_per_stage * stage) * fortune_bonus * reward_boost)
rewards["points"] = points
# 2. 掉落物品(权重随机)
loot_weights = {
"equipment": CombatConfig.get_int("combat_loot_weight_equipment"),
"material": CombatConfig.get_int("combat_loot_weight_material"),
"souvenir": CombatConfig.get_int("combat_loot_weight_souvenir"),
"potion": CombatConfig.get_int("combat_loot_weight_potion"),
"seed": CombatConfig.get_int("combat_loot_weight_seed"),
2025-11-10 14:59:07 +08:00
}
# 根据阶段决定掉落数量
num_drops = min(1 + stage // 3, 3) # 1-3件物品
rewards["items"] = []
for _ in range(num_drops):
loot_type = random.choices(
list(loot_weights.keys()),
weights=list(loot_weights.values())
)[0]
item = self._generate_loot_item(loot_type, stage)
if item:
rewards["items"].append(item)
return rewards
def _generate_loot_item(self, loot_type: str, stage: int) -> Optional[Dict[str, Any]]:
"""生成单个掉落物品"""
from .combat_models import (
ADVENTURE_MATERIALS,
ADVENTURE_SEEDS,
ADVENTURE_SOUVENIRS,
COMBAT_POTIONS,
EQUIPMENT_REGISTRY,
)
if loot_type == "equipment":
# 根据阶段提升装备品质概率
tier_weights = {
BackpackItemTier.COMMON: max(50 - stage * 5, 10),
BackpackItemTier.RARE: 30 + stage * 3,
BackpackItemTier.EPIC: 15 + stage * 2,
BackpackItemTier.LEGENDARY: min(5 + stage, 20),
}
tier = random.choices(
list(tier_weights.keys()),
weights=list(tier_weights.values())
)[0]
# 筛选该品质的装备
candidates = [eq for eq in EQUIPMENT_REGISTRY.values() if eq.tier == tier]
if candidates:
equipment = random.choice(candidates)
return {"type": "equipment", "item_id": equipment.item_id, "quantity": 1}
elif loot_type == "material":
# 材料品质也随阶段提升
materials = list(ADVENTURE_MATERIALS.items())
# 高阶段有更高概率掉稀有材料
idx = min(stage // 2, len(materials) - 1)
2025-11-10 22:30:16 +08:00
item_id, (_, tier, _) = random.choice(materials[max(0, idx-1):])
2025-11-10 14:59:07 +08:00
quantity = random.randint(1, 3)
return {"type": "material", "item_id": item_id, "quantity": quantity}
elif loot_type == "souvenir":
souvenirs = list(ADVENTURE_SOUVENIRS.keys())
item_id = random.choice(souvenirs)
return {"type": "souvenir", "item_id": item_id, "quantity": 1}
elif loot_type == "potion":
potions = list(COMBAT_POTIONS.keys())
item_id = random.choice(potions)
quantity = random.randint(1, 2)
return {"type": "potion", "item_id": item_id, "quantity": quantity}
elif loot_type == "seed":
if stage >= 5: # 高阶段才掉落稀有种子
seeds = list(ADVENTURE_SEEDS.keys())
item_id = random.choice(seeds)
return {"type": "seed", "item_id": item_id, "quantity": 1}
return None
def _grant_rewards(self, user_id: int, rewards: Dict[str, Any]) -> None:
"""发放奖励"""
# 1. 积分
if "points" in rewards:
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
config_api.adjust_user_points_sync(user_id, rewards["points"], "冒险奖励")
2025-11-10 14:59:07 +08:00
# 2. 物品
if "items" in rewards:
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
for item in rewards["items"]:
backpack.add_item(user_id, item["item_id"], item["quantity"])
def _format_success_message(self, stage: int, rewards: Dict[str, Any]) -> str:
"""格式化成功消息"""
lines = [
f"# 🎉 冒险成功",
f"- 阶段:第 {stage} 阶段",
f"",
f"**获得奖励:**",
]
if "points" in rewards:
lines.append(f"- 积分:`+{rewards['points']}`")
if "items" in rewards and rewards["items"]:
lines.append("\n**物品:**")
backpack: WPSBackpackSystem = Architecture.Get(WPSBackpackSystem)
for item in rewards["items"]:
try:
item_def = backpack._get_definition(item["item_id"])
lines.append(f"- {item_def.name} × {item['quantity']}")
except:
lines.append(f"- {item['item_id']} × {item['quantity']}")
lines.append("\n> 使用 `继续冒险` 进入下一阶段,或停止冒险")
return "\n".join(lines)
# ========================================================================
# 冒险系统 - 恢复机制
# ========================================================================
def recover_overdue_adventures(self) -> None:
"""恢复过期但未结算的冒险"""
cursor = self._db.conn.cursor()
cursor.execute(
"""
SELECT * FROM combat_adventure_records
WHERE status = 'in_progress' AND expected_end_time < ?
""",
(datetime.now().isoformat(),)
)
overdue_adventures = cursor.fetchall()
for adventure in overdue_adventures:
self._config.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}发现过期冒险 {adventure['adventure_id']},执行恢复结算{ConsoleFrontColor.RESET}"
)
# 同步结算
self.settle_adventure(adventure["adventure_id"])
if overdue_adventures:
self._config.Log(
"Info",
f"{ConsoleFrontColor.GREEN}恢复了 {len(overdue_adventures)} 个过期冒险{ConsoleFrontColor.RESET}"
)
# ========================================================================
# PVP系统 - 挑战管理
# ========================================================================
def create_pvp_challenge(
self,
challenger_id: int,
target_id: int,
timeout_minutes: int = 15
) -> Tuple[bool, str, Optional[int]]:
"""
发起PVP挑战
Returns:
(是否成功, 消息, 挑战ID)
"""
# 1. 验证双方状态
challenger_status = self.get_player_status(challenger_id)
target_status = self.get_player_status(target_id)
if challenger_status.get("is_injured"):
return False, "❌ 你受了伤,无法发起挑战", None
if target_status.get("is_injured"):
return False, "❌ 对方受了伤,无法接受挑战", None
if challenger_status.get("current_adventure_id"):
return False, "❌ 你正在冒险中,无法发起挑战", None
if target_status.get("current_adventure_id"):
return False, "❌ 对方正在冒险中,无法接受挑战", None
# 2. 检查积分
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
challenger_points = config_api.get_user_points(0, challenger_id)
target_points = config_api.get_user_points(0, target_id)
if challenger_points < 100:
return False, "❌ 你的积分不足100无法发起挑战", None
if target_points < 100:
return False, "❌ 对方积分不足100无法接受挑战", None
# 3. 创建挑战记录
cursor = self._db.conn.cursor()
expire_time = datetime.now() + timedelta(minutes=timeout_minutes)
cursor.execute(
"""
INSERT INTO combat_pvp_challenges
(challenger_id, target_id, status, created_at, expires_at)
VALUES (?, ?, 'pending', ?, ?)
""",
(challenger_id, target_id, datetime.now().isoformat(), expire_time.isoformat())
)
challenge_id = cursor.lastrowid
self._db.conn.commit()
msg = (
f"# ⚔️ PVP挑战已发送\n"
f"- 挑战者:用户 {challenger_id}\n"
f"- 目标:用户 {target_id}\n"
f"- 过期时间:{expire_time.strftime('%H:%M')}\n"
f"\n> 对方需使用 `接受挑战 {challenge_id}` 接受,或 `拒绝挑战 {challenge_id}` 拒绝"
)
return True, msg, challenge_id
def accept_challenge(
self,
challenge_id: int,
target_id: int
) -> Tuple[bool, str, Optional[int]]:
"""
接受挑战
Returns:
(是否成功, 消息, 战斗ID)
"""
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT * FROM combat_pvp_challenges WHERE challenge_id = ?",
(challenge_id,)
)
challenge = cursor.fetchone()
if not challenge:
return False, "❌ 挑战不存在", None
if challenge["target_id"] != target_id:
return False, "❌ 这不是发给你的挑战", None
if challenge["status"] != "pending":
return False, f"❌ 挑战已失效(状态:{challenge['status']}", None
# 检查是否过期
if datetime.now() > datetime.fromisoformat(challenge["expires_at"]):
cursor.execute(
"UPDATE combat_pvp_challenges SET status = 'expired' WHERE challenge_id = ?",
(challenge_id,)
)
self._db.conn.commit()
return False, "❌ 挑战已过期", None
# 更新挑战状态
cursor.execute(
"UPDATE combat_pvp_challenges SET status = 'accepted' WHERE challenge_id = ?",
(challenge_id,)
)
# 创建战斗
battle_id = self._create_battle(challenge["challenger_id"], target_id)
self._db.conn.commit()
msg = (
f"# ⚔️ 挑战已接受\n"
f"- 战斗ID{battle_id}\n"
f"\n> 战斗开始!使用 `战斗 {battle_id} [技能]` 进行操作"
)
return True, msg, battle_id
def reject_challenge(
self,
challenge_id: int,
target_id: int
) -> Tuple[bool, str]:
"""拒绝挑战"""
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT * FROM combat_pvp_challenges WHERE challenge_id = ?",
(challenge_id,)
)
challenge = cursor.fetchone()
if not challenge:
return False, "❌ 挑战不存在"
if challenge["target_id"] != target_id:
return False, "❌ 这不是发给你的挑战"
if challenge["status"] != "pending":
return False, f"❌ 挑战已失效(状态:{challenge['status']}"
cursor.execute(
"UPDATE combat_pvp_challenges SET status = 'rejected' WHERE challenge_id = ?",
(challenge_id,)
)
self._db.conn.commit()
return True, "✅ 已拒绝挑战"
def _create_battle(self, player1_id: int, player2_id: int) -> int:
"""
创建战斗实例
Returns:
battle_id
"""
from .combat_models import BattleState
# 获取双方属性快照
stats1 = self.calculate_player_stats(player1_id)
stats2 = self.calculate_player_stats(player2_id)
# 初始化战斗状态
battle_state = BattleState(
player1_id=player1_id,
player2_id=player2_id,
player1_hp=stats1.total_hp,
player2_hp=stats2.total_hp,
player1_max_hp=stats1.total_hp,
player2_max_hp=stats2.total_hp,
player1_stats=stats1.__dict__,
player2_stats=stats2.__dict__,
current_turn=player1_id if stats1.total_spd >= stats2.total_spd else player2_id,
turn_number=1,
cooldowns={},
battle_log=[]
)
cursor = self._db.conn.cursor()
cursor.execute(
"""
INSERT INTO combat_pvp_battles
(player1_id, player2_id, status, battle_state, created_at, last_action_time)
VALUES (?, ?, 'active', ?, ?, ?)
""",
(
player1_id,
player2_id,
json.dumps(battle_state.__dict__),
datetime.now().isoformat(),
datetime.now().isoformat()
)
)
return cursor.lastrowid
# ========================================================================
# PVP系统 - 战斗执行
# ========================================================================
def execute_battle_action(
self,
battle_id: int,
user_id: int,
skill_name: str
) -> Tuple[bool, str]:
"""
执行战斗动作
Args:
battle_id: 战斗ID
user_id: 操作用户
skill_name: 技能名称
Returns:
(是否成功, 消息)
"""
from .combat_models import BattleState, SKILL_REGISTRY
# 1. 获取战斗状态
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT * FROM combat_pvp_battles WHERE battle_id = ?",
(battle_id,)
)
battle_row = cursor.fetchone()
if not battle_row:
return False, "❌ 战斗不存在"
if battle_row["status"] != "active":
return False, f"❌ 战斗已结束(状态:{battle_row['status']}"
# 2. 解析战斗状态
battle_state = BattleState(**json.loads(battle_row["battle_state"]))
# 3. 检查回合
if battle_state.current_turn != user_id:
return False, "❌ 现在不是你的回合"
# 4. 获取技能
skill = SKILL_REGISTRY.get(skill_name)
if not skill:
return False, f"❌ 技能 {skill_name} 不存在"
# 5. 检查技能是否可用
if not self._can_use_skill(user_id, skill, battle_state):
return False, f"❌ 技能 {skill_name} 暂不可用(冷却中)"
# 6. 执行技能效果
log_entry = self._execute_skill_effect(user_id, skill, battle_state)
battle_state.battle_log.append(log_entry)
# 7. 更新冷却
if skill.cooldown > 0:
cooldown_key = f"{user_id}_{skill.skill_id}"
battle_state.cooldowns[cooldown_key] = skill.cooldown
# 8. 检查战斗是否结束
winner_id = None
if battle_state.player1_hp <= 0:
winner_id = battle_state.player2_id
elif battle_state.player2_hp <= 0:
winner_id = battle_state.player1_id
if winner_id:
return self._end_battle(battle_id, winner_id, battle_state)
# 9. 切换回合
battle_state.current_turn = (
battle_state.player2_id if battle_state.current_turn == battle_state.player1_id
else battle_state.player1_id
)
battle_state.turn_number += 1
# 减少所有技能冷却
for key in list(battle_state.cooldowns.keys()):
battle_state.cooldowns[key] -= 1
if battle_state.cooldowns[key] <= 0:
del battle_state.cooldowns[key]
# 10. 保存状态
cursor.execute(
"""
UPDATE combat_pvp_battles
SET battle_state = ?, last_action_time = ?
WHERE battle_id = ?
""",
(json.dumps(battle_state.__dict__), datetime.now().isoformat(), battle_id)
)
self._db.conn.commit()
# 11. 生成消息
msg = self._format_battle_message(battle_state, log_entry)
return True, msg
def _can_use_skill(self, user_id: int, skill, battle_state) -> bool:
"""检查技能是否可用"""
cooldown_key = f"{user_id}_{skill.skill_id}"
return cooldown_key not in battle_state.cooldowns
def _execute_skill_effect(self, user_id: int, skill, battle_state) -> Dict[str, Any]:
"""
执行技能效果DSL引擎
Returns:
战斗日志条目
"""
# 判断施法者和目标
is_player1 = (user_id == battle_state.player1_id)
caster_hp = battle_state.player1_hp if is_player1 else battle_state.player2_hp
target_hp = battle_state.player2_hp if is_player1 else battle_state.player1_hp
caster_stats = battle_state.player1_stats if is_player1 else battle_state.player2_stats
target_stats = battle_state.player2_stats if is_player1 else battle_state.player1_stats
log_entry = {
"turn": battle_state.turn_number,
"caster": user_id,
"skill": skill.name,
"effects": []
}
# 执行DSL效果
for effect in skill.effects:
effect_type = effect.get("type")
if effect_type == "damage":
# 伤害计算
base_dmg = effect.get("base", 0)
atk_ratio = effect.get("atk_ratio", 0.0)
raw_damage = base_dmg + caster_stats["total_atk"] * atk_ratio
# 暴击判定
crit_rate = caster_stats["total_crit"]
is_crit = random.random() < crit_rate
if is_crit:
crit_dmg = caster_stats["total_crit_dmg"]
raw_damage *= crit_dmg
# 防御减伤
defense_reduction = self._calculate_defense_reduction(target_stats["total_def"])
final_damage = int(raw_damage * (1 - defense_reduction))
# 应用伤害
target_hp -= final_damage
log_entry["effects"].append({
"type": "damage",
"value": final_damage,
"is_crit": is_crit
})
elif effect_type == "heal":
# 治疗
base_heal = effect.get("base", 0)
hp_ratio = effect.get("hp_ratio", 0.0)
max_hp = battle_state.player1_max_hp if is_player1 else battle_state.player2_max_hp
heal_amount = int(base_heal + max_hp * hp_ratio)
caster_hp = min(caster_hp + heal_amount, max_hp)
log_entry["effects"].append({
"type": "heal",
"value": heal_amount
})
elif effect_type == "shield":
# 护盾简化为临时HP
shield_amount = effect.get("value", 0)
caster_hp += shield_amount
log_entry["effects"].append({
"type": "shield",
"value": shield_amount
})
# 更新战斗状态
if is_player1:
battle_state.player1_hp = max(0, caster_hp)
battle_state.player2_hp = max(0, target_hp)
else:
battle_state.player2_hp = max(0, caster_hp)
battle_state.player1_hp = max(0, target_hp)
return log_entry
def _calculate_defense_reduction(self, defense: float) -> float:
"""计算防御减伤率"""
def_coef = CombatConfig.get_float("combat_pvp_defense_coefficient")
2025-11-10 14:59:07 +08:00
return defense / (defense + def_coef)
def _end_battle(
self,
battle_id: int,
winner_id: int,
battle_state
) -> Tuple[bool, str]:
"""结束战斗"""
loser_id = (
battle_state.player2_id if winner_id == battle_state.player1_id
else battle_state.player1_id
)
# 1. 更新战斗状态
cursor = self._db.conn.cursor()
cursor.execute(
"""
UPDATE combat_pvp_battles
SET status = 'finished', winner_id = ?, battle_state = ?
WHERE battle_id = ?
""",
(winner_id, json.dumps(battle_state.__dict__), battle_id)
)
# 2. 积分转移
reward = CombatConfig.get_int("combat_pvp_reward_points")
2025-11-10 14:59:07 +08:00
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
loser_points = config_api.get_user_points(0, loser_id)
actual_reward = min(reward, loser_points)
config_api.adjust_user_points_sync(loser_id, -actual_reward, f"PVP战斗失败战斗{battle_id}")
config_api.adjust_user_points_sync(winner_id, actual_reward, f"PVP战斗胜利战斗{battle_id}")
2025-11-10 14:59:07 +08:00
self._db.conn.commit()
# 3. 生成结算消息
msg = (
f"# 🏆 战斗结束\n"
f"- 胜利者:用户 {winner_id}\n"
f"- 失败者:用户 {loser_id}\n"
f"- 积分转移:{actual_reward}\n"
f"\n**战斗统计:**\n"
f"- 总回合数:{battle_state.turn_number}\n"
f"- 剩余HP{battle_state.player1_hp if winner_id == battle_state.player1_id else battle_state.player2_hp}"
)
return True, msg
def surrender_battle(
self,
battle_id: int,
user_id: int
) -> Tuple[bool, str]:
"""投降"""
cursor = self._db.conn.cursor()
cursor.execute(
"SELECT * FROM combat_pvp_battles WHERE battle_id = ?",
(battle_id,)
)
battle_row = cursor.fetchone()
if not battle_row:
return False, "❌ 战斗不存在"
if battle_row["status"] != "active":
return False, f"❌ 战斗已结束(状态:{battle_row['status']}"
# 确定胜者
from .combat_models import BattleState
battle_state = BattleState(**json.loads(battle_row["battle_state"]))
winner_id = (
battle_state.player2_id if user_id == battle_state.player1_id
else battle_state.player1_id
)
return self._end_battle(battle_id, winner_id, battle_state)
def _format_battle_message(self, battle_state, log_entry: Dict) -> str:
"""格式化战斗消息"""
lines = [
f"# ⚔️ 回合 {battle_state.turn_number}",
f"",
f"**{log_entry['caster']} 使用了 {log_entry['skill']}**",
]
for effect in log_entry["effects"]:
if effect["type"] == "damage":
crit_text = "【暴击】" if effect.get("is_crit") else ""
lines.append(f"- 造成 {effect['value']} 点伤害 {crit_text}")
elif effect["type"] == "heal":
lines.append(f"- 恢复 {effect['value']} 点HP")
elif effect["type"] == "shield":
lines.append(f"- 获得 {effect['value']} 点护盾")
lines.append("")
lines.append(f"**当前HP**")
lines.append(f"- 玩家1{battle_state.player1_hp}/{battle_state.player1_max_hp}")
lines.append(f"- 玩家2{battle_state.player2_hp}/{battle_state.player2_max_hp}")
lines.append("")
lines.append(f"> 当前回合:用户 {battle_state.current_turn}")
return "\n".join(lines)
def check_battle_timeout(self) -> None:
"""检查战斗超时"""
timeout_seconds = CombatConfig.get_int("combat_pvp_turn_timeout")
2025-11-10 14:59:07 +08:00
cursor = self._db.conn.cursor()
cursor.execute(
"""
SELECT * FROM combat_pvp_battles
WHERE status = 'active'
""",
)
active_battles = cursor.fetchall()
for battle_row in active_battles:
last_action = datetime.fromisoformat(battle_row["last_action_time"])
if (datetime.now() - last_action).total_seconds() > timeout_seconds:
# 超时,判定当前回合者失败
from .combat_models import BattleState
battle_state = BattleState(**json.loads(battle_row["battle_state"]))
winner_id = (
battle_state.player2_id if battle_state.current_turn == battle_state.player1_id
else battle_state.player1_id
)
self._config.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}战斗 {battle_row['battle_id']} 超时,玩家 {battle_state.current_turn} 失败{ConsoleFrontColor.RESET}"
)
self._end_battle(battle_row["battle_id"], winner_id, battle_state)
# 全局服务实例(单例模式)
_service_instance: Optional[CombatService] = None
def get_combat_service() -> CombatService:
"""获取战斗服务单例"""
global _service_instance
if _service_instance is None:
_service_instance = CombatService()
return _service_instance