Compare commits

...

10 Commits

16 changed files with 2599 additions and 18 deletions

View File

@@ -0,0 +1,40 @@
# 背景
文件名2025-11-17_1
创建于2025-11-17_19:43:26
创建者ASUS
主分支main
任务分支:未创建
Yolo模式Off
# 任务描述
新增一个红包系统,可以赠送积分,并发起一些小游戏,包括猜谜红包、手气红包、专属红包、口令红包。
# 项目概览
WPS Bot 插件化系统,已存在战斗、炼金、花园等插件,需要在既有积分/背包体系下扩展红包玩法。
# 分析
已查阅 `Plugins/WPSAPI.py``PWF/CoreModules/plugin_interface.py`:所有插件继承 `WPSAPI`,通过 `register_plugin` 绑定命令,消息发送统一走 `send_markdown_message`,依赖通过 `Architecture.Get` 获取,数据库操作直接使用 `get_db().conn`
积分记录位于 `user_info` 表,由 `WPSConfigAPI` 暴露的 `adjust_user_points/adjust_user_points_sync` 读写;签到、商店、战斗、菜园等模块都以此接口为唯一积分入口,理由是保证同一逻辑下的数据一致性。
背包、商店、战斗、菜园等复杂插件都采用“service+plugin”拆分与独立 `DatabaseModel` 定义;红包系统若要支持多种玩法,需要类似的 service 层管理红包生命周期、并通过 API 输出图鉴信息和命令帮助。
需求补充:
- 指令入口拆分:`红包|手气红包 <金额> <人数>`(缺省金额/人数分别由 `ProjectConfig` 读取,默认 100 / 4`口令红包 <金额> <人数> <口令>``专属红包 <金额> <user_id|用户名>`,统一通过 `抢红包 <红包ID> [tokens]` 领取,口令红包要求 tokens 与口令一致。猜谜红包暂缓实现。
- 金额不足直接拒绝发出;红包不限时间,可并行存在且发送者也能参与抢夺;人数不设上限但需合理处理默认值与输入校验。
- 红包发出后必须在返回消息中附带红包ID供玩家在 `抢红包` 指令中引用;需要记录红包状态、剩余份额/金额、领取日志,以及区分不同玩法的附加约束(专属白名单、口令验证等)。
# 提议的解决方案
(待补充:完成调研后填写多个可选实现思路及其优缺点。)
# 当前执行步骤:"1. 研究需求与代码结构"
# 任务进度
2025-11-17_20:29:15
- 已修改:`Plugins/WPSRedPacketService.py` `Plugins/WPSRedPacketBase.py` `Plugins/WPSRandomRedPacket.py` `Plugins/WPSPasswordRedPacket.py` `Plugins/WPSExclusiveRedPacket.py` `Plugins/WPSRedPacketClaim.py`
- 更改:实现红包核心服务、三类发包插件与抢红包入口,完成数据库表定义、指令注册与积分结算逻辑。
- 原因:支持手气/口令/专属红包以及统一的领取流程。
- 阻碍因素:无
- 状态:未确认
# 最终审查

View File

@@ -0,0 +1,217 @@
# 背景
文件名2025-11-20_1_ai_chat_plugin.md
创建于2025-11-20_11:06:10
创建者admin
主分支main
任务分支:无(不创建分支)
Yolo模式Off
# 任务描述
创建一个 AI 对话插件,支持多用户、多群聊的会话隔离,具备短期记忆功能(内存存储,无需持久化)。
## 核心需求
1. 基于 LlamaIndex + Ollama 的 AI 对话功能
2. **维护一个全局的会话历史**(不按用户/群聊分隔)
3. AI 能同时在不同群聊和不同用户对话
4. 历史消息包含时间戳、群聊ID、用户ID信息AI 能分辨消息来源
5. 最大历史消息数量可配置默认20条
6. 不需要持久化,重启后会话清空
7. 提供清空全局历史会话的指令
8. **AI 能够 @ 用户**:使用 `<at user_id="{user_id}"></at>` 格式,禁止直接输出 user_id
## 指令入口
- `ai_chat`: 对话指令
- `ai_chat_clear`: 清空当前用户的会话历史
## 技术要点
- 参考 NewsReport.py 的架构模式
- 使用内存字典存储会话历史
- 模型使用 qwen3:0.6b
- 消息格式包含上下文信息(时间、群聊、用户)
# 项目概览
这是一个基于 PWF 框架的 WPS 机器人项目,使用插件化架构。
- 插件基类WPSAPI → BasicWPSInterface → PluginInterface
- 每个插件通过 callback(message, chat_id, user_id) 处理消息
- 支持数据库、定时任务、路由等功能
- 已有多个游戏系统插件(菜园、战斗、炼金等)
# 分析
## 现有架构分析
1. **插件系统**
- 插件通过继承 WPSAPI 实现
- callback 方法接收 message已去除 command、chat_id、user_id
- 通过 register_plugin(command) 注册指令入口
- 通过 dependencies() 声明依赖关系
2. **NewsReport.py 架构**
- NewsAIAgent 类:封装 AI 逻辑LlamaIndex + Ollama
- NewsAIPlugin 类:继承 WPSAPI作为插件入口
- 使用 ProjectConfig().GetFile() 进行文件缓存
3. **会话管理需求**
- **全局单一会话历史**,不按用户/群聊分隔
- 每条消息标记 chat_id群聊ID和 user_id用户ID
- AI 能看到所有群聊、所有用户的对话历史
- 使用 List[Dict] 存储全局消息列表
- 内存存储,无需持久化
4. **配置系统**
- 使用 ProjectConfig.FindItem(key, default) 读取配置
- 使用 ProjectConfig.SaveProperties() 保存配置
- 配置存储在 Assets/config.json
## 技术选型
- **AI 框架**LlamaIndex + Ollama
- **模型**qwen3:0.6b
- **存储方式**内存列表List[Dict]- 全局单一历史
- **消息格式**:包含 timestamp, chat_id, user_id, role, content
# 提议的解决方案
## 整体架构
```
ChatAI.py
├── ChatAIAgent (AI 智能体)
│ ├── 会话字典管理
│ ├── 消息历史维护
│ └── LLM 对话调用
├── ChatAIPlugin (对话插件)
│ ├── 注册 ai_chat 指令
│ └── 处理用户对话
└── ChatAIClearPlugin (清空历史插件)
├── 注册 ai_chat_clear 指令
└── 清空会话历史
```
## 核心类设计
### 1. ChatAIAgent
- 单例模式,通过 Architecture 注册
- 维护 **全局消息历史**: List[Dict]
- 每条消息格式:
```python
{
"timestamp": "2025-11-20 11:06:10",
"chat_id": 12345,
"user_id": 67890,
"role": "user" | "assistant",
"content": "消息内容"
}
```
- AI 能够看到所有群聊、所有用户的完整对话历史
### 2. ChatAIPlugin
- 继承 WPSAPI
- 依赖 WPSAPI
- 初始化时创建/获取 ChatAIAgent
- callback 处理用户消息,调用 Agent.chat()
### 3. ChatAIClearPlugin
- 继承 WPSAPI
- 依赖 ChatAIPlugin确保 Agent 已初始化)
- callback 调用 Agent.clear_history() 清空全局历史
## 系统提示词设计
AI 需要理解的关键信息:
1. **多群聊、多用户环境**:能看到所有群聊和用户的对话历史
2. **消息格式**:每条消息包含 [时间] [群聊ID] [用户ID] 标记
3. **@ 用户格式**AI 回复时,当需要特指或回复某个用户时,使用 `<at user_id="{user_id}"></at>`
4. **禁止行为**:禁止直接输出裸露的 user_id 数字
示例提示词:
```
你是一个友好的AI助手能够同时在多个群聊中与不同用户对话。
重要规则:
- 你能看到所有群聊的对话历史每条消息都标注了时间、群聊ID和用户ID
- 当你需要特指某个用户或专门回复某个用户时,必须在回复中使用以下格式:<at user_id="用户ID"></at>
- 禁止直接输出用户ID数字始终使用 <at> 标签包裹
- 注意区分不同群聊和不同用户的对话上下文
示例对话:
[2025-11-20 10:00:00] [群聊#12345] [用户#67890]: 你好
AI回复: <at user_id="67890"></at> 你好!有什么可以帮助你的吗?
[2025-11-20 10:01:00] [群聊#12345] [用户#11111]: 今天天气怎么样?
AI回复: <at user_id="11111"></at> 抱歉,我无法获取实时天气信息。
[2025-11-20 10:02:00] [群聊#67890] [用户#22222]: 刚才12345群的用户说了什么
AI回复: <at user_id="22222"></at> 刚才群聊12345中用户67890问候了我用户11111询问了天气。
```
## 消息流程
```
用户消息 (chat_id, user_id) → ChatAIPlugin.callback()
获取当前时间
添加用户消息到全局历史带时间戳、chat_id、user_id
格式化全局历史消息为 LLM 输入
├─ 包含所有群聊、所有用户的历史
├─ 每条消息标注来源(群聊、用户、时间)
└─ AI 能理解跨群聊、跨用户的上下文
调用 Ollama LLM使用包含 @ 格式说明的系统提示词)
添加 AI 回复到全局历史(标注当前 chat_id, user_id
返回格式化的回复
```
# 当前执行步骤:"1. 创建任务文件"
# 任务进度
[2025-11-20_11:06:10]
- 已创建:.tasks/2025-11-20_1_ai_chat_plugin.md
- 更改:创建任务文件,记录任务背景和设计方案
- 原因:按照 RIPER-5 协议要求,在进入 EXECUTE 模式前创建任务文件
- 阻碍因素:无
- 状态:成功
[2025-11-20_11:13:25]
- 已修改:.tasks/2025-11-20_1_ai_chat_plugin.md
- 更改:
1. 更新核心需求,明确全局单一会话历史设计
2. 添加系统提示词设计章节
3. 说明 @ 用户格式:<at user_id="{user_id}"></at>
4. 禁止直接输出 user_id
- 原因:根据用户反馈,澄清架构设计(全局历史 vs 分用户历史)
- 阻碍因素:无
- 状态:成功
[2025-11-20_11:16:49]
- 已修改:.tasks/2025-11-20_1_ai_chat_plugin.md
- 更改:修正系统提示词设计,明确用户消息是普通文本,只有 AI 回复时才使用 <at> 格式
- 原因:用户澄清 @ 用户格式只用于 AI 回复,不用于用户消息
- 阻碍因素:无
- 状态:成功
[2025-11-20_11:22:38]
- 已创建Plugins/Others/ChatAI.py
- 更改:完整实现 AI 对话插件系统
1. ChatAIAgent 类全局会话管理、消息历史维护、LLM 对话
2. ChatAIPlugin 类:注册 ai_chat 指令,处理用户对话
3. ChatAIClearPlugin 类:注册 ai_chat_clear 指令,清空历史
4. 使用 qwen3:0.6b 模型
5. 最大历史消息数量可配置默认20
6. 系统提示词包含 @ 用户格式说明
7. 全局单一会话历史设计
- 原因:实施 EXECUTE 模式清单项 1-28
- 阻碍因素:无
- 状态:成功
[2025-11-20_11:29:24]
- 已修改Plugins/Others/ChatAI.py
- 更改:修复 AI 重复用户消息的问题
1. 从 acomplete文本补全改为 achat对话方法
2. 使用 ChatMessage 结构化消息格式
3. 分离系统提示词和对话历史
4. 优化提示词结构,更清晰地指示 AI 回复
5. 添加 llama_index.core.llms.ChatMessage 导入
- 原因:用户反馈 AI 一直重复用户说的话
- 阻碍因素:无
- 状态:未确认
# 最终审查
待完成

