"""Callback路由处理""" import logging from fastapi import APIRouter, Request from fastapi.responses import JSONResponse from core.models import CallbackRequest from core.database import get_db from utils.message import get_message_sender from utils.parser import CommandParser from utils.rate_limit import get_rate_limiter logger = logging.getLogger(__name__) router = APIRouter() @router.get("/callback") async def callback_verify(): """Callback可用性校验 - WPS会发送GET请求验证""" logger.info("收到Callback验证请求") return JSONResponse({"result": "ok"}) @router.post("/callback") async def callback_receive(request: Request): """接收WPS Callback消息""" try: # 解析请求数据 data = await request.json() logger.info(f"收到消息: chatid={data.get('chatid')}, creator={data.get('creator')}") logger.info(f"消息内容: {data.get('content')}") logger.info(f"完整callback数据: {data}") # 验证请求 try: callback_data = CallbackRequest(**data) except Exception as e: logger.error(f"请求数据验证失败: {e}") return JSONResponse({"result": "ok"}) # 仍返回ok以避免重试 # 解析指令 parse_result = CommandParser.parse(callback_data.content) if not parse_result: # 不是有效指令,忽略 logger.debug("非有效指令,忽略") return JSONResponse({"result": "ok"}) game_type, command = parse_result logger.info(f"识别指令: game_type={game_type}, command={command}") # 检查限流 rate_limiter = get_rate_limiter() if not rate_limiter.is_allowed(): remaining = rate_limiter.get_remaining() reset_time = int(rate_limiter.get_reset_time()) sender = get_message_sender() await sender.send_text( f"⚠️ 消息发送过于频繁,请等待 {reset_time} 秒后再试\n" f"剩余配额: {remaining}" ) return JSONResponse({"result": "ok"}) # 更新用户信息 db = get_db() db.get_or_create_user(callback_data.creator) # 处理指令 response_text = await handle_command( game_type=game_type, command=command, chat_id=callback_data.chatid, user_id=callback_data.creator ) # 发送回复 if response_text: sender = get_message_sender() # AI 对话:统一按 Markdown 发送(按任务决策) if game_type == 'ai_chat': try: await sender.send_markdown(response_text) except Exception as send_md_err: logger.error(f"发送Markdown消息失败,改用文本发送: {send_md_err}") await sender.send_text(response_text) else: # 其他模块保持原有启发式:以 # 开头视为 Markdown,否则文本 if response_text.startswith('#'): await sender.send_markdown(response_text) else: await sender.send_text(response_text) return JSONResponse({"result": "ok"}) except Exception as e: logger.error(f"处理Callback异常: {e}", exc_info=True) # 仍然返回ok,避免WPS重试 return JSONResponse({"result": "ok"}) async def handle_command(game_type: str, command: str, chat_id: int, user_id: int) -> str: """处理游戏指令 Args: game_type: 游戏类型 command: 完整指令 chat_id: 会话ID user_id: 用户ID Returns: 回复文本 """ try: # 帮助指令 if game_type == 'help': from games.base import get_help_message return get_help_message() # 统计指令 if game_type == 'stats': from games.base import get_stats_message return get_stats_message(user_id) # 注册系统 if game_type == 'register': return await handle_register_command(command, chat_id, user_id) # 骰娘游戏 if game_type == 'dice': from games.dice import DiceGame game = DiceGame() return await game.handle(command, chat_id, user_id) # 石头剪刀布 if game_type == 'rps': from games.rps import RPSGame game = RPSGame() return await game.handle(command, chat_id, user_id) # 运势占卜 if game_type == 'fortune': from games.fortune import FortuneGame game = FortuneGame() return await game.handle(command, chat_id, user_id) # 猜数字 if game_type == 'guess': from games.guess import GuessGame game = GuessGame() return await game.handle(command, chat_id, user_id) # 问答游戏 if game_type == 'quiz': from games.quiz import QuizGame game = QuizGame() return await game.handle(command, chat_id, user_id) # 成语接龙 if game_type == 'idiom': from games.idiom import IdiomGame game = IdiomGame() return await game.handle(command, chat_id, user_id) # 五子棋 if game_type == 'gomoku': from games.gomoku import GomokuGame game = GomokuGame() return await game.handle(command, chat_id, user_id) # 积分系统 if game_type == 'points': from games.points import PointsGame game = PointsGame() return await game.handle(command, chat_id, user_id) # 炼金系统 if game_type == 'alchemy': from games.alchemy import AlchemyGame game = AlchemyGame() return await game.handle(command, chat_id, user_id) # 冒险系统 if game_type == 'adventure': from games.adventure import AdventureGame game = AdventureGame() return await game.handle(command, chat_id, user_id) # 积分赠送系统 if game_type == 'gift': from games.gift import GiftGame game = GiftGame() return await game.handle(command, chat_id, user_id) # 复述功能 if game_type == 'say': # 提取参数并原样返回 _, args = CommandParser.extract_command_args(command) args = args.strip() if not args: return "用法:.say 你想让我说的话\n别名:.说 / .复述" return args # 私聊功能 if game_type == 'talk': return await handle_talk_command(command, chat_id, user_id) # AI对话系统 if game_type == 'ai_chat': from games.ai_chat import AIChatGame game = AIChatGame() return await game.handle(command, chat_id, user_id) # 赌场系统 if game_type == 'casino': from games.casino import CasinoGame game = CasinoGame() return await game.handle(command, chat_id, user_id) # 未知游戏类型 logger.warning(f"未知游戏类型: {game_type}") return "❌ 未知的游戏类型" except Exception as e: logger.error(f"处理游戏指令异常: {e}", exc_info=True) return f"❌ 处理指令时出错: {str(e)}" async def handle_register_command(command: str, chat_id: int, user_id: int) -> str: """处理注册命令 Args: command: 完整指令 ".register name" 或 ".register url " chat_id: 会话ID user_id: 用户ID Returns: 注册结果消息 """ try: # 提取参数 _, args = CommandParser.extract_command_args(command) args = args.strip() # 验证参数 if not args: return "❌ 请提供要注册的内容!\n\n正确格式:\n`.register <名称>` - 注册用户名\n`.register url ` - 注册webhook URL\n\n示例:\n`.register 张三`\n`.register url https://example.com/webhook?key=xxx`" # 检查是否为url子命令 parts = args.split(maxsplit=1) if parts and parts[0].lower() == 'url': # 处理URL注册 if len(parts) < 2: return "❌ 请提供webhook URL!\n\n正确格式:`.register url `\n\n示例:\n`.register url https://example.com/webhook?key=xxx`" webhook_url = parts[1].strip() # URL验证 if not webhook_url.startswith(('http://', 'https://')): return "❌ URL格式无效!必须以 http:// 或 https:// 开头。" # 设置URL db = get_db() success = db.set_user_webhook_url(user_id, webhook_url) if success: return f"✅ Webhook URL注册成功!\n\n**您的个人URL**:{webhook_url}\n\n私聊消息将发送到此URL。" else: return "❌ 注册失败!请稍后重试。" else: # 原有的名称注册逻辑 if len(args) > 20: return "❌ 名称过长!最多支持20个字符。" # 更新用户名称 db = get_db() success = db.update_user_name(user_id, args) if success: return f"✅ 注册成功!\n\n**您的名称**:{args}\n\n之后您可以使用这个名称参与各种游戏和功能。" else: return "❌ 注册失败!请稍后重试。" except Exception as e: logger.error(f"处理注册指令错误: {e}", exc_info=True) return f"❌ 处理指令出错: {str(e)}" async def handle_talk_command(command: str, chat_id: int, user_id: int) -> str: """处理私聊命令 Args: command: 完整指令 ".talk " chat_id: 会话ID user_id: 发送者用户ID Returns: 处理结果消息 """ try: # 提取参数 _, args = CommandParser.extract_command_args(command) args = args.strip() # 验证参数 if not args: return "❌ 请提供用户名和消息内容!\n\n正确格式:`.talk <用户名> <消息内容>`\n\n示例:\n`.talk 张三 你好,想和你聊聊`\n`.talk 李四 这是一条私聊消息`" # 解析username和content(第一个单词是username,剩余部分是content) parts = args.split(maxsplit=1) if len(parts) < 2: return "❌ 请提供用户名和消息内容!\n\n正确格式:`.talk <用户名> <消息内容>`\n\n示例:\n`.talk 张三 你好,想和你聊聊`" target_username = parts[0].strip() content = parts[1].strip() if not target_username: return "❌ 用户名不能为空!\n\n正确格式:`.talk <用户名> <消息内容>`" if not content: return "❌ 消息内容不能为空!\n\n正确格式:`.talk <用户名> <消息内容>`" # 通过用户名查找目标用户 db = get_db() target_user = db.get_user_by_name(target_username) if not target_user: return f"❌ 找不到用户名为「{target_username}」的用户!\n\n提示:目标用户需要使用 `.register <名称>` 注册用户名。" target_user_id = target_user['user_id'] # 检查目标用户是否有注册名称(应该有,因为是通过名称找到的) if not target_user.get('username'): return f"❌ 用户「{target_username}」尚未注册用户名!" # 检查目标用户是否有个人webhook URL if not db.has_webhook_url(target_user_id): return f"❌ 用户「{target_username}」尚未注册个人webhook URL!\n\n提示:目标用户需要使用 `.register url ` 注册个人URL后才能接收私聊消息。" # 发送私聊消息 from utils.message import send_private_message success = await send_private_message( user_id=target_user_id, content=content, msg_type='text' ) if success: # 私聊消息发送成功,不向主URL发送提示消息 return "" else: # 发送失败时仍然需要提示用户 return f"❌ 发送私聊消息失败,请稍后重试。" except Exception as e: logger.error(f"处理私聊指令错误: {e}", exc_info=True) return f"❌ 处理指令出错: {str(e)}"