Files
WPSBot/utils/message.py

175 lines
5.0 KiB
Python
Raw Normal View History

2025-10-28 13:00:35 +08:00
"""WPS消息构造和发送工具"""
import httpx
import logging
from typing import Dict, Any, Optional
2025-10-29 12:19:25 +08:00
from config import GetWebhookURL
2025-10-28 13:00:35 +08:00
logger = logging.getLogger(__name__)
class MessageSender:
"""消息发送器"""
2025-10-29 12:16:12 +08:00
def __init__(self, webhook_url: Optional[str] = None):
2025-10-28 13:00:35 +08:00
"""初始化消息发送器
Args:
webhook_url: Webhook URL
"""
2025-10-29 12:16:12 +08:00
if webhook_url is None:
2025-10-29 12:19:25 +08:00
webhook_url = GetWebhookURL()
2025-10-28 13:00:35 +08:00
self.webhook_url = webhook_url
self.client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""获取HTTP客户端懒加载"""
if self.client is None:
self.client = httpx.AsyncClient(timeout=10.0)
return self.client
async def send_message(self, message: Dict[str, Any]) -> bool:
"""发送消息到WPS
Args:
message: 消息字典
Returns:
是否发送成功
"""
try:
client = await self._get_client()
response = await client.post(self.webhook_url, json=message)
if response.status_code == 200:
logger.info(f"消息发送成功: {message.get('msgtype')}")
return True
else:
logger.error(f"消息发送失败: status={response.status_code}, body={response.text}")
return False
except Exception as e:
logger.error(f"发送消息异常: {e}", exc_info=True)
return False
async def send_text(self, content: str, at_user_id: Optional[int] = None) -> bool:
"""发送文本消息
Args:
content: 文本内容
at_user_id: @用户ID可选
Returns:
是否发送成功
"""
# 如果需要@人
if at_user_id:
content = f'<at user_id="{at_user_id}"></at> {content}'
message = {
"msgtype": "text",
"text": {
"content": content
}
}
return await self.send_message(message)
async def send_markdown(self, text: str) -> bool:
"""发送Markdown消息
Args:
text: Markdown文本
Returns:
是否发送成功
"""
message = {
"msgtype": "markdown",
"markdown": {
"text": text
}
}
return await self.send_message(message)
async def send_link(self, title: str, text: str,
message_url: str = "", btn_title: str = "查看详情") -> bool:
"""发送链接消息
Args:
title: 标题
text: 文本内容
message_url: 跳转URL
btn_title: 按钮文字
Returns:
是否发送成功
"""
message = {
"msgtype": "link",
"link": {
"title": title,
"text": text,
"messageUrl": message_url,
"btnTitle": btn_title
}
}
return await self.send_message(message)
async def close(self):
"""关闭HTTP客户端"""
if self.client:
await self.client.aclose()
self.client = None
# 全局消息发送器实例
_sender_instance: Optional[MessageSender] = None
def get_message_sender() -> MessageSender:
"""获取全局消息发送器实例(单例模式)"""
global _sender_instance
if _sender_instance is None:
_sender_instance = MessageSender()
2025-10-29 12:19:25 +08:00
else:
# 更新Webhook URL以确保使用最新的值
_sender_instance.webhook_url = GetWebhookURL()
2025-10-28 13:00:35 +08:00
return _sender_instance
async def send_private_message(user_id: int, content: str, msg_type: str = 'text') -> bool:
"""发送私聊消息到用户个人webhook URL
Args:
user_id: 目标用户ID
content: 消息内容
msg_type: 消息类型 ('text' 'markdown')
Returns:
是否发送成功如果用户没有个人URL则返回False
"""
from core.database import get_db
# 从数据库获取用户webhook URL
db = get_db()
webhook_url = db.get_user_webhook_url(user_id)
if not webhook_url:
logger.warning(f"用户 {user_id} 没有注册个人webhook URL无法发送私聊消息")
return False
# 创建MessageSender实例使用用户的个人URL
sender = MessageSender(webhook_url=webhook_url)
try:
# 根据msg_type调用相应方法
if msg_type == 'markdown':
return await sender.send_markdown(content)
else:
return await sender.send_text(content)
except Exception as e:
logger.error(f"发送私聊消息失败: user_id={user_id}, error={e}", exc_info=True)
return False
finally:
# 关闭HTTP客户端
await sender.close()