Files
WPSBot/games/adventure.py

366 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""冒险系统游戏模块"""
import random
import time
import logging
import re
from datetime import datetime
from games.base import BaseGame
from utils.parser import CommandParser
from core.database import get_db
from typing import *
logger = logging.getLogger(__name__)
class AdventureGame(BaseGame):
"""冒险系统游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.db = get_db()
# 奖品池配置
self.prize_pool: List[Tuple[int, float, str]] = [
# (权重, 倍率, 描述)
(300, 1, "少量积分"),
(250, 2, "中等积分"),
(200, 2, "大量积分"),
(150, 5, "丰厚积分"),
(100, 10, "丰厚积分"),
(50, 100, "🌟 巨额积分"),
(10, 1000, "💎 传说积分"),
]
self.total_weight: int = 0
for weight,_,_ in self.prize_pool:
self.total_weight += weight
def _parse_time_string(self, time_str: str) -> int:
"""解析时间字符串,支持 h/m/s 格式
支持的格式示例:
- "1h30m10s" -> 5410秒
- "30m" -> 1800秒
- "10s" -> 10秒
- "1h30m" -> 5400秒
- "3600" -> 3600秒纯数字按秒处理
Args:
time_str: 时间字符串
Returns:
解析后的秒数如果解析失败返回None
"""
if not time_str:
return None
# 如果是纯数字,直接返回
if time_str.isdigit():
return int(time_str)
# 使用正则表达式匹配 h/m/s 格式,确保整个字符串匹配
pattern = r'^(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?$'
match = re.match(pattern, time_str.lower())
if not match:
return None
hours = int(match.group(1) or 0)
minutes = int(match.group(2) or 0)
seconds = int(match.group(3) or 0)
# 如果所有值都是0返回None
if hours == 0 and minutes == 0 and seconds == 0:
return None
total_seconds = hours * 3600 + minutes * 60 + seconds
return total_seconds
def _format_time(self, seconds: int) -> str:
"""将秒数格式化为 "X时X分X秒" 格式
Args:
seconds: 秒数
Returns:
格式化的时间字符串,如 "1时30分10秒""30分10秒""10秒"
"""
if seconds < 0:
seconds = 0
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
parts = []
if hours > 0:
parts.append(f"{hours}")
if minutes > 0:
parts.append(f"{minutes}")
if secs > 0 or not parts:
parts.append(f"{secs}")
return "".join(parts)
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理冒险相关指令
Args:
command: 指令,如 ".adventure", ".adventure 10", ".adventure stats"
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip().lower()
# 冒险说明
if args in ['help', '帮助', 'info']:
return self._get_adventure_help()
# 放弃当前冒险(按最低倍率结算已冒险时间)
if args in ['abandon', '放弃']:
return await self._abandon_adventure(chat_id, user_id)
# 默认冒险耗时1秒
else:
# 解析消耗时间
cost_time = 1 # 默认消耗1秒
if args:
parsed_time = self._parse_time_string(args)
if parsed_time is not None:
cost_time = parsed_time
else:
return f"❌ 时间格式错误!请使用以下格式:\n- 纯数字(秒):`.adventure 60`\n- 时分秒格式:`.adventure 1h30m10s`\n- 分钟秒格式:`.adventure 30m10s`\n- 只有秒:`.adventure 10s`"
return await self._perform_adventure(chat_id, user_id, cost_time)
except Exception as e:
logger.error(f"处理冒险指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
async def _perform_adventure(self, chat_id: int, user_id: int, cost_time: int) -> str:
"""执行冒险耗时
Args:
chat_id: 会话ID使用0作为用户级标识
user_id: 用户ID
cost_time: 消耗时间(秒)
Returns:
抽奖结果消息
"""
# 参数验证
if cost_time < 1:
return "❌ 冒险时间至少需要1秒"
# 查询冒险状态使用chat_id=0表示用户级状态
state = self.db.get_game_state(0, user_id, 'adventure')
current_time = int(time.time())
# 情况1用户已有未完成的冒险
if state:
try:
state_data = state['state_data']
start_time = state_data.get('start_time', 0)
saved_cost_time = state_data.get('cost_time', 1)
end_time = start_time + saved_cost_time
remaining_seconds = end_time - current_time
# 情况1.1:冒险已完成(时间已到或过期)
if remaining_seconds <= 0:
# 选择奖品池
prize_pool = self.prize_pool
# 执行抽奖
reward = self._draw_prize(prize_pool)
reward_points = int(reward['value'] * saved_cost_time)
# 处理奖励
self.db.add_points(user_id, reward_points, "adventure", f"冒险奖励")
# 删除冒险状态
self.db.delete_game_state(0, user_id, 'adventure')
# 获取更新后的积分信息
updated_points = self.db.get_user_points(user_id)
# 格式化输出
time_str = self._format_time(saved_cost_time)
text = f"## ⚡️ 冒险结果\n\n"
text += f"**消耗时间**: {time_str}\n\n"
text += f"**{reward['description']}**: 获得 {reward_points} 积分\n\n"
text += f"**当前积分**: {updated_points['points']}\n\n"
text += "---\n\n"
text += "💡 提示:冒险进行时不能炼金!"
return text
# 情况1.2:冒险未完成,返回等待提示
wait_msg = self._format_time(remaining_seconds)
saved_time_str = self._format_time(saved_cost_time)
text = f"## ⚡️ 冒险进行中\n\n"
text += f"你正在进行一次冒险,还需等待 **{wait_msg}** 才能完成。\n\n"
text += f"**当前冒险时长**: {saved_time_str}\n\n"
text += "---\n\n"
text += "💡 提示:冒险期间无法进行炼金,请耐心等待!"
return text
except Exception as e:
# 状态数据异常,删除损坏状态并允许重新开始
logger.error(f"冒险状态数据异常: {e}", exc_info=True)
self.db.delete_game_state(0, user_id, 'adventure')
# 继续执行到情况3开始新冒险
# 情况3用户没有冒险状态开始新的冒险
state_data = {
'start_time': current_time,
'cost_time': cost_time
}
# 保存冒险状态
self.db.save_game_state(0, user_id, 'adventure', state_data)
# 计算预计完成时间
end_time = current_time + cost_time
end_datetime = datetime.fromtimestamp(end_time)
current_datetime = datetime.fromtimestamp(current_time)
# 判断是否跨天:如果完成日期和当前日期不同,或跨年,则显示完整日期时间
if (end_datetime.date() != current_datetime.date() or
end_datetime.year != current_datetime.year):
end_time_str = end_datetime.strftime('%Y-%m-%d %H:%M:%S')
else:
end_time_str = end_datetime.strftime('%H:%M:%S')
cost_time_str = self._format_time(cost_time)
text = f"## ⚡️ 冒险开始\n\n"
text += f"你已经开始了冒险之旅,本次冒险将持续 **{cost_time_str}**。\n\n"
text += f"**预计完成时间**: {end_time_str}\n\n"
text += "---\n\n"
text += "💡 提示:冒险期间无法进行炼金,完成后使用 `.adventure` 获取奖励!"
return text
async def _abandon_adventure(self, chat_id: int, user_id: int) -> str:
"""放弃当前冒险,按最低倍率结算已冒险时间
Args:
chat_id: 会话ID使用0作为用户级标识
user_id: 用户ID
Returns:
放弃结果消息
"""
try:
# 查询冒险状态
state = self.db.get_game_state(0, user_id, 'adventure')
if not state:
return "❌ 当前没有进行中的冒险,可使用 `.adventure` 开始新的冒险。"
state_data = state.get('state_data', {})
start_time = state_data.get('start_time')
cost_time = state_data.get('cost_time')
if start_time is None or cost_time is None:
# 状态异常,清理并提示
self.db.delete_game_state(0, user_id, 'adventure')
return "⚠️ 冒险状态异常已清理,请使用 `.adventure` 重新开始。"
current_time = int(time.time())
elapsed_seconds = max(0, current_time - int(start_time))
if elapsed_seconds < 1:
elapsed_seconds = 1
# 计算最低倍率
try:
min_multiplier = min(m for _, m, _ in self.prize_pool)
except Exception:
# 兜底若奖池异常按0.5处理
min_multiplier = 0.5
reward_points = int(min_multiplier * elapsed_seconds)
if reward_points < 0:
reward_points = 0
# 发放奖励并清理状态
if reward_points > 0:
self.db.add_points(user_id, reward_points, "adventure", "冒险放弃奖励")
self.db.delete_game_state(0, user_id, 'adventure')
# 查询当前积分
updated_points = self.db.get_user_points(user_id)
# 输出
elapsed_time_str = self._format_time(elapsed_seconds)
text = f"## ⚡️ 冒险放弃\n\n"
text += f"**已计入时间**: {elapsed_time_str}\n\n"
text += f"**最低倍率**: {min_multiplier}\n\n"
text += f"**获得积分**: {reward_points}\n\n"
text += f"**当前积分**: {updated_points['points']}\n\n"
text += "---\n\n"
text += "💡 提示:可随时使用 `.adventure` 再次踏上冒险之旅!"
return text
except Exception as e:
logger.error(f"放弃冒险时出错: {e}", exc_info=True)
# 失败时不影响原状态,返回提示
return f"❌ 放弃冒险失败:{str(e)}"
def _draw_prize(self, prize_pool: list) -> dict:
"""从奖品池中抽取奖品
Args:
prize_pool: 奖品池,格式为 (权重, 倍率, 描述)
Returns:
奖品信息,格式为 {'value': 倍率, 'description': 描述}
"""
# 生成随机数
rand = random.random()*self.total_weight
cumulative_prob = 0.0
for weight, multiplier, description in prize_pool:
cumulative_prob += weight
if rand <= cumulative_prob:
return {
'value': multiplier,
'description': description
}
# 兜底返回第一个奖品
return {
'value': prize_pool[0][1],
'description': prize_pool[0][2]
}
def _get_adventure_help(self) -> str:
"""获取冒险帮助信息
Returns:
帮助信息消息
"""
text = f"## ⚡️ 冒险系统\n\n"
text += f"### 基础用法\n"
text += f"- `.adventure` - 消耗1秒进行冒险\n"
text += f"- `.adventure 60` - 消耗60秒进行冒险\n"
text += f"- `.adventure 1h30m10s` - 消耗1小时30分10秒进行冒险\n"
text += f"- `.adventure 30m` - 消耗30分钟进行冒险\n"
text += f"- `.adventure 10s` - 消耗10秒进行冒险\n\n"
text += f"**时间格式说明**:支持时分秒组合,如 `1h30m10s`、`30m`、`10s`,也支持纯数字(按秒计算)。\n\n"
text += f"### 其他功能\n"
text += f"- `.adventure abandon` - 放弃当前冒险,按最低倍率结算已冒险时间\n"
text += f"- `.adventure 放弃` - 放弃当前冒险,按最低倍率结算已冒险时间\n"
text += f"- `.adventure help` - 查看帮助\n\n"
return text
def get_help(self) -> str:
"""获取帮助信息"""
return self._get_adventure_help()