"""AI对话游戏模块""" import json import logging import asyncio import time from pathlib import Path from typing import Optional, Dict, Any, List from games.base import BaseGame from utils.parser import CommandParser logger = logging.getLogger(__name__) # 全局字典:存储每个chat_id的延迟任务句柄 _pending_tasks: Dict[int, asyncio.Task] = {} # 全局字典:存储每个chat_id的待处理消息队列 _message_queues: Dict[int, List[Dict[str, Any]]] = {} # 全局字典:存储每个chat_id的ChatEngine实例 _chat_engines: Dict[int, Any] = {} class AIChatGame(BaseGame): """AI对话游戏""" def __init__(self): """初始化游戏""" super().__init__() self.config_file = Path(__file__).parent.parent / "data" / "ai_config.json" self.wait_window = 10 # 固定10秒等待窗口 async def handle(self, command: str, chat_id: int, user_id: int) -> str: """处理AI对话指令 Args: command: 指令,如 ".ai 问题" 或 ".aiconfig host=xxx port=xxx model=xxx" chat_id: 会话ID user_id: 用户ID Returns: 回复消息 """ try: # 提取指令和参数 cmd, args = CommandParser.extract_command_args(command) args = args.strip() # 判断是配置指令还是AI对话指令 if cmd == '.aiconfig': return await self._handle_config(args, chat_id, user_id) else: # .ai 指令 return await self._handle_ai(args, chat_id, user_id) except Exception as e: logger.error(f"处理AI对话指令错误: {e}", exc_info=True) return f"❌ 处理指令出错: {str(e)}" async def _handle_ai(self, content: str, chat_id: int, user_id: int) -> str: """处理AI对话请求 Args: content: 消息内容 chat_id: 会话ID user_id: 用户ID Returns: 回复消息 """ # 如果内容为空,返回帮助信息 if not content: return self.get_help() # 将消息加入队列 self._add_to_queue(chat_id, user_id, content) # 取消旧的延迟任务(如果存在) if chat_id in _pending_tasks: old_task = _pending_tasks[chat_id] if not old_task.done(): old_task.cancel() try: await old_task except asyncio.CancelledError: pass # 创建新的延迟任务 task = asyncio.create_task(self._delayed_response(chat_id)) _pending_tasks[chat_id] = task # 不返回确认消息,静默处理 return "" async def _handle_config(self, args: str, chat_id: int, user_id: int) -> str: """处理配置请求 Args: args: 配置参数,格式如 "host=localhost port=11434 model=llama3.1" chat_id: 会话ID user_id: 用户ID Returns: 配置确认消息 """ if not args: return "❌ 请提供配置参数\n\n格式:`.aiconfig host=xxx port=xxx model=xxx`\n\n示例:`.aiconfig host=localhost port=11434 model=llama3.1`" # 解析配置参数 config_updates = {} parts = args.split() for part in parts: if '=' in part: key, value = part.split('=', 1) key = key.strip().lower() value = value.strip() if key == 'host': config_updates['host'] = value elif key == 'port': try: config_updates['port'] = int(value) except ValueError: return f"❌ 端口号必须是数字:{value}" elif key == 'model': config_updates['model'] = value else: return f"❌ 未知的配置项:{key}\n\n支持的配置项:host, port, model" if not config_updates: return "❌ 未提供有效的配置参数" # 加载现有配置 current_config = self._load_config() # 更新配置 current_config.update(config_updates) # 保存配置 if self._save_config(current_config): # 清除所有ChatEngine缓存(配置变更需要重新创建) _chat_engines.clear() return f"✅ 配置已更新\n\n**当前配置**:\n- 地址:{current_config['host']}\n- 端口:{current_config['port']}\n- 模型:{current_config['model']}" else: return "❌ 保存配置失败,请稍后重试" def _add_to_queue(self, chat_id: int, user_id: int, content: str) -> None: """将消息加入等待队列 Args: chat_id: 会话ID user_id: 用户ID content: 消息内容 """ if chat_id not in _message_queues: _message_queues[chat_id] = [] _message_queues[chat_id].append({ "user_id": user_id, "content": content, "timestamp": int(time.time()) }) async def _delayed_response(self, chat_id: int) -> None: """延迟回答任务 Args: chat_id: 会话ID """ try: # 等待固定时间窗口 await asyncio.sleep(self.wait_window) # 检查队列中是否有消息 if chat_id in _message_queues and _message_queues[chat_id]: # 生成回答 response = await self._generate_response(chat_id) # 清空队列 _message_queues[chat_id] = [] # 发送回答 if response: from utils.message import get_message_sender sender = get_message_sender() await sender.send_text(response) # 从pending_tasks中移除任务句柄 if chat_id in _pending_tasks: del _pending_tasks[chat_id] except asyncio.CancelledError: # 任务被取消,正常情况,不需要记录错误 logger.debug(f"延迟任务被取消: chat_id={chat_id}") if chat_id in _pending_tasks: del _pending_tasks[chat_id] except Exception as e: logger.error(f"延迟回答任务错误: {e}", exc_info=True) if chat_id in _pending_tasks: del _pending_tasks[chat_id] async def _generate_response(self, chat_id: int) -> Optional[str]: """使用LLM生成回答 Args: chat_id: 会话ID Returns: 回答文本 """ try: # 获取队列消息 if chat_id not in _message_queues or not _message_queues[chat_id]: return None messages = _message_queues[chat_id].copy() # 获取ChatEngine实例 chat_engine = self._get_chat_engine(chat_id) if not chat_engine: return "❌ AI服务初始化失败,请检查配置" # 将消息按用户角色格式化并添加到ChatMemoryBuffer # 构建合并的消息内容(包含用户信息) merged_content = "" for msg in messages: user_id = msg['user_id'] role = self._get_user_role(chat_id, user_id) merged_content += f"[{role}]: {msg['content']}\n" # 去掉最后的换行 merged_content = merged_content.strip() # 调用ChatEngine生成回答 # chat_engine是一个字典,包含llm, memory, system_prompt llm = chat_engine['llm'] memory = chat_engine['memory'] system_prompt = chat_engine['system_prompt'] # 构建完整的消息(包含系统提示和历史对话) full_message = f"{system_prompt}\n\n{merged_content}" # 使用LLM生成回答(同步调用,在线程池中执行) response = await asyncio.to_thread(llm.complete, full_message) # 返回回答文本 return str(response) except Exception as e: error_msg = str(e) logger.error(f"生成AI回答错误: {e}", exc_info=True) # 获取当前配置以便在错误消息中显示 try: config = self._load_config() config_info = f"\n\n当前配置:\n- 地址: {config['host']}\n- 端口: {config['port']}\n- 模型: {config['model']}" except: config_info = "" # 提供更友好的错误消息 if "Server disconnected" in error_msg or "RemoteProtocolError" in error_msg: config = self._load_config() test_cmd = f"curl -X POST http://{config.get('host', 'localhost')}:{config.get('port', 11434)}/api/generate -d '{{\"model\": \"{config.get('model', 'qwen3:0.6b')}\", \"prompt\": \"你是谁\", \"stream\": false}}'" return f"❌ AI服务连接失败,请检查:\n1. Ollama服务是否已启动(在笔记本上)\n2. NPS端口转发是否正常工作\n3. 配置的地址是否为服务器IP(不是localhost)\n4. 模型名称是否正确\n\n测试命令(在服务器上执行):\n{test_cmd}{config_info}\n\n使用 `.aiconfig` 命令检查和修改配置" elif "ConnectionError" in error_msg or "ConnectTimeout" in error_msg or "actively refused" in error_msg.lower() or "Empty reply" in error_msg: return f"❌ 无法连接到Ollama服务(连接被拒绝)\n\n根据NPS日志,笔记本上的Ollama服务拒绝了连接。\n\n**解决方案**:\n\n1. **检查Ollama是否运行**:\n 在笔记本上运行:`tasklist | findstr ollama` 或 `Get-Process | Where-Object {{$_.ProcessName -like '*ollama*'}}`\n\n2. **设置Ollama监听地址**(重要!):\n 在Windows上,Ollama默认只监听127.0.0.1,需要设置为0.0.0.0才能被NPS访问\n \n 方法A:设置环境变量(推荐)\n 1. 打开系统环境变量设置\n 2. 添加环境变量:`OLLAMA_HOST=0.0.0.0:11434`\n 3. 重启Ollama服务或重启电脑\n \n 方法B:在命令行启动时指定\n ```powershell\n $env:OLLAMA_HOST=\"0.0.0.0:11434\"\n ollama serve\n ```\n\n3. **检查Windows防火墙**:\n 确保允许11434端口的入站连接\n\n4. **验证监听地址**:\n 在笔记本上运行:`netstat -an | findstr 11434`\n 应该看到 `0.0.0.0:11434` 而不是 `127.0.0.1:11434`\n\n5. **测试本地连接**:\n 在笔记本上运行:`curl http://localhost:11434/api/tags`\n 应该返回模型列表\n\n提示:如果使用NPS转发,配置中的host应该是服务器的IP地址,不是localhost!{config_info}\n\n使用 `.aiconfig host=服务器IP` 修改配置" elif "timeout" in error_msg.lower(): return f"❌ AI服务响应超时,请稍后重试{config_info}" else: return f"❌ 生成回答时出错: {error_msg}{config_info}" def _get_chat_engine(self, chat_id: int) -> Any: """获取或创建ChatEngine实例 Args: chat_id: 会话ID Returns: ChatEngine实例 """ # 检查是否已存在 if chat_id in _chat_engines: return _chat_engines[chat_id] try: # 加载配置 config = self._load_config() # 导入llama_index模块 from llama_index.llms.ollama import Ollama from llama_index.core.memory import ChatMemoryBuffer from llama_index.core import ChatPromptTemplate, Settings # 创建Ollama LLM实例 # 添加超时设置以避免长时间等待 llm = Ollama( model=config['model'], base_url=f"http://{config['host']}:{config['port']}", timeout=120.0 # 120秒超时 ) # 设置全局LLM Settings.llm = llm # 创建ChatMemoryBuffer(设置足够的token_limit确保保留30+轮对话) memory = ChatMemoryBuffer.from_defaults(token_limit=8000) # 系统提示 system_prompt = ( "这是一个多用户对话场景,不同用户的发言会用不同的角色标识(如'用户1'、'用户2'等)。" "你需要理解不同用户的发言内容,并根据上下文给出合适的回复。" "请用自然、友好的方式与用户交流。" ) # 创建对话引擎 # 由于llama_index的API可能在不同版本有变化,这里使用基本的chat接口 # 实际使用时可能需要根据llama_index的版本调整 chat_engine = { 'llm': llm, 'memory': memory, 'system_prompt': system_prompt } # 存储到全局字典 _chat_engines[chat_id] = chat_engine return chat_engine except ImportError as e: logger.error(f"导入llama_index模块失败: {e}") return None except Exception as e: logger.error(f"创建ChatEngine失败: {e}", exc_info=True) return None def _get_user_role(self, chat_id: int, user_id: int) -> str: """获取用户角色名称(创建或获取映射) Args: chat_id: 会话ID user_id: 用户ID Returns: 角色名称 """ # 获取现有映射 user_mapping, user_count = self._get_user_mapping(chat_id) user_id_str = str(user_id) # 如果用户已存在,返回角色名称 if user_id_str in user_mapping: return user_mapping[user_id_str] # 新用户,分配角色 user_count += 1 role_name = f"用户{user_count}" user_mapping[user_id_str] = role_name # 保存到数据库 state_data = { "user_mapping": user_mapping, "user_count": user_count } self.db.save_game_state(chat_id, 0, 'ai_chat', state_data) return role_name def _get_user_mapping(self, chat_id: int) -> tuple[Dict[str, str], int]: """获取用户角色映射和计数 Args: chat_id: 会话ID Returns: (用户映射字典, 用户计数) """ # 从数据库获取映射 state = self.db.get_game_state(chat_id, 0, 'ai_chat') if state and state.get('state_data'): user_mapping = state['state_data'].get('user_mapping', {}) user_count = state['state_data'].get('user_count', 0) else: user_mapping = {} user_count = 0 return user_mapping, user_count def _load_config(self) -> Dict[str, Any]: """从JSON文件加载配置 Returns: 配置字典 """ # 如果文件不存在,创建默认配置 if not self.config_file.exists(): default_config = { "host": "localhost", "port": 11434, "model": "llama3.1" } self._save_config(default_config) return default_config try: with open(self.config_file, 'r', encoding='utf-8') as f: config = json.load(f) # 确保所有必需的字段存在 if 'host' not in config: config['host'] = "localhost" if 'port' not in config: config['port'] = 11434 if 'model' not in config: config['model'] = "llama3.1" return config except Exception as e: logger.error(f"加载配置文件失败: {e}", exc_info=True) # 返回默认配置 return { "host": "localhost", "port": 11434, "model": "llama3.1" } def _save_config(self, config: Dict[str, Any]) -> bool: """保存配置到JSON文件 Args: config: 配置字典 Returns: 是否成功 """ try: # 确保目录存在 self.config_file.parent.mkdir(parents=True, exist_ok=True) # 写入JSON文件 with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(config, f, indent=4, ensure_ascii=False) return True except Exception as e: logger.error(f"保存配置文件失败: {e}", exc_info=True) return False def get_help(self) -> str: """获取帮助信息 Returns: 帮助文本 """ return """## 🤖 AI对话系统帮助 ### 基本用法 - `.ai <问题>` - 向AI提问(支持多用户对话,等待10秒后回答) - `.aiconfig host=xxx port=xxx model=xxx` - 配置Ollama服务地址和模型 ### 配置示例 `.aiconfig host=localhost port=11434 model=llama3.1` ### 说明 - 多个用户可以在同一个会话中提问 - 系统会等待10秒,收集所有问题后统一回答 - 如果在等待期间有新消息,会重新计时 --- 💡 提示:确保Ollama服务已启动并配置正确 """