Files
NewWPSBot/Plugins/WPSFortuneSystem.py
2025-11-12 22:58:36 +08:00

147 lines
5.1 KiB
Python

from __future__ import annotations
import hashlib
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, List, Tuple, Type, override
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from .WPSAPI import WPSAPI
logger: ProjectConfig = Architecture.Get(ProjectConfig)
_HASH_BYTES = 16
_HASH_MAX = (1 << (_HASH_BYTES * 8)) - 1
_FORTUNE_STAGE_TABLE: List[Tuple[float, str]] = [
(-0.9, "厄运深谷"),
(-0.7, "凶多吉少"),
(-0.5, "多有波折"),
(-0.3, "略显低迷"),
(-0.1, "风平浪静"),
(0.1, "小有起色"),
(0.3, "渐入佳境"),
(0.5, "好运上扬"),
(0.7, "顺风顺水"),
(0.9, "鸿运当头"),
(1.01, "天命所归"),
]
class WPSFortuneSystem(WPSAPI):
"""基于整点哈希的运势系统,可供其他模块复用"""
def get_guide_subtitle(self) -> str:
return "提供整点运势值及阶段信息的公共组件"
def collect_command_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "fortune",
"identifier": "fortune",
"description": "查询当前整点的运势值与阶段文本。",
"metadata": {"别名": "运势"},
},
)
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "运势算法",
"description": "基于用户ID与整点时间的 SHA-256 哈希映射为 [-0.9999, 0.9999] 区间。",
},
{
"title": "阶段划分",
"description": "通过 `_FORTUNE_STAGE_TABLE` 匹配阶段标签,供冒险等系统引用。",
},
{
"title": "复用接口",
"description": "`get_fortune_value / get_fortune_info` 可被其他插件同步或异步调用。",
},
)
@override
def dependencies(self) -> List[Type]:
return [WPSAPI]
@override
def is_enable_plugin(self) -> bool:
return True
@override
def wake_up(self) -> None:
logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSFortuneSystem 插件已加载{ConsoleFrontColor.RESET}")
self.register_plugin("运势")
self.register_plugin("fortune")
@override
async def callback(self, message: str, chat_id: int, user_id: int) -> str | None:
fortune_message = self._format_fortune_message(user_id)
return await self.send_markdown_message(fortune_message, chat_id, user_id)
def get_fortune_value(self, user_id: int, dt: datetime | None = None) -> float:
hour_dt, hour_key = self._resolve_hour(dt)
return self._compute_fortune_value(user_id, hour_key)
def get_fortune_stage(self, user_id: int, dt: datetime | None = None) -> str:
value = self.get_fortune_value(user_id, dt)
return self._match_stage(value)
def get_fortune_info(self, user_id: int, dt: datetime | None = None) -> Dict[str, Any]:
hour_dt, hour_key = self._resolve_hour(dt)
value = self._compute_fortune_value(user_id, hour_key)
stage = self._match_stage(value)
return {
"value": value,
"stage": stage,
"hour_key": hour_key,
"hour_label": hour_dt.strftime("%Y-%m-%d %H:00"),
"timestamp": hour_dt.isoformat(),
}
def _resolve_hour(self, dt: datetime | None) -> Tuple[datetime, str]:
target_dt = dt or datetime.now()
hour_dt = target_dt.replace(minute=0, second=0, microsecond=0)
return hour_dt, hour_dt.isoformat()
def _format_fortune_message(self, user_id: int) -> str:
info = self.get_fortune_info(user_id)
value_display = f"{info['value']:.4f}"
return (
"# 🎲 运势占卜\n"
f"- 运势值:`{value_display}`\n"
f"- 运势阶段:{info['stage']}\n"
f"- 基准整点:{info['hour_label']}\n"
"> 运势每整点刷新,允许运气加成的系统会复用同一结果。"
)
def _match_stage(self, value: float) -> str:
for upper_bound, label in _FORTUNE_STAGE_TABLE:
if value < upper_bound:
return label
return _FORTUNE_STAGE_TABLE[-1][1]
@staticmethod
@lru_cache(maxsize=2048)
def _cached_hash_value(user_id: int, hour_key: str) -> float:
payload = f"{user_id}:{hour_key}".encode("utf-8")
digest = hashlib.sha256(payload).digest()[:_HASH_BYTES]
integer = int.from_bytes(digest, "big")
normalized = integer / _HASH_MAX if _HASH_MAX else 0.0
mapped = normalized * 2 - 1
return max(-0.9999, min(0.9999, mapped))
def _compute_fortune_value(self, user_id: int, hour_key: str) -> float:
try:
return self._cached_hash_value(user_id, hour_key)
except Exception as exc:
logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}计算运势时出现异常: {exc}{ConsoleFrontColor.RESET}",
)
return 0.0
__all__ = ["WPSFortuneSystem"]