263 lines
10 KiB
Python
263 lines
10 KiB
Python
from __future__ import annotations
|
|
|
|
from PWF.Convention.Runtime.Config import *
|
|
from PWF.Convention.Runtime.Architecture import Architecture
|
|
from PWF.Convention.Runtime.GlobalConfig import ProjectConfig
|
|
from datetime import datetime
|
|
|
|
from PWF.CoreModules.plugin_interface import DatabaseModel, get_db
|
|
|
|
from .WPSAPI import WPSAPI
|
|
|
|
config = ProjectConfig()
|
|
CHECKIN_POINTS = config.FindItem("checkin_points", 100)
|
|
|
|
class WPSConfigAPI(WPSAPI):
|
|
@override
|
|
def dependencies(self) -> List[Type]:
|
|
return [WPSAPI]
|
|
|
|
@override
|
|
def is_enable_plugin(self) -> bool:
|
|
return True
|
|
|
|
@override
|
|
def register_db_model(self) -> DatabaseModel:
|
|
return DatabaseModel(
|
|
table_name="user_info",
|
|
column_defs={
|
|
"user_id": "INTEGER PRIMARY KEY",
|
|
"username": "TEXT DEFAULT ''",
|
|
"userurl": "TEXT DEFAULT ''",
|
|
"userpoint": "INTEGER DEFAULT 0"
|
|
}
|
|
)
|
|
|
|
@override
|
|
def wake_up(self) -> None:
|
|
config.Log("Info", f"{ConsoleFrontColor.GREEN}WPSConfigAPI 插件已加载{ConsoleFrontColor.RESET}")
|
|
self.register_plugin("config")
|
|
self.register_plugin("cfg")
|
|
|
|
async def do_callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
|
|
tokens = [token.strip() for token in message.strip().split() if token.strip()]
|
|
if not tokens:
|
|
return self._help_message()
|
|
|
|
action = tokens[0].lower()
|
|
if action == "set" and len(tokens) >= 3:
|
|
key = tokens[1].lower()
|
|
value = " ".join(tokens[2:]).strip()
|
|
return await self._handle_set(chat_id, user_id, key, value)
|
|
if action == "get" and len(tokens) >= 2:
|
|
key = tokens[1].lower()
|
|
return await self._handle_get(chat_id, user_id, key)
|
|
|
|
return self._help_message()
|
|
|
|
@override
|
|
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
|
|
message = self.parse_message_after_at(message)
|
|
message = await self.do_callback(message, chat_id, user_id)
|
|
return await self.send_markdown_message(message, chat_id, user_id)
|
|
|
|
async def _handle_set(self, chat_id: int, user_id: int, key: str, value: str) -> str:
|
|
if key == "user.name":
|
|
if not value:
|
|
return "❌ 用户名不能为空"
|
|
if len(value) > 40:
|
|
return "❌ 用户名长度不能超过40个字符"
|
|
await self._set_value(chat_id, user_id, key, value)
|
|
return f"✅ 已设置用户名为 {value}"
|
|
|
|
if key == "user.url":
|
|
if not value:
|
|
return "❌ 个人URL不能为空"
|
|
lowered = value.lower()
|
|
if not (lowered.startswith("http://") or lowered.startswith("https://")):
|
|
return "❌ 个人URL 必须以 http:// 或 https:// 开头"
|
|
await self._set_value(chat_id, user_id, key, value)
|
|
return "✅ 已更新个人URL"
|
|
|
|
return "❌ 未识别的配置键,请使用 user.name / user.url"
|
|
|
|
async def _handle_get(self, chat_id: int, user_id: int, key: str) -> str:
|
|
record = self._get_user_record(user_id)
|
|
if key == "user.point":
|
|
points = record.get("userpoint") if record else 0
|
|
return f"当前积分: {self._coerce_int(points)}"
|
|
if key == "user.name":
|
|
value = record.get("username") if record else None
|
|
elif key == "user.url":
|
|
value = record.get("userurl") if record else None
|
|
else:
|
|
return "❌ 未识别的配置键,请使用 user.name / user.url"
|
|
|
|
if value is None or value == "":
|
|
return f"⚠️ 尚未设置 {key}"
|
|
return f"当前 {key} = {value}"
|
|
|
|
async def _handle_adjust(self, chat_id: int, user_id: int, key: str, delta: int, reason: str) -> str:
|
|
if key != "user.point":
|
|
return "❌ 仅支持调整 user.point"
|
|
|
|
new_value = await self._adjust_points(chat_id, user_id, delta, reason)
|
|
prefix = "增加" if delta >= 0 else "扣除"
|
|
detail = f"{prefix} {abs(delta)}" if delta != 0 else "未变动"
|
|
return f"✅ {detail},当前积分: {new_value}"
|
|
|
|
async def _set_value(self, chat_id: int, user_id: int, key: str, value: Any) -> None:
|
|
if key == "user.name":
|
|
self._update_user_field(user_id, "username", value)
|
|
elif key == "user.url":
|
|
self._update_user_field(user_id, "userurl", value)
|
|
|
|
async def _adjust_points(self, chat_id: int, user_id: int, delta: int, reason: str) -> int:
|
|
return self._adjust_db_points(user_id, delta)
|
|
|
|
def _get_user_record(self, user_id: int) -> Optional[Dict[str, Any]]:
|
|
cursor = get_db().conn.cursor()
|
|
cursor.execute(
|
|
"SELECT user_id, username, userurl, userpoint FROM user_info WHERE user_id = ?",
|
|
(user_id,)
|
|
)
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
def _ensure_user_row(self, user_id: int) -> None:
|
|
cursor = get_db().conn.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO user_info (user_id) VALUES (?) ON CONFLICT(user_id) DO NOTHING",
|
|
(user_id,)
|
|
)
|
|
|
|
def _update_user_field(self, user_id: int, column: str, value: Any) -> None:
|
|
self._ensure_user_row(user_id)
|
|
cursor = get_db().conn.cursor()
|
|
cursor.execute(
|
|
f"""
|
|
INSERT INTO user_info (user_id, {column}) VALUES (?, ?)
|
|
ON CONFLICT(user_id) DO UPDATE SET {column} = excluded.{column}
|
|
""",
|
|
(user_id, value)
|
|
)
|
|
|
|
def _adjust_db_points(self, user_id: int, delta: int) -> int:
|
|
self._ensure_user_row(user_id)
|
|
cursor = get_db().conn.cursor()
|
|
cursor.execute(
|
|
"UPDATE user_info SET userpoint = COALESCE(userpoint, 0) + ? WHERE user_id = ?",
|
|
(delta, user_id)
|
|
)
|
|
cursor.execute("SELECT userpoint FROM user_info WHERE user_id = ?", (user_id,))
|
|
row = cursor.fetchone()
|
|
return self._coerce_int(row[0] if row else 0)
|
|
|
|
def _coerce_int(self, value: Any) -> int:
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
def get_user_name(self, chat_id: int, user_id: int) -> Optional[str]:
|
|
record = self._get_user_record(user_id)
|
|
if not record:
|
|
return f"user_{user_id}"
|
|
value = record.get("username")
|
|
return str(value) if value else f"user_{user_id}"
|
|
|
|
def get_user_url(self, chat_id: int, user_id: int) -> Optional[str]:
|
|
record = self._get_user_record(user_id)
|
|
if not record:
|
|
return None
|
|
value = record.get("userurl")
|
|
return str(value) if value else None
|
|
|
|
def get_user_points(self, chat_id: int, user_id: int) -> int:
|
|
record = self._get_user_record(user_id)
|
|
if not record:
|
|
return 0
|
|
return self._coerce_int(record.get("userpoint", 0))
|
|
|
|
async def adjust_user_points(self, chat_id: int, user_id: int, delta: int, reason: str = "") -> int:
|
|
return await self._adjust_points(chat_id, user_id, delta, reason)
|
|
|
|
def _help_message(self) -> str:
|
|
return '''# 🛠️ Config 命令帮助
|
|
- config set user.name <用户名>
|
|
- config set user.url <URL>
|
|
- config get user.name|user.url|user.point
|
|
'''
|
|
|
|
|
|
class WPSDailyAPI(WPSAPI):
|
|
@override
|
|
def dependencies(self) -> List[Type]:
|
|
return [WPSAPI]
|
|
|
|
@override
|
|
def is_enable_plugin(self) -> bool:
|
|
return True
|
|
|
|
@override
|
|
def wake_up(self) -> None:
|
|
config.Log("Info", f"{ConsoleFrontColor.GREEN}WPSDailyAPI 插件已加载{ConsoleFrontColor.RESET}")
|
|
self.register_plugin("daily")
|
|
|
|
@override
|
|
def register_db_model(self) -> DatabaseModel:
|
|
return DatabaseModel(
|
|
table_name="daily_checkin",
|
|
column_defs={
|
|
"user_id": "INTEGER PRIMARY KEY",
|
|
"checkin_date": "TEXT",
|
|
}
|
|
)
|
|
|
|
def _get_today_checkin_status(self, user_id: int) -> bool:
|
|
cursor = get_db().conn.cursor()
|
|
cursor.execute("SELECT checkin_date FROM daily_checkin WHERE user_id = ?", (user_id,))
|
|
row = cursor.fetchone()
|
|
return row is not None and row[0] == datetime.now().strftime("%Y-%m-%d")
|
|
|
|
def _set_today_checkin_status(self, user_id: int) -> None:
|
|
cursor = get_db().conn.cursor()
|
|
cursor.execute("INSERT INTO daily_checkin (user_id, checkin_date) VALUES (?, ?)", (user_id, datetime.now().strftime("%Y-%m-%d")))
|
|
get_db().conn.commit()
|
|
|
|
async def do_callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
|
|
tokens = [token.strip() for token in message.strip().split() if token.strip()]
|
|
if not tokens:
|
|
return self._help_message(chat_id, user_id)
|
|
action = tokens[0].lower()
|
|
if action == "checkin":
|
|
return await self._handle_checkin(chat_id, user_id)
|
|
return self._help_message()
|
|
|
|
async def _handle_checkin(self, chat_id: int, user_id: int) -> str:
|
|
wps_config_api: "WPSConfigAPI" = Architecture.Get(WPSConfigAPI)
|
|
if self._get_today_checkin_status(user_id):
|
|
return "今日已签到"
|
|
self._set_today_checkin_status(user_id)
|
|
return f"签到成功, 当前分数: {await wps_config_api.adjust_user_points(chat_id, user_id, CHECKIN_POINTS, "每日签到")}"
|
|
|
|
@override
|
|
async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]:
|
|
message = self.parse_message_after_at(message)
|
|
message = await self.do_callback(message, chat_id, user_id)
|
|
return await self.send_markdown_message(message, chat_id, user_id)
|
|
|
|
def _help_message(self, chat_id: int, user_id: int) -> str:
|
|
wps_config_api: "WPSConfigAPI" = Architecture.Get(WPSConfigAPI)
|
|
return f'''# 📅 Daily 命令帮助
|
|
- daily checkin: 签到
|
|
---
|
|
- 当前分数: {wps_config_api.get_user_points(chat_id, user_id)}
|
|
- 今日签到状态: {"已签到" if self._get_today_checkin_status(user_id) else "未签到"}
|
|
'''
|
|
|
|
config.SaveProperties()
|
|
|
|
__all__ = ["WPSConfigAPI"]
|
|
|