From dc1766e023545428da79411d5191618bbf821e61 Mon Sep 17 00:00:00 2001 From: ninemine <1371605831@qq.com> Date: Wed, 5 Nov 2025 17:27:15 +0800 Subject: [PATCH] Init --- CoreModules/plugin_interface.py | 21 ++ CoreRouters/callback.py | 418 +++----------------------------- README.md | 1 + Utilities/__init__.py | 2 + 4 files changed, 58 insertions(+), 384 deletions(-) create mode 100644 Utilities/__init__.py diff --git a/CoreModules/plugin_interface.py b/CoreModules/plugin_interface.py index 71ae3c6..bd68c01 100644 --- a/CoreModules/plugin_interface.py +++ b/CoreModules/plugin_interface.py @@ -16,6 +16,21 @@ class DatabaseModel(BaseModel): column_defs: Dict[str, str] = Field(default={}) class PluginInterface(ABC): + plugin_instances: Dict[str, "PluginInterface"] = {} + + def callback(self, message: str, chat_id: int, user_id: int) -> str: + ''' + 继承后重写该方法接受消息并返回消息 + 返回空字符串代表不进行反馈 + Args: + message: 消息内容 + chat_id: 会话ID + user_id: 用户ID + Returns: + str: 消息内容 + ''' + return "" + def execute(self, path:str) -> Optional[APIRouter]: ''' 继承后是否返回路由决定是否启动该插件 @@ -55,6 +70,12 @@ class PluginInterface(ABC): ''' pass + def register_plugin(self, command: str) -> None: + ''' + 将插件注册, 使其可以被命令匹配 + ''' + PluginInterface.plugin_instances[command] = self + def register_db_model(self) -> DatabaseModel: ''' 继承后重写该方法注册数据库模型 diff --git a/CoreRouters/callback.py b/CoreRouters/callback.py index c178b7a..be2f055 100644 --- a/CoreRouters/callback.py +++ b/CoreRouters/callback.py @@ -1,175 +1,65 @@ """Callback路由处理""" -import logging -import re +from ..Convention.Runtime.GlobalConfig import ProjectConfig + from fastapi import APIRouter, Request from fastapi.responses import JSONResponse -from core.models import CallbackRequest -from core.database import get_db -from utils.message import get_message_sender -from utils.parser import CommandParser -from utils.rate_limit import get_rate_limiter +from ..CoreModules.models import CallbackRequest +from ..CoreModules.plugin_interface import PluginInterface -logger = logging.getLogger(__name__) +config = ProjectConfig() +ALWAYS_RETURN_OK = config.FindItem("always_return_ok", True) # 返回ok,可以避免重试 router = APIRouter() - @router.get("/callback") async def callback_verify(): - """Callback可用性校验 - WPS会发送GET请求验证""" - logger.info("收到Callback验证请求") + """Callback可用性校验""" + config.Log("Info", "收到Callback验证请求") return JSONResponse({"result": "ok"}) @router.post("/callback") async def callback_receive(request: Request): - """接收WPS Callback消息""" + """Callback消息""" try: # 解析请求数据 data = await request.json() - logger.info(f"收到消息: chatid={data.get('chatid')}, creator={data.get('creator')}") - logger.info(f"消息内容: {data.get('content')}") - logger.info(f"完整callback数据: {data}") + config.Log("Info", f"完整callback数据: {data}") # 验证请求 try: callback_data = CallbackRequest(**data) except Exception as e: - logger.error(f"请求数据验证失败: {e}") - return JSONResponse({"result": "ok"}) # 仍返回ok以避免重试 + config.Log("Error", f"请求数据验证失败: {e}") + if ALWAYS_RETURN_OK: + return JSONResponse({"result": "ok"}) + else: + return JSONResponse({"result": "error", "message": str(e)}) # 解析指令 - parse_result = CommandParser.parse(callback_data.content) - if not parse_result: - # 不是有效指令,忽略 - logger.debug("非有效指令,忽略") - return JSONResponse({"result": "ok"}) + content = callback_data.content + command = content.split(" ")[0] + config.Log("Info", f"识别指令: command={command}") - game_type, command = parse_result - logger.info(f"识别指令: game_type={game_type}, command={command}") - - # 检查是否包含 @s 参数(私聊标志) - use_private_url = False - # 使用正则表达式匹配独立的 @s 参数(前后有空格或字符串边界) - if re.search(r'\s+@s\s+|\s+@s$|^@s\s+|^@s$', command): - use_private_url = True - # 从命令中移除 @s 参数,保持其他参数不变 - command = re.sub(r'\s+@s(\s+|$)|^@s\s+', ' ', command).strip() - logger.info(f"检测到 @s 参数,将优先使用个人URL发送反馈,清理后的命令: {command}") - - # 检查限流 - rate_limiter = get_rate_limiter() - if not rate_limiter.is_allowed(): - remaining = rate_limiter.get_remaining() - reset_time = int(rate_limiter.get_reset_time()) - - sender = get_message_sender() - await sender.send_text( - f"⚠️ 消息发送过于频繁,请等待 {reset_time} 秒后再试\n" - f"剩余配额: {remaining}" - ) - return JSONResponse({"result": "ok"}) - - # 更新用户信息 - db = get_db() - db.get_or_create_user(callback_data.creator) - - # 处理指令 - response_text = await handle_command( - game_type=game_type, - command=command, - chat_id=callback_data.chatid, - user_id=callback_data.creator - ) - - # 发送回复 - if response_text: - # 如果使用了 @s 参数,优先发送到个人URL - if use_private_url: - db = get_db() - user_webhook_url = db.get_user_webhook_url(callback_data.creator) - - if user_webhook_url: - # 有个人URL,发送到个人URL - from utils.message import send_private_message - # 判断消息类型 - if game_type == 'ai_chat': - msg_type = 'markdown' - elif response_text.startswith('#'): - msg_type = 'markdown' - else: - msg_type = 'text' - - success = await send_private_message( - user_id=callback_data.creator, - content=response_text, - msg_type=msg_type - ) - if not success: - # 如果私聊发送失败,回退到主URL - logger.warning(f"个人URL发送失败,回退到主URL: user_id={callback_data.creator}") - sender = get_message_sender() - if game_type == 'ai_chat': - try: - await sender.send_markdown(response_text) - except Exception as send_md_err: - logger.error(f"发送Markdown消息失败,改用文本发送: {send_md_err}") - await sender.send_text(response_text) - else: - if response_text.startswith('#'): - await sender.send_markdown(response_text) - else: - await sender.send_text(response_text) - # 成功发送到个人URL,不向主URL发送 - else: - # 没有个人URL,回退到主URL - logger.info(f"用户 {callback_data.creator} 没有注册个人URL,使用主URL发送") - sender = get_message_sender() - if game_type == 'ai_chat': - try: - await sender.send_markdown(response_text) - except Exception as send_md_err: - logger.error(f"发送Markdown消息失败,改用文本发送: {send_md_err}") - await sender.send_text(response_text) - else: - if response_text.startswith('#'): - await sender.send_markdown(response_text) - else: - await sender.send_text(response_text) - else: - # 没有 @s 参数,正常发送到主URL - sender = get_message_sender() - - # AI 对话:统一按 Markdown 发送(按任务决策) - if game_type == 'ai_chat': - try: - await sender.send_markdown(response_text) - except Exception as send_md_err: - logger.error(f"发送Markdown消息失败,改用文本发送: {send_md_err}") - await sender.send_text(response_text) - else: - # 其他模块保持原有启发式:以 # 开头视为 Markdown,否则文本 - if response_text.startswith('#'): - await sender.send_markdown(response_text) - else: - await sender.send_text(response_text) - - return JSONResponse({"result": "ok"}) + # TODO: 处理指令 + return await handle_command(command, content, callback_data.chatid, callback_data.creator) except Exception as e: - logger.error(f"处理Callback异常: {e}", exc_info=True) - # 仍然返回ok,避免WPS重试 - return JSONResponse({"result": "ok"}) + config.Log("Error", f"处理Callback异常: {e}", exc_info=True) + if ALWAYS_RETURN_OK: + return JSONResponse({"result": "ok"}) + else: + return JSONResponse({"result": "error", "message": str(e)}) -async def handle_command(game_type: str, command: str, +async def handle_command(command: str, content: str, chat_id: int, user_id: int) -> str: """处理游戏指令 Args: - game_type: 游戏类型 - command: 完整指令 + command: 指令 + content: 消息内容 chat_id: 会话ID user_id: 用户ID @@ -177,252 +67,12 @@ async def handle_command(game_type: str, command: str, 回复文本 """ try: - # 帮助指令 - if game_type == 'help': - from games.base import get_help_message - return get_help_message() - - # 统计指令 - if game_type == 'stats': - from games.base import get_stats_message - return get_stats_message(user_id) - - # 注册系统 - if game_type == 'register': - return await handle_register_command(command, chat_id, user_id) - - # 骰娘游戏 - if game_type == 'dice': - from games.dice import DiceGame - game = DiceGame() - return await game.handle(command, chat_id, user_id) - - # 石头剪刀布 - if game_type == 'rps': - from games.rps import RPSGame - game = RPSGame() - return await game.handle(command, chat_id, user_id) - - # 运势占卜 - if game_type == 'fortune': - from games.fortune import FortuneGame - game = FortuneGame() - return await game.handle(command, chat_id, user_id) - - # 猜数字 - if game_type == 'guess': - from games.guess import GuessGame - game = GuessGame() - return await game.handle(command, chat_id, user_id) - - # 问答游戏 - if game_type == 'quiz': - from games.quiz import QuizGame - game = QuizGame() - return await game.handle(command, chat_id, user_id) - - # 成语接龙 - if game_type == 'idiom': - from games.idiom import IdiomGame - game = IdiomGame() - return await game.handle(command, chat_id, user_id) - - # 五子棋 - if game_type == 'gomoku': - from games.gomoku import GomokuGame - game = GomokuGame() - return await game.handle(command, chat_id, user_id) - - # 积分系统 - if game_type == 'points': - from games.points import PointsGame - game = PointsGame() - return await game.handle(command, chat_id, user_id) - - # 炼金系统 - if game_type == 'alchemy': - from games.alchemy import AlchemyGame - game = AlchemyGame() - return await game.handle(command, chat_id, user_id) - - # 冒险系统 - if game_type == 'adventure': - from games.adventure import AdventureGame - game = AdventureGame() - return await game.handle(command, chat_id, user_id) - - # 积分赠送系统 - if game_type == 'gift': - from games.gift import GiftGame - game = GiftGame() - return await game.handle(command, chat_id, user_id) - - # 复述功能 - if game_type == 'say': - # 提取参数并原样返回 - _, args = CommandParser.extract_command_args(command) - args = args.strip() - if not args: - return "用法:.say 你想让我说的话\n别名:.说 / .复述" - return args - - # 私聊功能 - if game_type == 'talk': - return await handle_talk_command(command, chat_id, user_id) - - # AI对话系统 - if game_type == 'ai_chat': - from games.ai_chat import AIChatGame - game = AIChatGame() - return await game.handle(command, chat_id, user_id) - - # 赌场系统 - if game_type == 'casino': - from games.casino import CasinoGame - game = CasinoGame() - return await game.handle(command, chat_id, user_id) - - # 狼人杀系统 - if game_type == 'werewolf': - from games.werewolf import WerewolfGame - game = WerewolfGame() - return await game.handle(command, chat_id, user_id) - - # 未知游戏类型 - logger.warning(f"未知游戏类型: {game_type}") - return "❌ 未知的游戏类型" - + plugin = PluginInterface.plugin_instances.get(command, None) + if plugin: + return plugin.callback(content, chat_id, user_id) + else: + return f"❌ 未识别指令: {command}" except Exception as e: - logger.error(f"处理游戏指令异常: {e}", exc_info=True) + config.Log("Error", f"处理指令异常: {e}", exc_info=True) return f"❌ 处理指令时出错: {str(e)}" - -async def handle_register_command(command: str, chat_id: int, user_id: int) -> str: - """处理注册命令 - - Args: - command: 完整指令 ".register name" 或 ".register url " - chat_id: 会话ID - user_id: 用户ID - - Returns: - 注册结果消息 - """ - try: - # 提取参数 - _, args = CommandParser.extract_command_args(command) - args = args.strip() - - # 验证参数 - if not args: - return "❌ 请提供要注册的内容!\n\n正确格式:\n`.register <名称>` - 注册用户名\n`.register url ` - 注册webhook URL\n\n示例:\n`.register 张三`\n`.register url https://example.com/webhook?key=xxx`" - - # 检查是否为url子命令 - parts = args.split(maxsplit=1) - if parts and parts[0].lower() == 'url': - # 处理URL注册 - if len(parts) < 2: - return "❌ 请提供webhook URL!\n\n正确格式:`.register url `\n\n示例:\n`.register url https://example.com/webhook?key=xxx`" - - webhook_url = parts[1].strip() - - # URL验证 - if not webhook_url.startswith(('http://', 'https://')): - return "❌ URL格式无效!必须以 http:// 或 https:// 开头。" - - # 设置URL - db = get_db() - success = db.set_user_webhook_url(user_id, webhook_url) - - if success: - return f"✅ Webhook URL注册成功!\n\n**您的个人URL**:{webhook_url}\n\n私聊消息将发送到此URL。" - else: - return "❌ 注册失败!请稍后重试。" - else: - # 原有的名称注册逻辑 - if len(args) > 20: - return "❌ 名称过长!最多支持20个字符。" - - # 更新用户名称 - db = get_db() - success = db.update_user_name(user_id, args) - - if success: - return f"✅ 注册成功!\n\n**您的名称**:{args}\n\n之后您可以使用这个名称参与各种游戏和功能。" - else: - return "❌ 注册失败!请稍后重试。" - - except Exception as e: - logger.error(f"处理注册指令错误: {e}", exc_info=True) - return f"❌ 处理指令出错: {str(e)}" - - -async def handle_talk_command(command: str, chat_id: int, user_id: int) -> str: - """处理私聊命令 - - Args: - command: 完整指令 ".talk " - chat_id: 会话ID - user_id: 发送者用户ID - - Returns: - 处理结果消息 - """ - try: - # 提取参数 - _, args = CommandParser.extract_command_args(command) - args = args.strip() - - # 验证参数 - if not args: - return "❌ 请提供用户名和消息内容!\n\n正确格式:`.talk <用户名> <消息内容>`\n\n示例:\n`.talk 张三 你好,想和你聊聊`\n`.talk 李四 这是一条私聊消息`" - - # 解析username和content(第一个单词是username,剩余部分是content) - parts = args.split(maxsplit=1) - if len(parts) < 2: - return "❌ 请提供用户名和消息内容!\n\n正确格式:`.talk <用户名> <消息内容>`\n\n示例:\n`.talk 张三 你好,想和你聊聊`" - - target_username = parts[0].strip() - content = parts[1].strip() - - if not target_username: - return "❌ 用户名不能为空!\n\n正确格式:`.talk <用户名> <消息内容>`" - if not content: - return "❌ 消息内容不能为空!\n\n正确格式:`.talk <用户名> <消息内容>`" - - # 通过用户名查找目标用户 - db = get_db() - target_user = db.get_user_by_name(target_username) - - if not target_user: - return f"❌ 找不到用户名为「{target_username}」的用户!\n\n提示:目标用户需要使用 `.register <名称>` 注册用户名。" - - target_user_id = target_user['user_id'] - - # 检查目标用户是否有注册名称(应该有,因为是通过名称找到的) - if not target_user.get('username'): - return f"❌ 用户「{target_username}」尚未注册用户名!" - - # 检查目标用户是否有个人webhook URL - if not db.has_webhook_url(target_user_id): - return f"❌ 用户「{target_username}」尚未注册个人webhook URL!\n\n提示:目标用户需要使用 `.register url ` 注册个人URL后才能接收私聊消息。" - - # 发送私聊消息 - from utils.message import send_private_message - success = await send_private_message( - user_id=target_user_id, - content=content, - msg_type='text' - ) - - if success: - # 私聊消息发送成功,不向主URL发送提示消息 - return "" - else: - # 发送失败时仍然需要提示用户 - return f"❌ 发送私聊消息失败,请稍后重试。" - - except Exception as e: - logger.error(f"处理私聊指令错误: {e}", exc_info=True) - return f"❌ 处理指令出错: {str(e)}" - diff --git a/README.md b/README.md index 1bcfe63..f8395fd 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ just because program not running to the place where argument been referenced - **max_concurrent_requests** default: 100 - **database_path** file on [Assets](Assets), default: db.db - **plugin_dir** where plugins load, default: Plugins +- **always_return_ok** default: true ## Plugins diff --git a/Utilities/__init__.py b/Utilities/__init__.py new file mode 100644 index 0000000..e03d15a --- /dev/null +++ b/Utilities/__init__.py @@ -0,0 +1,2 @@ +"""工具模块""" +