Files
WPSHaiGuiTangBot/Plugins/HosterAPI.py
2025-11-13 17:15:45 +08:00

503 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from .WPSAPI import *
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Sequence
import asyncio
import copy
import json
import re
import time
import httpx
logger: ProjectConfig = Architecture.Get(ProjectConfig)
OLLAMA_URL = logger.FindItem("ollama_url", "http://localhost:11434")
MODEL_NAME = logger.FindItem("model_name", "deepseek-r1:8b")
OLLAMA_TIMEOUT = float(logger.FindItem("ollama_timeout", 120.0))
logger.SaveProperties()
_CHAT_API_PATH = "/api/chat"
_JSON_BLOCK_PATTERN = re.compile(r"\{[\s\S]*\}")
@dataclass
class HosterGameData:
title: str
soup: str
truth: str
clues: List[str]
guidelines: List[str]
intro: str
created_at: float = field(default_factory=lambda: time.time())
creator_id: Optional[int] = None
question_log: List[Dict[str, str]] = field(default_factory=list)
class HosterState:
_data: Optional[HosterGameData] = None
_lock: asyncio.Lock = asyncio.Lock()
@classmethod
async def start_new(cls, data: HosterGameData) -> None:
async with cls._lock:
cls._data = data
@classmethod
async def clear(cls) -> None:
async with cls._lock:
cls._data = None
@classmethod
async def snapshot(cls) -> Optional[HosterGameData]:
async with cls._lock:
if cls._data is None:
return None
return copy.deepcopy(cls._data)
@classmethod
async def record_question(cls, question: str, answer: str) -> None:
async with cls._lock:
if cls._data is None:
return
cls._data.question_log.append({"question": question, "answer": answer})
@classmethod
async def is_active(cls) -> bool:
async with cls._lock:
return cls._data is not None
_http_client: Optional[httpx.AsyncClient] = None
_http_lock: asyncio.Lock = asyncio.Lock()
async def _get_http_client() -> httpx.AsyncClient:
global _http_client
async with _http_lock:
if _http_client is None:
_http_client = httpx.AsyncClient(timeout=httpx.Timeout(OLLAMA_TIMEOUT))
return _http_client
def _ensure_json_block(text: str) -> Dict[str, Any]:
stripped = text.strip()
if stripped.startswith("```"):
stripped = stripped.strip("`")
if stripped.startswith("json"):
stripped = stripped[4:].strip()
if stripped.startswith("{") and stripped.endswith("}"):
candidate = stripped
else:
match = _JSON_BLOCK_PATTERN.search(stripped)
if not match:
raise ValueError("未找到有效的JSON内容")
candidate = match.group(0)
return json.loads(candidate)
async def _call_ollama(messages: Sequence[Dict[str, str]], *, temperature: float = 0.7) -> str:
base_url = OLLAMA_URL.rstrip("/")
payload = {
"model": MODEL_NAME,
"stream": False,
"options": {"temperature": temperature},
"messages": list(messages),
}
client = await _get_http_client()
response = await client.post(f"{base_url}{_CHAT_API_PATH}", json=payload)
response.raise_for_status()
data = response.json()
message = data.get("message", {})
content = message.get("content", "").strip()
if not content:
raise ValueError("模型未返回内容")
return content
def _normalize_list(value: Any) -> List[str]:
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
if isinstance(value, str):
parts = [part.strip() for part in re.split(r"[\n,;]", value) if part.strip()]
return parts
return []
def _render_question_log(logs: Sequence[Dict[str, str]]) -> str:
if not logs:
return "暂无问答记录。"
lines = ["问答记录:"]
for idx, item in enumerate(logs, start=1):
question = item.get("question", "")
answer = item.get("answer", "")
lines.append(f"{idx}. Q: {question}\n A: {answer}")
return "\n".join(lines)
STARTER_SYSTEM_PROMPT = (
"You are an experienced host of a lateral thinking puzzle game called 'Turtle Soup'. "
"Design mysteries that are solvable, coherent, and fair. Do not reveal the hidden truth "
"in outputs meant for players, but provide structured data for the host to manage the game."
)
STARTER_USER_TEMPLATE = (
"请根据以下说明创作一个新的海龟汤谜题。务必输出严格的JSON对象键名如下\n"
"title: 故事标题;\n"
"intro: 1~2句的主持开场白\n"
"soup: 玩家可见的汤面描述;\n"
"truth: 隐藏的真实全貌;\n"
"clues: 3~5条关键线索字符串数组用于主持人掌握纠偏要点\n"
"guidelines: 2~4条问答裁决提示数组帮助主持人判断是/否;\n"
"请保持故事逻辑自洽、可逆推、具有惊喜感。若玩家附加需求:{extra_hint}\n"
)
QNA_SYSTEM_PROMPT = (
"You are the impartial judge for a Turtle Soup puzzle. You know the hidden truth, but "
"must respond to players strictly with '' or ''. If the question cannot be answered "
"with yes/no, reply with ''. Never reveal the truth or additional hints."
)
QNA_USER_TEMPLATE = (
"汤面:{soup}\n隐藏真相:{truth}\n关键线索:{clues}\n主持指引:{guidelines}\n"
"玩家提问:{question}\n"
"请仅输出一个汉字:是 或 否。"
)
REVEAL_TEMPLATE = (
"# 谜底揭晓\n\n"
"**汤面回顾**\n{summary}\n\n"
"**谜底真相**\n{truth}\n\n"
"**关键线索**\n{clues}\n\n"
"{log}\n"
"游戏已结束,感谢参与。"
)
async def _generate_story(extra_hint: str, user_id: int) -> HosterGameData:
content = await _call_ollama(
(
{"role": "system", "content": STARTER_SYSTEM_PROMPT},
{"role": "user", "content": STARTER_USER_TEMPLATE.format(extra_hint=extra_hint or "无附加需求")},
),
temperature=0.65,
)
payload = _ensure_json_block(content)
title = str(payload.get("title", "海龟汤谜题")).strip() or "海龟汤谜题"
intro = str(payload.get("intro", "大家准备好脑洞了吗?")).strip()
soup = str(payload.get("soup", "")).strip()
truth = str(payload.get("truth", "")).strip()
clues = _normalize_list(payload.get("clues", []))
guidelines = _normalize_list(payload.get("guidelines", []))
if not soup or not truth:
raise ValueError("生成内容缺少必要字段")
return HosterGameData(
title=title,
intro=intro,
soup=soup,
truth=truth,
clues=clues,
guidelines=guidelines,
creator_id=user_id,
)
async def _judge_question(story: HosterGameData, question: str) -> str:
content = await _call_ollama(
(
{"role": "system", "content": QNA_SYSTEM_PROMPT},
{
"role": "user",
"content": QNA_USER_TEMPLATE.format(
soup=story.soup,
truth=story.truth,
clues="; ".join(story.clues) if story.clues else "",
guidelines="; ".join(story.guidelines) if story.guidelines else "保持公平",
question=question,
),
},
),
temperature=0.1,
)
answer = content.strip()
return answer
class HosterSoupStarter(WPSAPI):
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
self.register_plugin("汤面")
@override
def get_guide_title(self) -> str:
return "海龟汤主持人 · 汤面"
@override
def get_guide_subtitle(self) -> str:
return "生成全新的海龟汤故事,开场并引导玩家提问"
@override
def get_guide_metadata(self) -> Dict[str, str]:
return {
"使用模型": MODEL_NAME,
"Ollama 服务": OLLAMA_URL,
}
@override
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "汤面",
"identifier": "汤面",
"description": "调用大模型生成全新海龟汤故事,重置并开局。",
"metadata": {
"回答方式": "主持人自动播报汤面",
"失败处理": "捕获异常并提示稍后重试",
},
"icon": "🥣",
"badge": "入口",
},
)
@override
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "生成流程",
"description": "依据预设提示词向 Ollama 请求故事,解析 JSON 字段并缓存为当前对局。",
"details": [
"包含标题、开场白、汤面描述、隐藏真相、关键线索、裁决指引",
"重复调用会覆盖旧故事,适用于强制重开",
],
"icon": "🧠",
},
{
"title": "主持提示",
"description": "生成成功后自动推送 Markdown提醒玩家开始以是/否提问。",
"details": [
"仅在缓存成功后才播报",
"失败时返回错误信息,避免进入半成品状态",
],
"icon": "📣",
},
)
@override
def collect_additional_sections(self) -> Sequence[GuideSection]:
return (
GuideSection(
title="注意事项",
description="保持 Ollama 可用,必要时检查模型或网络。",
entries=(
{
"title": "模型依赖",
"description": "需提前在 Ollama 拉取对应模型,保证 /api/chat 可用。",
"icon": "⚙️",
},
{
"title": "故事质量",
"description": "若剧情不佳,可再次发送“汤面”刷新,旧记录会被覆盖。",
"icon": "🔄",
},
),
),
)
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str | None:
story = await _generate_story(message, user_id)
if get_internal_verbose():
logger.Log("Info", f"新的海龟汤故事已生成: {story.title}\n{story.intro}\n{story.soup}\n{story.truth}\n{story.clues}\n{story.guidelines}")
await HosterState.start_new(story)
return await self.send_markdown_message(
f"# {story.title}\n"
f"{story.intro}\n\n"
f"**汤面**{story.soup}\n\n"
f"现在可以开始提问啦!我只能回答`是`或`否`,使用其他指令会被忽略。"
, chat_id, user_id)
class HosterSoupQnA(WPSAPI):
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
self.register_plugin("default")
@override
def get_guide_title(self) -> str:
return "海龟汤主持人 · 问答裁决"
@override
def get_guide_subtitle(self) -> str:
return "监听玩家提问,只返回是或否"
@override
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "默认问答",
"identifier": "default",
"description": "在游戏进行中处理玩家消息,调用模型裁定并返回是/否。",
"metadata": {
"无游戏": "提示玩家先使用汤面开启",
"记录": "自动存档问答日志",
},
"icon": "",
"badge": "默认",
},
)
@override
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "裁决机制",
"description": "根据隐藏真相和主持指引向模型提问,只允许返回是或否,异常情况判定为否。",
"icon": "⚖️",
"details": [
"模型输出非规定内容时回退为“否”",
"记录每次提问与回答以便揭晓阶段回顾",
],
},
{
"title": "玩家引导",
"description": "若玩家发送空消息或未开局,则反馈提示语,避免无效提问。",
"icon": "💬",
},
)
@override
def collect_additional_sections(self) -> Sequence[GuideSection]:
return (
GuideSection(
title="进阶运维",
entries=(
{
"title": "提示词调优",
"description": "可根据需求调整问答提示模板,提升准确性。",
"icon": "🛠️",
},
{
"title": "负载注意",
"description": "高频问答会占用模型资源,可在外层加限流。",
"icon": "📊",
},
),
),
)
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str | None:
story = await HosterState.snapshot()
if story is None:
return await self.send_markdown_message("目前没有正在进行的海龟汤,请先发送指令“汤面”开启新故事。")
question = message.strip()
if not question:
return await self.send_markdown_message("请提出一个可用“是/否”回答的问题。")
answer = await _judge_question(story, question)
await HosterState.record_question(question, answer)
return await self.send_markdown_message(answer, chat_id, user_id)
class HosterSoupReveal(WPSAPI):
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
self.register_plugin("汤底")
@override
def get_guide_title(self) -> str:
return "海龟汤主持人 · 汤底"
@override
def get_guide_subtitle(self) -> str:
return "揭示谜底并清空当前游戏"
@override
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "汤底",
"identifier": "汤底",
"description": "输出汤面回顾、隐藏真相、关键线索与问答记录,然后清除状态。",
"metadata": {
"无游戏": "提示玩家先开局",
"重置": "揭晓后自动清空缓存",
},
"icon": "🧾",
"badge": "结局",
},
)
@override
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "揭晓输出",
"description": "以 Markdown 形式汇总剧情,方便玩家快速回顾。",
"icon": "📜",
"details": [
"包含汤面回顾、真实故事、关键线索",
"附带问答记录,复盘互动过程",
],
},
{
"title": "状态管理",
"description": "揭晓后调用 clear() 重置状态,避免旧故事残留。",
"icon": "🧹",
},
)
@override
def collect_additional_sections(self) -> Sequence[GuideSection]:
return (
GuideSection(
title="主持建议",
entries=(
{
"title": "复盘对话",
"description": "揭晓后可引导玩家讨论推理过程,提高参与感。",
"icon": "🤝",
},
{
"title": "二次开局",
"description": "若需要继续游戏,直接使用“汤面”生成新故事。",
"icon": "🚀",
},
),
),
)
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str | None:
story = await HosterState.snapshot()
if story is None:
return await self.send_markdown_message("还没有汤面呢,发送“汤面”来创作新故事吧。")
clues_block = "\n".join(f"- {item}" for item in story.clues) if story.clues else "- 暂无记录"
log_block = _render_question_log(story.question_log)
await HosterState.clear()
logger.Log("Info", "海龟汤故事已揭晓并重置状态")
return await self.send_markdown_message(REVEAL_TEMPLATE.format(
summary=story.soup,
truth=story.truth,
clues=clues_block,
log=log_block,
), chat_id, user_id)