1491 lines
50 KiB
Python
1491 lines
50 KiB
Python
from PWF.Convention.Runtime.Config import *
|
||
from PWF.CoreModules.plugin_interface import PluginInterface
|
||
from PWF.CoreModules.flags import *
|
||
from PWF.Convention.Runtime.Architecture import Architecture
|
||
from PWF.Convention.Runtime.GlobalConfig import ProjectConfig
|
||
from PWF.Convention.Runtime.Web import ToolURL
|
||
from PWF.Convention.Runtime.String import LimitStringLength
|
||
from fastapi.responses import HTMLResponse
|
||
from dataclasses import dataclass, field
|
||
from typing import Any, Dict, Iterable, List, Optional, Sequence, TypedDict, override, Union
|
||
import httpx
|
||
import re
|
||
from datetime import datetime, timedelta
|
||
from llama_index.core.tools import FunctionTool
|
||
from llama_index.llms.ollama import Ollama
|
||
from llama_index.core.agent.workflow import AgentWorkflow
|
||
from llama_index.core.agent.workflow.react_agent import ReActAgent
|
||
from bs4 import BeautifulSoup
|
||
|
||
logger: ProjectConfig = Architecture.Get(ProjectConfig)
|
||
MAIN_WEBHOOK_URL = logger.FindItem("main_webhook_url", "")
|
||
logger.SaveProperties()
|
||
|
||
def get_target_web_page_url(year:str, month:str, day:str) -> str:
|
||
return fr"http://mrxwlb.com/{year}/{month}/{day}/{year}年{month}月{day}日新闻联播文字版/"
|
||
|
||
ollama_url = "http://www.liubai.site:11434"
|
||
|
||
class NewsAIAgent:
|
||
"""新闻AI智能体 - 基于LlamaIndex和Ollama的工具调用智能体"""
|
||
|
||
def __init__(self, ollama_url: str = "http://www.liubai.site:11434"):
|
||
self.ollama_url = ollama_url
|
||
self.client: Optional[httpx.AsyncClient] = None
|
||
self.llm = Ollama(model="qwen2.5:7b", base_url=ollama_url, request_timeout=600.0)
|
||
self.workflow: Optional[AgentWorkflow] = None
|
||
self._initialize_agent()
|
||
|
||
def _initialize_agent(self):
|
||
"""初始化智能体和工具"""
|
||
# 创建工具函数
|
||
tools = [
|
||
FunctionTool.from_defaults(
|
||
fn=self._get_current_date,
|
||
name="get_current_date",
|
||
description="获取当前日期,返回格式为'年-月-日',例如'2025-11-19'"
|
||
),
|
||
FunctionTool.from_defaults(
|
||
fn=self._get_yesterday_date,
|
||
name="get_yesterday_date",
|
||
description="获取昨天的日期,返回格式为'年-月-日',例如'2025-11-18'"
|
||
),
|
||
FunctionTool.from_defaults(
|
||
fn=self._get_news_content,
|
||
name="get_news_content",
|
||
description="获取指定日期的新闻联播文字内容。参数date格式为'年-月-日',例如'2025-11-19'。返回该日期的新闻内容文本。"
|
||
),
|
||
FunctionTool.from_defaults(
|
||
fn=self._parse_date_from_text,
|
||
name="parse_date_from_text",
|
||
description="从文本中解析日期。支持格式:'2025年11月19日'、'2025-11-19'、'2025/11/19'、'今天'、'昨天'等。返回格式为'年-月-日'"
|
||
),
|
||
]
|
||
|
||
# 系统提示词 - 更清晰明确的指令
|
||
system_prompt = """你是一个新闻分析助手,专门回答关于新闻联播的问题。
|
||
|
||
【重要】你必须按照以下步骤使用工具:
|
||
|
||
步骤1: 确定日期
|
||
- 如果用户问"今天"或"今日",调用 get_current_date
|
||
- 如果用户问"昨天"或"昨日",调用 get_yesterday_date
|
||
- 如果用户提到具体日期(如"2025年11月17日"),调用 parse_date_from_text
|
||
|
||
步骤2: 获取新闻内容
|
||
- 拿到日期后,必须调用 get_news_content(date="YYYY-MM-DD") 获取新闻
|
||
- 注意:date参数格式必须是 "年-月-日",例如 "2025-11-19"
|
||
|
||
步骤3: 回答问题
|
||
- 仔细阅读获取到的新闻内容
|
||
- 基于新闻内容准确回答用户的问题
|
||
- 如果新闻中没有相关信息,明确告知用户
|
||
|
||
【禁止】直接回答问题而不调用工具!你必须先获取新闻内容才能回答。"""
|
||
|
||
# 创建ReActAgent
|
||
agent = ReActAgent(
|
||
llm=self.llm,
|
||
tools=tools,
|
||
verbose=True,
|
||
system_prompt=system_prompt,
|
||
)
|
||
|
||
# 创建AgentWorkflow
|
||
self.workflow = AgentWorkflow(
|
||
agents=[agent],
|
||
timeout=600.0,
|
||
)
|
||
|
||
async def _get_client(self) -> httpx.AsyncClient:
|
||
"""获取HTTP客户端(懒加载)"""
|
||
if self.client is None:
|
||
self.client = httpx.AsyncClient(timeout=120.0)
|
||
return self.client
|
||
|
||
def _get_current_date(self) -> str:
|
||
"""获取当前日期
|
||
|
||
Returns:
|
||
当前日期字符串,格式:年-月-日
|
||
"""
|
||
now = datetime.now()
|
||
return f"{now.year}-{str(now.month).zfill(2)}-{str(now.day).zfill(2)}"
|
||
|
||
def _get_yesterday_date(self) -> str:
|
||
"""获取昨天日期
|
||
|
||
Returns:
|
||
昨天日期字符串,格式:年-月-日
|
||
"""
|
||
yesterday = datetime.now() - timedelta(days=1)
|
||
return f"{yesterday.year}-{str(yesterday.month).zfill(2)}-{str(yesterday.day).zfill(2)}"
|
||
|
||
def _parse_date_from_text(self, text: str) -> str:
|
||
"""从文本中解析日期
|
||
|
||
Args:
|
||
text: 包含日期信息的文本
|
||
|
||
Returns:
|
||
日期字符串,格式:年-月-日
|
||
"""
|
||
# 尝试匹配日期格式
|
||
date_patterns = [
|
||
r'(\d{4})年(\d{1,2})月(\d{1,2})日',
|
||
r'(\d{4})-(\d{1,2})-(\d{1,2})',
|
||
r'(\d{4})/(\d{1,2})/(\d{1,2})',
|
||
]
|
||
|
||
for pattern in date_patterns:
|
||
match = re.search(pattern, text)
|
||
if match:
|
||
year, month, day = match.groups()
|
||
return f"{year}-{month.zfill(2)}-{day.zfill(2)}"
|
||
|
||
# 检查相对日期
|
||
if "今天" in text or "今日" in text:
|
||
return self._get_current_date()
|
||
elif "昨天" in text or "昨日" in text:
|
||
return self._get_yesterday_date()
|
||
|
||
# 默认返回今天
|
||
return self._get_current_date()
|
||
|
||
def _get_news_content(self, date: str) -> str:
|
||
"""获取指定日期的新闻内容(带缓存)
|
||
|
||
Args:
|
||
date: 日期字符串,格式:年-月-日,例如'2025-11-19'
|
||
|
||
Returns:
|
||
新闻文字内容
|
||
"""
|
||
try:
|
||
# 解析日期
|
||
parts = date.split('-')
|
||
if len(parts) != 3:
|
||
return f"日期格式错误,请使用'年-月-日'格式,例如'2025-11-19'"
|
||
|
||
year, month, day = parts
|
||
# 去掉前导零(某些网站URL格式要求)
|
||
month = str(int(month))
|
||
day = str(int(day))
|
||
|
||
# 检查缓存
|
||
cache_key = f"news_cache/{year}/{month}/{day}"
|
||
cached_file = ProjectConfig().GetFile(cache_key, False)
|
||
|
||
if cached_file.Exists():
|
||
cached_content = cached_file.LoadAsText()
|
||
logger.Log("Info", f"从缓存加载新闻: {date}")
|
||
return cached_content
|
||
|
||
# 如果没有缓存,则抓取网页
|
||
logger.Log("Info", f"从网页抓取新闻: {date}")
|
||
url = get_target_web_page_url(year, month, day)
|
||
|
||
# 使用同步方式获取(因为工具函数需要同步)
|
||
import requests
|
||
|
||
# 直接使用中文URL,让requests自动处理编码
|
||
url = f"http://mrxwlb.com/{year}/{month}/{day}/{year}年{month}月{day}日新闻联播文字版/"
|
||
|
||
# 添加浏览器请求头,模拟真实浏览器访问
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||
'Accept-Encoding': 'gzip, deflate',
|
||
'Connection': 'keep-alive',
|
||
'Upgrade-Insecure-Requests': '1',
|
||
}
|
||
|
||
logger.Log("Info", f"请求URL: {url}")
|
||
response = requests.get(url, headers=headers, timeout=30, allow_redirects=True)
|
||
response.raise_for_status()
|
||
|
||
# 使用BeautifulSoup解析HTML
|
||
soup = BeautifulSoup(response.text, 'html.parser')
|
||
|
||
# 移除script和style标签
|
||
for script in soup(["script", "style"]):
|
||
script.decompose()
|
||
|
||
# 获取文本内容
|
||
content = soup.get_text()
|
||
# 清理空白字符
|
||
lines = (line.strip() for line in content.splitlines())
|
||
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
|
||
content = ' '.join(chunk for chunk in chunks if chunk)
|
||
|
||
if not content or len(content) < 100:
|
||
return f"未能获取到{year}年{month}月{day}日的新闻内容"
|
||
|
||
# 保存到缓存
|
||
ProjectConfig().GetFile(cache_key, True).SaveAsText(content)
|
||
logger.Log("Info", f"新闻内容已缓存: {date}")
|
||
|
||
return content
|
||
|
||
except Exception as e:
|
||
logger.Log("Error", f"获取新闻失败: {e}")
|
||
return f"获取新闻时出错: {str(e)}"
|
||
|
||
async def answer_question(self, query: str) -> str:
|
||
"""根据问题回答新闻内容
|
||
|
||
Args:
|
||
query: 用户问题
|
||
|
||
Returns:
|
||
AI生成的回答
|
||
"""
|
||
try:
|
||
logger.Log("Info", f"="*50)
|
||
logger.Log("Info", f"用户提问: {query}")
|
||
logger.Log("Info", f"使用模型: {self.llm.model}")
|
||
logger.Log("Info", f"="*50)
|
||
|
||
# 使用workflow运行agent(注意参数是user_msg)
|
||
result = await self.workflow.run(user_msg=query)
|
||
|
||
logger.Log("Info", f"Agent执行完成,结果类型: {type(result)}")
|
||
|
||
# 提取回答内容
|
||
if hasattr(result, 'response'):
|
||
answer = str(result.response)
|
||
logger.Log("Info", f"从result.response提取答案")
|
||
elif hasattr(result, 'message'):
|
||
answer = str(result.message)
|
||
logger.Log("Info", f"从result.message提取答案")
|
||
else:
|
||
answer = str(result)
|
||
logger.Log("Info", f"直接转换result为字符串")
|
||
|
||
logger.Log("Info", f"最终答案长度: {len(answer)} 字符")
|
||
|
||
return answer
|
||
|
||
except Exception as e:
|
||
logger.Log("Error", f"回答问题失败: {e}")
|
||
import traceback
|
||
error_trace = traceback.format_exc()
|
||
logger.Log("Error", f"详细错误:\n{error_trace}")
|
||
return f"处理问题时出错: {str(e)}"
|
||
|
||
async def close(self):
|
||
"""关闭客户端"""
|
||
if self.client:
|
||
await self.client.aclose()
|
||
self.client = None
|
||
|
||
class GuideEntry(TypedDict, total=False):
|
||
"""单条图鉴信息。"""
|
||
|
||
title: str
|
||
identifier: str
|
||
description: str
|
||
category: str
|
||
metadata: Dict[str, str]
|
||
icon: str
|
||
badge: str
|
||
links: Sequence[Dict[str, str]]
|
||
tags: Sequence[str]
|
||
details: Sequence[Union[str, Dict[str, Any]]]
|
||
group: str
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class GuideSection:
|
||
"""图鉴章节。"""
|
||
|
||
title: str
|
||
entries: Sequence[GuideEntry] = field(default_factory=tuple)
|
||
description: str = ""
|
||
layout: str = "grid"
|
||
section_id: str | None = None
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class GuidePage:
|
||
"""完整图鉴页面。"""
|
||
|
||
title: str
|
||
sections: Sequence[GuideSection] = field(default_factory=tuple)
|
||
subtitle: str = ""
|
||
metadata: Dict[str, str] = field(default_factory=dict)
|
||
related_links: Dict[str, Sequence[Dict[str, str]]] = field(default_factory=dict)
|
||
|
||
|
||
def render_markdown_page(page: GuidePage) -> str:
|
||
"""保留 Markdown 渲染(备用)。"""
|
||
|
||
def _render_section(section: GuideSection) -> str:
|
||
lines: List[str] = [f"## {section.title}"]
|
||
if section.description:
|
||
lines.append(section.description)
|
||
if not section.entries:
|
||
lines.append("> 暂无内容。")
|
||
return "\n".join(lines)
|
||
for entry in section.entries:
|
||
title = entry.get("title", "未命名")
|
||
identifier = entry.get("identifier")
|
||
desc = entry.get("description", "")
|
||
category = entry.get("category")
|
||
metadata = entry.get("metadata", {})
|
||
bullet = f"- **{title}**"
|
||
if identifier:
|
||
bullet += f"|`{identifier}`"
|
||
if category:
|
||
bullet += f"|{category}"
|
||
lines.append(bullet)
|
||
if desc:
|
||
lines.append(f" - {desc}")
|
||
for meta_key, meta_value in metadata.items():
|
||
lines.append(f" - {meta_key}:{meta_value}")
|
||
return "\n".join(lines)
|
||
|
||
lines: List[str] = [f"# {page.title}"]
|
||
if page.subtitle:
|
||
lines.append(page.subtitle)
|
||
if page.metadata:
|
||
lines.append("")
|
||
for key, value in page.metadata.items():
|
||
lines.append(f"- {key}:{value}")
|
||
for section in page.sections:
|
||
lines.append("")
|
||
lines.append(_render_section(section))
|
||
return "\n".join(lines)
|
||
|
||
|
||
def render_html_page(page: GuidePage) -> str:
|
||
"""渲染 Apple Store 风格的 HTML 页面。"""
|
||
|
||
def escape(text: Optional[str]) -> str:
|
||
if not text:
|
||
return ""
|
||
return (
|
||
text.replace("&", "&")
|
||
.replace("<", "<")
|
||
.replace(">", ">")
|
||
.replace('"', """)
|
||
)
|
||
|
||
def render_metadata(metadata: Dict[str, str]) -> str:
|
||
if not metadata:
|
||
return ""
|
||
cards = []
|
||
for key, value in metadata.items():
|
||
cards.append(
|
||
f"""
|
||
<div class="meta-card">
|
||
<div class="meta-key">{escape(key)}</div>
|
||
<div class="meta-value">{escape(value)}</div>
|
||
</div>
|
||
"""
|
||
)
|
||
return f'<section class="meta-grid">{"".join(cards)}</section>'
|
||
|
||
def render_links(links: Optional[Sequence[Dict[str, str]]]) -> str:
|
||
if not links:
|
||
return ""
|
||
items = []
|
||
for link in links:
|
||
href = escape(link.get("href", "#"))
|
||
label = escape(link.get("label", "前往"))
|
||
items.append(f'<a class="entry-link" href="{href}" target="_blank">{label}</a>')
|
||
return "".join(items)
|
||
|
||
def render_tags(tags: Optional[Sequence[str]]) -> str:
|
||
if not tags:
|
||
return ""
|
||
chips = "".join(f'<span class="entry-tag">{escape(tag)}</span>' for tag in tags)
|
||
return f'<div class="entry-tags-extra">{chips}</div>'
|
||
|
||
def render_details(details: Optional[Sequence[Union[str, Dict[str, Any]]]]) -> str:
|
||
if not details:
|
||
return ""
|
||
blocks: List[str] = []
|
||
for detail in details:
|
||
if isinstance(detail, str):
|
||
blocks.append(f'<p class="entry-detail-paragraph">{escape(detail)}</p>')
|
||
elif isinstance(detail, dict):
|
||
kind = detail.get("type")
|
||
if kind == "list":
|
||
items = "".join(
|
||
f'<li>{escape(str(item))}</li>' for item in detail.get("items", [])
|
||
)
|
||
blocks.append(f'<ul class="entry-detail-list">{items}</ul>')
|
||
elif kind == "steps":
|
||
items = "".join(
|
||
f'<li><span class="step-index">{idx+1}</span><span>{escape(str(item))}</span></li>'
|
||
for idx, item in enumerate(detail.get("items", []))
|
||
)
|
||
blocks.append(f'<ol class="entry-detail-steps">{items}</ol>')
|
||
elif kind == "table":
|
||
rows = []
|
||
for row in detail.get("rows", []):
|
||
cols = "".join(f"<td>{escape(str(col))}</td>" for col in row)
|
||
rows.append(f"<tr>{cols}</tr>")
|
||
head = ""
|
||
headers = detail.get("headers")
|
||
if headers:
|
||
head = "".join(f"<th>{escape(str(col))}</th>" for col in headers)
|
||
head = f"<thead><tr>{head}</tr></thead>"
|
||
blocks.append(
|
||
f'<table class="entry-detail-table">{head}<tbody>{"".join(rows)}</tbody></table>'
|
||
)
|
||
if not blocks:
|
||
return ""
|
||
return f'<div class="entry-details">{"".join(blocks)}</div>'
|
||
|
||
def render_entry(entry: GuideEntry) -> str:
|
||
icon = escape(entry.get("icon"))
|
||
badge = escape(entry.get("badge"))
|
||
title = escape(entry.get("title"))
|
||
identifier = escape(entry.get("identifier"))
|
||
description = escape(entry.get("description"))
|
||
category = escape(entry.get("category"))
|
||
metadata_items = []
|
||
for meta_key, meta_value in entry.get("metadata", {}).items():
|
||
metadata_items.append(
|
||
f'<li><span>{escape(meta_key)}</span><span>{escape(str(meta_value))}</span></li>'
|
||
)
|
||
metadata_html = ""
|
||
if metadata_items:
|
||
metadata_html = f'<ul class="entry-meta">{"".join(metadata_items)}</ul>'
|
||
identifier_html = f'<code class="entry-id">{identifier}</code>' if identifier else ""
|
||
category_html = f'<span class="entry-category">{category}</span>' if category else ""
|
||
badge_html = f'<span class="entry-badge">{badge}</span>' if badge else ""
|
||
icon_html = f'<div class="entry-icon">{icon}</div>' if icon else ""
|
||
links_html = render_links(entry.get("links"))
|
||
tags_html = render_tags(entry.get("tags"))
|
||
details_html = render_details(entry.get("details"))
|
||
group = escape(entry.get("group"))
|
||
group_attr = f' data-group="{group}"' if group else ""
|
||
return f"""
|
||
<article class="entry-card"{group_attr}>
|
||
{icon_html}
|
||
<div class="entry-content">
|
||
<header>
|
||
<h3>{title}{badge_html}</h3>
|
||
<div class="entry-tags">{identifier_html}{category_html}</div>
|
||
</header>
|
||
<p class="entry-desc">{description}</p>
|
||
{metadata_html}
|
||
{tags_html}
|
||
{details_html}
|
||
{links_html}
|
||
</div>
|
||
</article>
|
||
"""
|
||
|
||
def render_section(section: GuideSection) -> str:
|
||
layout_class = "entries-grid" if section.layout == "grid" else "entries-list"
|
||
section_attr = f' id="{escape(section.section_id)}"' if section.section_id else ""
|
||
cards = "".join(render_entry(entry) for entry in section.entries)
|
||
description_html = (
|
||
f'<p class="section-desc">{escape(section.description)}</p>'
|
||
if section.description
|
||
else ""
|
||
)
|
||
if not cards:
|
||
cards = '<div class="empty-placeholder">暂无内容</div>'
|
||
return f"""
|
||
<section class="guide-section"{section_attr}>
|
||
<div class="section-header">
|
||
<h2>{escape(section.title)}</h2>
|
||
{description_html}
|
||
</div>
|
||
<div class="{layout_class}">
|
||
{cards}
|
||
</div>
|
||
</section>
|
||
"""
|
||
|
||
def render_related(related: Dict[str, Sequence[Dict[str, str]]]) -> str:
|
||
if not related:
|
||
return ""
|
||
blocks: List[str] = []
|
||
for label, links in related.items():
|
||
if not links:
|
||
continue
|
||
items = "".join(
|
||
f'<a class="related-link" href="{escape(link.get("href", "#"))}">{escape(link.get("label", ""))}</a>'
|
||
for link in links
|
||
)
|
||
blocks.append(
|
||
f"""
|
||
<div class="related-block">
|
||
<div class="related-label">{escape(label)}</div>
|
||
<div class="related-items">{items}</div>
|
||
</div>
|
||
"""
|
||
)
|
||
if not blocks:
|
||
return ""
|
||
return f'<section class="related-section">{"".join(blocks)}</section>'
|
||
|
||
sections_html = "".join(render_section(section) for section in page.sections)
|
||
metadata_html = render_metadata(page.metadata)
|
||
related_html = render_related(page.related_links)
|
||
subtitle_html = f'<p class="hero-subtitle">{escape(page.subtitle)}</p>' if page.subtitle else ""
|
||
|
||
return f"""
|
||
<!DOCTYPE html>
|
||
<html lang="zh-CN">
|
||
<head>
|
||
<meta charset="utf-8" />
|
||
<meta name="viewport" content="width=device-width, initial-scale=1" />
|
||
<title>{escape(page.title)}</title>
|
||
<style>
|
||
:root {{
|
||
color-scheme: dark;
|
||
--bg-primary: radial-gradient(120% 120% at 10% 10%, #222 0%, #050505 100%);
|
||
--bg-card: rgba(255, 255, 255, 0.06);
|
||
--bg-card-hover: rgba(255, 255, 255, 0.12);
|
||
--border-soft: rgba(255, 255, 255, 0.1);
|
||
--text-main: #f5f5f7;
|
||
--text-sub: rgba(245, 245, 247, 0.64);
|
||
--accent: linear-gradient(135deg, #4b7bec, #34e7e4);
|
||
--accent-strong: linear-gradient(135deg, #ff9f1a, #ff3f34);
|
||
}}
|
||
* {{
|
||
box-sizing: border-box;
|
||
}}
|
||
body {{
|
||
margin: 0;
|
||
font-family: "SF Pro Display", "SF Pro SC", "PingFang SC", "Microsoft YaHei", sans-serif;
|
||
background: var(--bg-primary);
|
||
color: var(--text-main);
|
||
min-height: 100vh;
|
||
padding: 0 0 64px;
|
||
}}
|
||
a {{
|
||
color: #9fc9ff;
|
||
text-decoration: none;
|
||
}}
|
||
a:hover {{
|
||
text-decoration: underline;
|
||
}}
|
||
header.hero {{
|
||
padding: 80px 24px 40px;
|
||
text-align: center;
|
||
position: relative;
|
||
}}
|
||
header.hero::after {{
|
||
content: "";
|
||
position: absolute;
|
||
inset: 0;
|
||
background: radial-gradient(circle at 50% -20%, rgba(255, 255, 255, 0.18), transparent 55%);
|
||
pointer-events: none;
|
||
z-index: -1;
|
||
}}
|
||
.hero-title {{
|
||
font-size: clamp(32px, 5vw, 48px);
|
||
font-weight: 700;
|
||
margin: 0;
|
||
background: var(--accent);
|
||
-webkit-background-clip: text;
|
||
color: transparent;
|
||
letter-spacing: 0.8px;
|
||
}}
|
||
.hero-subtitle {{
|
||
margin: 16px auto 0;
|
||
max-width: 640px;
|
||
font-size: 18px;
|
||
color: var(--text-sub);
|
||
line-height: 1.6;
|
||
}}
|
||
main {{
|
||
width: min(1100px, 92vw);
|
||
margin: 0 auto;
|
||
}}
|
||
.meta-grid {{
|
||
display: grid;
|
||
gap: 20px;
|
||
margin: 0 auto 48px;
|
||
padding: 0 8px;
|
||
grid-template-columns: repeat(auto-fit, minmax(180px, 1fr));
|
||
}}
|
||
.related-section {{
|
||
margin: 0 auto 48px;
|
||
padding: 0 8px;
|
||
display: grid;
|
||
gap: 18px;
|
||
}}
|
||
.related-block {{
|
||
display: grid;
|
||
gap: 8px;
|
||
}}
|
||
.related-label {{
|
||
font-size: 13px;
|
||
text-transform: uppercase;
|
||
letter-spacing: 0.5px;
|
||
color: var(--text-sub);
|
||
}}
|
||
.related-items {{
|
||
display: flex;
|
||
gap: 10px;
|
||
flex-wrap: wrap;
|
||
}}
|
||
.related-link {{
|
||
display: inline-flex;
|
||
align-items: center;
|
||
padding: 6px 14px;
|
||
border-radius: 999px;
|
||
background: rgba(255, 255, 255, 0.08);
|
||
font-size: 13px;
|
||
letter-spacing: 0.4px;
|
||
transition: background 0.2s ease;
|
||
}}
|
||
.related-link:hover {{
|
||
background: rgba(255, 255, 255, 0.16);
|
||
text-decoration: none;
|
||
}}
|
||
.meta-card {{
|
||
border-radius: 18px;
|
||
padding: 22px;
|
||
border: 1px solid var(--border-soft);
|
||
background: rgba(255, 255, 255, 0.04);
|
||
backdrop-filter: blur(16px);
|
||
box-shadow: 0 12px 30px rgba(0, 0, 0, 0.35);
|
||
}}
|
||
.meta-key {{
|
||
font-size: 13px;
|
||
letter-spacing: 1px;
|
||
text-transform: uppercase;
|
||
color: var(--text-sub);
|
||
}}
|
||
.meta-value {{
|
||
margin-top: 6px;
|
||
font-size: 22px;
|
||
font-weight: 600;
|
||
}}
|
||
.guide-section {{
|
||
margin: 0 auto 56px;
|
||
padding: 0 8px;
|
||
}}
|
||
.section-header {{
|
||
display: flex;
|
||
flex-direction: column;
|
||
gap: 12px;
|
||
margin-bottom: 28px;
|
||
}}
|
||
.section-header h2 {{
|
||
margin: 0;
|
||
font-size: clamp(26px, 3vw, 32px);
|
||
letter-spacing: 0.5px;
|
||
}}
|
||
.section-desc {{
|
||
margin: 0;
|
||
font-size: 16px;
|
||
color: var(--text-sub);
|
||
max-width: 640px;
|
||
}}
|
||
.entries-grid {{
|
||
display: grid;
|
||
gap: 20px;
|
||
grid-template-columns: repeat(auto-fit, minmax(260px, 1fr));
|
||
}}
|
||
.entries-list {{
|
||
display: flex;
|
||
flex-direction: column;
|
||
gap: 18px;
|
||
}}
|
||
.entry-card {{
|
||
display: flex;
|
||
gap: 18px;
|
||
border-radius: 20px;
|
||
padding: 24px;
|
||
border: 1px solid transparent;
|
||
background: var(--bg-card);
|
||
transition: border 0.2s ease, background 0.2s ease, transform 0.2s ease;
|
||
}}
|
||
.entry-card:hover {{
|
||
background: var(--bg-card-hover);
|
||
border-color: rgba(255, 255, 255, 0.18);
|
||
transform: translateY(-4px);
|
||
}}
|
||
.entry-icon {{
|
||
font-size: 36px;
|
||
}}
|
||
.entry-content {{
|
||
flex: 1;
|
||
}}
|
||
.entry-content header {{
|
||
display: flex;
|
||
flex-direction: column;
|
||
gap: 8px;
|
||
margin-bottom: 10px;
|
||
}}
|
||
.entry-content h3 {{
|
||
margin: 0;
|
||
font-size: 20px;
|
||
display: inline-flex;
|
||
align-items: center;
|
||
gap: 10px;
|
||
}}
|
||
.entry-badge {{
|
||
display: inline-flex;
|
||
padding: 4px 10px;
|
||
border-radius: 999px;
|
||
font-size: 12px;
|
||
letter-spacing: 0.5px;
|
||
background: var(--accent-strong);
|
||
}}
|
||
.entry-tags {{
|
||
display: flex;
|
||
gap: 12px;
|
||
flex-wrap: wrap;
|
||
}}
|
||
.entry-tags-extra {{
|
||
display: flex;
|
||
gap: 8px;
|
||
flex-wrap: wrap;
|
||
margin-top: 10px;
|
||
}}
|
||
.entry-id {{
|
||
background: rgba(255, 255, 255, 0.1);
|
||
border-radius: 999px;
|
||
padding: 4px 10px;
|
||
font-size: 12px;
|
||
letter-spacing: 0.5px;
|
||
}}
|
||
.entry-category {{
|
||
font-size: 12px;
|
||
color: var(--text-sub);
|
||
}}
|
||
.entry-desc {{
|
||
margin: 0 0 12px;
|
||
color: var(--text-sub);
|
||
line-height: 1.6;
|
||
}}
|
||
.entry-meta {{
|
||
list-style: none;
|
||
margin: 0;
|
||
padding: 0;
|
||
display: grid;
|
||
gap: 6px;
|
||
font-size: 13px;
|
||
color: var(--text-sub);
|
||
}}
|
||
.entry-meta li {{
|
||
display: flex;
|
||
justify-content: space-between;
|
||
}}
|
||
.entry-link {{
|
||
display: inline-flex;
|
||
align-items: center;
|
||
gap: 6px;
|
||
margin-top: 10px;
|
||
font-size: 14px;
|
||
}}
|
||
.entry-tag {{
|
||
display: inline-flex;
|
||
align-items: center;
|
||
padding: 4px 12px;
|
||
border-radius: 999px;
|
||
background: rgba(255, 255, 255, 0.08);
|
||
font-size: 12px;
|
||
letter-spacing: 0.4px;
|
||
}}
|
||
.entry-details {{
|
||
margin-top: 14px;
|
||
display: grid;
|
||
gap: 10px;
|
||
}}
|
||
.entry-detail-paragraph {{
|
||
margin: 0;
|
||
color: var(--text-sub);
|
||
line-height: 1.6;
|
||
}}
|
||
.entry-detail-list, .entry-detail-steps {{
|
||
margin: 0;
|
||
padding-left: 20px;
|
||
color: var(--text-sub);
|
||
}}
|
||
.entry-detail-steps {{
|
||
list-style: none;
|
||
padding-left: 0;
|
||
}}
|
||
.entry-detail-steps li {{
|
||
display: grid;
|
||
grid-template-columns: 28px 1fr;
|
||
align-items: start;
|
||
gap: 10px;
|
||
margin-bottom: 6px;
|
||
}}
|
||
.entry-detail-steps .step-index {{
|
||
width: 28px;
|
||
height: 28px;
|
||
border-radius: 50%;
|
||
display: inline-flex;
|
||
align-items: center;
|
||
justify-content: center;
|
||
background: rgba(255, 255, 255, 0.12);
|
||
font-size: 12px;
|
||
}}
|
||
.entry-detail-table {{
|
||
width: 100%;
|
||
border-collapse: collapse;
|
||
border-radius: 14px;
|
||
overflow: hidden;
|
||
}}
|
||
.entry-detail-table th,
|
||
.entry-detail-table td {{
|
||
border: 1px solid rgba(255, 255, 255, 0.1);
|
||
padding: 8px 12px;
|
||
font-size: 13px;
|
||
text-align: left;
|
||
}}
|
||
.entry-detail-table th {{
|
||
background: rgba(255, 255, 255, 0.08);
|
||
font-weight: 600;
|
||
}}
|
||
.entry-detail-table tr:nth-child(even) {{
|
||
background: rgba(255, 255, 255, 0.04);
|
||
}}
|
||
.empty-placeholder {{
|
||
padding: 40px;
|
||
text-align: center;
|
||
border-radius: 20px;
|
||
border: 1px dashed rgba(255, 255, 255, 0.2);
|
||
color: var(--text-sub);
|
||
}}
|
||
@media (max-width: 680px) {{
|
||
.entry-card {{
|
||
flex-direction: column;
|
||
}}
|
||
.entry-content h3 {{
|
||
font-size: 18px;
|
||
}}
|
||
.meta-grid {{
|
||
grid-template-columns: 1fr 1fr;
|
||
}}
|
||
}}
|
||
@media (max-width: 480px) {{
|
||
.meta-grid {{
|
||
grid-template-columns: 1fr;
|
||
}}
|
||
}}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<header class="hero">
|
||
<h1 class="hero-title">{escape(page.title)}</h1>
|
||
{subtitle_html}
|
||
</header>
|
||
<main>
|
||
{metadata_html}
|
||
{related_html}
|
||
{sections_html}
|
||
</main>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
|
||
class MessageSender:
|
||
"""消息发送器"""
|
||
|
||
def __init__(self, webhook_url: str):
|
||
"""初始化消息发送器
|
||
|
||
Args:
|
||
webhook_url: Webhook URL
|
||
"""
|
||
self.webhook_url = webhook_url
|
||
self.client: Optional[httpx.AsyncClient] = None
|
||
|
||
async def _get_client(self) -> httpx.AsyncClient:
|
||
"""获取HTTP客户端(懒加载)"""
|
||
if self.client is None:
|
||
self.client = httpx.AsyncClient(timeout=10.0)
|
||
return self.client
|
||
|
||
async def send_message(self, message: Dict[str, Any]) -> bool:
|
||
"""发送消息到WPS
|
||
|
||
Args:
|
||
message: 消息字典
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
try:
|
||
client = await self._get_client()
|
||
response = await client.post(self.webhook_url, json=message)
|
||
|
||
if response.status_code == 200:
|
||
logger.Log("Info", f"消息发送成功: {message.get('msgtype')}")
|
||
return True
|
||
else:
|
||
logger.Log("Error", f"消息发送失败: status={response.status_code}, body={response.text}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.Log("Error", f"发送消息异常: {e}")
|
||
return False
|
||
|
||
async def send_text(self, content: str, at_user_id: Optional[int] = None) -> bool:
|
||
"""发送文本消息
|
||
|
||
Args:
|
||
content: 文本内容
|
||
at_user_id: @用户ID(可选)
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
# 如果需要@人
|
||
if at_user_id:
|
||
content = f'<at user_id="{at_user_id}"></at> {content}'
|
||
|
||
message = {
|
||
"msgtype": "text",
|
||
"text": {
|
||
"content": content
|
||
}
|
||
}
|
||
return await self.send_message(message)
|
||
|
||
async def send_markdown(self, text: str) -> bool:
|
||
"""发送Markdown消息
|
||
|
||
Args:
|
||
text: Markdown文本
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
message = {
|
||
"msgtype": "markdown",
|
||
"markdown": {
|
||
"text": text
|
||
}
|
||
}
|
||
return await self.send_message(message)
|
||
|
||
async def send_link(self, title: str, text: str,
|
||
message_url: str = "", btn_title: str = "查看详情") -> bool:
|
||
"""发送链接消息
|
||
|
||
Args:
|
||
title: 标题
|
||
text: 文本内容
|
||
message_url: 跳转URL
|
||
btn_title: 按钮文字
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
message = {
|
||
"msgtype": "link",
|
||
"link": {
|
||
"title": title,
|
||
"text": text,
|
||
"messageUrl": message_url,
|
||
"btnTitle": btn_title
|
||
}
|
||
}
|
||
return await self.send_message(message)
|
||
|
||
async def close(self):
|
||
"""关闭HTTP客户端"""
|
||
if self.client:
|
||
await self.client.aclose()
|
||
self.client = None
|
||
|
||
|
||
class BasicWPSInterface(PluginInterface):
|
||
user_id_to_username: Optional[Callable[[int], str]] = None
|
||
|
||
@override
|
||
def is_enable_plugin(self) -> bool:
|
||
return False
|
||
|
||
def get_webhook_url(self, message: str, chat_id: int, user_id: int) -> str:
|
||
'''
|
||
根据消息和用户ID获取Webhook URL, 返回空字符串表示不需要回复消息
|
||
Args:
|
||
message: 消息内容
|
||
chat_id: 聊天ID
|
||
user_id: 用户ID
|
||
Returns:
|
||
Webhook URL
|
||
'''
|
||
return ""
|
||
|
||
def get_webhook_request(self, data:Any|None) -> None:
|
||
pass
|
||
|
||
def get_message_sender(self, webhook_url: str) -> MessageSender:
|
||
return MessageSender(webhook_url)
|
||
|
||
def get_message_sender_type(self) -> Literal["text", "markdown", "link"]:
|
||
return "markdown"
|
||
|
||
def get_message_sender_function(self, webhook_url: str, type: Literal["text", "markdown", "link"]) -> Coroutine[Any, Any, bool]:
|
||
if type == "text":
|
||
return self.get_message_sender(webhook_url).send_text
|
||
elif type == "markdown":
|
||
return self.get_message_sender(webhook_url).send_markdown
|
||
elif type == "link":
|
||
return self.get_message_sender(webhook_url).send_link
|
||
else:
|
||
raise ValueError(f"Invalid message sender type: {type}")
|
||
|
||
def parse_message_after_at(self, message: str) -> str:
|
||
'''
|
||
已过时
|
||
'''
|
||
return message
|
||
|
||
async def send_markdown_message(self, message: str, chat_id: int, user_id: int) -> str|None:
|
||
webhook_url = self.get_webhook_url(message, chat_id, user_id)
|
||
if get_internal_debug():
|
||
logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, Chat ID: {chat_id}, User ID: {user_id}")
|
||
if webhook_url == "" or webhook_url == None:
|
||
return None
|
||
|
||
username: str = self.user_id_to_username(user_id) if self.user_id_to_username else ""
|
||
send_message = f"""## <at user_id="{user_id}">{username}</at>
|
||
---
|
||
{message}
|
||
"""
|
||
|
||
result = await self.get_message_sender_function(webhook_url, self.get_message_sender_type())(send_message)
|
||
if get_internal_verbose():
|
||
logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, Result: {result}")
|
||
return None
|
||
|
||
@override
|
||
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
|
||
message = self.parse_message_after_at(message)
|
||
if message == "":
|
||
return None
|
||
return await self.send_markdown_message(message, chat_id, user_id)
|
||
|
||
|
||
class WPSAPI(BasicWPSInterface):
|
||
"""核心 WPS 插件基类,提供图鉴模板设施。"""
|
||
|
||
guide_section_labels: Dict[str, str] = {
|
||
"commands": "指令一览",
|
||
"items": "物品与资源",
|
||
"recipes": "配方与合成",
|
||
"guides": "系统指引",
|
||
}
|
||
|
||
def get_guide_title(self) -> str:
|
||
return self.__class__.__name__
|
||
|
||
def get_guide_subtitle(self) -> str:
|
||
return ""
|
||
|
||
def get_guide_metadata(self) -> Dict[str, str]:
|
||
return {}
|
||
|
||
def collect_command_entries(self) -> Sequence[GuideEntry]:
|
||
return ()
|
||
|
||
def collect_item_entries(self) -> Sequence[GuideEntry]:
|
||
return ()
|
||
|
||
def collect_recipe_entries(self) -> Sequence[GuideEntry]:
|
||
return ()
|
||
|
||
def collect_guide_entries(self) -> Sequence[GuideEntry]:
|
||
return ()
|
||
|
||
def collect_additional_sections(self) -> Sequence[GuideSection]:
|
||
return ()
|
||
|
||
def collect_guide_sections(self) -> Sequence[GuideSection]:
|
||
sections: List[GuideSection] = []
|
||
|
||
command_entries = tuple(self.collect_command_entries())
|
||
if command_entries:
|
||
sections.append(
|
||
GuideSection(
|
||
title=self.guide_section_labels["commands"],
|
||
entries=command_entries,
|
||
layout="list",
|
||
)
|
||
)
|
||
|
||
item_entries = tuple(self.collect_item_entries())
|
||
if item_entries:
|
||
sections.append(
|
||
GuideSection(
|
||
title=self.guide_section_labels["items"],
|
||
entries=item_entries,
|
||
)
|
||
)
|
||
|
||
recipe_entries = tuple(self.collect_recipe_entries())
|
||
if recipe_entries:
|
||
sections.append(
|
||
GuideSection(
|
||
title=self.guide_section_labels["recipes"],
|
||
entries=recipe_entries,
|
||
)
|
||
)
|
||
|
||
guide_entries = tuple(self.collect_guide_entries())
|
||
if guide_entries:
|
||
sections.append(
|
||
GuideSection(
|
||
title=self.guide_section_labels["guides"],
|
||
entries=guide_entries,
|
||
layout="list",
|
||
)
|
||
)
|
||
|
||
additional_sections = tuple(self.collect_additional_sections())
|
||
if additional_sections:
|
||
sections.extend(additional_sections)
|
||
|
||
return tuple(sections)
|
||
|
||
def build_guide_page(self) -> GuidePage:
|
||
metadata: Dict[str, str] = {}
|
||
for key, value in self.get_guide_metadata().items():
|
||
metadata[key] = str(value)
|
||
|
||
related = self.get_related_links()
|
||
|
||
return GuidePage(
|
||
title=self.get_guide_title(),
|
||
subtitle=self.get_guide_subtitle(),
|
||
sections=self.collect_guide_sections(),
|
||
metadata=metadata,
|
||
related_links=related,
|
||
)
|
||
|
||
def render_guide_page(self, page: GuidePage) -> str:
|
||
return render_html_page(page)
|
||
|
||
def render_guide(self) -> str:
|
||
return self.render_guide_page(self.build_guide_page())
|
||
|
||
def get_guide_response(self, content: str) -> HTMLResponse:
|
||
return HTMLResponse(content)
|
||
|
||
@override
|
||
def generate_router_illustrated_guide(self):
|
||
async def handler() -> HTMLResponse:
|
||
return self.get_guide_response(self.render_guide())
|
||
|
||
return handler
|
||
|
||
def get_related_links(self) -> Dict[str, Sequence[Dict[str, str]]]:
|
||
links: Dict[str, Sequence[Dict[str, str]]] = {}
|
||
|
||
parents = []
|
||
for base in self.__class__.__mro__[1:]:
|
||
if not issubclass(base, WPSAPI):
|
||
continue
|
||
if base.__module__.startswith("Plugins."):
|
||
parents.append(base)
|
||
if base is WPSAPI:
|
||
break
|
||
if parents:
|
||
parents_links = [self._build_class_link(cls) for cls in reversed(parents)]
|
||
links["父类链"] = tuple(filter(None, parents_links))
|
||
|
||
child_links = [self._build_class_link(child) for child in self._iter_subclasses(self.__class__)]
|
||
child_links = [link for link in child_links if link]
|
||
if child_links:
|
||
links["子类"] = tuple(child_links)
|
||
|
||
return links
|
||
|
||
def _build_class_link(self, cls: type) -> Optional[Dict[str, str]]:
|
||
if not hasattr(cls, "__module__") or not cls.__module__.startswith("Plugins."):
|
||
return None
|
||
path = f"/api/{cls.__name__}"
|
||
return {
|
||
"label": cls.__name__,
|
||
"href": path,
|
||
}
|
||
|
||
def _iter_subclasses(self, cls: type) -> List[type]:
|
||
collected: List[type] = []
|
||
for subclass in cls.__subclasses__():
|
||
if not issubclass(subclass, WPSAPI):
|
||
continue
|
||
if not subclass.__module__.startswith("Plugins."):
|
||
continue
|
||
collected.append(subclass)
|
||
collected.extend(self._iter_subclasses(subclass))
|
||
return collected
|
||
|
||
def get_guide_subtitle(self) -> str:
|
||
return "核心 Webhook 转发插件"
|
||
|
||
def get_guide_metadata(self) -> Dict[str, str]:
|
||
return {
|
||
"Webhook 状态": "已配置" if MAIN_WEBHOOK_URL else "未配置",
|
||
}
|
||
|
||
def collect_command_entries(self) -> Sequence[GuideEntry]:
|
||
return (
|
||
{
|
||
"title": "say",
|
||
"identifier": "say",
|
||
"description": "将后续消息内容以 Markdown 形式发送到主 Webhook。",
|
||
"metadata": {"别名": "说"},
|
||
"icon": "🗣️",
|
||
"badge": "核心",
|
||
},
|
||
)
|
||
|
||
def collect_guide_entries(self) -> Sequence[GuideEntry]:
|
||
return (
|
||
{
|
||
"title": "Webhook 绑定",
|
||
"description": (
|
||
"在项目配置中设置 `main_webhook_url` 后插件自动启用,"
|
||
"所有注册的命令将调用 `send_markdown_message` 发送富文本。"
|
||
),
|
||
"icon": "🔗",
|
||
},
|
||
{
|
||
"title": "消息格式",
|
||
"description": (
|
||
"默认使用 Markdown 模式发送,支持 `聊天ID` 与 `用户ID` 的 @ 提醒。"
|
||
),
|
||
"icon": "📝",
|
||
},
|
||
)
|
||
|
||
@override
|
||
def is_enable_plugin(self) -> bool:
|
||
if MAIN_WEBHOOK_URL == "":
|
||
logger.Log("Error", f"{ConsoleFrontColor.RED}WPSAPI未配置主Webhook URL{ConsoleFrontColor.RESET}")
|
||
return MAIN_WEBHOOK_URL != ""
|
||
|
||
def get_main_webhook_url(self) -> str:
|
||
return MAIN_WEBHOOK_URL
|
||
|
||
@override
|
||
def get_webhook_url(self, message: str, chat_id: int, user_id: int) -> str:
|
||
webhook_url = ProjectConfig().GetFile(f"webhook_url/{chat_id}",True).LoadAsText()
|
||
if webhook_url == "":
|
||
webhook_url = self.get_main_webhook_url()
|
||
return webhook_url
|
||
|
||
@override
|
||
def get_webhook_request(self, data:Any|None) -> None:
|
||
return None
|
||
|
||
@override
|
||
def wake_up(self) -> None:
|
||
logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPI核心插件已加载{ConsoleFrontColor.RESET}")
|
||
self.register_plugin("say")
|
||
self.register_plugin("说")
|
||
|
||
class WPSAPIHelp(WPSAPI):
|
||
@override
|
||
def dependencies(self) -> List[Type]:
|
||
return [WPSAPI]
|
||
|
||
@override
|
||
def is_enable_plugin(self) -> bool:
|
||
return True
|
||
|
||
@override
|
||
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
|
||
mapper: Dict[str, List[str]] = {}
|
||
for key in PluginInterface.plugin_instances.keys():
|
||
plugin = PluginInterface.plugin_instances.get(key)
|
||
if plugin:
|
||
if plugin.__class__.__name__ not in mapper:
|
||
mapper[plugin.__class__.__name__] = []
|
||
mapper[plugin.__class__.__name__].append(key)
|
||
desc = ""
|
||
for plugin_name, keys in mapper.items():
|
||
desc += f"- {plugin_name}: {", ".join(keys)}\n"
|
||
return await self.send_markdown_message(f"""# 指令入口
|
||
{desc}
|
||
""", chat_id, user_id)
|
||
|
||
@override
|
||
def wake_up(self) -> None:
|
||
logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPIHelp 插件已加载{ConsoleFrontColor.RESET}")
|
||
self.register_plugin("help")
|
||
self.register_plugin("帮助")
|
||
|
||
class WPSAPIWebhook(WPSAPI):
|
||
@override
|
||
def dependencies(self) -> List[Type]:
|
||
return [WPSAPI]
|
||
|
||
@override
|
||
def is_enable_plugin(self) -> bool:
|
||
return True
|
||
|
||
@override
|
||
def wake_up(self) -> None:
|
||
logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPIWebhook 插件已加载{ConsoleFrontColor.RESET}")
|
||
self.register_plugin("chat_url_register")
|
||
self.register_plugin("会话注册")
|
||
|
||
@override
|
||
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
|
||
ProjectConfig().GetFile(f"webhook_url/{chat_id}",True).SaveAsText(message)
|
||
return await self.send_markdown_message(f"会话注册成功", chat_id, user_id)
|
||
|
||
class NewsAIPlugin(WPSAPI):
|
||
"""新闻AI智能问答插件"""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.ai_agent = NewsAIAgent(ollama_url)
|
||
|
||
@override
|
||
def dependencies(self) -> List[Type]:
|
||
return [WPSAPI]
|
||
|
||
@override
|
||
def is_enable_plugin(self) -> bool:
|
||
return True
|
||
|
||
def get_guide_title(self) -> str:
|
||
return "新闻AI智能问答"
|
||
|
||
def get_guide_subtitle(self) -> str:
|
||
return "基于AI的新闻联播智能问答系统"
|
||
|
||
def get_guide_metadata(self) -> Dict[str, str]:
|
||
return {
|
||
"AI模型": "qwen3:0.6b",
|
||
"数据源": "每日新闻联播",
|
||
"功能": "智能问答",
|
||
}
|
||
|
||
def collect_command_entries(self) -> Sequence[GuideEntry]:
|
||
return (
|
||
{
|
||
"title": "问新闻",
|
||
"identifier": "ask_news",
|
||
"description": "询问新闻内容,支持自动识别日期或查询今天/昨天的新闻。",
|
||
"metadata": {"别名": "新闻问答, 问"},
|
||
"icon": "🤖",
|
||
"badge": "AI",
|
||
"details": [
|
||
{
|
||
"type": "list",
|
||
"items": [
|
||
"支持日期格式:2024年11月19日、2024-11-19、2024/11/19",
|
||
"支持相对日期:今天、昨天、今日、昨日",
|
||
"自动使用AI分析新闻内容并回答问题",
|
||
"示例:问 今天有什么重要新闻?",
|
||
"示例:问 2024年11月19日 经济相关的新闻有哪些?",
|
||
]
|
||
}
|
||
]
|
||
},
|
||
{
|
||
"title": "新闻摘要",
|
||
"identifier": "news_summary",
|
||
"description": "获取指定日期新闻的AI摘要。",
|
||
"metadata": {"别名": "摘要"},
|
||
"icon": "📝",
|
||
"badge": "AI",
|
||
},
|
||
)
|
||
|
||
def collect_guide_entries(self) -> Sequence[GuideEntry]:
|
||
return (
|
||
{
|
||
"title": "智能问答",
|
||
"description": (
|
||
"使用AI理解用户问题,从新闻内容中提取相关信息进行回答。"
|
||
"支持自然语言提问,可以询问特定主题、人物、事件等。"
|
||
),
|
||
"icon": "💬",
|
||
},
|
||
{
|
||
"title": "日期识别",
|
||
"description": (
|
||
"自动从问题中识别日期,支持多种格式。"
|
||
"如果未指定日期,默认查询当天新闻。"
|
||
),
|
||
"icon": "📅",
|
||
},
|
||
{
|
||
"title": "内容抓取",
|
||
"description": (
|
||
"自动从新闻网站抓取指定日期的新闻联播文字版内容。"
|
||
),
|
||
"icon": "🌐",
|
||
},
|
||
)
|
||
|
||
@override
|
||
def wake_up(self) -> None:
|
||
logger.Log("Info", f"{ConsoleFrontColor.GREEN}NewsAIPlugin 新闻AI智能问答插件已加载{ConsoleFrontColor.RESET}")
|
||
self.register_plugin("default")
|
||
|
||
@override
|
||
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
|
||
"""处理用户问题"""
|
||
try:
|
||
if not message or message.strip() == "":
|
||
help_text = """# 📰 新闻AI智能问答使用帮助
|
||
|
||
**直接提问即可,智能体会自动:**
|
||
1. 识别你想查询的日期
|
||
2. 获取对应日期的新闻内容
|
||
3. 基于新闻内容回答你的问题
|
||
|
||
**支持的日期格式:**
|
||
- 今天、昨天、今日、昨日
|
||
- 2025年11月19日
|
||
- 2025-11-19
|
||
- 2025/11/19
|
||
|
||
**示例问题:**
|
||
- `今天有什么重要新闻?`
|
||
- `2025年11月17日有什么新闻?`
|
||
- `昨天的新闻中有关于经济的内容吗?`
|
||
- `今天习近平主席有什么活动?`
|
||
- `请总结今天新闻联播的主要内容`
|
||
|
||
**特性:**
|
||
✨ 智能理解问题意图
|
||
🤖 自动调用工具获取信息
|
||
💾 新闻内容自动缓存
|
||
🚀 基于LlamaIndex + Ollama"""
|
||
return await self.send_markdown_message(help_text, chat_id, user_id)
|
||
|
||
# 使用智能体回答问题
|
||
answer = await self.ai_agent.answer_question(message)
|
||
|
||
# 格式化返回结果
|
||
formatted_answer = f"""📰 **新闻AI智能问答**
|
||
|
||
{answer}
|
||
|
||
---
|
||
*由 LlamaIndex + Ollama 驱动*"""
|
||
|
||
return await self.send_markdown_message(formatted_answer, chat_id, user_id)
|
||
|
||
except Exception as e:
|
||
logger.Log("Error", f"新闻AI问答异常: {e}")
|
||
import traceback
|
||
error_detail = traceback.format_exc()
|
||
logger.Log("Error", f"详细错误: {error_detail}")
|
||
error_msg = f"""❌ **处理问题时出错**
|
||
|
||
错误信息:{str(e)}
|
||
|
||
请稍后重试或联系管理员。"""
|
||
return await self.send_markdown_message(error_msg, chat_id, user_id)
|
||
|
||
logger.SaveProperties() |