Files
NewWPSBot/Plugins/WPSAPI.py
2025-11-10 21:44:28 +08:00

244 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PWF.Convention.Runtime.Config import *
from PWF.CoreModules.plugin_interface import PluginInterface
from PWF.CoreModules.flags import *
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ProjectConfig
from PWF.Convention.Runtime.Web import ToolURL
from PWF.Convention.Runtime.String import LimitStringLength
import httpx
import re
logger: ProjectConfig = Architecture.Get(ProjectConfig)
MAIN_WEBHOOK_URL = logger.FindItem("main_webhook_url", "")
logger.SaveProperties()
class MessageSender:
"""消息发送器"""
def __init__(self, webhook_url: str):
"""初始化消息发送器
Args:
webhook_url: Webhook URL
"""
self.webhook_url = webhook_url
self.client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""获取HTTP客户端懒加载"""
if self.client is None:
self.client = httpx.AsyncClient(timeout=10.0)
return self.client
async def send_message(self, message: Dict[str, Any]) -> bool:
"""发送消息到WPS
Args:
message: 消息字典
Returns:
是否发送成功
"""
try:
client = await self._get_client()
response = await client.post(self.webhook_url, json=message)
if response.status_code == 200:
logger.Log("Info", f"消息发送成功: {message.get('msgtype')}")
return True
else:
logger.Log("Error", f"消息发送失败: status={response.status_code}, body={response.text}")
return False
except Exception as e:
logger.Log("Error", f"发送消息异常: {e}")
return False
async def send_text(self, content: str, at_user_id: Optional[int] = None) -> bool:
"""发送文本消息
Args:
content: 文本内容
at_user_id: @用户ID可选
Returns:
是否发送成功
"""
# 如果需要@人
if at_user_id:
content = f'<at user_id="{at_user_id}"></at> {content}'
message = {
"msgtype": "text",
"text": {
"content": content
}
}
return await self.send_message(message)
async def send_markdown(self, text: str) -> bool:
"""发送Markdown消息
Args:
text: Markdown文本
Returns:
是否发送成功
"""
message = {
"msgtype": "markdown",
"markdown": {
"text": text
}
}
return await self.send_message(message)
async def send_link(self, title: str, text: str,
message_url: str = "", btn_title: str = "查看详情") -> bool:
"""发送链接消息
Args:
title: 标题
text: 文本内容
message_url: 跳转URL
btn_title: 按钮文字
Returns:
是否发送成功
"""
message = {
"msgtype": "link",
"link": {
"title": title,
"text": text,
"messageUrl": message_url,
"btnTitle": btn_title
}
}
return await self.send_message(message)
async def close(self):
"""关闭HTTP客户端"""
if self.client:
await self.client.aclose()
self.client = None
class BasicWPSInterface(PluginInterface):
@override
def is_enable_plugin(self) -> bool:
return False
def get_webhook_url(self, message: str, user_id: int) -> str:
'''
根据消息和用户ID获取Webhook URL, 返回空字符串表示不需要回复消息
Args:
message: 消息内容
user_id: 用户ID
Returns:
Webhook URL
'''
return ""
def get_webhook_request(self, data:Any|None) -> None:
pass
def get_message_sender(self, webhook_url: str) -> MessageSender:
return MessageSender(webhook_url)
def get_message_sender_type(self) -> Literal["text", "markdown", "link"]:
return "markdown"
def get_message_sender_function(self, webhook_url: str, type: Literal["text", "markdown", "link"]) -> Coroutine[Any, Any, bool]:
if type == "text":
return self.get_message_sender(webhook_url).send_text
elif type == "markdown":
return self.get_message_sender(webhook_url).send_markdown
elif type == "link":
return self.get_message_sender(webhook_url).send_link
else:
raise ValueError(f"Invalid message sender type: {type}")
def parse_message_after_at(self, message: str) -> str:
'''
已过时
'''
return message
async def send_markdown_message(self, message: str, chat_id: int, user_id: int) -> str|None:
webhook_url = self.get_webhook_url(message, user_id)
if get_internal_debug():
logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, User ID: {user_id}")
if webhook_url == "" or webhook_url == None:
return None
result = await self.get_message_sender_function(webhook_url, self.get_message_sender_type())(f"""<at user_id="{user_id}"></at>
---
{message}
""")
if get_internal_verbose():
logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, Result: {result}")
return None
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
message = self.parse_message_after_at(message)
if message == "":
return None
return await self.send_markdown_message(message, chat_id, user_id)
class WPSAPI(BasicWPSInterface):
@override
def is_enable_plugin(self) -> bool:
if MAIN_WEBHOOK_URL == "":
logger.Log("Error", f"{ConsoleFrontColor.RED}WPSAPI未配置主Webhook URL{ConsoleFrontColor.RESET}")
return MAIN_WEBHOOK_URL != ""
@override
def get_webhook_url(self, message: str, user_id: int) -> str:
return MAIN_WEBHOOK_URL
@override
def get_webhook_request(self, data:Any|None) -> None:
return None
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPI核心插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("say")
self.register_plugin("")
class WPSAPIHelp(WPSAPI):
@override
def dependencies(self) -> List[Type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
mapper: Dict[str, List[str]] = {}
for key in PluginInterface.plugin_instances.keys():
plugin = PluginInterface.plugin_instances.get(key)
if plugin:
if plugin.__class__.__name__ not in mapper:
mapper[plugin.__class__.__name__] = []
mapper[plugin.__class__.__name__].append(key)
desc = ""
for plugin_name, keys in mapper.items():
desc += f"- {plugin_name}: {", ".join(keys)}\n"
return await self.send_markdown_message(f"""# 指令入口
{desc}
""", chat_id, user_id)
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPIHelp 插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("help")
self.register_plugin("帮助")
logger.SaveProperties()