View File

@@ -0,0 +1,300 @@
# 背景
文件名2025-11-20_2_add_user_tools_to_ai.md
创建于2025-11-20_15:25:43
创建者admin
主分支main
任务分支:无(不创建分支)
Yolo模式Off
# 任务描述
为 AI 聊天插件ChatAI.py添加用户 ID 与用户名转换的工具调用功能,让 AI 能够主动查询用户信息。
## 具体需求
1. AI 应该能够通过工具调用实现以下功能:
- 根据 user_id 查询 username
- 根据 username 查询 user_id
- 查询所有存在的 username 列表
2. 使用 LlamaIndex 框架的高级功能FunctionTool + AgentWorkflow + ReActAgent
3. 参考 NewsReport.py 的工具调用架构实现
# 项目概览
- **项目**NewWPSBot
- **相关文件**
- `Plugins/Others/ChatAI.py` - AI 对话插件(需要改造)
- `Plugins/WPSConfigSystem.py` - 用户配置系统(需要添加查询所有用户名的方法)
- `Plugins/Others/NewsReport.py` - 参考实现(已实现工具调用)
# 分析
## WPSConfigSystem.py 现状
- ✅ 已有方法:`get_user_name(user_id: int)` - 第201-206行
- ✅ 已有方法:`find_user_id_by_username(username: str)` - 第238-248行
- ❌ 缺失方法:查询所有用户名列表
- 数据表:`user_info` (user_id, username, userurl, userpoint)
## ChatAI.py 现状第1-428行
- 当前使用简单的 `Ollama` LLM + `ChatMessage` 架构
- 通过 `llm.achat()` 进行对话
- **没有工具调用能力**
- 维护全局历史 `global_history`
- 单例模式:`ChatAIAgent.get_instance()`
## NewsReport.py 工具调用架构(参考)
```python
# 1. 导入
from llama_index.core.tools import FunctionTool
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.agent.workflow.react_agent import ReActAgent
# 2. 创建工具
tools = [
FunctionTool.from_defaults(
fn=self._tool_function,
name="tool_name",
description="工具描述"
),
]
# 3. 创建 Agent
agent = ReActAgent(
llm=self.llm,
tools=tools,
verbose=True,
system_prompt=system_prompt,
)
# 4. 创建 Workflow
self.workflow = AgentWorkflow(
agents=[agent],
timeout=600.0,
)
# 5. 执行
result = await self.workflow.run(user_msg=query)
```
## 技术要点
1. **工具函数必须是同步函数**(从 NewsReport.py 看,工具函数如 `_get_current_date()` 都是同步的)
2. **WPSConfigAPI 需要通过 Architecture.Get() 获取**
3. **系统提示词需要明确指导 AI 何时使用工具**
4. **需要保留现有的历史消息管理功能**
# 提议的解决方案
## 方案探索INNOVATE 阶段)
### 方案A完全重构为 ReActAgent 架构 ⭐ 推荐
**架构设计**
- 移除 `llm.achat()` 调用方式
- 使用 `AgentWorkflow` + `ReActAgent`
- 历史消息通过系统提示词传递
- 工具函数独立实现
**优势**
- 与 NewsReport.py 架构统一,易于维护
- ReActAgent 专门为工具调用优化
- 工具调用日志清晰verbose=True
- 扩展性强,易于添加新工具
**劣势**
- 需要大幅度重构现有代码
- 历史消息管理方式需改变
- qwen3:0.6b 小模型的工具调用能力需验证
**关键改进**
- 保留历史消息数组结构,调用时转换为提示词
- 添加错误处理和降级机制
- 可根据消息内容智能决定是否启用工具
---
### 方案B混合架构 - 保留对话 + 添加工具层
**架构设计**
- 保留 `llm.achat()` 作为主要对话方式
- AI 在回复中使用特殊标记触发工具(如 `[TOOL:get_username:123]`
- 后处理解析工具调用并再次调用 LLM
**优势**
- 改动最小,风险低
- 保留现有历史消息机制
- 对话连贯性不受影响
**劣势**
- 需要自定义工具调用协议
- 可能需要多轮 LLM 调用
- 工具调用智能程度较低
---
### 方案C双智能体架构
**架构设计**
- ToolAgentReActAgent专门处理工具调用
- ChatAgent现有架构处理普通对话
- 路由逻辑判断使用哪个智能体
**优势**:职责分离,灵活性高
**劣势**:架构复杂,维护成本高,上下文共享困难
---
### 方案D轻量级工具注入
**架构设计**
- 预处理:分析消息,预先调用工具
- 将工具结果注入到提示词
- 正常调用 `llm.achat()`
**优势**:改动极小,性能影响最小
**劣势**AI 无法主动决定使用工具,智能程度低
---
## 最终推荐方案方案AReActAgent 架构)
### 实施要点
#### 1. 在 WPSConfigSystem.py 添加方法
```python
def get_all_usernames(self) -> List[Dict[str, Any]]:
"""获取所有用户名列表"""
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT user_id, username FROM user_info WHERE username != '' AND username IS NOT NULL"
)
rows = cursor.fetchall()
return [{"user_id": row["user_id"], "username": row["username"]} for row in rows]
```
#### 2. 改造 ChatAI.py
**导入新增**
```python
from llama_index.core.tools import FunctionTool
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.agent.workflow.react_agent import ReActAgent
from PWF.Convention.Runtime.Architecture import Architecture
```
**添加 `_initialize_agent()` 方法**
- 获取 WPSConfigAPI 实例
- 创建三个工具函数
- 初始化 ReActAgent
- 创建 AgentWorkflow
**工具函数**
1. `get_username_by_id(user_id: int) -> str`
2. `get_userid_by_name(username: str) -> str`
3. `list_all_usernames() -> str`
**修改 `chat()` 方法**
- 将历史消息格式化为提示词
- 使用 `await self.workflow.run(user_msg=...)`
- 保留历史消息管理逻辑
- 添加错误处理
**更新系统提示词**
- 告知 AI 可用的三个工具
- 说明工具使用场景和时机
- 保持原有的 @用户 格式规则
### 技术挑战与解决方案
1. **历史消息管理**:保留数组结构,调用时动态转换
2. **WPSConfigAPI 访问**:在 `_initialize_agent()` 中通过 Architecture.Get() 获取
3. **错误处理**:工具调用失败时返回友好提示,不中断对话
4. **模型兼容性**:测试 qwen3:0.6b 的工具调用能力,必要时提供降级方案
# 当前执行步骤:"4. 执行完成,等待测试"
# 任务进度
## [2025-11-20_15:25:43]
- 状态:研究完成
- 已完成:
- 分析 WPSConfigSystem.py 现有功能
- 分析 ChatAI.py 当前架构
- 研究 NewsReport.py 工具调用实现
- 创建任务文件
- 下一步:进入 INNOVATE 模式
## [2025-11-20_15:30:00]
- 状态:创新完成
- 已完成:
- 探索了 4 种技术方案ReActAgent、混合架构、双智能体、轻量级注入
- 评估各方案优劣势和技术风险
- 确定推荐方案:完全 ReActAgent 架构
- 更新任务文件,记录方案分析
- 下一步:进入 PLAN 模式
## [2025-11-20_15:35:00]
- 状态:规划完成
- 已完成:
- 制定详细的代码修改规格
- 设计完整的系统提示词(包含工具使用说明、示例、错误处理)
- 明确每个函数的签名和实现细节
- 创建8项实施清单
- 添加降级机制设计
- 下一步:进入 EXECUTE 模式
## [2025-11-20_15:38:25]
- 状态:执行完成
- 已修改文件:
- Plugins/WPSConfigSystem.py添加 get_all_usernames() 方法)
- Plugins/Others/ChatAI.py完全重构为 ReActAgent 架构)
- 已完成修改:
1. ✅ WPSConfigSystem.py 添加 get_all_usernames() 方法
2. ✅ ChatAI.py 添加导入FunctionTool, AgentWorkflow, ReActAgent
3. ✅ ChatAIAgent.__init__() 添加 workflow 和 config_api 属性
4. ✅ ChatAIAgent.__init__() 添加 _initialize_agent() 调用
5. ✅ 实现 _initialize_agent() 方法(包含三个工具函数)
6. ✅ 实现 _build_system_prompt() 方法详细提示词约200行
7. ✅ 重构 chat() 方法使用 workflow.run()
8. ✅ 添加 _fallback_chat() 降级方法
- Linter 检查:✅ 无错误
- 更改摘要:
- 新增方法get_all_usernames() (WPSConfigSystem)
- 新增方法_initialize_agent() (ChatAI)
- 新增方法_build_system_prompt() (ChatAI)
- 新增方法_fallback_chat() (ChatAI)
- 重构方法chat() (ChatAI)
- 新增工具函数get_username_by_id, get_userid_by_name, list_all_usernames
- 新增导入FunctionTool, AgentWorkflow, ReActAgent
- 下一步:用户测试工具调用功能
# 详细实施规格PLAN 阶段输出)
## 实施清单
1. 在 WPSConfigSystem.py 添加 get_all_usernames() 方法
2. 在 ChatAI.py 添加新的导入语句
3. 在 ChatAIAgent.__init__() 添加 workflow 和 config_api 属性
4. 在 ChatAIAgent.__init__() 添加 _initialize_agent() 调用
5. 添加 _initialize_agent() 方法(创建三个工具)
6. 添加 _build_system_prompt() 方法(详细提示词)
7. 重构 chat() 方法使用 workflow.run()
8. 添加 _fallback_chat() 降级方法
## 关键设计
### 系统提示词特点
- 详细说明三个工具的用途、参数、返回值
- 提供大量使用示例5个场景
- 明确工具使用原则(何时用/何时不用)
- 包含错误处理指导
- 强调 <at> 标签使用规则
### 三个工具函数
1. get_username_by_id(user_id: int) -> str
2. get_userid_by_name(username: str) -> str
3. list_all_usernames() -> str
### 错误处理
- Workflow 初始化失败 → 使用 _fallback_chat()
- 工具调用失败 → 返回友好错误信息
- 整体异常 → 降级到简单对话模式
# 最终审查
(待完成后填写)

