from PWF.Convention.Runtime.Config import * from PWF.CoreModules.plugin_interface import PluginInterface from PWF.CoreModules.flags import * from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ProjectConfig from PWF.Convention.Runtime.Web import ToolURL from PWF.Convention.Runtime.String import LimitStringLength from fastapi.responses import HTMLResponse from dataclasses import dataclass, field from typing import Any, Dict, Iterable, List, Optional, Sequence, TypedDict, override, Union import httpx import re logger: ProjectConfig = Architecture.Get(ProjectConfig) MAIN_WEBHOOK_URL = logger.FindItem("main_webhook_url", "") logger.SaveProperties() class GuideEntry(TypedDict, total=False): """单条图鉴信息。""" title: str identifier: str description: str category: str metadata: Dict[str, str] icon: str badge: str links: Sequence[Dict[str, str]] tags: Sequence[str] details: Sequence[Union[str, Dict[str, Any]]] group: str @dataclass(frozen=True) class GuideSection: """图鉴章节。""" title: str entries: Sequence[GuideEntry] = field(default_factory=tuple) description: str = "" layout: str = "grid" section_id: str | None = None @dataclass(frozen=True) class GuidePage: """完整图鉴页面。""" title: str sections: Sequence[GuideSection] = field(default_factory=tuple) subtitle: str = "" metadata: Dict[str, str] = field(default_factory=dict) related_links: Dict[str, Sequence[Dict[str, str]]] = field(default_factory=dict) def render_markdown_page(page: GuidePage) -> str: """保留 Markdown 渲染(备用)。""" def _render_section(section: GuideSection) -> str: lines: List[str] = [f"## {section.title}"] if section.description: lines.append(section.description) if not section.entries: lines.append("> 暂无内容。") return "\n".join(lines) for entry in section.entries: title = entry.get("title", "未命名") identifier = entry.get("identifier") desc = entry.get("description", "") category = entry.get("category") metadata = entry.get("metadata", {}) bullet = f"- **{title}**" if identifier: bullet += f"|`{identifier}`" if category: bullet += f"|{category}" lines.append(bullet) if desc: lines.append(f" - {desc}") for meta_key, meta_value in metadata.items(): lines.append(f" - {meta_key}:{meta_value}") return "\n".join(lines) lines: List[str] = [f"# {page.title}"] if page.subtitle: lines.append(page.subtitle) if page.metadata: lines.append("") for key, value in page.metadata.items(): lines.append(f"- {key}:{value}") for section in page.sections: lines.append("") lines.append(_render_section(section)) return "\n".join(lines) def render_html_page(page: GuidePage) -> str: """渲染 Apple Store 风格的 HTML 页面。""" def escape(text: Optional[str]) -> str: if not text: return "" return ( text.replace("&", "&") .replace("<", "<") .replace(">", ">") .replace('"', """) ) def render_metadata(metadata: Dict[str, str]) -> str: if not metadata: return "" cards = [] for key, value in metadata.items(): cards.append( f"""
{escape(key)}
{escape(value)}
""" ) return f'
{"".join(cards)}
' def render_links(links: Optional[Sequence[Dict[str, str]]]) -> str: if not links: return "" items = [] for link in links: href = escape(link.get("href", "#")) label = escape(link.get("label", "前往")) items.append(f'{label}') return "".join(items) def render_tags(tags: Optional[Sequence[str]]) -> str: if not tags: return "" chips = "".join(f'{escape(tag)}' for tag in tags) return f'
{chips}
' def render_details(details: Optional[Sequence[Union[str, Dict[str, Any]]]]) -> str: if not details: return "" blocks: List[str] = [] for detail in details: if isinstance(detail, str): blocks.append(f'

{escape(detail)}

') elif isinstance(detail, dict): kind = detail.get("type") if kind == "list": items = "".join( f'
  • {escape(str(item))}
  • ' for item in detail.get("items", []) ) blocks.append(f'') elif kind == "steps": items = "".join( f'
  • {idx+1}{escape(str(item))}
  • ' for idx, item in enumerate(detail.get("items", [])) ) blocks.append(f'
      {items}
    ') elif kind == "table": rows = [] for row in detail.get("rows", []): cols = "".join(f"{escape(str(col))}" for col in row) rows.append(f"{cols}") head = "" headers = detail.get("headers") if headers: head = "".join(f"{escape(str(col))}" for col in headers) head = f"{head}" blocks.append( f'{head}{"".join(rows)}
    ' ) if not blocks: return "" return f'
    {"".join(blocks)}
    ' def render_entry(entry: GuideEntry) -> str: icon = escape(entry.get("icon")) badge = escape(entry.get("badge")) title = escape(entry.get("title")) identifier = escape(entry.get("identifier")) description = escape(entry.get("description")) category = escape(entry.get("category")) metadata_items = [] for meta_key, meta_value in entry.get("metadata", {}).items(): metadata_items.append( f'
  • {escape(meta_key)}{escape(str(meta_value))}
  • ' ) metadata_html = "" if metadata_items: metadata_html = f'' identifier_html = f'{identifier}' if identifier else "" category_html = f'{category}' if category else "" badge_html = f'{badge}' if badge else "" icon_html = f'
    {icon}
    ' if icon else "" links_html = render_links(entry.get("links")) tags_html = render_tags(entry.get("tags")) details_html = render_details(entry.get("details")) group = escape(entry.get("group")) group_attr = f' data-group="{group}"' if group else "" return f"""
    {icon_html}

    {title}{badge_html}

    {description}

    {metadata_html} {tags_html} {details_html} {links_html}
    """ def render_section(section: GuideSection) -> str: layout_class = "entries-grid" if section.layout == "grid" else "entries-list" section_attr = f' id="{escape(section.section_id)}"' if section.section_id else "" cards = "".join(render_entry(entry) for entry in section.entries) description_html = ( f'

    {escape(section.description)}

    ' if section.description else "" ) if not cards: cards = '
    暂无内容
    ' return f"""

    {escape(section.title)}

    {description_html}
    {cards}
    """ def render_related(related: Dict[str, Sequence[Dict[str, str]]]) -> str: if not related: return "" blocks: List[str] = [] for label, links in related.items(): if not links: continue items = "".join( f'{escape(link.get("label", ""))}' for link in links ) blocks.append( f""" """ ) if not blocks: return "" return f'' sections_html = "".join(render_section(section) for section in page.sections) metadata_html = render_metadata(page.metadata) related_html = render_related(page.related_links) subtitle_html = f'

    {escape(page.subtitle)}

    ' if page.subtitle else "" return f""" {escape(page.title)}

    {escape(page.title)}

    {subtitle_html}
    {metadata_html} {related_html} {sections_html}
    """ class MessageSender: """消息发送器""" def __init__(self, webhook_url: str): """初始化消息发送器 Args: webhook_url: Webhook URL """ self.webhook_url = webhook_url self.client: Optional[httpx.AsyncClient] = None async def _get_client(self) -> httpx.AsyncClient: """获取HTTP客户端(懒加载)""" if self.client is None: self.client = httpx.AsyncClient(timeout=10.0) return self.client async def send_message(self, message: Dict[str, Any]) -> bool: """发送消息到WPS Args: message: 消息字典 Returns: 是否发送成功 """ try: client = await self._get_client() response = await client.post(self.webhook_url, json=message) if response.status_code == 200: logger.Log("Info", f"消息发送成功: {message.get('msgtype')}") return True else: logger.Log("Error", f"消息发送失败: status={response.status_code}, body={response.text}") return False except Exception as e: logger.Log("Error", f"发送消息异常: {e}") return False async def send_text(self, content: str, at_user_id: Optional[int] = None) -> bool: """发送文本消息 Args: content: 文本内容 at_user_id: @用户ID(可选) Returns: 是否发送成功 """ # 如果需要@人 if at_user_id: content = f' {content}' message = { "msgtype": "text", "text": { "content": content } } return await self.send_message(message) async def send_markdown(self, text: str) -> bool: """发送Markdown消息 Args: text: Markdown文本 Returns: 是否发送成功 """ message = { "msgtype": "markdown", "markdown": { "text": text } } return await self.send_message(message) async def send_link(self, title: str, text: str, message_url: str = "", btn_title: str = "查看详情") -> bool: """发送链接消息 Args: title: 标题 text: 文本内容 message_url: 跳转URL btn_title: 按钮文字 Returns: 是否发送成功 """ message = { "msgtype": "link", "link": { "title": title, "text": text, "messageUrl": message_url, "btnTitle": btn_title } } return await self.send_message(message) async def close(self): """关闭HTTP客户端""" if self.client: await self.client.aclose() self.client = None class BasicWPSInterface(PluginInterface): user_id_to_username: Optional[Callable[[int], str]] = None @override def is_enable_plugin(self) -> bool: return False def get_webhook_url(self, message: str, user_id: int) -> str: ''' 根据消息和用户ID获取Webhook URL, 返回空字符串表示不需要回复消息 Args: message: 消息内容 user_id: 用户ID Returns: Webhook URL ''' return "" def get_webhook_request(self, data:Any|None) -> None: pass def get_message_sender(self, webhook_url: str) -> MessageSender: return MessageSender(webhook_url) def get_message_sender_type(self) -> Literal["text", "markdown", "link"]: return "markdown" def get_message_sender_function(self, webhook_url: str, type: Literal["text", "markdown", "link"]) -> Coroutine[Any, Any, bool]: if type == "text": return self.get_message_sender(webhook_url).send_text elif type == "markdown": return self.get_message_sender(webhook_url).send_markdown elif type == "link": return self.get_message_sender(webhook_url).send_link else: raise ValueError(f"Invalid message sender type: {type}") def parse_message_after_at(self, message: str) -> str: ''' 已过时 ''' return message async def send_markdown_message(self, message: str, chat_id: int, user_id: int) -> str|None: webhook_url = self.get_webhook_url(message, user_id) if get_internal_debug(): logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, User ID: {user_id}") if webhook_url == "" or webhook_url == None: return None username: str = self.user_id_to_username(user_id) if self.user_id_to_username else "" send_message = f"""## {username} --- {message} """ result = await self.get_message_sender_function(webhook_url, self.get_message_sender_type())(send_message) if get_internal_verbose(): logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, Result: {result}") return None @override async def callback(self, message: str, chat_id: int, user_id: int) -> str|None: message = self.parse_message_after_at(message) if message == "": return None return await self.send_markdown_message(message, chat_id, user_id) class WPSAPI(BasicWPSInterface): """核心 WPS 插件基类,提供图鉴模板设施。""" guide_section_labels: Dict[str, str] = { "commands": "指令一览", "items": "物品与资源", "recipes": "配方与合成", "guides": "系统指引", } def get_guide_title(self) -> str: return self.__class__.__name__ def get_guide_subtitle(self) -> str: return "" def get_guide_metadata(self) -> Dict[str, str]: return {} def collect_command_entries(self) -> Sequence[GuideEntry]: return () def collect_item_entries(self) -> Sequence[GuideEntry]: return () def collect_recipe_entries(self) -> Sequence[GuideEntry]: return () def collect_guide_entries(self) -> Sequence[GuideEntry]: return () def collect_additional_sections(self) -> Sequence[GuideSection]: return () def collect_guide_sections(self) -> Sequence[GuideSection]: sections: List[GuideSection] = [] command_entries = tuple(self.collect_command_entries()) if command_entries: sections.append( GuideSection( title=self.guide_section_labels["commands"], entries=command_entries, layout="list", ) ) item_entries = tuple(self.collect_item_entries()) if item_entries: sections.append( GuideSection( title=self.guide_section_labels["items"], entries=item_entries, ) ) recipe_entries = tuple(self.collect_recipe_entries()) if recipe_entries: sections.append( GuideSection( title=self.guide_section_labels["recipes"], entries=recipe_entries, ) ) guide_entries = tuple(self.collect_guide_entries()) if guide_entries: sections.append( GuideSection( title=self.guide_section_labels["guides"], entries=guide_entries, layout="list", ) ) additional_sections = tuple(self.collect_additional_sections()) if additional_sections: sections.extend(additional_sections) return tuple(sections) def build_guide_page(self) -> GuidePage: metadata: Dict[str, str] = {} for key, value in self.get_guide_metadata().items(): metadata[key] = str(value) related = self.get_related_links() return GuidePage( title=self.get_guide_title(), subtitle=self.get_guide_subtitle(), sections=self.collect_guide_sections(), metadata=metadata, related_links=related, ) def render_guide_page(self, page: GuidePage) -> str: return render_html_page(page) def render_guide(self) -> str: return self.render_guide_page(self.build_guide_page()) def get_guide_response(self, content: str) -> HTMLResponse: return HTMLResponse(content) @override def generate_router_illustrated_guide(self): async def handler() -> HTMLResponse: return self.get_guide_response(self.render_guide()) return handler def get_related_links(self) -> Dict[str, Sequence[Dict[str, str]]]: links: Dict[str, Sequence[Dict[str, str]]] = {} parents = [] for base in self.__class__.__mro__[1:]: if not issubclass(base, WPSAPI): continue if base.__module__.startswith("Plugins."): parents.append(base) if base is WPSAPI: break if parents: parents_links = [self._build_class_link(cls) for cls in reversed(parents)] links["父类链"] = tuple(filter(None, parents_links)) child_links = [self._build_class_link(child) for child in self._iter_subclasses(self.__class__)] child_links = [link for link in child_links if link] if child_links: links["子类"] = tuple(child_links) return links def _build_class_link(self, cls: type) -> Optional[Dict[str, str]]: if not hasattr(cls, "__module__") or not cls.__module__.startswith("Plugins."): return None path = f"/api/{cls.__name__}" return { "label": cls.__name__, "href": path, } def _iter_subclasses(self, cls: type) -> List[type]: collected: List[type] = [] for subclass in cls.__subclasses__(): if not issubclass(subclass, WPSAPI): continue if not subclass.__module__.startswith("Plugins."): continue collected.append(subclass) collected.extend(self._iter_subclasses(subclass)) return collected def get_guide_subtitle(self) -> str: return "核心 Webhook 转发插件" def get_guide_metadata(self) -> Dict[str, str]: return { "Webhook 状态": "已配置" if MAIN_WEBHOOK_URL else "未配置", } def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "say", "identifier": "say", "description": "将后续消息内容以 Markdown 形式发送到主 Webhook。", "metadata": {"别名": "说"}, "icon": "🗣️", "badge": "核心", }, ) def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "Webhook 绑定", "description": ( "在项目配置中设置 `main_webhook_url` 后插件自动启用," "所有注册的命令将调用 `send_markdown_message` 发送富文本。" ), "icon": "🔗", }, { "title": "消息格式", "description": ( "默认使用 Markdown 模式发送,支持 `聊天ID` 与 `用户ID` 的 @ 提醒。" ), "icon": "📝", }, ) @override def is_enable_plugin(self) -> bool: if MAIN_WEBHOOK_URL == "": logger.Log("Error", f"{ConsoleFrontColor.RED}WPSAPI未配置主Webhook URL{ConsoleFrontColor.RESET}") return MAIN_WEBHOOK_URL != "" @override def get_webhook_url(self, message: str, user_id: int) -> str: return MAIN_WEBHOOK_URL @override def get_webhook_request(self, data:Any|None) -> None: return None @override def wake_up(self) -> None: logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPI核心插件已加载{ConsoleFrontColor.RESET}") self.register_plugin("say") self.register_plugin("说") class WPSAPIHelp(WPSAPI): @override def dependencies(self) -> List[Type]: return [WPSAPI] @override def is_enable_plugin(self) -> bool: return True @override async def callback(self, message: str, chat_id: int, user_id: int) -> str|None: mapper: Dict[str, List[str]] = {} for key in PluginInterface.plugin_instances.keys(): plugin = PluginInterface.plugin_instances.get(key) if plugin: if plugin.__class__.__name__ not in mapper: mapper[plugin.__class__.__name__] = [] mapper[plugin.__class__.__name__].append(key) desc = "" for plugin_name, keys in mapper.items(): desc += f"- {plugin_name}: {", ".join(keys)}\n" return await self.send_markdown_message(f"""# 指令入口 {desc} """, chat_id, user_id) @override def wake_up(self) -> None: logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPIHelp 插件已加载{ConsoleFrontColor.RESET}") self.register_plugin("help") self.register_plugin("帮助") logger.SaveProperties()