Files
WPSBot/games/adventure.py
2025-10-29 17:28:02 +08:00

219 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""冒险系统游戏模块"""
import random
import time
import logging
from datetime import datetime
from games.base import BaseGame
from utils.parser import CommandParser
from core.database import get_db
from typing import *
logger = logging.getLogger(__name__)
class AdventureGame(BaseGame):
"""冒险系统游戏"""
def __init__(self):
"""初始化游戏"""
super().__init__()
self.db = get_db()
# 奖品池配置
self.prize_pool: List[Tuple[int, float, str]] = [
# (权重, 倍率, 描述)
(500, 0.5, "少量积分"),
(350, 1, "中等积分"),
(200, 2, "大量积分"),
(100, 5, "丰厚积分"),
(50, 10, "丰厚积分"),
(10, 100, "🌟 巨额积分"),
(1, 1000, "💎 传说积分"),
]
self.total_weight: int = 0
for weight,_,_ in self.prize_pool:
self.total_weight += weight
async def handle(self, command: str, chat_id: int, user_id: int) -> str:
"""处理冒险相关指令
Args:
command: 指令,如 ".adventure", ".adventure 10", ".adventure stats"
chat_id: 会话ID
user_id: 用户ID
Returns:
回复消息
"""
try:
# 提取参数
_, args = CommandParser.extract_command_args(command)
args = args.strip().lower()
# 冒险说明
if args in ['help', '帮助', 'info']:
return self._get_adventure_help()
# 默认冒险耗时1分钟
else:
# 解析消耗时间
cost_time = 1 # 默认消耗1分钟
if args.isdigit():
cost_time = int(args)
return await self._perform_adventure(chat_id, user_id, cost_time)
except Exception as e:
logger.error(f"处理冒险指令错误: {e}", exc_info=True)
return f"❌ 处理指令出错: {str(e)}"
async def _perform_adventure(self, chat_id: int, user_id: int, cost_time: int) -> str:
"""执行冒险耗时
Args:
chat_id: 会话ID使用0作为用户级标识
user_id: 用户ID
cost_time: 消耗时间(分钟)
Returns:
抽奖结果消息
"""
# 参数验证
if cost_time < 1:
return "❌ 冒险时间至少需要1分钟"
# 查询冒险状态使用chat_id=0表示用户级状态
state = self.db.get_game_state(0, user_id, 'adventure')
current_time = int(time.time())
# 情况1用户已有未完成的冒险
if state:
try:
state_data = state['state_data']
start_time = state_data.get('start_time', 0)
saved_cost_time = state_data.get('cost_time', 1)
end_time = start_time + saved_cost_time * 60
remaining_seconds = end_time - current_time
# 情况1.1:冒险已完成(时间已到或过期)
if remaining_seconds <= 0:
# 选择奖品池
prize_pool = self.prize_pool
# 执行抽奖
reward = self._draw_prize(prize_pool)
reward_points = int(reward['value'] * saved_cost_time)
# 处理奖励
self.db.add_points(user_id, reward_points, "adventure", f"冒险奖励")
# 删除冒险状态
self.db.delete_game_state(0, user_id, 'adventure')
# 获取更新后的积分信息
updated_points = self.db.get_user_points(user_id)
# 格式化输出
text = f"## ⚡️ 冒险结果\n\n"
text += f"**消耗时间**: {saved_cost_time} 分钟\n\n"
text += f"**{reward['description']}**: 获得 {reward_points} 积分\n\n"
text += f"**当前积分**: {updated_points['points']}\n\n"
text += "---\n\n"
text += "💡 提示:冒险进行时不能炼金!"
return text
# 情况1.2:冒险未完成,返回等待提示
remaining_minutes = remaining_seconds // 60
remaining_secs = remaining_seconds % 60
if remaining_minutes > 0:
wait_msg = f"{remaining_minutes}{remaining_secs}"
else:
wait_msg = f"{remaining_secs}"
text = f"## ⚡️ 冒险进行中\n\n"
text += f"你正在进行一次冒险,还需等待 **{wait_msg}** 才能完成。\n\n"
text += f"**当前冒险时长**: {saved_cost_time} 分钟\n\n"
text += "---\n\n"
text += "💡 提示:冒险期间无法进行炼金,请耐心等待!"
return text
except Exception as e:
# 状态数据异常,删除损坏状态并允许重新开始
logger.error(f"冒险状态数据异常: {e}", exc_info=True)
self.db.delete_game_state(0, user_id, 'adventure')
# 继续执行到情况3开始新冒险
# 情况3用户没有冒险状态开始新的冒险
state_data = {
'start_time': current_time,
'cost_time': cost_time
}
# 保存冒险状态
self.db.save_game_state(0, user_id, 'adventure', state_data)
# 计算预计完成时间
end_time = current_time + cost_time * 60
end_datetime = datetime.fromtimestamp(end_time)
end_time_str = end_datetime.strftime('%H:%M:%S')
text = f"## ⚡️ 冒险开始\n\n"
text += f"你已经开始了冒险之旅,本次冒险将持续 **{cost_time}** 分钟。\n\n"
text += f"**预计完成时间**: {end_time_str}\n\n"
text += "---\n\n"
text += "💡 提示:冒险期间无法进行炼金,完成后使用 `.adventure` 获取奖励!"
return text
def _draw_prize(self, prize_pool: list) -> dict:
"""从奖品池中抽取奖品
Args:
prize_pool: 奖品池,格式为 (权重, 倍率, 描述)
Returns:
奖品信息,格式为 {'value': 倍率, 'description': 描述}
"""
# 生成随机数
rand = random.random()*self.total_weight
cumulative_prob = 0.0
for weight, multiplier, description in prize_pool:
cumulative_prob += weight
if rand <= cumulative_prob:
return {
'value': multiplier,
'description': description
}
# 兜底返回第一个奖品
return {
'value': prize_pool[0][1],
'description': prize_pool[0][2]
}
def _get_adventure_help(self) -> str:
"""获取冒险帮助信息
Returns:
帮助信息消息
"""
text = f"## ⚡️ 冒险系统\n\n"
text += f"### 基础用法\n"
text += f"- `.adventure` - 消耗1分钟进行冒险\n"
text += f"- `.adventure time` - 消耗time分钟进行冒险, 最少一分钟\n"
text += f"### 其他功能\n"
text += f"- `.adventure help` - 查看帮助\n\n"
return text
def get_help(self) -> str:
"""获取帮助信息"""
return self._get_adventure_help()