784
Plugins/Others/ChatAI.py Normal file
View File

@@ -0,0 +1,784 @@
from Plugins.WPSAPI import *
from datetime import datetime
from llama_index.llms.ollama import Ollama
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import FunctionTool
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.agent.workflow.react_agent import ReActAgent
from typing import List, Dict, Optional, Any
logger: ProjectConfig = Architecture.Get(ProjectConfig)
OLLAMA_URL = logger.FindItem("ollama_url", "http://ollama.liubai.site")
OLLAMA_MODEL = logger.FindItem("ollama_model", "qwen3:0.6b")
MAX_HISTORY = logger.FindItem("chat_ai_max_history", 100)
logger.SaveProperties()
class ChatAIAgent:
"""AI 对话智能体 - 维护全局会话历史"""
_instance: Optional['ChatAIAgent'] = None
def __init__(self, ollama_url: str = OLLAMA_URL, max_history: int = MAX_HISTORY):
"""初始化 AI 智能体
Args:
ollama_url: Ollama 服务地址
max_history: 最大历史消息数量
"""
self.ollama_url = ollama_url
self.max_history = max_history
self.llm = Ollama(model=OLLAMA_MODEL, base_url=ollama_url, request_timeout=600.0)
# 全局消息历史列表
self.global_history: List[Dict[str, any]] = []
# Agent 工作流和配置API
self.workflow: Optional[AgentWorkflow] = None
self.config_api = None # 延迟加载,避免循环依赖
# 初始化 Agent 和工具
self._initialize_agent()
logger.Log("Info", f"{ConsoleFrontColor.GREEN}ChatAIAgent 初始化完成{ConsoleFrontColor.RESET}")
logger.Log("Info", f"模型: {OLLAMA_MODEL}, 最大历史: {max_history}")
@classmethod
def get_instance(cls) -> 'ChatAIAgent':
"""获取单例实例"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _initialize_agent(self) -> None:
"""初始化 ReActAgent 和工具函数"""
try:
# 延迟导入避免循环依赖
from Plugins.WPSConfigSystem import WPSConfigAPI
from PWF.Convention.Runtime.Architecture import Architecture
# 获取 WPSConfigAPI 实例
try:
self.config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
except Exception as e:
logger.Log("Warning", f"无法获取 WPSConfigAPI: {e}")
self.config_api = None
# 创建工具函数
tools = []
if self.config_api:
# 工具1: 根据用户ID查询用户名
def get_username_by_id(user_id: int) -> str:
"""根据用户ID查询用户名
Args:
user_id: 用户ID整数
Returns:
用户名字符串。如果用户未设置用户名,返回 'user_用户ID' 格式
Examples:
get_username_by_id(12345) -> "张三"
get_username_by_id(99999) -> "user_99999"
"""
try:
username = self.config_api.get_user_name(user_id)
return username if username else f"user_{user_id}"
except Exception as e:
logger.Log("Error", f"查询用户名失败 user_id={user_id}: {e}")
return f"user_{user_id}"
# 工具2: 根据用户名查询用户ID
def get_userid_by_name(username: str) -> str:
"""根据用户名查询用户ID
Args:
username: 用户名(字符串)
Returns:
如果找到用户返回用户ID字符串格式
如果未找到,返回 "未找到用户名为'{username}'的用户"
Examples:
get_userid_by_name("张三") -> "12345"
get_userid_by_name("不存在的用户") -> "未找到用户名为'不存在的用户'的用户"
"""
try:
user_id = self.config_api.find_user_id_by_username(username)
if user_id is not None:
return str(user_id)
return f"未找到用户名为'{username}'的用户"
except Exception as e:
logger.Log("Error", f"查询用户ID失败 username={username}: {e}")
return f"查询失败: {str(e)}"
# 工具3: 获取所有用户名列表
def list_all_usernames() -> str:
"""获取系统中所有已设置用户名的用户列表
Returns:
格式化的用户列表字符串,每行一个用户
格式: "用户ID: 用户名"
如果没有用户,返回 "当前系统中没有设置用户名的用户"
Examples:
返回示例:
"12345: 张三
67890: 李四
11111: 王五"
"""
try:
users = self.config_api.get_all_usernames()
if not users:
return "当前系统中没有设置用户名的用户"
lines = [f"{user['user_id']}: {user['username']}" for user in users]
return "\n".join(lines)
except Exception as e:
logger.Log("Error", f"获取用户列表失败: {e}")
return f"获取用户列表失败: {str(e)}"
# 注册工具
tools = [
FunctionTool.from_defaults(
fn=get_username_by_id,
name="get_username_by_id",
description="根据用户ID整数查询对应的用户名。用于当你知道用户ID需要知道该用户的名字时。"
),
FunctionTool.from_defaults(
fn=get_userid_by_name,
name="get_userid_by_name",
description="根据用户名字符串查询对应的用户ID。用于当你知道用户名需要知道该用户的ID时。"
),
FunctionTool.from_defaults(
fn=list_all_usernames,
name="list_all_usernames",
description="获取系统中所有已设置用户名的用户列表。用于当你需要了解系统中有哪些用户,或者用户询问'有哪些用户''谁在线'等问题时。"
),
]
# 创建系统提示词
system_prompt = self._build_system_prompt()
# 创建 ReActAgent
agent = ReActAgent(
llm=self.llm,
tools=tools,
verbose=True,
system_prompt=system_prompt,
)
# 创建 AgentWorkflow
self.workflow = AgentWorkflow(
agents=[agent],
timeout=600.0,
)
logger.Log("Info", f"{ConsoleFrontColor.GREEN}Agent 工具初始化完成,共 {len(tools)} 个工具{ConsoleFrontColor.RESET}")
except Exception as e:
logger.Log("Error", f"初始化 Agent 失败: {e}")
import traceback
logger.Log("Error", traceback.format_exc())
self.workflow = None
def _build_system_prompt(self) -> str:
"""构建详细的系统提示词"""
return """你是一个友好且智能的 AI 助手,能够同时在多个群聊中与不同用户对话。
# 核心能力
## 1. 对话历史理解
- 你能看到所有群聊的对话历史
- 每条消息都标注了:[时间] [群聊ID] [用户ID]
- 你需要根据上下文理解对话,区分不同群聊和用户
## 2. 用户信息工具 🔧
你拥有三个强大的工具来查询用户信息:
### 工具 1: get_username_by_id(user_id: int)
**用途**根据用户ID查询用户名
**参数**user_id整数类型
**返回**:用户名字符串
**使用场景**
- 当你看到对话历史中有用户ID想知道这个用户叫什么名字时
- 当用户问"用户12345是谁"
- 当你需要用更友好的称呼来提及某个用户时
**示例**
```
用户问:"用户12345是谁"
你的思考我需要查询用户ID 12345 的用户名
你的动作:调用 get_username_by_id(12345)
工具返回:"张三"
你的回复:"用户12345是张三。"
```
### 工具 2: get_userid_by_name(username: str)
**用途**根据用户名查询用户ID
**参数**username字符串类型
**返回**用户ID字符串或未找到的提示
**使用场景**
- 当用户提到某个人名你需要知道对应的用户ID时
- 当用户问"张三的ID是多少"
- 当你需要用 <at> 标签提及某个用户,但只知道名字时
**示例**
```
用户问:"张三的用户ID是多少"
你的思考:我需要查询名为"张三"的用户ID
你的动作:调用 get_userid_by_name("张三")
工具返回:"12345"
你的回复:"张三的用户ID是12345。"
```
### 工具 3: list_all_usernames()
**用途**:获取所有已设置用户名的用户列表
**参数**:无
**返回**:格式化的用户列表
**使用场景**
- 当用户问"系统里有哪些用户"
- 当用户问"谁在这里""有哪些人"
- 当你需要了解系统中的用户概况时
**示例**
```
用户问:"系统里有哪些用户?"
你的思考:我需要获取所有用户列表
你的动作:调用 list_all_usernames()
工具返回:
"12345: 张三
67890: 李四
11111: 王五"
你的回复:"系统中目前有以下用户:
- 张三ID: 12345
- 李四ID: 67890
- 王五ID: 11111"
```
## 3. 工具使用原则 ⚠️
**何时应该使用工具**
✅ 用户明确询问用户信息("XX是谁""XX的ID是多少"
✅ 对话中提到不认识的人名,需要确认身份
✅ 需要列举系统中的用户
✅ 需要将用户名转换为ID或ID转换为用户名
**何时不需要使用工具**
❌ 对话历史中已经明确显示了用户ID和相关信息
❌ 用户只是在普通聊天,没有涉及用户查询
❌ 你已经在同一轮对话中查询过相同的信息
**工具使用建议**
1. 优先检查对话历史,看是否已有所需信息
2. 一次对话中避免重复查询相同信息
3. 查询失败时,友好地告知用户
4. 工具返回结果后,用自然语言整理给用户
## 4. 用户提及规则 📢
**重要**:当你需要特指某个用户时,必须使用 `<at>` 标签!
**正确格式**
- `<at user_id="用户ID"></at>`
- `<at user_id="用户ID">用户名</at>`
**错误格式**(禁止):
- ❌ 直接输出数字ID"用户12345"
- ❌ 不使用标签:"@张三"
**示例**
```
用户问:"我和小明谁比较忙?"
你的思考:用户提到"小明"我需要查询小明的ID然后用<at>标签回复
步骤1调用 get_userid_by_name("小明")
步骤2假设返回 "12345"
你的回复:"我觉得<at user_id="12345">小明</at>可能比较忙,不过这只是我的猜测。"
```
## 5. 对话示例 💬
### 示例 1查询用户信息
```
[2025-11-20 10:00:00] [群聊#12345] [用户#11111]: 用户99999是谁
AI思考用户询问用户99999的身份我需要查询
AI动作调用 get_username_by_id(99999)
AI回复用户99999是王小明。
```
### 示例 2根据名字查ID
```
[2025-11-20 10:01:00] [群聊#12345] [用户#11111]: 李华的ID是多少
AI思考用户询问李华的ID
AI动作调用 get_userid_by_name("李华")
工具返回:"88888"
AI回复李华的用户ID是88888。
```
### 示例 3列出所有用户
```
[2025-11-20 10:02:00] [群聊#12345] [用户#11111]: 系统里有哪些用户?
AI思考用户想知道所有用户列表
AI动作调用 list_all_usernames()
工具返回:"12345: 张三\n67890: 李四"
AI回复系统中目前有以下用户
- 张三ID: 12345
- 李四ID: 67890
```
### 示例 4智能对话中使用工具
```
[2025-11-20 10:03:00] [群聊#12345] [用户#11111]: 帮我问问小红今天有空吗?
AI思考用户提到"小红",我需要用<at>标签提及她所以要先查询ID
AI动作调用 get_userid_by_name("小红")
工具返回:"55555"
AI回复<at user_id="55555">小红</at> 今天有空吗?用户<at user_id="11111"></at>想知道。
```
### 示例 5普通对话不需要工具
```
[2025-11-20 10:04:00] [群聊#12345] [用户#11111]: 今天天气真好
AI思考这是普通聊天不涉及用户查询不需要使用工具
AI回复是的今天天气很不错呢适合出去走走。
```
## 6. 错误处理 🔧
- 如果工具调用失败,友好地告知用户
- 如果查询不到用户,明确说明"未找到该用户"
- 如果不确定是否需要使用工具,优先使用工具确保准确性
## 7. 总结
你的职责:
1. 理解对话上下文,区分不同群聊和用户
2. 在需要时主动使用工具查询用户信息
3. 始终使用 <at> 标签提及用户
4. 提供友好、准确、有帮助的回复
记住:工具是帮助你更好地服务用户的,当不确定时,优先使用工具确保信息准确!"""
def _add_message(self, chat_id: int, user_id: int, role: str, content: str) -> None:
"""添加消息到全局历史
Args:
chat_id: 群聊ID
user_id: 用户ID
role: 角色 (user/assistant)
content: 消息内容
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = {
"timestamp": timestamp,
"chat_id": chat_id,
"user_id": user_id,
"role": role,
"content": content
}
self.global_history.append(message)
self._trim_history()
def _trim_history(self) -> None:
"""裁剪历史消息到最大长度"""
if len(self.global_history) > self.max_history:
# 保留最新的 max_history 条消息
self.global_history = self.global_history[-self.max_history:]
logger.Log("Info", f"历史消息已裁剪到 {self.max_history}")
def clear_history(self) -> bool:
"""清空全局历史
Returns:
是否成功
"""
try:
old_count = len(self.global_history)
self.global_history.clear()
logger.Log("Info", f"已清空全局历史,共删除 {old_count} 条消息")
return True
except Exception as e:
logger.Log("Error", f"清空历史失败: {e}")
return False
def _format_message_with_context(self, msg: Dict[str, any]) -> str:
"""格式化单条消息,包含上下文信息
Args:
msg: 消息字典
Returns:
格式化后的消息字符串
"""
timestamp = msg.get("timestamp", "")
chat_id = msg.get("chat_id", "")
user_id = msg.get("user_id", "")
role = msg.get("role", "")
content = msg.get("content", "")
if role == "user":
return f"[{timestamp}] [群聊#{chat_id}] [用户#{user_id}]: {content}"
else: # assistant
return f"[{timestamp}] [AI助手]: {content}"
def _format_history_for_llm(self) -> str:
"""格式化全局历史为 LLM 输入
Returns:
格式化后的历史字符串
"""
if not self.global_history:
return ""
formatted_messages = [
self._format_message_with_context(msg)
for msg in self.global_history
]
return "\n".join(formatted_messages)
async def chat(self, message: str, chat_id: int, user_id: int) -> str:
"""处理对话
Args:
message: 用户消息
chat_id: 群聊ID
user_id: 用户ID
Returns:
AI 回复
"""
try:
# 添加用户消息到历史
self._add_message(chat_id, user_id, "user", message)
# 检查是否成功初始化 workflow
if self.workflow is None:
logger.Log("Warning", "Workflow 未初始化,使用降级模式")
return await self._fallback_chat(message, chat_id, user_id)
# 格式化历史消息(不包括刚添加的用户消息)
history_without_last = self.global_history[:-1] if len(self.global_history) > 1 else []
history_text = "\n".join([
self._format_message_with_context(msg)
for msg in history_without_last
]) if history_without_last else "(这是第一条对话)"
# 构建当前用户消息的格式化版本
current_msg = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [群聊#{chat_id}] [用户#{user_id}]: {message}"
# 构建完整的用户查询,包含历史上下文
full_query = f"""# 历史对话记录
{history_text}
# 当前消息
{current_msg}
# 任务
请作为 AI 助手回复当前用户。记住:
1. 如果需要查询用户信息,主动使用工具
2. 使用 <at user_id="用户ID"></at> 格式提及用户
3. 根据对话历史和当前消息,给出有意义的回复"""
logger.Log("Info", f"处理对话 - 群聊#{chat_id} 用户#{user_id}")
logger.Log("Info", f"当前历史消息数: {len(self.global_history)}")
# 使用 workflow 运行 agent
result = await self.workflow.run(user_msg=full_query)
# 提取回答
if hasattr(result, 'response'):
answer = str(result.response)
elif hasattr(result, 'message'):
answer = str(result.message)
else:
answer = str(result)
# 添加 AI 回复到历史
self._add_message(chat_id, user_id, "assistant", answer)
logger.Log("Info", f"AI 回复长度: {len(answer)} 字符")
return answer
except Exception as e:
logger.Log("Error", f"对话处理失败: {e}")
import traceback
error_trace = traceback.format_exc()
logger.Log("Error", f"详细错误:\n{error_trace}")
# 尝试降级处理
try:
return await self._fallback_chat(message, chat_id, user_id)
except:
return f"处理对话时出错: {str(e)}"
async def _fallback_chat(self, message: str, chat_id: int, user_id: int) -> str:
"""降级对话处理(当 workflow 不可用时)
Args:
message: 用户消息
chat_id: 群聊ID
user_id: 用户ID
Returns:
AI 回复
"""
logger.Log("Info", "使用降级模式处理对话")
# 格式化历史消息(不包括刚添加的用户消息)
history_without_last = self.global_history[:-1] if len(self.global_history) > 1 else []
history_text = "\n".join([
self._format_message_with_context(msg)
for msg in history_without_last
]) if history_without_last else "(这是第一条对话)"
# 构建当前用户消息的格式化版本
current_msg = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [群聊#{chat_id}] [用户#{user_id}]: {message}"
# 构建对话提示
conversation_prompt = f"""以下是历史对话记录:
{history_text}
当前消息:
{current_msg}
请作为AI助手回复当前用户。"""
# 使用简单的系统提示词
simple_prompt = """你是一个友好的AI助手。
- 使用 <at user_id="用户ID"></at> 格式来提及用户
- 根据对话历史和当前消息,给出有意义的回复"""
# 使用 achat 方法进行对话
messages = [
ChatMessage(role="system", content=simple_prompt),
ChatMessage(role="user", content=conversation_prompt)
]
response = await self.llm.achat(messages)
answer = str(response.message.content)
return answer
class ChatAIPlugin(WPSAPI):
"""AI 对话插件"""
def __init__(self):
super().__init__()
self.ai_agent = ChatAIAgent.get_instance()
@override
def dependencies(self) -> List[Type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
def get_guide_title(self) -> str:
return "AI 智能对话"
def get_guide_subtitle(self) -> str:
return "基于 LlamaIndex + Ollama 的多群聊 AI 对话系统"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"AI模型": OLLAMA_MODEL,
"最大历史": str(MAX_HISTORY),
"功能": "跨群聊智能对话",
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "AI对话",
"identifier": "ai_chat",
"description": "与AI助手对话AI能看到所有群聊的历史消息并智能回复。",
"metadata": {"模型": OLLAMA_MODEL},
"icon": "🤖",
"badge": "AI",
"details": [
{
"type": "list",
"items": [
"AI 维护全局会话历史,能看到所有群聊的对话",
"每条消息包含时间戳、群聊ID、用户ID",
f"最多保留最近 {MAX_HISTORY} 条消息",
"AI 会使用 @用户 格式回复",
"示例ai_chat 你好",
"示例ai_chat 刚才其他群聊说了什么?",
]
}
]
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "全局会话",
"description": (
"AI 维护一个全局的对话历史,能够跨群聊、跨用户理解上下文。"
"所有群聊的对话都在同一个历史中AI 能够关联不同群聊的信息。"
),
"icon": "🌐",
},
{
"title": "智能回复",
"description": (
"AI 能够识别消息来源(群聊、用户、时间),并使用 @ 格式回复特定用户。"
),
"icon": "💬",
},
{
"title": "历史管理",
"description": (
f"自动保留最近 {MAX_HISTORY} 条消息,超出部分自动清理。"
"可使用 ai_chat_clear 指令手动清空所有历史。"
),
"icon": "📝",
},
)
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}ChatAIPlugin AI对话插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("ai_chat")
self.register_plugin("default")
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
"""处理用户对话"""
try:
if not message or message.strip() == "":
help_text = f"""# 🤖 AI 智能对话使用帮助
**直接发送消息即可与 AI 对话**
**特性:**
- ✨ AI 能看到所有群聊的对话历史
- 🌐 跨群聊智能理解上下文
- 💬 自动使用 @ 格式回复用户
- 📝 保留最近 {MAX_HISTORY} 条消息
**示例:**
- `ai_chat 你好`
- `ai_chat 今天天气怎么样?`
- `ai_chat 刚才其他群聊说了什么?`
- `ai_chat 请总结一下最近的对话`
**清空历史:**
使用 `ai_chat_clear` 指令清空所有历史消息
**技术:**
基于 LlamaIndex + Ollama ({OLLAMA_MODEL})"""
return await self.send_markdown_message(help_text, chat_id, user_id)
# 使用智能体处理对话
answer = await self.ai_agent.chat(message, chat_id, user_id)
# 格式化返回结果
formatted_answer = answer
return await self.send_markdown_message(formatted_answer, chat_id, user_id)
except Exception as e:
logger.Log("Error", f"AI对话异常: {e}")
import traceback
error_detail = traceback.format_exc()
logger.Log("Error", f"详细错误: {error_detail}")
error_msg = f"""❌ **处理对话时出错**
错误信息:{str(e)}
请稍后重试或联系管理员。"""
return await self.send_markdown_message(error_msg, chat_id, user_id)
class ChatAIClearPlugin(WPSAPI):
"""清空 AI 对话历史插件"""
def __init__(self):
super().__init__()
@override
def dependencies(self) -> List[Type]:
return [ChatAIPlugin]
@override
def is_enable_plugin(self) -> bool:
return True
def get_guide_title(self) -> str:
return "清空 AI 对话历史"
def get_guide_subtitle(self) -> str:
return "清空全局 AI 对话历史记录"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"功能": "历史管理",
"作用范围": "全局",
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "清空历史",
"identifier": "ai_chat_clear",
"description": "清空 AI 的全局对话历史记录。",
"icon": "🗑️",
"badge": "管理",
"details": [
{
"type": "list",
"items": [
"清空所有群聊的对话历史",
"AI 将不再记得之前的对话",
"不影响 AI 的基础功能",
"示例ai_chat_clear",
]
}
]
},
)
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}ChatAIClearPlugin 清空历史插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("ai_chat_clear")
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
"""清空对话历史"""
try:
# 获取 AI Agent 实例
ai_agent = ChatAIAgent.get_instance()
# 清空历史
success = ai_agent.clear_history()
if success:
result_msg = """✅ **历史已清空**
AI 的全局对话历史已成功清空。
AI 将不再记得之前的对话内容。"""
else:
result_msg = """❌ **清空失败**
清空历史时出现错误,请稍后重试。"""
return await self.send_markdown_message(result_msg, chat_id, user_id)
except Exception as e:
logger.Log("Error", f"清空历史异常: {e}")
error_msg = f"""❌ **清空历史时出错**
错误信息:{str(e)}
请稍后重试或联系管理员。"""
return await self.send_markdown_message(error_msg, chat_id, user_id)

