"""冒险系统游戏模块""" import random import time import logging import re from datetime import datetime from games.base import BaseGame from utils.parser import CommandParser from core.database import get_db from typing import * logger = logging.getLogger(__name__) class AdventureGame(BaseGame): """冒险系统游戏""" def __init__(self): """初始化游戏""" super().__init__() self.db = get_db() # 奖品池配置 self.prize_pool: List[Tuple[int, float, str]] = [ # (权重, 倍率, 描述) (300, 1, "少量积分"), (250, 2, "中等积分"), (200, 2, "大量积分"), (150, 5, "丰厚积分"), (100, 10, "丰厚积分"), (50, 100, "🌟 巨额积分"), (10, 1000, "💎 传说积分"), ] self.total_weight: int = 0 for weight,_,_ in self.prize_pool: self.total_weight += weight def _parse_time_string(self, time_str: str) -> int: """解析时间字符串,支持 h/m/s 格式 支持的格式示例: - "1h30m10s" -> 5410秒 - "30m" -> 1800秒 - "10s" -> 10秒 - "1h30m" -> 5400秒 - "3600" -> 3600秒(纯数字,按秒处理) Args: time_str: 时间字符串 Returns: 解析后的秒数,如果解析失败返回None """ if not time_str: return None # 如果是纯数字,直接返回 if time_str.isdigit(): return int(time_str) # 使用正则表达式匹配 h/m/s 格式,确保整个字符串匹配 pattern = r'^(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?$' match = re.match(pattern, time_str.lower()) if not match: return None hours = int(match.group(1) or 0) minutes = int(match.group(2) or 0) seconds = int(match.group(3) or 0) # 如果所有值都是0,返回None if hours == 0 and minutes == 0 and seconds == 0: return None total_seconds = hours * 3600 + minutes * 60 + seconds return total_seconds def _format_time(self, seconds: int) -> str: """将秒数格式化为 "X时X分X秒" 格式 Args: seconds: 秒数 Returns: 格式化的时间字符串,如 "1时30分10秒"、"30分10秒"、"10秒" """ if seconds < 0: seconds = 0 hours = seconds // 3600 minutes = (seconds % 3600) // 60 secs = seconds % 60 parts = [] if hours > 0: parts.append(f"{hours}时") if minutes > 0: parts.append(f"{minutes}分") if secs > 0 or not parts: parts.append(f"{secs}秒") return "".join(parts) async def handle(self, command: str, chat_id: int, user_id: int) -> str: """处理冒险相关指令 Args: command: 指令,如 ".adventure", ".adventure 10", ".adventure stats" chat_id: 会话ID user_id: 用户ID Returns: 回复消息 """ try: # 提取参数 _, args = CommandParser.extract_command_args(command) args = args.strip().lower() # 冒险说明 if args in ['help', '帮助', 'info']: return self._get_adventure_help() # 放弃当前冒险(按最低倍率结算已冒险时间) if args in ['abandon', '放弃']: return await self._abandon_adventure(chat_id, user_id) # 默认:冒险耗时1秒 else: # 解析消耗时间 cost_time = 1 # 默认消耗1秒 if args: parsed_time = self._parse_time_string(args) if parsed_time is not None: cost_time = parsed_time else: return f"❌ 时间格式错误!请使用以下格式:\n- 纯数字(秒):`.adventure 60`\n- 时分秒格式:`.adventure 1h30m10s`\n- 分钟秒格式:`.adventure 30m10s`\n- 只有秒:`.adventure 10s`" return await self._perform_adventure(chat_id, user_id, cost_time) except Exception as e: logger.error(f"处理冒险指令错误: {e}", exc_info=True) return f"❌ 处理指令出错: {str(e)}" async def _perform_adventure(self, chat_id: int, user_id: int, cost_time: int) -> str: """执行冒险耗时 Args: chat_id: 会话ID(使用0作为用户级标识) user_id: 用户ID cost_time: 消耗时间(秒) Returns: 抽奖结果消息 """ # 参数验证 if cost_time < 1: return "❌ 冒险时间至少需要1秒!" # 查询冒险状态(使用chat_id=0表示用户级状态) state = self.db.get_game_state(0, user_id, 'adventure') current_time = int(time.time()) # 情况1:用户已有未完成的冒险 if state: try: state_data = state['state_data'] start_time = state_data.get('start_time', 0) saved_cost_time = state_data.get('cost_time', 1) end_time = start_time + saved_cost_time remaining_seconds = end_time - current_time # 情况1.1:冒险已完成(时间已到或过期) if remaining_seconds <= 0: # 选择奖品池 prize_pool = self.prize_pool # 执行抽奖 reward = self._draw_prize(prize_pool) reward_points = int(reward['value'] * saved_cost_time) # 处理奖励 self.db.add_points(user_id, reward_points, "adventure", f"冒险奖励") # 删除冒险状态 self.db.delete_game_state(0, user_id, 'adventure') # 获取更新后的积分信息 updated_points = self.db.get_user_points(user_id) # 格式化输出 time_str = self._format_time(saved_cost_time) text = f"## ⚡️ 冒险结果\n\n" text += f"**消耗时间**: {time_str}\n\n" text += f"**{reward['description']}**: 获得 {reward_points} 积分\n\n" text += f"**当前积分**: {updated_points['points']} 分\n\n" text += "---\n\n" text += "💡 提示:冒险进行时不能炼金!" return text # 情况1.2:冒险未完成,返回等待提示 wait_msg = self._format_time(remaining_seconds) saved_time_str = self._format_time(saved_cost_time) text = f"## ⚡️ 冒险进行中\n\n" text += f"你正在进行一次冒险,还需等待 **{wait_msg}** 才能完成。\n\n" text += f"**当前冒险时长**: {saved_time_str}\n\n" text += "---\n\n" text += "💡 提示:冒险期间无法进行炼金,请耐心等待!" return text except Exception as e: # 状态数据异常,删除损坏状态并允许重新开始 logger.error(f"冒险状态数据异常: {e}", exc_info=True) self.db.delete_game_state(0, user_id, 'adventure') # 继续执行到情况3(开始新冒险) # 情况3:用户没有冒险状态,开始新的冒险 state_data = { 'start_time': current_time, 'cost_time': cost_time } # 保存冒险状态 self.db.save_game_state(0, user_id, 'adventure', state_data) # 计算预计完成时间 end_time = current_time + cost_time end_datetime = datetime.fromtimestamp(end_time) current_datetime = datetime.fromtimestamp(current_time) # 判断是否跨天:如果完成日期和当前日期不同,或跨年,则显示完整日期时间 if (end_datetime.date() != current_datetime.date() or end_datetime.year != current_datetime.year): end_time_str = end_datetime.strftime('%Y-%m-%d %H:%M:%S') else: end_time_str = end_datetime.strftime('%H:%M:%S') cost_time_str = self._format_time(cost_time) text = f"## ⚡️ 冒险开始\n\n" text += f"你已经开始了冒险之旅,本次冒险将持续 **{cost_time_str}**。\n\n" text += f"**预计完成时间**: {end_time_str}\n\n" text += "---\n\n" text += "💡 提示:冒险期间无法进行炼金,完成后使用 `.adventure` 获取奖励!" return text async def _abandon_adventure(self, chat_id: int, user_id: int) -> str: """放弃当前冒险,按最低倍率结算已冒险时间 Args: chat_id: 会话ID(使用0作为用户级标识) user_id: 用户ID Returns: 放弃结果消息 """ try: # 查询冒险状态 state = self.db.get_game_state(0, user_id, 'adventure') if not state: return "❌ 当前没有进行中的冒险,可使用 `.adventure` 开始新的冒险。" state_data = state.get('state_data', {}) start_time = state_data.get('start_time') cost_time = state_data.get('cost_time') if start_time is None or cost_time is None: # 状态异常,清理并提示 self.db.delete_game_state(0, user_id, 'adventure') return "⚠️ 冒险状态异常已清理,请使用 `.adventure` 重新开始。" current_time = int(time.time()) elapsed_seconds = max(0, current_time - int(start_time)) if elapsed_seconds < 1: elapsed_seconds = 1 # 计算最低倍率 try: min_multiplier = min(m for _, m, _ in self.prize_pool) except Exception: # 兜底:若奖池异常,按0.5处理 min_multiplier = 0.5 reward_points = int(min_multiplier * elapsed_seconds) if reward_points < 0: reward_points = 0 # 发放奖励并清理状态 if reward_points > 0: self.db.add_points(user_id, reward_points, "adventure", "冒险放弃奖励") self.db.delete_game_state(0, user_id, 'adventure') # 查询当前积分 updated_points = self.db.get_user_points(user_id) # 输出 elapsed_time_str = self._format_time(elapsed_seconds) text = f"## ⚡️ 冒险放弃\n\n" text += f"**已计入时间**: {elapsed_time_str}\n\n" text += f"**最低倍率**: {min_multiplier} 倍\n\n" text += f"**获得积分**: {reward_points} 分\n\n" text += f"**当前积分**: {updated_points['points']} 分\n\n" text += "---\n\n" text += "💡 提示:可随时使用 `.adventure` 再次踏上冒险之旅!" return text except Exception as e: logger.error(f"放弃冒险时出错: {e}", exc_info=True) # 失败时不影响原状态,返回提示 return f"❌ 放弃冒险失败:{str(e)}" def _draw_prize(self, prize_pool: list) -> dict: """从奖品池中抽取奖品 Args: prize_pool: 奖品池,格式为 (权重, 倍率, 描述) Returns: 奖品信息,格式为 {'value': 倍率, 'description': 描述} """ # 生成随机数 rand = random.random()*self.total_weight cumulative_prob = 0.0 for weight, multiplier, description in prize_pool: cumulative_prob += weight if rand <= cumulative_prob: return { 'value': multiplier, 'description': description } # 兜底返回第一个奖品 return { 'value': prize_pool[0][1], 'description': prize_pool[0][2] } def _get_adventure_help(self) -> str: """获取冒险帮助信息 Returns: 帮助信息消息 """ text = f"## ⚡️ 冒险系统\n\n" text += f"### 基础用法\n" text += f"- `.adventure` - 消耗1秒进行冒险\n" text += f"- `.adventure 60` - 消耗60秒进行冒险\n" text += f"- `.adventure 1h30m10s` - 消耗1小时30分10秒进行冒险\n" text += f"- `.adventure 30m` - 消耗30分钟进行冒险\n" text += f"- `.adventure 10s` - 消耗10秒进行冒险\n\n" text += f"**时间格式说明**:支持时分秒组合,如 `1h30m10s`、`30m`、`10s`,也支持纯数字(按秒计算)。\n\n" text += f"### 其他功能\n" text += f"- `.adventure abandon` - 放弃当前冒险,按最低倍率结算已冒险时间\n" text += f"- `.adventure 放弃` - 放弃当前冒险,按最低倍率结算已冒险时间\n" text += f"- `.adventure help` - 查看帮助\n\n" return text def get_help(self) -> str: """获取帮助信息""" return self._get_adventure_help()