from __future__ import annotations from .WPSAPI import * from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Sequence import asyncio import copy import json import re import time import httpx logger: ProjectConfig = Architecture.Get(ProjectConfig) OLLAMA_URL = logger.FindItem("ollama_url", "http://localhost:11434") MODEL_NAME = logger.FindItem("model_name", "deepseek-r1:8b") OLLAMA_TIMEOUT = float(logger.FindItem("ollama_timeout", 120.0)) logger.SaveProperties() _CHAT_API_PATH = "/api/chat" _JSON_BLOCK_PATTERN = re.compile(r"\{[\s\S]*\}") @dataclass class HosterGameData: title: str soup: str truth: str clues: List[str] guidelines: List[str] intro: str created_at: float = field(default_factory=lambda: time.time()) creator_id: Optional[int] = None question_log: List[Dict[str, str]] = field(default_factory=list) class HosterState: _data: Optional[HosterGameData] = None _lock: asyncio.Lock = asyncio.Lock() @classmethod async def start_new(cls, data: HosterGameData) -> None: async with cls._lock: cls._data = data @classmethod async def clear(cls) -> None: async with cls._lock: cls._data = None @classmethod async def snapshot(cls) -> Optional[HosterGameData]: async with cls._lock: if cls._data is None: return None return copy.deepcopy(cls._data) @classmethod async def record_question(cls, question: str, answer: str) -> None: async with cls._lock: if cls._data is None: return cls._data.question_log.append({"question": question, "answer": answer}) @classmethod async def is_active(cls) -> bool: async with cls._lock: return cls._data is not None _http_client: Optional[httpx.AsyncClient] = None _http_lock: asyncio.Lock = asyncio.Lock() async def _get_http_client() -> httpx.AsyncClient: global _http_client async with _http_lock: if _http_client is None: _http_client = httpx.AsyncClient(timeout=httpx.Timeout(OLLAMA_TIMEOUT)) return _http_client def _ensure_json_block(text: str) -> Dict[str, Any]: stripped = text.strip() if stripped.startswith("```"): stripped = stripped.strip("`") if stripped.startswith("json"): stripped = stripped[4:].strip() if stripped.startswith("{") and stripped.endswith("}"): candidate = stripped else: match = _JSON_BLOCK_PATTERN.search(stripped) if not match: raise ValueError("未找到有效的JSON内容") candidate = match.group(0) return json.loads(candidate) async def _call_ollama(messages: Sequence[Dict[str, str]], *, temperature: float = 0.7) -> str: base_url = OLLAMA_URL.rstrip("/") payload = { "model": MODEL_NAME, "stream": False, "options": {"temperature": temperature}, "messages": list(messages), } client = await _get_http_client() response = await client.post(f"{base_url}{_CHAT_API_PATH}", json=payload) response.raise_for_status() data = response.json() message = data.get("message", {}) content = message.get("content", "").strip() if not content: raise ValueError("模型未返回内容") return content def _normalize_list(value: Any) -> List[str]: if isinstance(value, list): return [str(item).strip() for item in value if str(item).strip()] if isinstance(value, str): parts = [part.strip() for part in re.split(r"[\n,;]", value) if part.strip()] return parts return [] def _render_question_log(logs: Sequence[Dict[str, str]]) -> str: if not logs: return "暂无问答记录。" lines = ["问答记录:"] for idx, item in enumerate(logs, start=1): question = item.get("question", "") answer = item.get("answer", "") lines.append(f"{idx}. Q: {question}\n A: {answer}") return "\n".join(lines) STARTER_SYSTEM_PROMPT = ( "You are an experienced host of a lateral thinking puzzle game called 'Turtle Soup'. " "Design mysteries that are solvable, coherent, and fair. Do not reveal the hidden truth " "in outputs meant for players, but provide structured data for the host to manage the game." ) STARTER_USER_TEMPLATE = ( "请根据以下说明创作一个新的海龟汤谜题。务必输出严格的JSON对象,键名如下:\n" "title: 故事标题;\n" "intro: 1~2句的主持开场白;\n" "soup: 玩家可见的汤面描述;\n" "truth: 隐藏的真实全貌;\n" "clues: 3~5条关键线索字符串数组,用于主持人掌握纠偏要点;\n" "guidelines: 2~4条问答裁决提示数组,帮助主持人判断是/否;\n" "请保持故事逻辑自洽、可逆推、具有惊喜感,可以包含恐怖和魔法元素,虽然游戏名字叫做海龟汤但是主题不是海龟而是围绕人物的故事。若玩家附加需求:{extra_hint}\n" ) QNA_SYSTEM_PROMPT = ( "You are the impartial judge for a Turtle Soup puzzle. You know the hidden truth, but " "must respond to players strictly with '是' or '否'. If the question cannot be answered " "with yes/no, reply with '否'. Never reveal the truth or additional hints." ) QNA_USER_TEMPLATE = ( "汤面:{soup}\n隐藏真相:{truth}\n关键线索:{clues}\n主持指引:{guidelines}\n" "玩家提问:{question}\n" "如果能够回答, 请仅输出一个汉字:是 或 否。否则请输出: '与问题无关'。当玩家询问汤面时可以重新输出汤面。" ) REVEAL_TEMPLATE = ( "# 谜底揭晓\n\n" "**汤面回顾**\n{summary}\n\n" "**谜底真相**\n{truth}\n\n" "**关键线索**\n{clues}\n\n" "{log}\n" "游戏已结束,感谢参与。" ) async def _generate_story(extra_hint: str, user_id: int) -> HosterGameData: content = await _call_ollama( ( {"role": "system", "content": STARTER_SYSTEM_PROMPT}, {"role": "user", "content": STARTER_USER_TEMPLATE.format(extra_hint=extra_hint or "无附加需求")}, ), temperature=0.65, ) payload = _ensure_json_block(content) title = str(payload.get("title", "海龟汤谜题")).strip() or "海龟汤谜题" intro = str(payload.get("intro", "大家准备好脑洞了吗?")).strip() soup = str(payload.get("soup", "")).strip() truth = str(payload.get("truth", "")).strip() clues = _normalize_list(payload.get("clues", [])) guidelines = _normalize_list(payload.get("guidelines", [])) if not soup or not truth: raise ValueError("生成内容缺少必要字段") return HosterGameData( title=title, intro=intro, soup=soup, truth=truth, clues=clues, guidelines=guidelines, creator_id=user_id, ) async def _judge_question(story: HosterGameData, question: str) -> str: content = await _call_ollama( ( {"role": "system", "content": QNA_SYSTEM_PROMPT}, { "role": "user", "content": QNA_USER_TEMPLATE.format( soup=story.soup, truth=story.truth, clues="; ".join(story.clues) if story.clues else "无", guidelines="; ".join(story.guidelines) if story.guidelines else "保持公平", question=question, ), }, ), temperature=0.1, ) answer = content.strip() return answer class HosterSoupStarter(WPSAPI): @override def is_enable_plugin(self) -> bool: return True @override def wake_up(self) -> None: self.register_plugin("汤面") @override def get_guide_title(self) -> str: return "海龟汤主持人 · 汤面" @override def get_guide_subtitle(self) -> str: return "生成全新的海龟汤故事,开场并引导玩家提问" @override def get_guide_metadata(self) -> Dict[str, str]: return { "使用模型": MODEL_NAME, "Ollama 服务": OLLAMA_URL, } @override def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "汤面", "identifier": "汤面", "description": "调用大模型生成全新海龟汤故事,重置并开局。", "metadata": { "回答方式": "主持人自动播报汤面", "失败处理": "捕获异常并提示稍后重试", }, "icon": "🥣", "badge": "入口", }, ) @override def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "生成流程", "description": "依据预设提示词向 Ollama 请求故事,解析 JSON 字段并缓存为当前对局。", "details": [ "包含标题、开场白、汤面描述、隐藏真相、关键线索、裁决指引", "重复调用会覆盖旧故事,适用于强制重开", ], "icon": "🧠", }, { "title": "主持提示", "description": "生成成功后自动推送 Markdown,提醒玩家开始以是/否提问。", "details": [ "仅在缓存成功后才播报", "失败时返回错误信息,避免进入半成品状态", ], "icon": "📣", }, ) @override def collect_additional_sections(self) -> Sequence[GuideSection]: return ( GuideSection( title="注意事项", description="保持 Ollama 可用,必要时检查模型或网络。", entries=( { "title": "模型依赖", "description": "需提前在 Ollama 拉取对应模型,保证 /api/chat 可用。", "icon": "⚙️", }, { "title": "故事质量", "description": "若剧情不佳,可再次发送“汤面”刷新,旧记录会被覆盖。", "icon": "🔄", }, ), ), ) @override async def callback(self, message: str, chat_id: int, user_id: int) -> str | None: story = await _generate_story(message, user_id) if get_internal_verbose(): logger.Log("Info", f"新的海龟汤故事已生成: {story.title}\n{story.intro}\n{story.soup}\n{story.truth}\n{story.clues}\n{story.guidelines}") await HosterState.start_new(story) return await self.send_markdown_message( f"# {story.title}\n" f"{story.intro}\n\n" f"**汤面**:{story.soup}\n\n" f"现在可以开始提问啦!我只能回答`是`或`否`,使用其他指令会被忽略。" , chat_id, user_id) class HosterSoupQnA(WPSAPI): @override def is_enable_plugin(self) -> bool: return True @override def wake_up(self) -> None: self.register_plugin("default") @override def get_guide_title(self) -> str: return "海龟汤主持人 · 问答裁决" @override def get_guide_subtitle(self) -> str: return "监听玩家提问,只返回是或否" @override def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "默认问答", "identifier": "default", "description": "在游戏进行中处理玩家消息,调用模型裁定并返回是/否。", "metadata": { "无游戏": "提示玩家先使用汤面开启", "记录": "自动存档问答日志", }, "icon": "❓", "badge": "默认", }, ) @override def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "裁决机制", "description": "根据隐藏真相和主持指引向模型提问,只允许返回是或否,异常情况判定为否。", "icon": "⚖️", "details": [ "模型输出非规定内容时回退为“否”", "记录每次提问与回答以便揭晓阶段回顾", ], }, { "title": "玩家引导", "description": "若玩家发送空消息或未开局,则反馈提示语,避免无效提问。", "icon": "💬", }, ) @override def collect_additional_sections(self) -> Sequence[GuideSection]: return ( GuideSection( title="进阶运维", entries=( { "title": "提示词调优", "description": "可根据需求调整问答提示模板,提升准确性。", "icon": "🛠️", }, { "title": "负载注意", "description": "高频问答会占用模型资源,可在外层加限流。", "icon": "📊", }, ), ), ) @override async def callback(self, message: str, chat_id: int, user_id: int) -> str | None: story = await HosterState.snapshot() if story is None: return await self.send_markdown_message("目前没有正在进行的海龟汤,请先发送指令“汤面”开启新故事。") question = message.strip() if not question: return await self.send_markdown_message("请提出一个可用“是/否”回答的问题。") answer = await _judge_question(story, question) await HosterState.record_question(question, answer) return await self.send_markdown_message(answer, chat_id, user_id) class HosterSoupReveal(WPSAPI): @override def is_enable_plugin(self) -> bool: return True @override def wake_up(self) -> None: self.register_plugin("汤底") @override def get_guide_title(self) -> str: return "海龟汤主持人 · 汤底" @override def get_guide_subtitle(self) -> str: return "揭示谜底并清空当前游戏" @override def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "汤底", "identifier": "汤底", "description": "输出汤面回顾、隐藏真相、关键线索与问答记录,然后清除状态。", "metadata": { "无游戏": "提示玩家先开局", "重置": "揭晓后自动清空缓存", }, "icon": "🧾", "badge": "结局", }, ) @override def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "揭晓输出", "description": "以 Markdown 形式汇总剧情,方便玩家快速回顾。", "icon": "📜", "details": [ "包含汤面回顾、真实故事、关键线索", "附带问答记录,复盘互动过程", ], }, { "title": "状态管理", "description": "揭晓后调用 clear() 重置状态,避免旧故事残留。", "icon": "🧹", }, ) @override def collect_additional_sections(self) -> Sequence[GuideSection]: return ( GuideSection( title="主持建议", entries=( { "title": "复盘对话", "description": "揭晓后可引导玩家讨论推理过程,提高参与感。", "icon": "🤝", }, { "title": "二次开局", "description": "若需要继续游戏,直接使用“汤面”生成新故事。", "icon": "🚀", }, ), ), ) @override async def callback(self, message: str, chat_id: int, user_id: int) -> str | None: story = await HosterState.snapshot() if story is None: return await self.send_markdown_message("还没有汤面呢,发送“汤面”来创作新故事吧。") clues_block = "\n".join(f"- {item}" for item in story.clues) if story.clues else "- 暂无记录" log_block = _render_question_log(story.question_log) await HosterState.clear() logger.Log("Info", "海龟汤故事已揭晓并重置状态") return await self.send_markdown_message(REVEAL_TEMPLATE.format( summary=story.soup, truth=story.truth, clues=clues_block, log=log_block, ), chat_id, user_id)