View File

@@ -0,0 +1,415 @@
from Plugins.WPSAPI import *
import httpx
import re
from datetime import datetime, timedelta
from llama_index.core.tools import FunctionTool
from llama_index.llms.ollama import Ollama
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.agent.workflow.react_agent import ReActAgent
from bs4 import BeautifulSoup
import requests
logger: ProjectConfig = Architecture.Get(ProjectConfig)
OLLAMA_URL = logger.FindItem("ollama_url", "http://ollama.liubai.site")
OLLAMA_MODEL = logger.FindItem("ollama_model", "qwen2.5:7b")
logger.SaveProperties()
def get_target_web_page_url(year:str, month:str, day:str) -> str:
return fr"http://mrxwlb.com/{year}/{month}/{day}/{year}{month}{day}日新闻联播文字版/"
class NewsAIAgent:
"""新闻AI智能体 - 基于LlamaIndex和Ollama的工具调用智能体"""
def __init__(self, ollama_url: str = OLLAMA_URL):
self.ollama_url = ollama_url
self.client: Optional[httpx.AsyncClient] = None
self.llm = Ollama(model=OLLAMA_MODEL, base_url=ollama_url, request_timeout=600.0)
self.workflow: Optional[AgentWorkflow] = None
self._initialize_agent()
def _initialize_agent(self):
"""初始化智能体和工具"""
# 创建工具函数
tools = [
FunctionTool.from_defaults(
fn=self._get_current_date,
name="get_current_date",
description="获取当前日期,返回格式为'年-月-日',例如'2025-11-19'"
),
#FunctionTool.from_defaults(
# fn=self._get_yesterday_date,
# name="get_yesterday_date",
# description="获取昨天的日期,返回格式为'年-月-日',例如'2025-11-18'"
#),
FunctionTool.from_defaults(
fn=self._get_news_content,
name="get_news_content",
description="获取指定日期的新闻联播文字内容。参数date格式为'年-月-日',例如'2025-11-19'。返回该日期的新闻内容文本。"
),
#FunctionTool.from_defaults(
# fn=self._parse_date_from_text,
# name="parse_date_from_text",
# description="从文本中解析日期。支持格式:'2025年11月19日'、'2025-11-19'、'2025/11/19'、'今天'、'昨天'等。返回格式为'年-月-日'"
#),
]
# 系统提示词 - 更清晰明确的指令
system_prompt = """你是一个新闻分析助手,专门回答关于新闻联播的问题。
【重要】你必须按照以下步骤使用工具:
步骤1: 确定日期
- 如果用户问"今天""今日"或者是与今日日期相关的问题,调用 get_current_date 获取今日日期
步骤2: 获取新闻内容
- 拿到日期后,必须调用 get_news_content(date="YYYY-MM-DD") 获取新闻
- 注意date参数格式必须是 "年-月-日",例如 "2025-11-19"
步骤3: 回答问题
- 仔细阅读获取到的新闻内容
- 基于新闻内容准确回答用户的问题
- 如果新闻中没有相关信息,明确告知用户
【禁止】直接回答问题而不调用工具!你必须先获取新闻内容才能回答。"""
# 创建ReActAgent
agent = ReActAgent(
llm=self.llm,
tools=tools,
verbose=True,
system_prompt=system_prompt,
)
# 创建AgentWorkflow
self.workflow = AgentWorkflow(
agents=[agent],
timeout=600.0,
)
async def _get_client(self) -> httpx.AsyncClient:
"""获取HTTP客户端懒加载"""
if self.client is None:
self.client = httpx.AsyncClient(timeout=120.0)
return self.client
def _get_current_date(self) -> str:
"""获取当前日期
Returns:
当前日期字符串,格式:年-月-日
"""
now = datetime.now()
return f"{now.year}-{str(now.month).zfill(2)}-{str(now.day).zfill(2)}"
def _get_yesterday_date(self) -> str:
"""获取昨天日期
Returns:
昨天日期字符串,格式:年-月-日
"""
yesterday = datetime.now() - timedelta(days=1)
return f"{yesterday.year}-{str(yesterday.month).zfill(2)}-{str(yesterday.day).zfill(2)}"
def _parse_date_from_text(self, text: str) -> str:
"""从文本中解析日期
Args:
text: 包含日期信息的文本
Returns:
日期字符串,格式:年-月-日
"""
# 尝试匹配日期格式
date_patterns = [
r'(\d{4})年(\d{1,2})月(\d{1,2})日',
r'(\d{4})-(\d{1,2})-(\d{1,2})',
r'(\d{4})/(\d{1,2})/(\d{1,2})',
]
for pattern in date_patterns:
match = re.search(pattern, text)
if match:
year, month, day = match.groups()
return f"{year}-{month.zfill(2)}-{day.zfill(2)}"
# 检查相对日期
if "今天" in text or "今日" in text:
return self._get_current_date()
elif "昨天" in text or "昨日" in text:
return self._get_yesterday_date()
# 默认返回今天
return self._get_current_date()
def _get_news_content(self, date: str) -> str:
"""获取指定日期的新闻内容(带缓存)
Args:
date: 日期字符串,格式:年-月-日,例如'2025-11-19'
Returns:
新闻文字内容
"""
try:
# 解析日期
parts = date.split('-')
if len(parts) != 3:
return f"日期格式错误,请使用'年-月-日'格式,例如'2025-11-19'"
year, month, day = parts
# 去掉前导零某些网站URL格式要求
month = str(int(month))
day = str(int(day))
# 检查缓存
cache_key = f"news_cache/{year}/{month}/{day}"
cached_file = ProjectConfig().GetFile(cache_key, False)
if cached_file.Exists():
cached_content = cached_file.LoadAsText()
logger.Log("Info", f"从缓存加载新闻: {date}")
return cached_content
# 如果没有缓存,则抓取网页
logger.Log("Info", f"从网页抓取新闻: {date}")
# url = get_target_web_page_url(year, month, day)
url = ToolURL(get_target_web_page_url(year, month, day))
# 添加浏览器请求头,模拟真实浏览器访问
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
logger.Log("Info", f"请求URL: {url}")
response = requests.get(url, headers=headers, timeout=30, allow_redirects=True)
response.raise_for_status()
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
# 移除script和style标签
for script in soup(["script", "style"]):
script.decompose()
# 获取文本内容
content = soup.get_text()
# 清理空白字符
lines = (line.strip() for line in content.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
content = ' '.join(chunk for chunk in chunks if chunk)
if not content or len(content) < 100:
return f"未能获取到{year}{month}{day}日的新闻内容"
# 保存到缓存
ProjectConfig().GetFile(cache_key, True).SaveAsText(content)
logger.Log("Info", f"新闻内容已缓存: {date}")
return content
except Exception as e:
logger.Log("Error", f"获取新闻失败: {e}")
return f"获取新闻时出错: {str(e)}"
async def answer_question(self, query: str) -> str:
"""根据问题回答新闻内容
Args:
query: 用户问题
Returns:
AI生成的回答
"""
try:
logger.Log("Info", f"="*50)
logger.Log("Info", f"用户提问: {query}")
logger.Log("Info", f"使用模型: {self.llm.model}")
logger.Log("Info", f"="*50)
# 使用workflow运行agent注意参数是user_msg
result = await self.workflow.run(user_msg=query)
logger.Log("Info", f"Agent执行完成结果类型: {type(result)}")
# 提取回答内容
if hasattr(result, 'response'):
answer = str(result.response)
logger.Log("Info", f"从result.response提取答案")
elif hasattr(result, 'message'):
answer = str(result.message)
logger.Log("Info", f"从result.message提取答案")
else:
answer = str(result)
logger.Log("Info", f"直接转换result为字符串")
logger.Log("Info", f"最终答案长度: {len(answer)} 字符")
return answer
except Exception as e:
logger.Log("Error", f"回答问题失败: {e}")
import traceback
error_trace = traceback.format_exc()
logger.Log("Error", f"详细错误:\n{error_trace}")
return f"处理问题时出错: {str(e)}"
async def close(self):
"""关闭客户端"""
if self.client:
await self.client.aclose()
self.client = None
class NewsAIPlugin(WPSAPI):
"""新闻AI智能问答插件"""
def __init__(self):
super().__init__()
self.ai_agent = NewsAIAgent(OLLAMA_URL)
@override
def dependencies(self) -> List[Type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
def get_guide_title(self) -> str:
return "新闻AI智能问答"
def get_guide_subtitle(self) -> str:
return "基于AI的新闻联播智能问答系统"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"AI模型": "qwen3:0.6b",
"数据源": "每日新闻联播",
"功能": "智能问答",
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "新闻",
"identifier": "ask_news",
"description": "询问新闻内容,支持自动识别日期或查询今天/昨天的新闻。",
"metadata": {"别名": "news"},
"icon": "🤖",
"badge": "AI",
"details": [
{
"type": "list",
"items": [
"支持日期格式2024年11月19日、2024-11-19、2024/11/19",
"支持相对日期:今天、昨天、今日、昨日",
"自动使用AI分析新闻内容并回答问题",
"示例:问 今天有什么重要新闻?",
"示例:问 2024年11月19日 经济相关的新闻有哪些?",
]
}
]
},
#{
# "title": "新闻摘要",
# "identifier": "news_summary",
# "description": "获取指定日期新闻的AI摘要。",
# "metadata": {"别名": "摘要"},
# "icon": "📝",
# "badge": "AI",
#},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "智能问答",
"description": (
"使用AI理解用户问题从新闻内容中提取相关信息进行回答。"
"支持自然语言提问,可以询问特定主题、人物、事件等。"
),
"icon": "💬",
},
{
"title": "日期识别",
"description": (
"自动从问题中识别日期,支持多种格式。"
"如果未指定日期,默认查询当天新闻。"
),
"icon": "📅",
},
{
"title": "内容抓取",
"description": (
"自动从新闻网站抓取指定日期的新闻联播文字版内容。"
),
"icon": "🌐",
},
)
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}NewsAIPlugin 新闻AI智能问答插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("news")
self.register_plugin("新闻")
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
"""处理用户问题"""
try:
if not message or message.strip() == "":
help_text = """# 📰 新闻AI智能问答使用帮助
**直接提问即可,智能体会自动:**
1. 识别你想查询的日期
2. 获取对应日期的新闻内容
3. 基于新闻内容回答你的问题
**支持的日期格式:**
- 今天、昨天、今日、昨日
- 2025年11月19日
- 2025-11-19
- 2025/11/19
**示例问题:**
- `今天有什么重要新闻?`
- `2025年11月17日有什么新闻`
- `昨天的新闻中有关于经济的内容吗?`
- `今天习近平主席有什么活动?`
- `请总结今天新闻联播的主要内容`
**特性:**
✨ 智能理解问题意图
🤖 自动调用工具获取信息
💾 新闻内容自动缓存
🚀 基于LlamaIndex + Ollama"""
return await self.send_markdown_message(help_text, chat_id, user_id)
# 使用智能体回答问题
answer = await self.ai_agent.answer_question(message)
# 格式化返回结果
formatted_answer = f"""📰 **新闻AI智能问答**
{answer}
---
*由 LlamaIndex + Ollama 驱动*"""
return await self.send_markdown_message(formatted_answer, chat_id, user_id)
except Exception as e:
logger.Log("Error", f"新闻AI问答异常: {e}")
import traceback
error_detail = traceback.format_exc()
logger.Log("Error", f"详细错误: {error_detail}")
error_msg = f"""❌ **处理问题时出错**
错误信息:{str(e)}
请稍后重试或联系管理员。"""
return await self.send_markdown_message(error_msg, chat_id, user_id)

