Files
NewWPSBot/Plugins/Others/ChatAI.py
2025-11-20 15:40:03 +08:00

785 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from Plugins.WPSAPI import *
from datetime import datetime
from llama_index.llms.ollama import Ollama
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import FunctionTool
from llama_index.core.agent.workflow import AgentWorkflow
from llama_index.core.agent.workflow.react_agent import ReActAgent
from typing import List, Dict, Optional, Any
logger: ProjectConfig = Architecture.Get(ProjectConfig)
OLLAMA_URL = logger.FindItem("ollama_url", "http://ollama.liubai.site")
OLLAMA_MODEL = logger.FindItem("ollama_model", "qwen3:0.6b")
MAX_HISTORY = logger.FindItem("chat_ai_max_history", 100)
logger.SaveProperties()
class ChatAIAgent:
"""AI 对话智能体 - 维护全局会话历史"""
_instance: Optional['ChatAIAgent'] = None
def __init__(self, ollama_url: str = OLLAMA_URL, max_history: int = MAX_HISTORY):
"""初始化 AI 智能体
Args:
ollama_url: Ollama 服务地址
max_history: 最大历史消息数量
"""
self.ollama_url = ollama_url
self.max_history = max_history
self.llm = Ollama(model=OLLAMA_MODEL, base_url=ollama_url, request_timeout=600.0)
# 全局消息历史列表
self.global_history: List[Dict[str, any]] = []
# Agent 工作流和配置API
self.workflow: Optional[AgentWorkflow] = None
self.config_api = None # 延迟加载,避免循环依赖
# 初始化 Agent 和工具
self._initialize_agent()
logger.Log("Info", f"{ConsoleFrontColor.GREEN}ChatAIAgent 初始化完成{ConsoleFrontColor.RESET}")
logger.Log("Info", f"模型: {OLLAMA_MODEL}, 最大历史: {max_history}")
@classmethod
def get_instance(cls) -> 'ChatAIAgent':
"""获取单例实例"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def _initialize_agent(self) -> None:
"""初始化 ReActAgent 和工具函数"""
try:
# 延迟导入避免循环依赖
from Plugins.WPSConfigSystem import WPSConfigAPI
from PWF.Convention.Runtime.Architecture import Architecture
# 获取 WPSConfigAPI 实例
try:
self.config_api: WPSConfigAPI = Architecture.Get(WPSConfigAPI)
except Exception as e:
logger.Log("Warning", f"无法获取 WPSConfigAPI: {e}")
self.config_api = None
# 创建工具函数
tools = []
if self.config_api:
# 工具1: 根据用户ID查询用户名
def get_username_by_id(user_id: int) -> str:
"""根据用户ID查询用户名
Args:
user_id: 用户ID整数
Returns:
用户名字符串。如果用户未设置用户名,返回 'user_用户ID' 格式
Examples:
get_username_by_id(12345) -> "张三"
get_username_by_id(99999) -> "user_99999"
"""
try:
username = self.config_api.get_user_name(user_id)
return username if username else f"user_{user_id}"
except Exception as e:
logger.Log("Error", f"查询用户名失败 user_id={user_id}: {e}")
return f"user_{user_id}"
# 工具2: 根据用户名查询用户ID
def get_userid_by_name(username: str) -> str:
"""根据用户名查询用户ID
Args:
username: 用户名(字符串)
Returns:
如果找到用户返回用户ID字符串格式
如果未找到,返回 "未找到用户名为'{username}'的用户"
Examples:
get_userid_by_name("张三") -> "12345"
get_userid_by_name("不存在的用户") -> "未找到用户名为'不存在的用户'的用户"
"""
try:
user_id = self.config_api.find_user_id_by_username(username)
if user_id is not None:
return str(user_id)
return f"未找到用户名为'{username}'的用户"
except Exception as e:
logger.Log("Error", f"查询用户ID失败 username={username}: {e}")
return f"查询失败: {str(e)}"
# 工具3: 获取所有用户名列表
def list_all_usernames() -> str:
"""获取系统中所有已设置用户名的用户列表
Returns:
格式化的用户列表字符串,每行一个用户
格式: "用户ID: 用户名"
如果没有用户,返回 "当前系统中没有设置用户名的用户"
Examples:
返回示例:
"12345: 张三
67890: 李四
11111: 王五"
"""
try:
users = self.config_api.get_all_usernames()
if not users:
return "当前系统中没有设置用户名的用户"
lines = [f"{user['user_id']}: {user['username']}" for user in users]
return "\n".join(lines)
except Exception as e:
logger.Log("Error", f"获取用户列表失败: {e}")
return f"获取用户列表失败: {str(e)}"
# 注册工具
tools = [
FunctionTool.from_defaults(
fn=get_username_by_id,
name="get_username_by_id",
description="根据用户ID整数查询对应的用户名。用于当你知道用户ID需要知道该用户的名字时。"
),
FunctionTool.from_defaults(
fn=get_userid_by_name,
name="get_userid_by_name",
description="根据用户名字符串查询对应的用户ID。用于当你知道用户名需要知道该用户的ID时。"
),
FunctionTool.from_defaults(
fn=list_all_usernames,
name="list_all_usernames",
description="获取系统中所有已设置用户名的用户列表。用于当你需要了解系统中有哪些用户,或者用户询问'有哪些用户''谁在线'等问题时。"
),
]
# 创建系统提示词
system_prompt = self._build_system_prompt()
# 创建 ReActAgent
agent = ReActAgent(
llm=self.llm,
tools=tools,
verbose=True,
system_prompt=system_prompt,
)
# 创建 AgentWorkflow
self.workflow = AgentWorkflow(
agents=[agent],
timeout=600.0,
)
logger.Log("Info", f"{ConsoleFrontColor.GREEN}Agent 工具初始化完成,共 {len(tools)} 个工具{ConsoleFrontColor.RESET}")
except Exception as e:
logger.Log("Error", f"初始化 Agent 失败: {e}")
import traceback
logger.Log("Error", traceback.format_exc())
self.workflow = None
def _build_system_prompt(self) -> str:
"""构建详细的系统提示词"""
return """你是一个友好且智能的 AI 助手,能够同时在多个群聊中与不同用户对话。
# 核心能力
## 1. 对话历史理解
- 你能看到所有群聊的对话历史
- 每条消息都标注了:[时间] [群聊ID] [用户ID]
- 你需要根据上下文理解对话,区分不同群聊和用户
## 2. 用户信息工具 🔧
你拥有三个强大的工具来查询用户信息:
### 工具 1: get_username_by_id(user_id: int)
**用途**根据用户ID查询用户名
**参数**user_id整数类型
**返回**:用户名字符串
**使用场景**
- 当你看到对话历史中有用户ID想知道这个用户叫什么名字时
- 当用户问"用户12345是谁"
- 当你需要用更友好的称呼来提及某个用户时
**示例**
```
用户问:"用户12345是谁"
你的思考我需要查询用户ID 12345 的用户名
你的动作:调用 get_username_by_id(12345)
工具返回:"张三"
你的回复:"用户12345是张三。"
```
### 工具 2: get_userid_by_name(username: str)
**用途**根据用户名查询用户ID
**参数**username字符串类型
**返回**用户ID字符串或未找到的提示
**使用场景**
- 当用户提到某个人名你需要知道对应的用户ID时
- 当用户问"张三的ID是多少"
- 当你需要用 <at> 标签提及某个用户,但只知道名字时
**示例**
```
用户问:"张三的用户ID是多少"
你的思考:我需要查询名为"张三"的用户ID
你的动作:调用 get_userid_by_name("张三")
工具返回:"12345"
你的回复:"张三的用户ID是12345。"
```
### 工具 3: list_all_usernames()
**用途**:获取所有已设置用户名的用户列表
**参数**:无
**返回**:格式化的用户列表
**使用场景**
- 当用户问"系统里有哪些用户"
- 当用户问"谁在这里""有哪些人"
- 当你需要了解系统中的用户概况时
**示例**
```
用户问:"系统里有哪些用户?"
你的思考:我需要获取所有用户列表
你的动作:调用 list_all_usernames()
工具返回:
"12345: 张三
67890: 李四
11111: 王五"
你的回复:"系统中目前有以下用户:
- 张三ID: 12345
- 李四ID: 67890
- 王五ID: 11111"
```
## 3. 工具使用原则 ⚠️
**何时应该使用工具**
✅ 用户明确询问用户信息("XX是谁""XX的ID是多少"
✅ 对话中提到不认识的人名,需要确认身份
✅ 需要列举系统中的用户
✅ 需要将用户名转换为ID或ID转换为用户名
**何时不需要使用工具**
❌ 对话历史中已经明确显示了用户ID和相关信息
❌ 用户只是在普通聊天,没有涉及用户查询
❌ 你已经在同一轮对话中查询过相同的信息
**工具使用建议**
1. 优先检查对话历史,看是否已有所需信息
2. 一次对话中避免重复查询相同信息
3. 查询失败时,友好地告知用户
4. 工具返回结果后,用自然语言整理给用户
## 4. 用户提及规则 📢
**重要**:当你需要特指某个用户时,必须使用 `<at>` 标签!
**正确格式**
- `<at user_id="用户ID"></at>`
- `<at user_id="用户ID">用户名</at>`
**错误格式**(禁止):
- ❌ 直接输出数字ID"用户12345"
- ❌ 不使用标签:"@张三"
**示例**
```
用户问:"我和小明谁比较忙?"
你的思考:用户提到"小明"我需要查询小明的ID然后用<at>标签回复
步骤1调用 get_userid_by_name("小明")
步骤2假设返回 "12345"
你的回复:"我觉得<at user_id="12345">小明</at>可能比较忙,不过这只是我的猜测。"
```
## 5. 对话示例 💬
### 示例 1查询用户信息
```
[2025-11-20 10:00:00] [群聊#12345] [用户#11111]: 用户99999是谁
AI思考用户询问用户99999的身份我需要查询
AI动作调用 get_username_by_id(99999)
AI回复用户99999是王小明。
```
### 示例 2根据名字查ID
```
[2025-11-20 10:01:00] [群聊#12345] [用户#11111]: 李华的ID是多少
AI思考用户询问李华的ID
AI动作调用 get_userid_by_name("李华")
工具返回:"88888"
AI回复李华的用户ID是88888。
```
### 示例 3列出所有用户
```
[2025-11-20 10:02:00] [群聊#12345] [用户#11111]: 系统里有哪些用户?
AI思考用户想知道所有用户列表
AI动作调用 list_all_usernames()
工具返回:"12345: 张三\n67890: 李四"
AI回复系统中目前有以下用户
- 张三ID: 12345
- 李四ID: 67890
```
### 示例 4智能对话中使用工具
```
[2025-11-20 10:03:00] [群聊#12345] [用户#11111]: 帮我问问小红今天有空吗?
AI思考用户提到"小红",我需要用<at>标签提及她所以要先查询ID
AI动作调用 get_userid_by_name("小红")
工具返回:"55555"
AI回复<at user_id="55555">小红</at> 今天有空吗?用户<at user_id="11111"></at>想知道。
```
### 示例 5普通对话不需要工具
```
[2025-11-20 10:04:00] [群聊#12345] [用户#11111]: 今天天气真好
AI思考这是普通聊天不涉及用户查询不需要使用工具
AI回复是的今天天气很不错呢适合出去走走。
```
## 6. 错误处理 🔧
- 如果工具调用失败,友好地告知用户
- 如果查询不到用户,明确说明"未找到该用户"
- 如果不确定是否需要使用工具,优先使用工具确保准确性
## 7. 总结
你的职责:
1. 理解对话上下文,区分不同群聊和用户
2. 在需要时主动使用工具查询用户信息
3. 始终使用 <at> 标签提及用户
4. 提供友好、准确、有帮助的回复
记住:工具是帮助你更好地服务用户的,当不确定时,优先使用工具确保信息准确!"""
def _add_message(self, chat_id: int, user_id: int, role: str, content: str) -> None:
"""添加消息到全局历史
Args:
chat_id: 群聊ID
user_id: 用户ID
role: 角色 (user/assistant)
content: 消息内容
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = {
"timestamp": timestamp,
"chat_id": chat_id,
"user_id": user_id,
"role": role,
"content": content
}
self.global_history.append(message)
self._trim_history()
def _trim_history(self) -> None:
"""裁剪历史消息到最大长度"""
if len(self.global_history) > self.max_history:
# 保留最新的 max_history 条消息
self.global_history = self.global_history[-self.max_history:]
logger.Log("Info", f"历史消息已裁剪到 {self.max_history}")
def clear_history(self) -> bool:
"""清空全局历史
Returns:
是否成功
"""
try:
old_count = len(self.global_history)
self.global_history.clear()
logger.Log("Info", f"已清空全局历史,共删除 {old_count} 条消息")
return True
except Exception as e:
logger.Log("Error", f"清空历史失败: {e}")
return False
def _format_message_with_context(self, msg: Dict[str, any]) -> str:
"""格式化单条消息,包含上下文信息
Args:
msg: 消息字典
Returns:
格式化后的消息字符串
"""
timestamp = msg.get("timestamp", "")
chat_id = msg.get("chat_id", "")
user_id = msg.get("user_id", "")
role = msg.get("role", "")
content = msg.get("content", "")
if role == "user":
return f"[{timestamp}] [群聊#{chat_id}] [用户#{user_id}]: {content}"
else: # assistant
return f"[{timestamp}] [AI助手]: {content}"
def _format_history_for_llm(self) -> str:
"""格式化全局历史为 LLM 输入
Returns:
格式化后的历史字符串
"""
if not self.global_history:
return ""
formatted_messages = [
self._format_message_with_context(msg)
for msg in self.global_history
]
return "\n".join(formatted_messages)
async def chat(self, message: str, chat_id: int, user_id: int) -> str:
"""处理对话
Args:
message: 用户消息
chat_id: 群聊ID
user_id: 用户ID
Returns:
AI 回复
"""
try:
# 添加用户消息到历史
self._add_message(chat_id, user_id, "user", message)
# 检查是否成功初始化 workflow
if self.workflow is None:
logger.Log("Warning", "Workflow 未初始化,使用降级模式")
return await self._fallback_chat(message, chat_id, user_id)
# 格式化历史消息(不包括刚添加的用户消息)
history_without_last = self.global_history[:-1] if len(self.global_history) > 1 else []
history_text = "\n".join([
self._format_message_with_context(msg)
for msg in history_without_last
]) if history_without_last else "(这是第一条对话)"
# 构建当前用户消息的格式化版本
current_msg = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [群聊#{chat_id}] [用户#{user_id}]: {message}"
# 构建完整的用户查询,包含历史上下文
full_query = f"""# 历史对话记录
{history_text}
# 当前消息
{current_msg}
# 任务
请作为 AI 助手回复当前用户。记住:
1. 如果需要查询用户信息,主动使用工具
2. 使用 <at user_id="用户ID"></at> 格式提及用户
3. 根据对话历史和当前消息,给出有意义的回复"""
logger.Log("Info", f"处理对话 - 群聊#{chat_id} 用户#{user_id}")
logger.Log("Info", f"当前历史消息数: {len(self.global_history)}")
# 使用 workflow 运行 agent
result = await self.workflow.run(user_msg=full_query)
# 提取回答
if hasattr(result, 'response'):
answer = str(result.response)
elif hasattr(result, 'message'):
answer = str(result.message)
else:
answer = str(result)
# 添加 AI 回复到历史
self._add_message(chat_id, user_id, "assistant", answer)
logger.Log("Info", f"AI 回复长度: {len(answer)} 字符")
return answer
except Exception as e:
logger.Log("Error", f"对话处理失败: {e}")
import traceback
error_trace = traceback.format_exc()
logger.Log("Error", f"详细错误:\n{error_trace}")
# 尝试降级处理
try:
return await self._fallback_chat(message, chat_id, user_id)
except:
return f"处理对话时出错: {str(e)}"
async def _fallback_chat(self, message: str, chat_id: int, user_id: int) -> str:
"""降级对话处理(当 workflow 不可用时)
Args:
message: 用户消息
chat_id: 群聊ID
user_id: 用户ID
Returns:
AI 回复
"""
logger.Log("Info", "使用降级模式处理对话")
# 格式化历史消息(不包括刚添加的用户消息)
history_without_last = self.global_history[:-1] if len(self.global_history) > 1 else []
history_text = "\n".join([
self._format_message_with_context(msg)
for msg in history_without_last
]) if history_without_last else "(这是第一条对话)"
# 构建当前用户消息的格式化版本
current_msg = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [群聊#{chat_id}] [用户#{user_id}]: {message}"
# 构建对话提示
conversation_prompt = f"""以下是历史对话记录:
{history_text}
当前消息:
{current_msg}
请作为AI助手回复当前用户。"""
# 使用简单的系统提示词
simple_prompt = """你是一个友好的AI助手。
- 使用 <at user_id="用户ID"></at> 格式来提及用户
- 根据对话历史和当前消息,给出有意义的回复"""
# 使用 achat 方法进行对话
messages = [
ChatMessage(role="system", content=simple_prompt),
ChatMessage(role="user", content=conversation_prompt)
]
response = await self.llm.achat(messages)
answer = str(response.message.content)
return answer
class ChatAIPlugin(WPSAPI):
"""AI 对话插件"""
def __init__(self):
super().__init__()
self.ai_agent = ChatAIAgent.get_instance()
@override
def dependencies(self) -> List[Type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
def get_guide_title(self) -> str:
return "AI 智能对话"
def get_guide_subtitle(self) -> str:
return "基于 LlamaIndex + Ollama 的多群聊 AI 对话系统"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"AI模型": OLLAMA_MODEL,
"最大历史": str(MAX_HISTORY),
"功能": "跨群聊智能对话",
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "AI对话",
"identifier": "ai_chat",
"description": "与AI助手对话AI能看到所有群聊的历史消息并智能回复。",
"metadata": {"模型": OLLAMA_MODEL},
"icon": "🤖",
"badge": "AI",
"details": [
{
"type": "list",
"items": [
"AI 维护全局会话历史,能看到所有群聊的对话",
"每条消息包含时间戳、群聊ID、用户ID",
f"最多保留最近 {MAX_HISTORY} 条消息",
"AI 会使用 @用户 格式回复",
"示例ai_chat 你好",
"示例ai_chat 刚才其他群聊说了什么?",
]
}
]
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "全局会话",
"description": (
"AI 维护一个全局的对话历史,能够跨群聊、跨用户理解上下文。"
"所有群聊的对话都在同一个历史中AI 能够关联不同群聊的信息。"
),
"icon": "🌐",
},
{
"title": "智能回复",
"description": (
"AI 能够识别消息来源(群聊、用户、时间),并使用 @ 格式回复特定用户。"
),
"icon": "💬",
},
{
"title": "历史管理",
"description": (
f"自动保留最近 {MAX_HISTORY} 条消息,超出部分自动清理。"
"可使用 ai_chat_clear 指令手动清空所有历史。"
),
"icon": "📝",
},
)
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}ChatAIPlugin AI对话插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("ai_chat")
self.register_plugin("default")
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
"""处理用户对话"""
try:
if not message or message.strip() == "":
help_text = f"""# 🤖 AI 智能对话使用帮助
**直接发送消息即可与 AI 对话**
**特性:**
- ✨ AI 能看到所有群聊的对话历史
- 🌐 跨群聊智能理解上下文
- 💬 自动使用 @ 格式回复用户
- 📝 保留最近 {MAX_HISTORY} 条消息
**示例:**
- `ai_chat 你好`
- `ai_chat 今天天气怎么样?`
- `ai_chat 刚才其他群聊说了什么?`
- `ai_chat 请总结一下最近的对话`
**清空历史:**
使用 `ai_chat_clear` 指令清空所有历史消息
**技术:**
基于 LlamaIndex + Ollama ({OLLAMA_MODEL})"""
return await self.send_markdown_message(help_text, chat_id, user_id)
# 使用智能体处理对话
answer = await self.ai_agent.chat(message, chat_id, user_id)
# 格式化返回结果
formatted_answer = answer
return await self.send_markdown_message(formatted_answer, chat_id, user_id)
except Exception as e:
logger.Log("Error", f"AI对话异常: {e}")
import traceback
error_detail = traceback.format_exc()
logger.Log("Error", f"详细错误: {error_detail}")
error_msg = f"""❌ **处理对话时出错**
错误信息:{str(e)}
请稍后重试或联系管理员。"""
return await self.send_markdown_message(error_msg, chat_id, user_id)
class ChatAIClearPlugin(WPSAPI):
"""清空 AI 对话历史插件"""
def __init__(self):
super().__init__()
@override
def dependencies(self) -> List[Type]:
return [ChatAIPlugin]
@override
def is_enable_plugin(self) -> bool:
return True
def get_guide_title(self) -> str:
return "清空 AI 对话历史"
def get_guide_subtitle(self) -> str:
return "清空全局 AI 对话历史记录"
def get_guide_metadata(self) -> Dict[str, str]:
return {
"功能": "历史管理",
"作用范围": "全局",
}
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "清空历史",
"identifier": "ai_chat_clear",
"description": "清空 AI 的全局对话历史记录。",
"icon": "🗑️",
"badge": "管理",
"details": [
{
"type": "list",
"items": [
"清空所有群聊的对话历史",
"AI 将不再记得之前的对话",
"不影响 AI 的基础功能",
"示例ai_chat_clear",
]
}
]
},
)
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}ChatAIClearPlugin 清空历史插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("ai_chat_clear")
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
"""清空对话历史"""
try:
# 获取 AI Agent 实例
ai_agent = ChatAIAgent.get_instance()
# 清空历史
success = ai_agent.clear_history()
if success:
result_msg = """✅ **历史已清空**
AI 的全局对话历史已成功清空。
AI 将不再记得之前的对话内容。"""
else:
result_msg = """❌ **清空失败**
清空历史时出现错误,请稍后重试。"""
return await self.send_markdown_message(result_msg, chat_id, user_id)
except Exception as e:
logger.Log("Error", f"清空历史异常: {e}")
error_msg = f"""❌ **清空历史时出错**
错误信息:{str(e)}
请稍后重试或联系管理员。"""
return await self.send_markdown_message(error_msg, chat_id, user_id)