Files
NewWPSBot/Plugins/WPSAPI.py
2025-11-07 15:53:09 +08:00

217 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PWF.Convention.Runtime.Config import *
from PWF.CoreModules.plugin_interface import PluginInterface
from PWF.CoreModules.flags import *
from PWF.Convention.Runtime.GlobalConfig import ProjectConfig
from PWF.Convention.Runtime.Web import ToolURL
from PWF.Convention.Runtime.String import LimitStringLength
import httpx
import re
config = ProjectConfig()
MAIN_WEBHOOK_URL = config.FindItem("main_webhook_url", "")
config.SaveProperties()
class MessageSender:
"""消息发送器"""
def __init__(self, webhook_url: str):
"""初始化消息发送器
Args:
webhook_url: Webhook URL
"""
self.webhook_url = webhook_url
self.client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""获取HTTP客户端懒加载"""
if self.client is None:
self.client = httpx.AsyncClient(timeout=10.0)
return self.client
async def send_message(self, message: Dict[str, Any]) -> bool:
"""发送消息到WPS
Args:
message: 消息字典
Returns:
是否发送成功
"""
try:
client = await self._get_client()
response = await client.post(self.webhook_url, json=message)
if response.status_code == 200:
config.Log("Info", f"消息发送成功: {message.get('msgtype')}")
return True
else:
config.Log("Error", f"消息发送失败: status={response.status_code}, body={response.text}")
return False
except Exception as e:
config.Log("Error", f"发送消息异常: {e}")
return False
async def send_text(self, content: str, at_user_id: Optional[int] = None) -> bool:
"""发送文本消息
Args:
content: 文本内容
at_user_id: @用户ID可选
Returns:
是否发送成功
"""
# 如果需要@人
if at_user_id:
content = f'<at user_id="{at_user_id}"></at> {content}'
message = {
"msgtype": "text",
"text": {
"content": content
}
}
return await self.send_message(message)
async def send_markdown(self, text: str) -> bool:
"""发送Markdown消息
Args:
text: Markdown文本
Returns:
是否发送成功
"""
message = {
"msgtype": "markdown",
"markdown": {
"text": text
}
}
return await self.send_message(message)
async def send_link(self, title: str, text: str,
message_url: str = "", btn_title: str = "查看详情") -> bool:
"""发送链接消息
Args:
title: 标题
text: 文本内容
message_url: 跳转URL
btn_title: 按钮文字
Returns:
是否发送成功
"""
message = {
"msgtype": "link",
"link": {
"title": title,
"text": text,
"messageUrl": message_url,
"btnTitle": btn_title
}
}
return await self.send_message(message)
async def close(self):
"""关闭HTTP客户端"""
if self.client:
await self.client.aclose()
self.client = None
class BasicWPSInterface(PluginInterface):
@override
def is_enable_plugin(self) -> bool:
return False
def get_webhook_url(self, message: str, user_id: int) -> str:
'''
根据消息和用户ID获取Webhook URL, 返回空字符串表示不需要回复消息
Args:
message: 消息内容
user_id: 用户ID
Returns:
Webhook URL
'''
return ""
def get_webhook_request(self, data:Any|None) -> None:
pass
def get_message_sender(self, webhook_url: str) -> MessageSender:
return MessageSender(webhook_url)
def get_message_sender_type(self) -> Literal["text", "markdown", "link"]:
return "markdown"
def get_message_sender_function(self, webhook_url: str, type: Literal["text", "markdown", "link"]) -> Coroutine[Any, Any, bool]:
if type == "text":
return self.get_message_sender(webhook_url).send_text
elif type == "markdown":
return self.get_message_sender(webhook_url).send_markdown
elif type == "link":
return self.get_message_sender(webhook_url).send_link
else:
raise ValueError(f"Invalid message sender type: {type}")
# 机器人名称模式(用于从@消息中提取)
AT_PATTERN = re.compile(r'@[^\s]+\s+(.+)', re.DOTALL)
def parse_message_after_at(self, message: str) -> str:
# 去除首尾空格
message = message.strip()
# 尝试提取@后的内容
at_match = BasicWPSInterface.AT_PATTERN.search(message)
if at_match:
return at_match.group(1).strip()
return message
async def send_markdown_message(self, message: str, chat_id: int, user_id: int) -> str|None:
webhook_url = self.get_webhook_url(message, user_id)
if get_internal_debug():
config.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, User ID: {user_id}")
if webhook_url == "" or webhook_url == None:
return None
result = await self.get_message_sender_function(webhook_url, self.get_message_sender_type())(message)
if get_internal_verbose():
config.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, Result: {result}")
return None
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
message = self.parse_message_after_at(message)
if message == "":
return None
return await self.send_markdown_message(message, chat_id, user_id)
class WPSAPI(BasicWPSInterface):
@override
def is_enable_plugin(self) -> bool:
if MAIN_WEBHOOK_URL == "":
config.Log("Error", f"{ConsoleFrontColor.RED}WPSAPI未配置主Webhook URL{ConsoleFrontColor.RESET}")
return MAIN_WEBHOOK_URL != ""
@override
def get_webhook_url(self, message: str, user_id: int) -> str:
return MAIN_WEBHOOK_URL
@override
def get_webhook_request(self, data:Any|None) -> None:
return None
@override
def wake_up(self) -> None:
config.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPI核心插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("say")
self.register_plugin("")
config.SaveProperties()