diff --git a/.tasks/2025-11-03_1_user-webhook-url.md b/.tasks/2025-11-03_1_user-webhook-url.md new file mode 100644 index 0000000..e44427e --- /dev/null +++ b/.tasks/2025-11-03_1_user-webhook-url.md @@ -0,0 +1,462 @@ +# 背景 +文件名:2025-11-03_1_user-webhook-url.md +创建于:2025-11-03_09:38:30 +创建者:admin +主分支:main +任务分支:task/user-webhook-url_2025-11-03_1 +Yolo模式:Off + +# 任务描述 +在WPS Bot Game项目中添加用户专属webhook URL功能,允许每个用户注册自己的个人webhook URL作为私聊途径。 + +## 核心需求 +1. 用户可以通过 `.register url ` 指令注册个人webhook URL +2. 私聊消息发送功能将被封装为API接口,供其他系统调用 +3. 提供检测用户是否具有个人URL的接口,用于系统运行时确保参与用户都能被私聊 +4. 服务器启动时使用的webhook URL称为主URL,私聊用的URL称为个人URL + +## 术语定义 +- **主URL**: 服务器启动时使用的webhook URL,用于群聊消息发送 +- **个人URL**: 用户注册的专属webhook URL,用于私聊消息发送 + +## 功能要求 +1. **注册功能**: 支持 `.register url ` 指令注册/更新个人URL +2. **私聊接口**: 封装私聊消息发送功能为API接口(暂不对用户开放命令) +3. **检测接口**: 提供单个和批量检测用户是否有个人URL的接口 +4. **数据库支持**: 在users表中添加webhook_url字段 + +# 项目概览 + +## 项目结构 +``` +WPSBotGame/ +├── app.py # FastAPI主应用 +├── config.py # 配置管理 +├── core/ +│ ├── database.py # SQLite数据库操作 +│ ├── middleware.py # 中间件 +│ └── models.py # 数据模型 +├── routers/ +│ ├── callback.py # Callback路由处理 +│ ├── health.py # 健康检查 +│ └── private.py # 私聊相关API(新增) +├── games/ # 游戏模块 +│ └── ... # 各种游戏 +└── utils/ + ├── parser.py # 指令解析 + └── message.py # 消息发送 +``` + +# 分析 + +## 当前状态 +1. `users` 表已有基础字段:user_id, username, created_at, last_active +2. `routers/callback.py` 中已有 `.register` 命令处理名称注册 +3. `utils/message.py` 中的 `MessageSender` 类使用全局webhook URL发送消息 +4. 数据库已支持动态添加列(`_add_column_if_not_exists`方法) +5. `init_tables()` 方法在表创建后会进行兼容性检查,使用 `_add_column_if_not_exists` 安全添加新列 + +## 关键技术点 +1. **数据库层**: + - 在`init_tables()`中使用`_add_column_if_not_exists`添加`webhook_url`字段(TEXT类型,可为NULL) + - 确保兼容性:如果表已存在且没有该列,会自动添加 + - 添加`set_user_webhook_url(user_id, webhook_url)`方法 + - 添加`get_user_webhook_url(user_id)`方法 + - 添加`has_webhook_url(user_id)`方法 + - 添加`check_users_webhook_urls(user_ids)`批量检查方法 + +2. **注册命令扩展**: + - 修改`handle_register_command`支持`.register url `子命令 + - 保留原有的`.register `功能 + - URL验证(基本格式检查) + +3. **私聊消息发送**: + - 封装私聊消息发送功能到`utils/message.py` + - 创建`send_private_message(user_id, content, msg_type='text')`函数 + - 如果用户有个人URL则使用个人URL,否则返回错误 + +4. **API接口**: + - 创建`routers/private.py`路由文件 + - `POST /api/private/send` - 发送私聊消息 + - `GET /api/private/check/{user_id}` - 检查单个用户是否有个人URL + - `POST /api/private/check-batch` - 批量检查多个用户 + +# 提议的解决方案 + +## 方案概述 +1. **数据库扩展**: 在users表添加webhook_url字段,并实现相关CRUD方法 +2. **注册命令扩展**: 扩展`.register`命令支持`url`子命令 +3. **私聊功能封装**: 创建私聊消息发送工具函数 +4. **API接口**: 创建私聊相关的RESTful API接口 + +## 设计决策 +- 个人URL存储在users表中,与用户信息关联 +- 私聊功能暂不提供用户命令,仅作为API接口供系统调用 +- URL验证采用基本格式检查(http/https开头) +- 批量检查接口支持传入用户ID列表,返回每个用户的URL状态 + +# 当前执行步骤:"3. 执行阶段完成" + +实施清单: +1. 在core/database.py的init_tables()方法末尾添加webhook_url字段兼容性检查 +2. 在core/database.py中添加set_user_webhook_url方法 +3. 在core/database.py中添加get_user_webhook_url方法 +4. 在core/database.py中添加has_webhook_url方法 +5. 在core/database.py中添加check_users_webhook_urls方法 +6. 在core/models.py文件末尾添加PrivateMessageRequest模型 +7. 在core/models.py中添加CheckBatchRequest模型 +8. 在core/models.py中添加CheckBatchResponse模型 +9. 在core/models.py的导入中添加List类型 +10. 修改routers/callback.py的handle_register_command函数支持url子命令 +11. 在utils/message.py文件末尾添加send_private_message函数 +12. 创建新文件routers/private.py,包含所有私聊相关API接口 +13. 在app.py中导入private路由模块 +14. 在app.py中注册private路由 + +# 详细实施计划 + +## 文件1: core/database.py + +### 修改点1: 在init_tables()方法中添加webhook_url字段兼容性检查 + +**位置**: 在`init_tables()`方法的末尾,第324行`logger.info("数据库表初始化完成")`之前 + +**修改内容**: +```python +# 兼容性检查:为users表添加webhook_url字段 +self._add_column_if_not_exists('users', 'webhook_url', 'TEXT') +``` + +### 修改点2: 添加set_user_webhook_url方法 + +**位置**: 在`# ===== 用户相关操作 =====`部分,`update_user_name`方法之后(约第414行之后) + +**方法签名**: +```python +def set_user_webhook_url(self, user_id: int, webhook_url: str) -> bool: + """设置用户webhook URL + + Args: + user_id: 用户ID + webhook_url: Webhook URL + + Returns: + 是否成功 + """ +``` + +**实现逻辑**: +- 使用try-except包装 +- 确保用户存在(调用get_or_create_user) +- UPDATE users SET webhook_url = ? WHERE user_id = ? +- 记录成功/失败日志 +- 返回True/False,异常时返回False + +### 修改点3: 添加get_user_webhook_url方法 + +**位置**: 紧接`set_user_webhook_url`方法之后 + +**方法签名**: +```python +def get_user_webhook_url(self, user_id: int) -> Optional[str]: + """获取用户webhook URL + + Args: + user_id: 用户ID + + Returns: + Webhook URL,如果不存在返回None + """ +``` + +**实现逻辑**: +- SELECT webhook_url FROM users WHERE user_id = ? +- 如果查询结果为None,返回None +- 如果webhook_url为None或空字符串,返回None +- 否则返回URL字符串 + +### 修改点4: 添加has_webhook_url方法 + +**位置**: 紧接`get_user_webhook_url`方法之后 + +**方法签名**: +```python +def has_webhook_url(self, user_id: int) -> bool: + """检查用户是否有个人webhook URL + + Args: + user_id: 用户ID + + Returns: + 是否有个人URL + """ +``` + +**实现逻辑**: +- 调用get_user_webhook_url +- 检查返回值是否不为None且不为空字符串 + +### 修改点5: 添加check_users_webhook_urls方法(批量检查) + +**位置**: 紧接`has_webhook_url`方法之后 + +**方法签名**: +```python +def check_users_webhook_urls(self, user_ids: List[int]) -> Dict[int, bool]: + """批量检查用户是否有个人webhook URL + + Args: + user_ids: 用户ID列表 + + Returns: + 字典 {user_id: has_url} + """ +``` + +**实现逻辑**: +- 如果user_ids为空,返回空字典 +- 使用IN子句查询:SELECT user_id, webhook_url FROM users WHERE user_id IN (?) +- 构建结果字典:初始化为所有user_id为False +- 遍历查询结果,如果webhook_url不为None且不为空字符串,则设为True +- 返回结果字典 + +## 文件2: routers/callback.py + +### 修改点1: 修改handle_register_command函数支持url子命令 + +**位置**: 第226-260行的`handle_register_command`函数 + +**修改内容**: +- 提取命令和参数后,检查第一个参数是否为"url" +- 如果是"url",提取URL参数,验证URL格式(http/https开头),调用`db.set_user_webhook_url` +- 如果不是"url",保持原有逻辑(注册名称) +- 更新帮助信息,包含两种用法 + +**新的函数逻辑**: +```python +# 提取参数 +_, args = CommandParser.extract_command_args(command) +args = args.strip() + +# 检查是否为url子命令 +parts = args.split(maxsplit=1) +if parts and parts[0].lower() == 'url': + # 处理URL注册 + if len(parts) < 2: + return "❌ 请提供webhook URL!\n\n正确格式:`.register url `\n\n示例:\n`.register url https://example.com/webhook?key=xxx`" + webhook_url = parts[1].strip() + # URL验证 + if not webhook_url.startswith(('http://', 'https://')): + return "❌ URL格式无效!必须以 http:// 或 https:// 开头。" + # 设置URL + db = get_db() + success = db.set_user_webhook_url(user_id, webhook_url) + if success: + return f"✅ Webhook URL注册成功!\n\n**您的个人URL**:{webhook_url}\n\n私聊消息将发送到此URL。" + else: + return "❌ 注册失败!请稍后重试。" +else: + # 原有的名称注册逻辑 + ... +``` + +## 文件3: utils/message.py + +### 修改点1: 添加send_private_message函数 + +**位置**: 在文件末尾,`get_message_sender`函数之后 + +**函数签名**: +```python +async def send_private_message(user_id: int, content: str, msg_type: str = 'text') -> bool: + """发送私聊消息到用户个人webhook URL + + Args: + user_id: 目标用户ID + content: 消息内容 + msg_type: 消息类型 ('text' 或 'markdown') + + Returns: + 是否发送成功,如果用户没有个人URL则返回False + """ +``` + +**实现逻辑**: +- 从数据库获取用户webhook URL +- 如果URL不存在,记录日志并返回False +- 创建MessageSender实例(使用用户的个人URL) +- 根据msg_type调用send_text或send_markdown +- 返回发送结果 + +## 文件4: core/models.py (新增数据模型) + +### 修改点1: 添加PrivateMessageRequest模型 + +**位置**: 文件末尾 + +**模型定义**: +```python +class PrivateMessageRequest(BaseModel): + """私聊消息请求模型""" + user_id: int = Field(..., description="目标用户ID") + content: str = Field(..., description="消息内容") + msg_type: str = Field(default="text", description="消息类型: text 或 markdown") +``` + +### 修改点2: 添加CheckBatchRequest模型 + +**位置**: 紧接PrivateMessageRequest之后 + +**模型定义**: +```python +class CheckBatchRequest(BaseModel): + """批量检查请求模型""" + user_ids: List[int] = Field(..., description="用户ID列表") +``` + +### 修改点3: 添加CheckBatchResponse模型 + +**位置**: 紧接CheckBatchRequest之后 + +**模型定义**: +```python +class CheckBatchResponse(BaseModel): + """批量检查响应模型""" + results: Dict[int, bool] = Field(..., description="用户ID到是否有URL的映射") +``` + +**注意**: core/models.py需要添加`from typing import List`导入(如果尚未导入) + +## 文件5: routers/private.py (新建文件) + +### 文件结构: +```python +"""私聊相关API路由""" +import logging +from typing import List, Dict +from fastapi import APIRouter, HTTPException +from fastapi.responses import JSONResponse + +from core.database import get_db +from core.models import PrivateMessageRequest, CheckBatchRequest, CheckBatchResponse +from utils.message import send_private_message + +logger = logging.getLogger(__name__) + +router = APIRouter() +``` + +### 接口1: POST /api/private/send + +**位置**: router定义之后 + +**函数签名**: +```python +@router.post("/private/send") +async def send_private(request: PrivateMessageRequest): + """发送私聊消息 + + 请求体: + { + "user_id": 123456, + "content": "消息内容", + "msg_type": "text" // 可选,默认为"text" + } + """ +``` + +**实现逻辑**: +- 验证msg_type(必须是"text"或"markdown"),否则返回400错误 +- 调用send_private_message +- 如果返回False(用户没有个人URL或发送失败),返回400错误和相应消息 +- 成功则返回JSONResponse({"success": True, "message": "消息发送成功"}) + +### 接口2: GET /api/private/check/{user_id} + +**位置**: send_private之后 + +**函数签名**: +```python +@router.get("/private/check/{user_id}") +async def check_user_webhook(user_id: int): + """检查用户是否有个人webhook URL""" +``` + +**实现逻辑**: +- 调用db.has_webhook_url(user_id) +- 返回JSONResponse({"user_id": user_id, "has_webhook_url": bool}) + +### 接口3: POST /api/private/check-batch + +**位置**: check_user_webhook之后 + +**函数签名**: +```python +@router.post("/private/check-batch") +async def check_users_webhook_batch(request: CheckBatchRequest): + """批量检查用户是否有个人webhook URL + + 请求体: + { + "user_ids": [123456, 789012, ...] + } + """ +``` + +**实现逻辑**: +- 调用db.check_users_webhook_urls(request.user_ids) +- 返回CheckBatchResponse(results=...) + +## 文件6: app.py + +### 修改点1: 导入private路由 + +**位置**: 第13行,导入语句中 + +**修改内容**: +```python +from routers import callback, health, private +``` + +### 修改点2: 注册private路由 + +**位置**: 第75-76行,路由注册部分 + +**修改内容**: +```python +app.include_router(callback.router, prefix="/api", tags=["callback"]) +app.include_router(health.router, tags=["health"]) +app.include_router(private.router, prefix="/api", tags=["private"]) +``` + +# 任务进度 + +[2025-11-03_09:45:56] +- 已修改: + 1. core/database.py - 添加webhook_url字段兼容性检查和4个数据库方法 + 2. core/models.py - 添加3个API数据模型和List类型导入 + 3. routers/callback.py - 扩展handle_register_command支持url子命令 + 4. utils/message.py - 添加send_private_message函数 + 5. routers/private.py - 新建文件,包含3个私聊相关API接口 + 6. app.py - 导入并注册private路由 + +- 更改: + 1. 在users表中添加webhook_url字段支持(兼容性检查) + 2. 实现用户webhook URL的CRUD操作(设置、获取、检查、批量检查) + 3. 扩展.register命令支持`.register url `子命令 + 4. 封装私聊消息发送功能为独立函数 + 5. 创建私聊相关的RESTful API接口(发送、单个检查、批量检查) + 6. 注册新的API路由到FastAPI应用 + +- 原因: + 实现用户专属webhook URL注册和私聊消息发送功能,为其他系统提供API接口调用 + +- 阻碍因素: + 无 + +- 状态:未确认 + +# 最终审查 + +待审查阶段完成... + diff --git a/app.py b/app.py index 506f9c6..983ab91 100644 --- a/app.py +++ b/app.py @@ -10,7 +10,7 @@ import asyncio from config import APP_CONFIG, SESSION_TIMEOUT, SetWebhookURL, GetWebhookURL from core.middleware import ConcurrencyLimitMiddleware from core.database import get_db -from routers import callback, health +from routers import callback, health, private # 配置日志 logging.basicConfig( @@ -74,6 +74,7 @@ app.add_middleware(ConcurrencyLimitMiddleware) # 注册路由 app.include_router(callback.router, prefix="/api", tags=["callback"]) app.include_router(health.router, tags=["health"]) +app.include_router(private.router, prefix="/api", tags=["private"]) @app.get("/") diff --git a/core/database.py b/core/database.py index 0485c14..9b879dd 100644 --- a/core/database.py +++ b/core/database.py @@ -321,6 +321,9 @@ class Database: self._add_column_if_not_exists('casino_bets', 'bet_value', "TEXT") self._add_column_if_not_exists('casino_bets', 'hand_status', "TEXT") + # 兼容性检查:为users表添加webhook_url字段 + self._add_column_if_not_exists('users', 'webhook_url', 'TEXT') + logger.info("数据库表初始化完成") # ===== 用户相关操作 ===== @@ -412,6 +415,103 @@ class Database: logger.error(f"更新用户名失败: user_id={user_id}, username={username}, error={e}", exc_info=True) return False + def set_user_webhook_url(self, user_id: int, webhook_url: str) -> bool: + """设置用户webhook URL + + Args: + user_id: 用户ID + webhook_url: Webhook URL + + Returns: + 是否成功 + """ + try: + # 确保用户存在 + self.get_or_create_user(user_id) + + cursor = self.conn.cursor() + cursor.execute( + "UPDATE users SET webhook_url = ? WHERE user_id = ?", + (webhook_url, user_id) + ) + + logger.info(f"用户 {user_id} 设置webhook URL: {webhook_url}") + return True + + except Exception as e: + logger.error(f"设置用户webhook URL失败: user_id={user_id}, error={e}", exc_info=True) + return False + + def get_user_webhook_url(self, user_id: int) -> Optional[str]: + """获取用户webhook URL + + Args: + user_id: 用户ID + + Returns: + Webhook URL,如果不存在返回None + """ + cursor = self.conn.cursor() + cursor.execute( + "SELECT webhook_url FROM users WHERE user_id = ?", + (user_id,) + ) + row = cursor.fetchone() + + if not row: + return None + + webhook_url = row[0] + if not webhook_url or webhook_url.strip() == '': + return None + + return webhook_url + + def has_webhook_url(self, user_id: int) -> bool: + """检查用户是否有个人webhook URL + + Args: + user_id: 用户ID + + Returns: + 是否有个人URL + """ + webhook_url = self.get_user_webhook_url(user_id) + return webhook_url is not None + + def check_users_webhook_urls(self, user_ids: List[int]) -> Dict[int, bool]: + """批量检查用户是否有个人webhook URL + + Args: + user_ids: 用户ID列表 + + Returns: + 字典 {user_id: has_url} + """ + if not user_ids: + return {} + + # 初始化结果字典,所有用户默认为False + results = {user_id: False for user_id in user_ids} + + cursor = self.conn.cursor() + # 使用IN子句查询 + placeholders = ','.join('?' * len(user_ids)) + cursor.execute( + f"SELECT user_id, webhook_url FROM users WHERE user_id IN ({placeholders})", + user_ids + ) + + rows = cursor.fetchall() + for row in rows: + user_id = row[0] + webhook_url = row[1] + # 如果webhook_url不为None且不为空字符串,则设为True + if webhook_url and webhook_url.strip() != '': + results[user_id] = True + + return results + def get_user_display_name(self, user_id: int) -> str: """获取用户显示名称 如果用户已注册(username不为None),返回用户名;否则返回"用户{user_id}" diff --git a/core/models.py b/core/models.py index 5cf4d97..f9d8ba7 100644 --- a/core/models.py +++ b/core/models.py @@ -1,6 +1,6 @@ """数据模型定义""" from pydantic import BaseModel, Field -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, List class CallbackRequest(BaseModel): @@ -76,3 +76,20 @@ class QuizGameState(GameState): attempts: int = Field(0, description="尝试次数") max_attempts: int = Field(3, description="最大尝试次数") + +class PrivateMessageRequest(BaseModel): + """私聊消息请求模型""" + user_id: int = Field(..., description="目标用户ID") + content: str = Field(..., description="消息内容") + msg_type: str = Field(default="text", description="消息类型: text 或 markdown") + + +class CheckBatchRequest(BaseModel): + """批量检查请求模型""" + user_ids: List[int] = Field(..., description="用户ID列表") + + +class CheckBatchResponse(BaseModel): + """批量检查响应模型""" + results: Dict[int, bool] = Field(..., description="用户ID到是否有URL的映射") + diff --git a/routers/callback.py b/routers/callback.py index 4697038..0ab065b 100644 --- a/routers/callback.py +++ b/routers/callback.py @@ -202,6 +202,10 @@ async def handle_command(game_type: str, command: str, return "用法:.say 你想让我说的话\n别名:.说 / .复述" return args + # 私聊功能 + if game_type == 'talk': + return await handle_talk_command(command, chat_id, user_id) + # AI对话系统 if game_type == 'ai_chat': from games.ai_chat import AIChatGame @@ -227,7 +231,7 @@ async def handle_register_command(command: str, chat_id: int, user_id: int) -> s """处理注册命令 Args: - command: 完整指令 ".register name" + command: 完整指令 ".register name" 或 ".register url " chat_id: 会话ID user_id: 用户ID @@ -241,21 +245,112 @@ async def handle_register_command(command: str, chat_id: int, user_id: int) -> s # 验证参数 if not args: - return "❌ 请提供要注册的名称!\n\n正确格式:`.register <名称>`\n\n示例:\n`.register 张三`\n`.register 小明`" + return "❌ 请提供要注册的内容!\n\n正确格式:\n`.register <名称>` - 注册用户名\n`.register url ` - 注册webhook URL\n\n示例:\n`.register 张三`\n`.register url https://example.com/webhook?key=xxx`" - if len(args) > 20: - return "❌ 名称过长!最多支持20个字符。" - - # 更新用户名称 - db = get_db() - success = db.update_user_name(user_id, args) - - if success: - return f"✅ 注册成功!\n\n**您的名称**:{args}\n\n之后您可以使用这个名称参与各种游戏和功能。" + # 检查是否为url子命令 + parts = args.split(maxsplit=1) + if parts and parts[0].lower() == 'url': + # 处理URL注册 + if len(parts) < 2: + return "❌ 请提供webhook URL!\n\n正确格式:`.register url `\n\n示例:\n`.register url https://example.com/webhook?key=xxx`" + + webhook_url = parts[1].strip() + + # URL验证 + if not webhook_url.startswith(('http://', 'https://')): + return "❌ URL格式无效!必须以 http:// 或 https:// 开头。" + + # 设置URL + db = get_db() + success = db.set_user_webhook_url(user_id, webhook_url) + + if success: + return f"✅ Webhook URL注册成功!\n\n**您的个人URL**:{webhook_url}\n\n私聊消息将发送到此URL。" + else: + return "❌ 注册失败!请稍后重试。" else: - return "❌ 注册失败!请稍后重试。" + # 原有的名称注册逻辑 + if len(args) > 20: + return "❌ 名称过长!最多支持20个字符。" + + # 更新用户名称 + db = get_db() + success = db.update_user_name(user_id, args) + + if success: + return f"✅ 注册成功!\n\n**您的名称**:{args}\n\n之后您可以使用这个名称参与各种游戏和功能。" + else: + return "❌ 注册失败!请稍后重试。" except Exception as e: logger.error(f"处理注册指令错误: {e}", exc_info=True) return f"❌ 处理指令出错: {str(e)}" + +async def handle_talk_command(command: str, chat_id: int, user_id: int) -> str: + """处理私聊命令 + + Args: + command: 完整指令 ".talk " + chat_id: 会话ID + user_id: 发送者用户ID + + Returns: + 处理结果消息 + """ + try: + # 提取参数 + _, args = CommandParser.extract_command_args(command) + args = args.strip() + + # 验证参数 + if not args: + return "❌ 请提供用户名和消息内容!\n\n正确格式:`.talk <用户名> <消息内容>`\n\n示例:\n`.talk 张三 你好,想和你聊聊`\n`.talk 李四 这是一条私聊消息`" + + # 解析username和content(第一个单词是username,剩余部分是content) + parts = args.split(maxsplit=1) + if len(parts) < 2: + return "❌ 请提供用户名和消息内容!\n\n正确格式:`.talk <用户名> <消息内容>`\n\n示例:\n`.talk 张三 你好,想和你聊聊`" + + target_username = parts[0].strip() + content = parts[1].strip() + + if not target_username: + return "❌ 用户名不能为空!\n\n正确格式:`.talk <用户名> <消息内容>`" + if not content: + return "❌ 消息内容不能为空!\n\n正确格式:`.talk <用户名> <消息内容>`" + + # 通过用户名查找目标用户 + db = get_db() + target_user = db.get_user_by_name(target_username) + + if not target_user: + return f"❌ 找不到用户名为「{target_username}」的用户!\n\n提示:目标用户需要使用 `.register <名称>` 注册用户名。" + + target_user_id = target_user['user_id'] + + # 检查目标用户是否有注册名称(应该有,因为是通过名称找到的) + if not target_user.get('username'): + return f"❌ 用户「{target_username}」尚未注册用户名!" + + # 检查目标用户是否有个人webhook URL + if not db.has_webhook_url(target_user_id): + return f"❌ 用户「{target_username}」尚未注册个人webhook URL!\n\n提示:目标用户需要使用 `.register url ` 注册个人URL后才能接收私聊消息。" + + # 发送私聊消息 + from utils.message import send_private_message + success = await send_private_message( + user_id=target_user_id, + content=content, + msg_type='text' + ) + + if success: + return f"✅ 私聊消息已发送给「{target_username}」!" + else: + return f"❌ 发送私聊消息失败,请稍后重试。" + + except Exception as e: + logger.error(f"处理私聊指令错误: {e}", exc_info=True) + return f"❌ 处理指令出错: {str(e)}" + diff --git a/routers/private.py b/routers/private.py new file mode 100644 index 0000000..6dfffa7 --- /dev/null +++ b/routers/private.py @@ -0,0 +1,111 @@ +"""私聊相关API路由""" +import logging +from typing import List, Dict +from fastapi import APIRouter, HTTPException +from fastapi.responses import JSONResponse + +from core.database import get_db +from core.models import PrivateMessageRequest, CheckBatchRequest, CheckBatchResponse +from utils.message import send_private_message + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.post("/private/send") +async def send_private(request: PrivateMessageRequest): + """发送私聊消息 + + 请求体: + { + "user_id": 123456, + "content": "消息内容", + "msg_type": "text" // 可选,默认为"text" + } + """ + try: + # 验证msg_type + if request.msg_type not in ['text', 'markdown']: + raise HTTPException( + status_code=400, + detail="msg_type必须是'text'或'markdown'" + ) + + # 调用send_private_message + success = await send_private_message( + user_id=request.user_id, + content=request.content, + msg_type=request.msg_type + ) + + if not success: + # 检查用户是否有个人URL + db = get_db() + has_url = db.has_webhook_url(request.user_id) + if not has_url: + raise HTTPException( + status_code=400, + detail=f"用户 {request.user_id} 没有注册个人webhook URL" + ) + else: + raise HTTPException( + status_code=500, + detail="消息发送失败,请稍后重试" + ) + + return JSONResponse({ + "success": True, + "message": "消息发送成功" + }) + + except HTTPException: + raise + except Exception as e: + logger.error(f"发送私聊消息API错误: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"服务器内部错误: {str(e)}" + ) + + +@router.get("/private/check/{user_id}") +async def check_user_webhook(user_id: int): + """检查用户是否有个人webhook URL""" + try: + db = get_db() + has_webhook_url = db.has_webhook_url(user_id) + + return JSONResponse({ + "user_id": user_id, + "has_webhook_url": has_webhook_url + }) + except Exception as e: + logger.error(f"检查用户webhook URL错误: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"服务器内部错误: {str(e)}" + ) + + +@router.post("/private/check-batch") +async def check_users_webhook_batch(request: CheckBatchRequest): + """批量检查用户是否有个人webhook URL + + 请求体: + { + "user_ids": [123456, 789012, ...] + } + """ + try: + db = get_db() + results = db.check_users_webhook_urls(request.user_ids) + + return CheckBatchResponse(results=results) + except Exception as e: + logger.error(f"批量检查用户webhook URL错误: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"服务器内部错误: {str(e)}" + ) + diff --git a/utils/message.py b/utils/message.py index 2527c08..1dd25fc 100644 --- a/utils/message.py +++ b/utils/message.py @@ -135,3 +135,40 @@ def get_message_sender() -> MessageSender: _sender_instance.webhook_url = GetWebhookURL() return _sender_instance + +async def send_private_message(user_id: int, content: str, msg_type: str = 'text') -> bool: + """发送私聊消息到用户个人webhook URL + + Args: + user_id: 目标用户ID + content: 消息内容 + msg_type: 消息类型 ('text' 或 'markdown') + + Returns: + 是否发送成功,如果用户没有个人URL则返回False + """ + from core.database import get_db + + # 从数据库获取用户webhook URL + db = get_db() + webhook_url = db.get_user_webhook_url(user_id) + + if not webhook_url: + logger.warning(f"用户 {user_id} 没有注册个人webhook URL,无法发送私聊消息") + return False + + # 创建MessageSender实例(使用用户的个人URL) + sender = MessageSender(webhook_url=webhook_url) + + try: + # 根据msg_type调用相应方法 + if msg_type == 'markdown': + return await sender.send_markdown(content) + else: + return await sender.send_text(content) + except Exception as e: + logger.error(f"发送私聊消息失败: user_id={user_id}, error={e}", exc_info=True) + return False + finally: + # 关闭HTTP客户端 + await sender.close() diff --git a/utils/parser.py b/utils/parser.py index 920055c..575f47d 100644 --- a/utils/parser.py +++ b/utils/parser.py @@ -73,6 +73,10 @@ class CommandParser: '.说': 'say', '.复述': 'say', + # 私聊 + '.talk': 'talk', + '.私聊': 'talk', + # 帮助 '.help': 'help', '.帮助': 'help',