Files
WPSBot/games/adventure.py

366 lines
14 KiB
Python
Raw Normal View History

2025-10-29 17:23:24 +08:00
"""冒险系统游戏模块"""
import random
import time
import logging
import re
2025-10-29 17:23:24 +08:00
from datetime import datetime
from games.base import BaseGame
from utils.parser import CommandParser
from core.database import get_db
from typing import *
logger = logging.getLogger(__name__)
class AdventureGame(BaseGame):
"""冒险系统游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.db = get_db()
# 奖品池配置
self.prize_pool: List[Tuple[int, float, str]] = [
# (权重, 倍率, 描述)
(300, 1, "少量积分"),
(250, 2, "中等积分"),
2025-10-29 17:23:24 +08:00
(200, 2, "大量积分"),
(150, 5, "丰厚积分"),
(100, 10, "丰厚积分"),
(50, 100, "🌟 巨额积分"),
(10, 1000, "💎 传说积分"),
2025-10-29 17:23:24 +08:00
]
self.total_weight: int = 0
2025-10-29 17:28:02 +08:00
for weight,_,_ in self.prize_pool:
2025-10-29 17:23:24 +08:00
self.total_weight += weight
def _parse_time_string(self, time_str: str) -> int:
"""解析时间字符串,支持 h/m/s 格式
支持的格式示例
- "1h30m10s" -> 5410
- "30m" -> 1800
- "10s" -> 10
- "1h30m" -> 5400
- "3600" -> 3600纯数字按秒处理
Args:
time_str: 时间字符串
Returns:
解析后的秒数如果解析失败返回None
"""
if not time_str:
return None
# 如果是纯数字,直接返回
if time_str.isdigit():
return int(time_str)
# 使用正则表达式匹配 h/m/s 格式,确保整个字符串匹配
pattern = r'^(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?$'
match = re.match(pattern, time_str.lower())
if not match:
return None
hours = int(match.group(1) or 0)
minutes = int(match.group(2) or 0)
seconds = int(match.group(3) or 0)
# 如果所有值都是0返回None
if hours == 0 and minutes == 0 and seconds == 0:
return None
total_seconds = hours * 3600 + minutes * 60 + seconds
return total_seconds
def _format_time(self, seconds: int) -> str:
"""将秒数格式化为 "X时X分X秒" 格式
Args:
seconds: 秒数
Returns:
格式化的时间字符串 "1时30分10秒""30分10秒""10秒"
"""
if seconds < 0:
seconds = 0
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
parts = []
if hours > 0:
parts.append(f"{hours}")
if minutes > 0:
parts.append(f"{minutes}")
if secs > 0 or not parts:
parts.append(f"{secs}")
return "".join(parts)
2025-10-29 17:23:24 +08:00
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理冒险相关指令
Args:
command: 指令 ".adventure", ".adventure 10", ".adventure stats"
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip().lower()
# 冒险说明
if args in ['help', '帮助', 'info']:
return self._get_adventure_help()
# 放弃当前冒险(按最低倍率结算已冒险时间)
if args in ['abandon', '放弃']:
return await self._abandon_adventure(chat_id, user_id)
# 默认冒险耗时1秒
2025-10-29 17:23:24 +08:00
else:
# 解析消耗时间
cost_time = 1 # 默认消耗1秒
if args:
parsed_time = self._parse_time_string(args)
if parsed_time is not None:
cost_time = parsed_time
else:
return f"❌ 时间格式错误!请使用以下格式:\n- 纯数字(秒):`.adventure 60`\n- 时分秒格式:`.adventure 1h30m10s`\n- 分钟秒格式:`.adventure 30m10s`\n- 只有秒:`.adventure 10s`"
2025-10-29 17:23:24 +08:00
return await self._perform_adventure(chat_id, user_id, cost_time)
except Exception as e:
logger.error(f"处理冒险指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
async def _perform_adventure(self, chat_id: int, user_id: int, cost_time: int) -> str:
"""执行冒险耗时
Args:
chat_id: 会话ID使用0作为用户级标识
user_id: 用户ID
cost_time: 消耗时间
2025-10-29 17:23:24 +08:00
Returns:
抽奖结果消息
"""
# 参数验证
if cost_time < 1:
return "❌ 冒险时间至少需要1秒"
2025-10-29 17:23:24 +08:00
# 查询冒险状态使用chat_id=0表示用户级状态
state = self.db.get_game_state(0, user_id, 'adventure')
current_time = int(time.time())
# 情况1用户已有未完成的冒险
if state:
try:
state_data = state['state_data']
start_time = state_data.get('start_time', 0)
saved_cost_time = state_data.get('cost_time', 1)
end_time = start_time + saved_cost_time
2025-10-29 17:23:24 +08:00
remaining_seconds = end_time - current_time
# 情况1.1:冒险已完成(时间已到或过期)
if remaining_seconds <= 0:
# 选择奖品池
prize_pool = self.prize_pool
# 执行抽奖
reward = self._draw_prize(prize_pool)
reward_points = int(reward['value'] * saved_cost_time)
# 处理奖励
self.db.add_points(user_id, reward_points, "adventure", f"冒险奖励")
# 删除冒险状态
self.db.delete_game_state(0, user_id, 'adventure')
# 获取更新后的积分信息
updated_points = self.db.get_user_points(user_id)
# 格式化输出
time_str = self._format_time(saved_cost_time)
2025-10-29 17:23:24 +08:00
text = f"## ⚡️ 冒险结果\n\n"
text += f"**消耗时间**: {time_str}\n\n"
2025-10-29 17:23:24 +08:00
text += f"**{reward['description']}**: 获得 {reward_points} 积分\n\n"
text += f"**当前积分**: {updated_points['points']}\n\n"
text += "---\n\n"
text += "💡 提示:冒险进行时不能炼金!"
return text
# 情况1.2:冒险未完成,返回等待提示
wait_msg = self._format_time(remaining_seconds)
saved_time_str = self._format_time(saved_cost_time)
2025-10-29 17:23:24 +08:00
text = f"## ⚡️ 冒险进行中\n\n"
text += f"你正在进行一次冒险,还需等待 **{wait_msg}** 才能完成。\n\n"
text += f"**当前冒险时长**: {saved_time_str}\n\n"
2025-10-29 17:23:24 +08:00
text += "---\n\n"
text += "💡 提示:冒险期间无法进行炼金,请耐心等待!"
return text
except Exception as e:
# 状态数据异常,删除损坏状态并允许重新开始
logger.error(f"冒险状态数据异常: {e}", exc_info=True)
self.db.delete_game_state(0, user_id, 'adventure')
# 继续执行到情况3开始新冒险
# 情况3用户没有冒险状态开始新的冒险
state_data = {
'start_time': current_time,
'cost_time': cost_time
}
# 保存冒险状态
self.db.save_game_state(0, user_id, 'adventure', state_data)
# 计算预计完成时间
end_time = current_time + cost_time
2025-10-29 17:23:24 +08:00
end_datetime = datetime.fromtimestamp(end_time)
2025-10-31 18:01:33 +08:00
current_datetime = datetime.fromtimestamp(current_time)
# 判断是否跨天:如果完成日期和当前日期不同,或跨年,则显示完整日期时间
if (end_datetime.date() != current_datetime.date() or
end_datetime.year != current_datetime.year):
end_time_str = end_datetime.strftime('%Y-%m-%d %H:%M:%S')
else:
end_time_str = end_datetime.strftime('%H:%M:%S')
cost_time_str = self._format_time(cost_time)
2025-10-29 17:23:24 +08:00
text = f"## ⚡️ 冒险开始\n\n"
text += f"你已经开始了冒险之旅,本次冒险将持续 **{cost_time_str}**。\n\n"
2025-10-29 17:23:24 +08:00
text += f"**预计完成时间**: {end_time_str}\n\n"
text += "---\n\n"
text += "💡 提示:冒险期间无法进行炼金,完成后使用 `.adventure` 获取奖励!"
return text
async def _abandon_adventure(self, chat_id: int, user_id: int) -> str:
"""放弃当前冒险,按最低倍率结算已冒险时间
Args:
chat_id: 会话ID使用0作为用户级标识
user_id: 用户ID
Returns:
放弃结果消息
"""
try:
# 查询冒险状态
state = self.db.get_game_state(0, user_id, 'adventure')
if not state:
return "❌ 当前没有进行中的冒险,可使用 `.adventure` 开始新的冒险。"
state_data = state.get('state_data', {})
start_time = state_data.get('start_time')
cost_time = state_data.get('cost_time')
if start_time is None or cost_time is None:
# 状态异常,清理并提示
self.db.delete_game_state(0, user_id, 'adventure')
return "⚠️ 冒险状态异常已清理,请使用 `.adventure` 重新开始。"
current_time = int(time.time())
elapsed_seconds = max(0, current_time - int(start_time))
if elapsed_seconds < 1:
elapsed_seconds = 1
# 计算最低倍率
try:
min_multiplier = min(m for _, m, _ in self.prize_pool)
except Exception:
# 兜底若奖池异常按0.5处理
min_multiplier = 0.5
reward_points = int(min_multiplier * elapsed_seconds)
if reward_points < 0:
reward_points = 0
# 发放奖励并清理状态
if reward_points > 0:
self.db.add_points(user_id, reward_points, "adventure", "冒险放弃奖励")
self.db.delete_game_state(0, user_id, 'adventure')
# 查询当前积分
updated_points = self.db.get_user_points(user_id)
# 输出
elapsed_time_str = self._format_time(elapsed_seconds)
text = f"## ⚡️ 冒险放弃\n\n"
text += f"**已计入时间**: {elapsed_time_str}\n\n"
text += f"**最低倍率**: {min_multiplier}\n\n"
text += f"**获得积分**: {reward_points}\n\n"
text += f"**当前积分**: {updated_points['points']}\n\n"
text += "---\n\n"
text += "💡 提示:可随时使用 `.adventure` 再次踏上冒险之旅!"
return text
except Exception as e:
logger.error(f"放弃冒险时出错: {e}", exc_info=True)
# 失败时不影响原状态,返回提示
return f"❌ 放弃冒险失败:{str(e)}"
2025-10-29 17:23:24 +08:00
def _draw_prize(self, prize_pool: list) -> dict:
"""从奖品池中抽取奖品
Args:
prize_pool: 奖品池格式为 (权重, 倍率, 描述)
Returns:
奖品信息格式为 {'value': 倍率, 'description': 描述}
"""
# 生成随机数
rand = random.random()*self.total_weight
cumulative_prob = 0.0
for weight, multiplier, description in prize_pool:
cumulative_prob += weight
if rand <= cumulative_prob:
return {
'value': multiplier,
'description': description
}
# 兜底返回第一个奖品
return {
'value': prize_pool[0][1],
'description': prize_pool[0][2]
}
def _get_adventure_help(self) -> str:
"""获取冒险帮助信息
Returns:
帮助信息消息
"""
text = f"## ⚡️ 冒险系统\n\n"
text += f"### 基础用法\n"
text += f"- `.adventure` - 消耗1秒进行冒险\n"
text += f"- `.adventure 60` - 消耗60秒进行冒险\n"
text += f"- `.adventure 1h30m10s` - 消耗1小时30分10秒进行冒险\n"
text += f"- `.adventure 30m` - 消耗30分钟进行冒险\n"
text += f"- `.adventure 10s` - 消耗10秒进行冒险\n\n"
text += f"**时间格式说明**:支持时分秒组合,如 `1h30m10s`、`30m`、`10s`,也支持纯数字(按秒计算)。\n\n"
2025-10-29 17:23:24 +08:00
text += f"### 其他功能\n"
text += f"- `.adventure abandon` - 放弃当前冒险,按最低倍率结算已冒险时间\n"
text += f"- `.adventure 放弃` - 放弃当前冒险,按最低倍率结算已冒险时间\n"
2025-10-29 17:23:24 +08:00
text += f"- `.adventure help` - 查看帮助\n\n"
return text
def get_help(self) -> str:
"""获取帮助信息"""
return self._get_adventure_help()