Files
NewsGetWPSBot/Plugins/WPSAPI.py
2025-11-19 11:52:24 +08:00

1491 lines
50 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from PWF.Convention.Runtime.Config import *
from PWF.CoreModules.plugin_interface import PluginInterface
from PWF.CoreModules.flags import *
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ProjectConfig
from PWF.Convention.Runtime.Web import ToolURL
from PWF.Convention.Runtime.String import LimitStringLength
from fastapi.responses import HTMLResponse
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Sequence, TypedDict, override, Union
import httpx
import re
from datetime import datetime, timedelta
from llama_index.core.tools import FunctionTool
from llama_index.llms.ollama import Ollama
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.agent.workflow.react_agent import ReActAgent
from bs4 import BeautifulSoup
logger: ProjectConfig = Architecture.Get(ProjectConfig)
MAIN_WEBHOOK_URL = logger.FindItem("main_webhook_url", "")
logger.SaveProperties()
def get_target_web_page_url(year:str, month:str, day:str) -> str:
return fr"http://mrxwlb.com/{year}/{month}/{day}/{year}{month}{day}日新闻联播文字版/"
ollama_url = "http://www.liubai.site:11434"
class NewsAIAgent:
"""新闻AI智能体 - 基于LlamaIndex和Ollama的工具调用智能体"""
def __init__(self, ollama_url: str = "http://www.liubai.site:11434"):
self.ollama_url = ollama_url
self.client: Optional[httpx.AsyncClient] = None
self.llm = Ollama(model="qwen2.5:7b", base_url=ollama_url, request_timeout=600.0)
self.workflow: Optional[AgentWorkflow] = None
self._initialize_agent()
def _initialize_agent(self):
"""初始化智能体和工具"""
# 创建工具函数
tools = [
FunctionTool.from_defaults(
fn=self._get_current_date,
name="get_current_date",
description="获取当前日期,返回格式为'年-月-日',例如'2025-11-19'"
),
FunctionTool.from_defaults(
fn=self._get_yesterday_date,
name="get_yesterday_date",
description="获取昨天的日期,返回格式为'年-月-日',例如'2025-11-18'"
),
FunctionTool.from_defaults(
fn=self._get_news_content,
name="get_news_content",
description="获取指定日期的新闻联播文字内容。参数date格式为'年-月-日',例如'2025-11-19'。返回该日期的新闻内容文本。"
),
FunctionTool.from_defaults(
fn=self._parse_date_from_text,
name="parse_date_from_text",
description="从文本中解析日期。支持格式:'2025年11月19日''2025-11-19''2025/11/19''今天''昨天'等。返回格式为'年-月-日'"
),
]
# 系统提示词 - 更清晰明确的指令
system_prompt = """你是一个新闻分析助手,专门回答关于新闻联播的问题。
【重要】你必须按照以下步骤使用工具:
步骤1: 确定日期
- 如果用户问"今天""今日",调用 get_current_date
- 如果用户问"昨天""昨日",调用 get_yesterday_date
- 如果用户提到具体日期(如"2025年11月17日"),调用 parse_date_from_text
步骤2: 获取新闻内容
- 拿到日期后,必须调用 get_news_content(date="YYYY-MM-DD") 获取新闻
- 注意date参数格式必须是 "年-月-日",例如 "2025-11-19"
步骤3: 回答问题
- 仔细阅读获取到的新闻内容
- 基于新闻内容准确回答用户的问题
- 如果新闻中没有相关信息,明确告知用户
【禁止】直接回答问题而不调用工具!你必须先获取新闻内容才能回答。"""
# 创建ReActAgent
agent = ReActAgent(
llm=self.llm,
tools=tools,
verbose=True,
system_prompt=system_prompt,
)
# 创建AgentWorkflow
self.workflow = AgentWorkflow(
agents=[agent],
timeout=600.0,
)
async def _get_client(self) -> httpx.AsyncClient:
"""获取HTTP客户端懒加载"""
if self.client is None:
self.client = httpx.AsyncClient(timeout=120.0)
return self.client
def _get_current_date(self) -> str:
"""获取当前日期
Returns:
当前日期字符串,格式:年-月-日
"""
now = datetime.now()
return f"{now.year}-{str(now.month).zfill(2)}-{str(now.day).zfill(2)}"
def _get_yesterday_date(self) -> str:
"""获取昨天日期
Returns:
昨天日期字符串,格式:年-月-日
"""
yesterday = datetime.now() - timedelta(days=1)
return f"{yesterday.year}-{str(yesterday.month).zfill(2)}-{str(yesterday.day).zfill(2)}"
def _parse_date_from_text(self, text: str) -> str:
"""从文本中解析日期
Args:
text: 包含日期信息的文本
Returns:
日期字符串,格式:年-月-日
"""
# 尝试匹配日期格式
date_patterns = [
r'(\d{4})年(\d{1,2})月(\d{1,2})日',
r'(\d{4})-(\d{1,2})-(\d{1,2})',
r'(\d{4})/(\d{1,2})/(\d{1,2})',
]
for pattern in date_patterns:
match = re.search(pattern, text)
if match:
year, month, day = match.groups()
return f"{year}-{month.zfill(2)}-{day.zfill(2)}"
# 检查相对日期
if "今天" in text or "今日" in text:
return self._get_current_date()
elif "昨天" in text or "昨日" in text:
return self._get_yesterday_date()
# 默认返回今天
return self._get_current_date()
def _get_news_content(self, date: str) -> str:
"""获取指定日期的新闻内容(带缓存)
Args:
date: 日期字符串,格式:年-月-日,例如'2025-11-19'
Returns:
新闻文字内容
"""
try:
# 解析日期
parts = date.split('-')
if len(parts) != 3:
return f"日期格式错误,请使用'年-月-日'格式,例如'2025-11-19'"
year, month, day = parts
# 去掉前导零某些网站URL格式要求
month = str(int(month))
day = str(int(day))
# 检查缓存
cache_key = f"news_cache/{year}/{month}/{day}"
cached_file = ProjectConfig().GetFile(cache_key, False)
if cached_file.Exists():
cached_content = cached_file.LoadAsText()
logger.Log("Info", f"从缓存加载新闻: {date}")
return cached_content
# 如果没有缓存,则抓取网页
logger.Log("Info", f"从网页抓取新闻: {date}")
url = get_target_web_page_url(year, month, day)
# 使用同步方式获取(因为工具函数需要同步)
import requests
# 直接使用中文URL让requests自动处理编码
url = f"http://mrxwlb.com/{year}/{month}/{day}/{year}{month}{day}日新闻联播文字版/"
# 添加浏览器请求头,模拟真实浏览器访问
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
logger.Log("Info", f"请求URL: {url}")
response = requests.get(url, headers=headers, timeout=30, allow_redirects=True)
response.raise_for_status()
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
# 移除script和style标签
for script in soup(["script", "style"]):
script.decompose()
# 获取文本内容
content = soup.get_text()
# 清理空白字符
lines = (line.strip() for line in content.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
content = ' '.join(chunk for chunk in chunks if chunk)
if not content or len(content) < 100:
return f"未能获取到{year}{month}{day}日的新闻内容"
# 保存到缓存
ProjectConfig().GetFile(cache_key, True).SaveAsText(content)
logger.Log("Info", f"新闻内容已缓存: {date}")
return content
except Exception as e:
logger.Log("Error", f"获取新闻失败: {e}")
return f"获取新闻时出错: {str(e)}"
async def answer_question(self, query: str) -> str:
"""根据问题回答新闻内容
Args:
query: 用户问题
Returns:
AI生成的回答
"""
try:
logger.Log("Info", f"="*50)
logger.Log("Info", f"用户提问: {query}")
logger.Log("Info", f"使用模型: {self.llm.model}")
logger.Log("Info", f"="*50)
# 使用workflow运行agent注意参数是user_msg
result = await self.workflow.run(user_msg=query)
logger.Log("Info", f"Agent执行完成结果类型: {type(result)}")
# 提取回答内容
if hasattr(result, 'response'):
answer = str(result.response)
logger.Log("Info", f"从result.response提取答案")
elif hasattr(result, 'message'):
answer = str(result.message)
logger.Log("Info", f"从result.message提取答案")
else:
answer = str(result)
logger.Log("Info", f"直接转换result为字符串")
logger.Log("Info", f"最终答案长度: {len(answer)} 字符")
return answer
except Exception as e:
logger.Log("Error", f"回答问题失败: {e}")
import traceback
error_trace = traceback.format_exc()
logger.Log("Error", f"详细错误:\n{error_trace}")
return f"处理问题时出错: {str(e)}"
async def close(self):
"""关闭客户端"""
if self.client:
await self.client.aclose()
self.client = None
class GuideEntry(TypedDict, total=False):
"""单条图鉴信息。"""
title: str
identifier: str
description: str
category: str
metadata: Dict[str, str]
icon: str
badge: str
links: Sequence[Dict[str, str]]
tags: Sequence[str]
details: Sequence[Union[str, Dict[str, Any]]]
group: str
@dataclass(frozen=True)
class GuideSection:
"""图鉴章节。"""
title: str
entries: Sequence[GuideEntry] = field(default_factory=tuple)
description: str = ""
layout: str = "grid"
section_id: str | None = None
@dataclass(frozen=True)
class GuidePage:
"""完整图鉴页面。"""
title: str
sections: Sequence[GuideSection] = field(default_factory=tuple)
subtitle: str = ""
metadata: Dict[str, str] = field(default_factory=dict)
related_links: Dict[str, Sequence[Dict[str, str]]] = field(default_factory=dict)
def render_markdown_page(page: GuidePage) -> str:
"""保留 Markdown 渲染(备用)。"""
def _render_section(section: GuideSection) -> str:
lines: List[str] = [f"## {section.title}"]
if section.description:
lines.append(section.description)
if not section.entries:
lines.append("> 暂无内容。")
return "\n".join(lines)
for entry in section.entries:
title = entry.get("title", "未命名")
identifier = entry.get("identifier")
desc = entry.get("description", "")
category = entry.get("category")
metadata = entry.get("metadata", {})
bullet = f"- **{title}**"
if identifier:
bullet += f"`{identifier}`"
if category:
bullet += f"{category}"
lines.append(bullet)
if desc:
lines.append(f" - {desc}")
for meta_key, meta_value in metadata.items():
lines.append(f" - {meta_key}{meta_value}")
return "\n".join(lines)
lines: List[str] = [f"# {page.title}"]
if page.subtitle:
lines.append(page.subtitle)
if page.metadata:
lines.append("")
for key, value in page.metadata.items():
lines.append(f"- {key}{value}")
for section in page.sections:
lines.append("")
lines.append(_render_section(section))
return "\n".join(lines)
def render_html_page(page: GuidePage) -> str:
"""渲染 Apple Store 风格的 HTML 页面。"""
def escape(text: Optional[str]) -> str:
if not text:
return ""
return (
text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
)
def render_metadata(metadata: Dict[str, str]) -> str:
if not metadata:
return ""
cards = []
for key, value in metadata.items():
cards.append(
f"""
<div class="meta-card">
<div class="meta-key">{escape(key)}</div>
<div class="meta-value">{escape(value)}</div>
</div>
"""
)
return f'<section class="meta-grid">{"".join(cards)}</section>'
def render_links(links: Optional[Sequence[Dict[str, str]]]) -> str:
if not links:
return ""
items = []
for link in links:
href = escape(link.get("href", "#"))
label = escape(link.get("label", "前往"))
items.append(f'<a class="entry-link" href="{href}" target="_blank">{label}</a>')
return "".join(items)
def render_tags(tags: Optional[Sequence[str]]) -> str:
if not tags:
return ""
chips = "".join(f'<span class="entry-tag">{escape(tag)}</span>' for tag in tags)
return f'<div class="entry-tags-extra">{chips}</div>'
def render_details(details: Optional[Sequence[Union[str, Dict[str, Any]]]]) -> str:
if not details:
return ""
blocks: List[str] = []
for detail in details:
if isinstance(detail, str):
blocks.append(f'<p class="entry-detail-paragraph">{escape(detail)}</p>')
elif isinstance(detail, dict):
kind = detail.get("type")
if kind == "list":
items = "".join(
f'<li>{escape(str(item))}</li>' for item in detail.get("items", [])
)
blocks.append(f'<ul class="entry-detail-list">{items}</ul>')
elif kind == "steps":
items = "".join(
f'<li><span class="step-index">{idx+1}</span><span>{escape(str(item))}</span></li>'
for idx, item in enumerate(detail.get("items", []))
)
blocks.append(f'<ol class="entry-detail-steps">{items}</ol>')
elif kind == "table":
rows = []
for row in detail.get("rows", []):
cols = "".join(f"<td>{escape(str(col))}</td>" for col in row)
rows.append(f"<tr>{cols}</tr>")
head = ""
headers = detail.get("headers")
if headers:
head = "".join(f"<th>{escape(str(col))}</th>" for col in headers)
head = f"<thead><tr>{head}</tr></thead>"
blocks.append(
f'<table class="entry-detail-table">{head}<tbody>{"".join(rows)}</tbody></table>'
)
if not blocks:
return ""
return f'<div class="entry-details">{"".join(blocks)}</div>'
def render_entry(entry: GuideEntry) -> str:
icon = escape(entry.get("icon"))
badge = escape(entry.get("badge"))
title = escape(entry.get("title"))
identifier = escape(entry.get("identifier"))
description = escape(entry.get("description"))
category = escape(entry.get("category"))
metadata_items = []
for meta_key, meta_value in entry.get("metadata", {}).items():
metadata_items.append(
f'<li><span>{escape(meta_key)}</span><span>{escape(str(meta_value))}</span></li>'
)
metadata_html = ""
if metadata_items:
metadata_html = f'<ul class="entry-meta">{"".join(metadata_items)}</ul>'
identifier_html = f'<code class="entry-id">{identifier}</code>' if identifier else ""
category_html = f'<span class="entry-category">{category}</span>' if category else ""
badge_html = f'<span class="entry-badge">{badge}</span>' if badge else ""
icon_html = f'<div class="entry-icon">{icon}</div>' if icon else ""
links_html = render_links(entry.get("links"))
tags_html = render_tags(entry.get("tags"))
details_html = render_details(entry.get("details"))
group = escape(entry.get("group"))
group_attr = f' data-group="{group}"' if group else ""
return f"""
<article class="entry-card"{group_attr}>
{icon_html}
<div class="entry-content">
<header>
<h3>{title}{badge_html}</h3>
<div class="entry-tags">{identifier_html}{category_html}</div>
</header>
<p class="entry-desc">{description}</p>
{metadata_html}
{tags_html}
{details_html}
{links_html}
</div>
</article>
"""
def render_section(section: GuideSection) -> str:
layout_class = "entries-grid" if section.layout == "grid" else "entries-list"
section_attr = f' id="{escape(section.section_id)}"' if section.section_id else ""
cards = "".join(render_entry(entry) for entry in section.entries)
description_html = (
f'<p class="section-desc">{escape(section.description)}</p>'
if section.description
else ""
)
if not cards:
cards = '<div class="empty-placeholder">暂无内容</div>'
return f"""
<section class="guide-section"{section_attr}>
<div class="section-header">
<h2>{escape(section.title)}</h2>
{description_html}
</div>
<div class="{layout_class}">
{cards}
</div>
</section>
"""
def render_related(related: Dict[str, Sequence[Dict[str, str]]]) -> str:
if not related:
return ""
blocks: List[str] = []
for label, links in related.items():
if not links:
continue
items = "".join(
f'<a class="related-link" href="{escape(link.get("href", "#"))}">{escape(link.get("label", ""))}</a>'
for link in links
)
blocks.append(
f"""
<div class="related-block">
<div class="related-label">{escape(label)}</div>
<div class="related-items">{items}</div>
</div>
"""
)
if not blocks:
return ""
return f'<section class="related-section">{"".join(blocks)}</section>'
sections_html = "".join(render_section(section) for section in page.sections)
metadata_html = render_metadata(page.metadata)
related_html = render_related(page.related_links)
subtitle_html = f'<p class="hero-subtitle">{escape(page.subtitle)}</p>' if page.subtitle else ""
return f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>{escape(page.title)}</title>
<style>
:root {{
color-scheme: dark;
--bg-primary: radial-gradient(120% 120% at 10% 10%, #222 0%, #050505 100%);
--bg-card: rgba(255, 255, 255, 0.06);
--bg-card-hover: rgba(255, 255, 255, 0.12);
--border-soft: rgba(255, 255, 255, 0.1);
--text-main: #f5f5f7;
--text-sub: rgba(245, 245, 247, 0.64);
--accent: linear-gradient(135deg, #4b7bec, #34e7e4);
--accent-strong: linear-gradient(135deg, #ff9f1a, #ff3f34);
}}
* {{
box-sizing: border-box;
}}
body {{
margin: 0;
font-family: "SF Pro Display", "SF Pro SC", "PingFang SC", "Microsoft YaHei", sans-serif;
background: var(--bg-primary);
color: var(--text-main);
min-height: 100vh;
padding: 0 0 64px;
}}
a {{
color: #9fc9ff;
text-decoration: none;
}}
a:hover {{
text-decoration: underline;
}}
header.hero {{
padding: 80px 24px 40px;
text-align: center;
position: relative;
}}
header.hero::after {{
content: "";
position: absolute;
inset: 0;
background: radial-gradient(circle at 50% -20%, rgba(255, 255, 255, 0.18), transparent 55%);
pointer-events: none;
z-index: -1;
}}
.hero-title {{
font-size: clamp(32px, 5vw, 48px);
font-weight: 700;
margin: 0;
background: var(--accent);
-webkit-background-clip: text;
color: transparent;
letter-spacing: 0.8px;
}}
.hero-subtitle {{
margin: 16px auto 0;
max-width: 640px;
font-size: 18px;
color: var(--text-sub);
line-height: 1.6;
}}
main {{
width: min(1100px, 92vw);
margin: 0 auto;
}}
.meta-grid {{
display: grid;
gap: 20px;
margin: 0 auto 48px;
padding: 0 8px;
grid-template-columns: repeat(auto-fit, minmax(180px, 1fr));
}}
.related-section {{
margin: 0 auto 48px;
padding: 0 8px;
display: grid;
gap: 18px;
}}
.related-block {{
display: grid;
gap: 8px;
}}
.related-label {{
font-size: 13px;
text-transform: uppercase;
letter-spacing: 0.5px;
color: var(--text-sub);
}}
.related-items {{
display: flex;
gap: 10px;
flex-wrap: wrap;
}}
.related-link {{
display: inline-flex;
align-items: center;
padding: 6px 14px;
border-radius: 999px;
background: rgba(255, 255, 255, 0.08);
font-size: 13px;
letter-spacing: 0.4px;
transition: background 0.2s ease;
}}
.related-link:hover {{
background: rgba(255, 255, 255, 0.16);
text-decoration: none;
}}
.meta-card {{
border-radius: 18px;
padding: 22px;
border: 1px solid var(--border-soft);
background: rgba(255, 255, 255, 0.04);
backdrop-filter: blur(16px);
box-shadow: 0 12px 30px rgba(0, 0, 0, 0.35);
}}
.meta-key {{
font-size: 13px;
letter-spacing: 1px;
text-transform: uppercase;
color: var(--text-sub);
}}
.meta-value {{
margin-top: 6px;
font-size: 22px;
font-weight: 600;
}}
.guide-section {{
margin: 0 auto 56px;
padding: 0 8px;
}}
.section-header {{
display: flex;
flex-direction: column;
gap: 12px;
margin-bottom: 28px;
}}
.section-header h2 {{
margin: 0;
font-size: clamp(26px, 3vw, 32px);
letter-spacing: 0.5px;
}}
.section-desc {{
margin: 0;
font-size: 16px;
color: var(--text-sub);
max-width: 640px;
}}
.entries-grid {{
display: grid;
gap: 20px;
grid-template-columns: repeat(auto-fit, minmax(260px, 1fr));
}}
.entries-list {{
display: flex;
flex-direction: column;
gap: 18px;
}}
.entry-card {{
display: flex;
gap: 18px;
border-radius: 20px;
padding: 24px;
border: 1px solid transparent;
background: var(--bg-card);
transition: border 0.2s ease, background 0.2s ease, transform 0.2s ease;
}}
.entry-card:hover {{
background: var(--bg-card-hover);
border-color: rgba(255, 255, 255, 0.18);
transform: translateY(-4px);
}}
.entry-icon {{
font-size: 36px;
}}
.entry-content {{
flex: 1;
}}
.entry-content header {{
display: flex;
flex-direction: column;
gap: 8px;
margin-bottom: 10px;
}}
.entry-content h3 {{
margin: 0;
font-size: 20px;
display: inline-flex;
align-items: center;
gap: 10px;
}}
.entry-badge {{
display: inline-flex;
padding: 4px 10px;
border-radius: 999px;
font-size: 12px;
letter-spacing: 0.5px;
background: var(--accent-strong);
}}
.entry-tags {{
display: flex;
gap: 12px;
flex-wrap: wrap;
}}
.entry-tags-extra {{
display: flex;
gap: 8px;
flex-wrap: wrap;
margin-top: 10px;
}}
.entry-id {{
background: rgba(255, 255, 255, 0.1);
border-radius: 999px;
padding: 4px 10px;
font-size: 12px;
letter-spacing: 0.5px;
}}
.entry-category {{
font-size: 12px;
color: var(--text-sub);
}}
.entry-desc {{
margin: 0 0 12px;
color: var(--text-sub);
line-height: 1.6;
}}
.entry-meta {{
list-style: none;
margin: 0;
padding: 0;
display: grid;
gap: 6px;
font-size: 13px;
color: var(--text-sub);
}}
.entry-meta li {{
display: flex;
justify-content: space-between;
}}
.entry-link {{
display: inline-flex;
align-items: center;
gap: 6px;
margin-top: 10px;
font-size: 14px;
}}
.entry-tag {{
display: inline-flex;
align-items: center;
padding: 4px 12px;
border-radius: 999px;
background: rgba(255, 255, 255, 0.08);
font-size: 12px;
letter-spacing: 0.4px;
}}
.entry-details {{
margin-top: 14px;
display: grid;
gap: 10px;
}}
.entry-detail-paragraph {{
margin: 0;
color: var(--text-sub);
line-height: 1.6;
}}
.entry-detail-list, .entry-detail-steps {{
margin: 0;
padding-left: 20px;
color: var(--text-sub);
}}
.entry-detail-steps {{
list-style: none;
padding-left: 0;
}}
.entry-detail-steps li {{
display: grid;
grid-template-columns: 28px 1fr;
align-items: start;
gap: 10px;
margin-bottom: 6px;
}}
.entry-detail-steps .step-index {{
width: 28px;
height: 28px;
border-radius: 50%;
display: inline-flex;
align-items: center;
justify-content: center;
background: rgba(255, 255, 255, 0.12);
font-size: 12px;
}}
.entry-detail-table {{
width: 100%;
border-collapse: collapse;
border-radius: 14px;
overflow: hidden;
}}
.entry-detail-table th,
.entry-detail-table td {{
border: 1px solid rgba(255, 255, 255, 0.1);
padding: 8px 12px;
font-size: 13px;
text-align: left;
}}
.entry-detail-table th {{
background: rgba(255, 255, 255, 0.08);
font-weight: 600;
}}
.entry-detail-table tr:nth-child(even) {{
background: rgba(255, 255, 255, 0.04);
}}
.empty-placeholder {{
padding: 40px;
text-align: center;
border-radius: 20px;
border: 1px dashed rgba(255, 255, 255, 0.2);
color: var(--text-sub);
}}
@media (max-width: 680px) {{
.entry-card {{
flex-direction: column;
}}
.entry-content h3 {{
font-size: 18px;
}}
.meta-grid {{
grid-template-columns: 1fr 1fr;
}}
}}
@media (max-width: 480px) {{
.meta-grid {{
grid-template-columns: 1fr;
}}
}}
</style>
</head>
<body>
<header class="hero">
<h1 class="hero-title">{escape(page.title)}</h1>
{subtitle_html}
</header>
<main>
{metadata_html}
{related_html}
{sections_html}
</main>
</body>
</html>
"""
class MessageSender:
"""消息发送器"""
def __init__(self, webhook_url: str):
"""初始化消息发送器
Args:
webhook_url: Webhook URL
"""
self.webhook_url = webhook_url
self.client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""获取HTTP客户端懒加载"""
if self.client is None:
self.client = httpx.AsyncClient(timeout=10.0)
return self.client
async def send_message(self, message: Dict[str, Any]) -> bool:
"""发送消息到WPS
Args:
message: 消息字典
Returns:
是否发送成功
"""
try:
client = await self._get_client()
response = await client.post(self.webhook_url, json=message)
if response.status_code == 200:
logger.Log("Info", f"消息发送成功: {message.get('msgtype')}")
return True
else:
logger.Log("Error", f"消息发送失败: status={response.status_code}, body={response.text}")
return False
except Exception as e:
logger.Log("Error", f"发送消息异常: {e}")
return False
async def send_text(self, content: str, at_user_id: Optional[int] = None) -> bool:
"""发送文本消息
Args:
content: 文本内容
at_user_id: @用户ID可选
Returns:
是否发送成功
"""
# 如果需要@人
if at_user_id:
content = f'<at user_id="{at_user_id}"></at> {content}'
message = {
"msgtype": "text",
"text": {
"content": content
}
}
return await self.send_message(message)
async def send_markdown(self, text: str) -> bool:
"""发送Markdown消息
Args:
text: Markdown文本
Returns:
是否发送成功
"""
message = {
"msgtype": "markdown",
"markdown": {
"text": text
}
}
return await self.send_message(message)
async def send_link(self, title: str, text: str,
message_url: str = "", btn_title: str = "查看详情") -> bool:
"""发送链接消息
Args:
title: 标题
text: 文本内容
message_url: 跳转URL
btn_title: 按钮文字
Returns:
是否发送成功
"""
message = {
"msgtype": "link",
"link": {
"title": title,
"text": text,
"messageUrl": message_url,
"btnTitle": btn_title
}
}
return await self.send_message(message)
async def close(self):
"""关闭HTTP客户端"""
if self.client:
await self.client.aclose()
self.client = None
class BasicWPSInterface(PluginInterface):
user_id_to_username: Optional[Callable[[int], str]] = None
@override
def is_enable_plugin(self) -> bool:
return False
def get_webhook_url(self, message: str, chat_id: int, user_id: int) -> str:
'''
根据消息和用户ID获取Webhook URL, 返回空字符串表示不需要回复消息
Args:
message: 消息内容
chat_id: 聊天ID
user_id: 用户ID
Returns:
Webhook URL
'''
return ""
def get_webhook_request(self, data:Any|None) -> None:
pass
def get_message_sender(self, webhook_url: str) -> MessageSender:
return MessageSender(webhook_url)
def get_message_sender_type(self) -> Literal["text", "markdown", "link"]:
return "markdown"
def get_message_sender_function(self, webhook_url: str, type: Literal["text", "markdown", "link"]) -> Coroutine[Any, Any, bool]:
if type == "text":
return self.get_message_sender(webhook_url).send_text
elif type == "markdown":
return self.get_message_sender(webhook_url).send_markdown
elif type == "link":
return self.get_message_sender(webhook_url).send_link
else:
raise ValueError(f"Invalid message sender type: {type}")
def parse_message_after_at(self, message: str) -> str:
'''
已过时
'''
return message
async def send_markdown_message(self, message: str, chat_id: int, user_id: int) -> str|None:
webhook_url = self.get_webhook_url(message, chat_id, user_id)
if get_internal_debug():
logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, Chat ID: {chat_id}, User ID: {user_id}")
if webhook_url == "" or webhook_url == None:
return None
username: str = self.user_id_to_username(user_id) if self.user_id_to_username else ""
send_message = f"""## <at user_id="{user_id}">{username}</at>
---
{message}
"""
result = await self.get_message_sender_function(webhook_url, self.get_message_sender_type())(send_message)
if get_internal_verbose():
logger.Log("Info", f"Webhook URL: {webhook_url}, Message: {LimitStringLength(message)}, Result: {result}")
return None
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
message = self.parse_message_after_at(message)
if message == "":
return None
return await self.send_markdown_message(message, chat_id, user_id)
class WPSAPI(BasicWPSInterface):
"""核心 WPS 插件基类,提供图鉴模板设施。"""
guide_section_labels: Dict[str, str] = {
"commands": "指令一览",
"items": "物品与资源",
"recipes": "配方与合成",
"guides": "系统指引",
}
def get_guide_title(self) -> str:
return self.__class__.__name__
def get_guide_subtitle(self) -> str:
return ""
def get_guide_metadata(self) -> Dict[str, str]:
return {}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return ()
def collect_item_entries(self) -> Sequence[GuideEntry]:
return ()
def collect_recipe_entries(self) -> Sequence[GuideEntry]:
return ()
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return ()
def collect_additional_sections(self) -> Sequence[GuideSection]:
return ()
def collect_guide_sections(self) -> Sequence[GuideSection]:
sections: List[GuideSection] = []
command_entries = tuple(self.collect_command_entries())
if command_entries:
sections.append(
GuideSection(
title=self.guide_section_labels["commands"],
entries=command_entries,
layout="list",
)
)
item_entries = tuple(self.collect_item_entries())
if item_entries:
sections.append(
GuideSection(
title=self.guide_section_labels["items"],
entries=item_entries,
)
)
recipe_entries = tuple(self.collect_recipe_entries())
if recipe_entries:
sections.append(
GuideSection(
title=self.guide_section_labels["recipes"],
entries=recipe_entries,
)
)
guide_entries = tuple(self.collect_guide_entries())
if guide_entries:
sections.append(
GuideSection(
title=self.guide_section_labels["guides"],
entries=guide_entries,
layout="list",
)
)
additional_sections = tuple(self.collect_additional_sections())
if additional_sections:
sections.extend(additional_sections)
return tuple(sections)
def build_guide_page(self) -> GuidePage:
metadata: Dict[str, str] = {}
for key, value in self.get_guide_metadata().items():
metadata[key] = str(value)
related = self.get_related_links()
return GuidePage(
title=self.get_guide_title(),
subtitle=self.get_guide_subtitle(),
sections=self.collect_guide_sections(),
metadata=metadata,
related_links=related,
)
def render_guide_page(self, page: GuidePage) -> str:
return render_html_page(page)
def render_guide(self) -> str:
return self.render_guide_page(self.build_guide_page())
def get_guide_response(self, content: str) -> HTMLResponse:
return HTMLResponse(content)
@override
def generate_router_illustrated_guide(self):
async def handler() -> HTMLResponse:
return self.get_guide_response(self.render_guide())
return handler
def get_related_links(self) -> Dict[str, Sequence[Dict[str, str]]]:
links: Dict[str, Sequence[Dict[str, str]]] = {}
parents = []
for base in self.__class__.__mro__[1:]:
if not issubclass(base, WPSAPI):
continue
if base.__module__.startswith("Plugins."):
parents.append(base)
if base is WPSAPI:
break
if parents:
parents_links = [self._build_class_link(cls) for cls in reversed(parents)]
links["父类链"] = tuple(filter(None, parents_links))
child_links = [self._build_class_link(child) for child in self._iter_subclasses(self.__class__)]
child_links = [link for link in child_links if link]
if child_links:
links["子类"] = tuple(child_links)
return links
def _build_class_link(self, cls: type) -> Optional[Dict[str, str]]:
if not hasattr(cls, "__module__") or not cls.__module__.startswith("Plugins."):
return None
path = f"/api/{cls.__name__}"
return {
"label": cls.__name__,
"href": path,
}
def _iter_subclasses(self, cls: type) -> List[type]:
collected: List[type] = []
for subclass in cls.__subclasses__():
if not issubclass(subclass, WPSAPI):
continue
if not subclass.__module__.startswith("Plugins."):
continue
collected.append(subclass)
collected.extend(self._iter_subclasses(subclass))
return collected
def get_guide_subtitle(self) -> str:
return "核心 Webhook 转发插件"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"Webhook 状态": "已配置" if MAIN_WEBHOOK_URL else "未配置",
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "say",
"identifier": "say",
"description": "将后续消息内容以 Markdown 形式发送到主 Webhook。",
"metadata": {"别名": ""},
"icon": "🗣️",
"badge": "核心",
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "Webhook 绑定",
"description": (
"在项目配置中设置 `main_webhook_url` 后插件自动启用,"
"所有注册的命令将调用 `send_markdown_message` 发送富文本。"
),
"icon": "🔗",
},
{
"title": "消息格式",
"description": (
"默认使用 Markdown 模式发送,支持 `聊天ID` 与 `用户ID` 的 @ 提醒。"
),
"icon": "📝",
},
)
@override
def is_enable_plugin(self) -> bool:
if MAIN_WEBHOOK_URL == "":
logger.Log("Error", f"{ConsoleFrontColor.RED}WPSAPI未配置主Webhook URL{ConsoleFrontColor.RESET}")
return MAIN_WEBHOOK_URL != ""
def get_main_webhook_url(self) -> str:
return MAIN_WEBHOOK_URL
@override
def get_webhook_url(self, message: str, chat_id: int, user_id: int) -> str:
webhook_url = ProjectConfig().GetFile(f"webhook_url/{chat_id}",True).LoadAsText()
if webhook_url == "":
webhook_url = self.get_main_webhook_url()
return webhook_url
@override
def get_webhook_request(self, data:Any|None) -> None:
return None
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPI核心插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("say")
self.register_plugin("")
class WPSAPIHelp(WPSAPI):
@override
def dependencies(self) -> List[Type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
mapper: Dict[str, List[str]] = {}
for key in PluginInterface.plugin_instances.keys():
plugin = PluginInterface.plugin_instances.get(key)
if plugin:
if plugin.__class__.__name__ not in mapper:
mapper[plugin.__class__.__name__] = []
mapper[plugin.__class__.__name__].append(key)
desc = ""
for plugin_name, keys in mapper.items():
desc += f"- {plugin_name}: {", ".join(keys)}\n"
return await self.send_markdown_message(f"""# 指令入口
{desc}
""", chat_id, user_id)
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPIHelp 插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("help")
self.register_plugin("帮助")
class WPSAPIWebhook(WPSAPI):
@override
def dependencies(self) -> List[Type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSAPIWebhook 插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("chat_url_register")
self.register_plugin("会话注册")
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
ProjectConfig().GetFile(f"webhook_url/{chat_id}",True).SaveAsText(message)
return await self.send_markdown_message(f"会话注册成功", chat_id, user_id)
class NewsAIPlugin(WPSAPI):
"""新闻AI智能问答插件"""
def __init__(self):
super().__init__()
self.ai_agent = NewsAIAgent(ollama_url)
@override
def dependencies(self) -> List[Type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
def get_guide_title(self) -> str:
return "新闻AI智能问答"
def get_guide_subtitle(self) -> str:
return "基于AI的新闻联播智能问答系统"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"AI模型": "qwen3:0.6b",
"数据源": "每日新闻联播",
"功能": "智能问答",
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "问新闻",
"identifier": "ask_news",
"description": "询问新闻内容,支持自动识别日期或查询今天/昨天的新闻。",
"metadata": {"别名": "新闻问答, 问"},
"icon": "🤖",
"badge": "AI",
"details": [
{
"type": "list",
"items": [
"支持日期格式2024年11月19日、2024-11-19、2024/11/19",
"支持相对日期:今天、昨天、今日、昨日",
"自动使用AI分析新闻内容并回答问题",
"示例:问 今天有什么重要新闻?",
"示例:问 2024年11月19日 经济相关的新闻有哪些?",
]
}
]
},
{
"title": "新闻摘要",
"identifier": "news_summary",
"description": "获取指定日期新闻的AI摘要。",
"metadata": {"别名": "摘要"},
"icon": "📝",
"badge": "AI",
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "智能问答",
"description": (
"使用AI理解用户问题从新闻内容中提取相关信息进行回答。"
"支持自然语言提问,可以询问特定主题、人物、事件等。"
),
"icon": "💬",
},
{
"title": "日期识别",
"description": (
"自动从问题中识别日期,支持多种格式。"
"如果未指定日期,默认查询当天新闻。"
),
"icon": "📅",
},
{
"title": "内容抓取",
"description": (
"自动从新闻网站抓取指定日期的新闻联播文字版内容。"
),
"icon": "🌐",
},
)
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}NewsAIPlugin 新闻AI智能问答插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("default")
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
"""处理用户问题"""
try:
if not message or message.strip() == "":
help_text = """# 📰 新闻AI智能问答使用帮助
**直接提问即可,智能体会自动:**
1. 识别你想查询的日期
2. 获取对应日期的新闻内容
3. 基于新闻内容回答你的问题
**支持的日期格式:**
- 今天、昨天、今日、昨日
- 2025年11月19日
- 2025-11-19
- 2025/11/19
**示例问题:**
- `今天有什么重要新闻?`
- `2025年11月17日有什么新闻`
- `昨天的新闻中有关于经济的内容吗?`
- `今天习近平主席有什么活动?`
- `请总结今天新闻联播的主要内容`
**特性:**
✨ 智能理解问题意图
🤖 自动调用工具获取信息
💾 新闻内容自动缓存
🚀 基于LlamaIndex + Ollama"""
return await self.send_markdown_message(help_text, chat_id, user_id)
# 使用智能体回答问题
answer = await self.ai_agent.answer_question(message)
# 格式化返回结果
formatted_answer = f"""📰 **新闻AI智能问答**
{answer}
---
*由 LlamaIndex + Ollama 驱动*"""
return await self.send_markdown_message(formatted_answer, chat_id, user_id)
except Exception as e:
logger.Log("Error", f"新闻AI问答异常: {e}")
import traceback
error_detail = traceback.format_exc()
logger.Log("Error", f"详细错误: {error_detail}")
error_msg = f"""❌ **处理问题时出错**
错误信息:{str(e)}
请稍后重试或联系管理员。"""
return await self.send_markdown_message(error_msg, chat_id, user_id)
logger.SaveProperties()