View File

View File

@@ -7,7 +7,6 @@ from PWF.Convention.Runtime.Web import ToolURL
from PWF.Convention.Runtime.String import LimitStringLength
from fastapi.responses import HTMLResponse
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Sequence, TypedDict, override, Union
import httpx
import re

View File

@@ -389,7 +389,7 @@ class WPSCombatBase(WPSAPI):
# 5.1. 注册冒险种子到菜园系统
self._register_adventure_seeds_to_garden()
self._register_legendary_alchemy_recipes()
self._register_alchemy_recipes()
# 6. 恢复过期任务和超时战斗
try:
@@ -606,31 +606,38 @@ class WPSCombatBase(WPSAPI):
f"{ConsoleFrontColor.YELLOW}注册冒险种子到菜园系统时出错: {e}{ConsoleFrontColor.RESET}"
)
def _register_legendary_alchemy_recipes(self) -> None:
"""注册传说装备的炼金链条"""
def _register_alchemy_recipes(self) -> None:
"""注册炼金配方"""
alchemy: WPSAlchemyGame = Architecture.Get(WPSAlchemyGame)
recipe_definitions = (
# 物品
(("alchemy_slag", "alchemy_slag", "alchemy_slag"), "combat_material_ore", "alchemy_slag", 0.34),
(("combat_potion_hp_small","combat_potion_hp_small","combat_potion_hp_small"), "combat_potion_hp_medium", "alchemy_slag", 0.75),
(("combat_potion_hp_medium","combat_potion_hp_medium","combat_potion_hp_medium"), "combat_potion_hp_large", "alchemy_slag", 0.75),
(("combat_material_ore","combat_material_ore","combat_material_gem"),"combat_material_crystal", "alchemy_ash", 0.75),
(("combat_material_gem","combat_material_gem","combat_material_crystal"),"combat_material_essence","alchemy_ash",0.75),
(("combat_material_crystal","combat_potion_atk","combat_potion_def"),"combat_material_essence","alchemy_ash",0.7),
# 护甲链
(("combat_material_ore", "garden_wood_maple", "combat_armor_chain"), "combat_armor_plate", 0.70),
(("combat_material_gem", "combat_material_crystal", "combat_armor_plate"), "combat_armor_sentinel", 0.50),
(("combat_armor_sentinel", "garden_wood_sakura", "combat_material_crystal"), "combat_armor_dragonheart", 0.30),
(("combat_armor_dragonheart", "combat_material_essence", "combat_armor_plate"), "combat_armor_guardian", 0.10),
(("combat_material_ore", "garden_wood_maple", "combat_armor_chain"), "combat_armor_plate", SPARK_DUST_ITEM_ID, 0.70),
(("combat_material_gem", "combat_material_crystal", "combat_armor_plate"), "combat_armor_sentinel", SPARK_DUST_ITEM_ID, 0.50),
(("combat_armor_sentinel", "garden_wood_sakura", "combat_material_crystal"), "combat_armor_dragonheart", SPARK_DUST_ITEM_ID, 0.30),
(("combat_armor_dragonheart", "combat_material_essence", "combat_armor_plate"), "combat_armor_guardian", SPARK_DUST_ITEM_ID, 0.10),
# 鞋子链
(("combat_material_ore", "garden_wood_ginkgo", "combat_boots_leather"), "combat_boots_rapid", 0.70),
(("combat_boots_rapid", "combat_material_gem", "combat_material_crystal"), "combat_boots_wind", 0.50),
(("combat_boots_wind", "combat_material_crystal", "garden_wood_ginkgo"), "combat_boots_tempest", 0.30),
(("combat_boots_tempest", "combat_material_essence", "garden_wine_maple"), "combat_boots_starlight", 0.10),
(("combat_material_ore", "garden_wood_ginkgo", "combat_boots_leather"), "combat_boots_rapid", SPARK_DUST_ITEM_ID, 0.70),
(("combat_boots_rapid", "combat_material_gem", "combat_material_crystal"), "combat_boots_wind", SPARK_DUST_ITEM_ID, 0.50),
(("combat_boots_wind", "combat_material_crystal", "garden_wood_ginkgo"), "combat_boots_tempest", SPARK_DUST_ITEM_ID, 0.30),
(("combat_boots_tempest", "combat_material_essence", "garden_wine_maple"), "combat_boots_starlight", SPARK_DUST_ITEM_ID, 0.10),
# 饰品链
(("combat_accessory_ring_str", "combat_material_gem", "garden_wood_sakura"), "combat_accessory_barrier", 0.70),
(("combat_accessory_barrier", "combat_material_crystal", "garden_wood_sakura"), "combat_accessory_amulet", 0.50),
(("combat_accessory_amulet", "combat_material_crystal", "garden_wine_sakura"), "combat_accessory_sanctum", 0.30),
(("combat_accessory_sanctum", "combat_material_essence", "combat_souvenir_relic"), "combat_accessory_aegis", 0.10),
(("combat_accessory_ring_str", "combat_material_gem", "garden_wood_sakura"), "combat_accessory_barrier", SPARK_DUST_ITEM_ID, 0.70),
(("combat_accessory_barrier", "combat_material_crystal", "garden_wood_sakura"), "combat_accessory_amulet", SPARK_DUST_ITEM_ID, 0.50),
(("combat_accessory_amulet", "combat_material_crystal", "garden_wine_sakura"), "combat_accessory_sanctum", SPARK_DUST_ITEM_ID, 0.30),
(("combat_accessory_sanctum", "combat_material_essence", "combat_souvenir_relic"), "combat_accessory_aegis", SPARK_DUST_ITEM_ID, 0.10),
)
for materials, success_item_id, success_rate in recipe_definitions:
for materials, success_item_id, fail_item_id, success_rate in recipe_definitions:
try:
alchemy.register_recipe(materials, success_item_id, SPARK_DUST_ITEM_ID, success_rate)
alchemy.register_recipe(materials, success_item_id, fail_item_id, success_rate)
except Exception as exc:
logger.Log(
"Warning",

View File

@@ -4,6 +4,7 @@ from PWF.Convention.Runtime.Config import *
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ProjectConfig
from datetime import datetime
import re
from PWF.CoreModules.plugin_interface import DatabaseModel, get_db
@@ -71,6 +72,7 @@ class WPSConfigAPI(WPSAPI):
@override
def wake_up(self) -> None:
self._sanitize_usernames()
logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSConfigAPI 插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("config")
self.register_plugin("cfg")
@@ -203,6 +205,36 @@ class WPSConfigAPI(WPSAPI):
value = record.get("username")
return str(value) if value else f"user_{user_id}"
def _sanitize_usernames(self) -> None:
cursor = get_db().conn.cursor()
try:
cursor.execute("SELECT user_id, username FROM user_info")
rows = cursor.fetchall()
except Exception as exc:
logger.Log("Error", f"查询 user_info 失败: {exc}")
return
updated = 0
for row in rows:
user_id = row["user_id"]
username = row["username"] or ""
if not username or not re.search(r"\s", username):
continue
sanitized = re.sub(r"\s+", "_", username.strip())
if sanitized == username:
continue
try:
cursor.execute(
"UPDATE user_info SET username = ? WHERE user_id = ?",
(sanitized, user_id),
)
updated += 1
except Exception as exc:
logger.Log("Error", f"更新 user_id={user_id} 的用户名失败: {exc}")
if updated:
get_db().conn.commit()
logger.Log("Info", f"已替换 {updated} 条含空白字符的用户名")
def find_user_id_by_username(self, username: str) -> Optional[int]:
text = (username or "").strip()
if not text:
@@ -215,6 +247,20 @@ class WPSConfigAPI(WPSAPI):
row = cursor.fetchone()
return int(row["user_id"]) if row else None
def get_all_usernames(self) -> List[Dict[str, Any]]:
"""获取所有已设置用户名的用户列表
Returns:
包含 user_id 和 username 的字典列表
示例: [{"user_id": 123, "username": "张三"}, ...]
"""
cursor = get_db().conn.cursor()
cursor.execute(
"SELECT user_id, username FROM user_info WHERE username != '' AND username IS NOT NULL ORDER BY user_id ASC"
)
rows = cursor.fetchall()
return [{"user_id": row["user_id"], "username": row["username"]} for row in rows]
def get_user_url(self, user_id: int) -> Optional[str]:
record = self._get_user_record(user_id)
if not record:

View File

@@ -0,0 +1,85 @@
from __future__ import annotations
from typing import Optional
from PWF.Convention.Runtime.Architecture import Architecture
from Plugins.WPSAPI import GuideEntry
from Plugins.WPSConfigSystem import WPSConfigAPI
from .WPSRedPacketBase import WPSRedPacketBase
class WPSExclusiveRedPacket(WPSRedPacketBase):
"""专属红包入口插件"""
def get_guide_subtitle(self) -> str:
return "发送给指定用户的专属红包"
def collect_command_entries(self):
return (
GuideEntry(
title="专属红包",
identifier="专属红包 <金额> <user_id|用户名>",
description="只允许指定用户领取,该用户也需要使用 `抢红包` 指令领取。",
),
)
def wake_up(self) -> None:
self.register_plugin("专属红包")
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
tokens = [token.strip() for token in payload.split() if token.strip()]
if len(tokens) < 2:
return await self.send_markdown_message(
self.format_error("指令格式:`专属红包 <金额> <用户ID|用户名>`"),
chat_id,
user_id,
)
service = self.red_packet_service()
amount_token = tokens[0]
target_token = tokens[1:]
try:
if not amount_token.isdigit():
raise ValueError("金额需要是正整数")
amount = int(amount_token)
target_identifier = " ".join(target_token).strip()
if not target_identifier:
raise ValueError("需要指定目标用户")
target_user_id = self._resolve_target_user(target_identifier)
if target_user_id is None:
raise ValueError("未找到目标用户")
target_username = self.config_api().get_user_name(target_user_id)
self.validate_amount_and_slots(amount, 1)
self.ensure_sufficient_points(user_id, amount)
packet = service.create_exclusive_packet(
chat_id,
user_id,
amount,
target_user_id,
target_username,
)
hint = f"- 专属对象:{target_username}(仅该用户可抢)"
return await self.send_markdown_message(
self.format_packet_message(packet, hint),
chat_id,
user_id,
)
except ValueError as exc:
return await self.send_markdown_message(self.format_error(str(exc)), chat_id, user_id)
def _resolve_target_user(self, identifier: str) -> Optional[int]:
text = identifier.strip()
if not text:
return None
if text.isdigit():
return int(text)
config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
return config_api.find_user_id_by_username(text)
__all__ = ["WPSExclusiveRedPacket"]

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from typing import Optional
from Plugins.WPSAPI import GuideEntry
from .WPSRedPacketBase import WPSRedPacketBase
class WPSPasswordRedPacket(WPSRedPacketBase):
"""口令红包入口插件"""
def get_guide_subtitle(self) -> str:
return "发布口令红包,需要准确口令才能领取"
def collect_command_entries(self):
return (
GuideEntry(
title="口令红包",
identifier="口令红包 <金额> <人数> <口令>",
description="设置好口令后,领取者需在抢红包时附带同样的口令。",
),
)
def wake_up(self) -> None:
self.register_plugin("口令红包")
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
tokens = [token.strip() for token in payload.split() if token.strip()]
service = self.red_packet_service()
try:
amount, slots, remaining = self.parse_amount_and_slots(tokens)
if not remaining:
raise ValueError("需要提供口令参数")
password = " ".join(remaining)
self.validate_amount_and_slots(amount, slots)
self.ensure_sufficient_points(user_id, amount)
packet = service.create_password_packet(chat_id, user_id, amount, slots, password)
hint = (
f"- 提示:抢红包时使用 ``抢红包 {packet.packet_id} {password}`` "
"(口令必须完全一致)。"
)
return await self.send_markdown_message(
self.format_packet_message(packet, hint),
chat_id,
user_id,
)
except ValueError as exc:
return await self.send_markdown_message(self.format_error(str(exc)), chat_id, user_id)
__all__ = ["WPSPasswordRedPacket"]

View File

@@ -0,0 +1,50 @@
from __future__ import annotations
from typing import Optional
from Plugins.WPSAPI import GuideEntry
from .WPSRedPacketBase import WPSRedPacketBase
class WPSRandomRedPacket(WPSRedPacketBase):
"""手气红包入口插件"""
def get_guide_subtitle(self) -> str:
return "发起手气红包,随机拆分积分给指定人数"
def collect_command_entries(self):
return (
GuideEntry(
title="红包",
identifier="红包 <金额> <人数>",
description="按照随机手气拆分积分,缺省金额/人数使用配置项。",
metadata={"别名": "手气红包"},
),
)
def wake_up(self) -> None:
self.register_plugin("红包")
self.register_plugin("手气红包")
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
tokens = [token.strip() for token in payload.split() if token.strip()]
service = self.red_packet_service()
try:
amount, slots, _ = self.parse_amount_and_slots(tokens)
self.validate_amount_and_slots(amount, slots)
self.ensure_sufficient_points(user_id, amount)
packet = service.create_random_packet(chat_id, user_id, amount, slots)
hint = f"- 提示:使用 `抢红包 {packet.packet_id}` 抢夺份额。"
return await self.send_markdown_message(
self.format_packet_message(packet, hint),
chat_id,
user_id,
)
except ValueError as exc:
return await self.send_markdown_message(self.format_error(str(exc)), chat_id, user_id)
__all__ = ["WPSRandomRedPacket"]

106
Plugins/WPSRedPacketBase.py Normal file
View File

@@ -0,0 +1,106 @@
from __future__ import annotations
from PWF.Convention.Runtime.Architecture import *
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from Plugins.WPSAPI import GuideEntry, WPSAPI
from Plugins.WPSConfigSystem import WPSConfigAPI
from .WPSRedPacketService import RedPacket, RedPacketType, WPSRedPacketService
logger: ProjectConfig = Architecture.Get(ProjectConfig)
class WPSRedPacketBase(WPSAPI):
"""红包插件公共基类,封装参数解析与输出模板。"""
@override
def wake_up(self) -> None:
pass
def dependencies(self) -> List[type]:
return [WPSAPI, WPSConfigAPI, WPSRedPacketService]
def is_enable_plugin(self) -> bool:
return True
def collect_command_entries(self) -> Sequence[GuideEntry]:
return ()
# region Helpers
def red_packet_service(self) -> WPSRedPacketService:
return Architecture.Get(WPSRedPacketService)
def config_api(self) -> WPSConfigAPI:
return Architecture.Get(WPSConfigAPI)
def parse_amount_and_slots(
self,
tokens: Sequence[str],
*,
require_slots: bool = True,
) -> Tuple[int, int, List[str]]:
service = self.red_packet_service()
remaining_tokens = list(tokens)
amount = service.get_default_amount()
slots = service.get_default_slots()
if remaining_tokens:
amount_token = remaining_tokens.pop(0)
if amount_token.isdigit():
amount = int(amount_token)
else:
remaining_tokens.insert(0, amount_token)
if require_slots and remaining_tokens:
slot_token = remaining_tokens.pop(0)
if slot_token.isdigit():
slots = int(slot_token)
else:
remaining_tokens.insert(0, slot_token)
elif not require_slots:
slots = 1
return amount, slots, remaining_tokens
def validate_amount_and_slots(self, amount: int, slots: int) -> None:
if amount <= 0:
raise ValueError("红包金额必须大于0")
if slots <= 0:
raise ValueError("红包份数必须大于0")
if amount < slots:
raise ValueError("红包金额不能小于份数")
def ensure_sufficient_points(self, user_id: int, amount: int) -> None:
points = self.config_api().get_user_points(user_id)
if points < amount:
raise ValueError("积分不足,无法发放红包")
def format_packet_message(self, packet: RedPacket, extra_hint: str | None = None) -> str:
type_label = {
RedPacketType.RANDOM: "手气红包",
RedPacketType.PASSWORD: "口令红包",
RedPacketType.EXCLUSIVE: "专属红包",
}.get(packet.packet_type, packet.packet_type.value)
lines = [
"# 🎁 红包已发出",
f"- 红包ID`{packet.packet_id}`",
f"- 类型:{type_label}",
f"- 总额:{packet.total_amount}",
f"- 份数:{packet.total_slots}",
f"- 指令:`抢红包 {packet.packet_id}`",
]
if packet.packet_type == RedPacketType.EXCLUSIVE and packet.exclusive_username:
lines.append(f"- 专属对象:{packet.exclusive_username}")
if extra_hint:
lines.append(extra_hint)
return "\n".join(lines)
def format_error(self, message: str) -> str:
logger.Log("Warning", f"{ConsoleFrontColor.YELLOW}{message}{ConsoleFrontColor.RESET}")
return f"{message}"
# endregion
__all__ = ["WPSRedPacketBase"]

View File

@@ -0,0 +1,55 @@
from __future__ import annotations
from typing import Optional, Sequence
from Plugins.WPSAPI import GuideEntry
from .WPSRedPacketBase import WPSRedPacketBase
class WPSRedPacketClaim(WPSRedPacketBase):
"""抢红包入口"""
def get_guide_subtitle(self) -> str:
return "通过红包ID领取积分奖励"
def collect_command_entries(self):
return (
GuideEntry(
title="抢红包",
identifier="抢红包 <红包ID> [口令]",
description="输入红包ID后领取剩余份额口令红包需附带口令。",
),
)
def wake_up(self) -> None:
self.register_plugin("抢红包")
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
payload = self.parse_message_after_at(message).strip()
tokens = [token.strip() for token in payload.split() if token.strip()]
if not tokens:
return await self.send_markdown_message(
self.format_error("指令格式:`抢红包 <红包ID> [口令]`"),
chat_id,
user_id,
)
packet_id = tokens[0]
extra_tokens: Sequence[str] = tokens[1:]
username = self.config_api().get_user_name(user_id)
service = self.red_packet_service()
success, message_text, _ = service.claim_packet(
chat_id,
packet_id,
user_id,
username,
extra_tokens,
)
if success:
return await self.send_markdown_message(message_text, chat_id, user_id)
return await self.send_markdown_message(message_text, chat_id, user_id)
__all__ = ["WPSRedPacketClaim"]

View File

@@ -0,0 +1,405 @@
from __future__ import annotations
import random
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import List, Optional, Sequence, Tuple
from uuid import uuid4
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
from PWF.CoreModules.plugin_interface import DatabaseModel
from Plugins.WPSAPI import GuideEntry, WPSAPI
from Plugins.WPSConfigSystem import WPSConfigAPI
logger: ProjectConfig = Architecture.Get(ProjectConfig)
class RedPacketType(str, Enum):
RANDOM = "random"
PASSWORD = "password"
EXCLUSIVE = "exclusive"
@dataclass(frozen=True)
class RedPacket:
packet_id: str
owner_id: int
packet_type: RedPacketType
total_amount: int
total_slots: int
remaining_amount: int
remaining_slots: int
exclusive_user_id: Optional[int] = None
exclusive_username: Optional[str] = None
password: Optional[str] = None
created_at: str = ""
@dataclass(frozen=True)
class RedPacketClaim:
packet_id: str
user_id: int
username: str
amount: int
claimed_at: str
class WPSRedPacketService(WPSAPI):
"""红包核心服务,负责数据存储与积分结算。"""
PACKET_TABLE = "red_packets"
CLAIM_TABLE = "red_packet_claims"
def __init__(self) -> None:
super().__init__()
self._default_amount: int = int(logger.FindItem("red_packet_default_amount", 100))
self._default_slots: int = int(logger.FindItem("red_packet_default_slots", 4))
logger.SaveProperties()
def get_guide_subtitle(self) -> str:
return "统一管理红包生命周期与积分结算的核心服务"
def collect_command_entries(self) -> Sequence[GuideEntry]:
return ()
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "默认配置",
"description": f"默认金额 {self._default_amount} 分,默认份数 {self._default_slots} 份,可通过 ProjectConfig 覆盖。",
},
{
"title": "积分结算",
"description": "发送红包时一次性扣除积分,领取时再发放给领取者,红包不支持退款或撤销。",
},
)
def dependencies(self) -> List[type]:
return [WPSAPI, WPSConfigAPI]
def is_enable_plugin(self) -> bool:
return True
def register_db_model(self):
return [
DatabaseModel(
table_name=self.PACKET_TABLE,
column_defs={
"packet_id": "TEXT PRIMARY KEY",
"owner_id": "INTEGER NOT NULL",
"packet_type": "TEXT NOT NULL",
"total_amount": "INTEGER NOT NULL",
"total_slots": "INTEGER NOT NULL",
"remaining_amount": "INTEGER NOT NULL",
"remaining_slots": "INTEGER NOT NULL",
"exclusive_user_id": "INTEGER",
"exclusive_username": "TEXT",
"password": "TEXT",
"created_at": "TEXT NOT NULL",
},
),
DatabaseModel(
table_name=self.CLAIM_TABLE,
column_defs={
"claim_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
"packet_id": "TEXT NOT NULL",
"user_id": "INTEGER NOT NULL",
"username": "TEXT",
"amount": "INTEGER NOT NULL",
"claimed_at": "TEXT NOT NULL",
},
),
]
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSRedPacketService 插件已加载{ConsoleFrontColor.RESET}",
)
# region 配置 / 公共接口
def get_default_amount(self) -> int:
return max(1, self._default_amount)
def get_default_slots(self) -> int:
return max(1, self._default_slots)
def generate_packet_id(self) -> str:
return uuid4().hex[:12]
def _config_api(self) -> WPSConfigAPI:
return Architecture.Get(WPSConfigAPI)
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
# endregion
# region 发红包
def create_random_packet(
self,
chat_id: int,
owner_id: int,
total_amount: int,
total_slots: int,
) -> RedPacket:
return self._create_packet(
chat_id=chat_id,
owner_id=owner_id,
total_amount=total_amount,
total_slots=total_slots,
packet_type=RedPacketType.RANDOM,
)
def create_password_packet(
self,
chat_id: int,
owner_id: int,
total_amount: int,
total_slots: int,
password: str,
) -> RedPacket:
if not password:
raise ValueError("口令不能为空")
return self._create_packet(
chat_id=chat_id,
owner_id=owner_id,
total_amount=total_amount,
total_slots=total_slots,
packet_type=RedPacketType.PASSWORD,
password=password,
)
def create_exclusive_packet(
self,
chat_id: int,
owner_id: int,
total_amount: int,
target_user_id: int,
target_username: str,
) -> RedPacket:
if target_user_id <= 0:
raise ValueError("目标用户ID无效")
return self._create_packet(
chat_id=chat_id,
owner_id=owner_id,
total_amount=total_amount,
total_slots=1,
packet_type=RedPacketType.EXCLUSIVE,
exclusive_user_id=target_user_id,
exclusive_username=target_username or f"user_{target_user_id}",
)
def _create_packet(
self,
*,
chat_id: int,
owner_id: int,
total_amount: int,
total_slots: int,
packet_type: RedPacketType,
password: Optional[str] = None,
exclusive_user_id: Optional[int] = None,
exclusive_username: Optional[str] = None,
) -> RedPacket:
if total_amount <= 0:
raise ValueError("红包金额必须大于0")
if total_slots <= 0:
raise ValueError("红包份数必须大于0")
if total_amount < total_slots:
raise ValueError("红包金额不能小于份数")
self._deduct_points(owner_id, total_amount, packet_type, chat_id)
packet_id = self.generate_packet_id()
created_at = self._now()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.PACKET_TABLE} (
packet_id, owner_id, packet_type,
total_amount, total_slots,
remaining_amount, remaining_slots,
exclusive_user_id, exclusive_username,
password, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
packet_id,
owner_id,
packet_type.value,
total_amount,
total_slots,
total_amount,
total_slots,
exclusive_user_id,
exclusive_username,
password,
created_at,
),
)
get_db().conn.commit()
return RedPacket(
packet_id=packet_id,
owner_id=owner_id,
packet_type=packet_type,
total_amount=total_amount,
total_slots=total_slots,
remaining_amount=total_amount,
remaining_slots=total_slots,
exclusive_user_id=exclusive_user_id,
exclusive_username=exclusive_username,
password=password,
created_at=created_at,
)
def _deduct_points(
self,
owner_id: int,
total_amount: int,
packet_type: RedPacketType,
chat_id: int,
) -> None:
config_api = self._config_api()
current_points = config_api.get_user_points(owner_id)
if current_points < total_amount:
raise ValueError("积分不足,无法发放红包")
config_api.adjust_user_points_sync(
owner_id,
-total_amount,
reason=f"发放{packet_type.value}红包(会话{chat_id}",
)
# endregion
# region 抢红包
def claim_packet(
self,
chat_id: int,
packet_id: str,
user_id: int,
username: str,
tokens: Optional[Sequence[str]] = None,
) -> Tuple[bool, str, Optional[int]]:
if not packet_id:
return False, "❌ 红包ID不能为空", None
conn = get_db().conn
cursor = conn.cursor()
try:
cursor.execute("BEGIN IMMEDIATE")
cursor.execute(
f"""
SELECT packet_id, owner_id, packet_type, total_amount, total_slots,
remaining_amount, remaining_slots, exclusive_user_id,
exclusive_username, password
FROM {self.PACKET_TABLE}
WHERE packet_id = ?
""",
(packet_id,),
)
row = cursor.fetchone()
if not row:
conn.rollback()
return False, "❌ 红包不存在或已失效", None
packet = self._row_to_packet(row)
if packet.remaining_slots <= 0 or packet.remaining_amount <= 0:
conn.rollback()
return False, "⚠️ 红包已经被抢完了", None
cursor.execute(
f"""
SELECT 1 FROM {self.CLAIM_TABLE}
WHERE packet_id = ? AND user_id = ?
""",
(packet_id, user_id),
)
if cursor.fetchone():
conn.rollback()
return False, "⚠️ 你已经抢过这个红包了", None
if packet.packet_type == RedPacketType.EXCLUSIVE:
if packet.exclusive_user_id != user_id:
conn.rollback()
return False, "❌ 这是专属红包,只有目标用户可以领取", None
if packet.packet_type == RedPacketType.PASSWORD:
provided = " ".join(tokens or []).strip()
if not provided:
conn.rollback()
return False, "❌ 这是口令红包,需要附带口令", None
if provided != (packet.password or ""):
conn.rollback()
return False, "❌ 口令不正确", None
grant_amount = self._allocate_amount(packet.remaining_amount, packet.remaining_slots)
cursor.execute(
f"""
UPDATE {self.PACKET_TABLE}
SET remaining_amount = remaining_amount - ?,
remaining_slots = remaining_slots - 1
WHERE packet_id = ?
""",
(grant_amount, packet_id),
)
cursor.execute(
f"""
INSERT INTO {self.CLAIM_TABLE} (packet_id, user_id, username, amount, claimed_at)
VALUES (?, ?, ?, ?, ?)
""",
(packet_id, user_id, username, grant_amount, self._now()),
)
conn.commit()
except Exception as exc:
conn.rollback()
logger.Log("Error", f"{ConsoleFrontColor.RED}领取红包失败: {exc}{ConsoleFrontColor.RESET}")
return False, "❌ 领取红包时出现错误,请稍后再试", None
config_api = self._config_api()
config_api.adjust_user_points_sync(
user_id,
grant_amount,
reason=f"抢到红包 {packet_id}(会话{chat_id}",
)
return True, f"✅ 领取成功,获得 {grant_amount} 分!", grant_amount
def _allocate_amount(self, remaining_amount: int, remaining_slots: int) -> int:
if remaining_slots <= 1:
return remaining_amount
min_per_slot = 1
max_available = remaining_amount - (remaining_slots - 1) * min_per_slot
avg = remaining_amount // remaining_slots
upper = max(min_per_slot, min(max_available, max(min_per_slot, avg * 2)))
return random.randint(min_per_slot, upper)
def _row_to_packet(self, row) -> RedPacket:
return RedPacket(
packet_id=row["packet_id"],
owner_id=row["owner_id"],
packet_type=RedPacketType(row["packet_type"]),
total_amount=row["total_amount"],
total_slots=row["total_slots"],
remaining_amount=row["remaining_amount"],
remaining_slots=row["remaining_slots"],
exclusive_user_id=row["exclusive_user_id"],
exclusive_username=row["exclusive_username"],
password=row["password"],
created_at="",
)
# endregion
__all__ = ["WPSRedPacketService", "RedPacketType", "RedPacket", "RedPacketClaim"]

18
requirements.txt Normal file
View File

@@ -0,0 +1,18 @@
# Web框架和服务器
fastapi
uvicorn
# HTTP客户端
httpx
requests
# 数据验证
pydantic
# HTML解析
beautifulsoup4
# AI框架 (LlamaIndex)
llama-index-core
llama-index-llms-ollama