from __future__ import annotations from PWF.Convention.Runtime.Config import * from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ProjectConfig from datetime import datetime from PWF.CoreModules.plugin_interface import DatabaseModel, get_db from .WPSAPI import * logger = ProjectConfig() CHECKIN_POINTS = logger.FindItem("checkin_points", 100) logger.SaveProperties() class WPSConfigAPI(WPSAPI): def get_guide_subtitle(self) -> str: return "用户基础资料与积分配置接口" def get_guide_metadata(self) -> Dict[str, str]: return { "数据表": "user_info", "每日签到积分": str(CHECKIN_POINTS), } def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "config", "identifier": "config", "description": "配置与查询用户昵称、URL、积分等信息。", "metadata": {"别名": "cfg"}, }, ) def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "设置指令", "description": "`config set user.name <昵称>` / `config set user.url `", }, { "title": "查询指令", "description": "`config get user.name|user.url|user.point` 返回当前资料或积分。", }, { "title": "数据校验", "description": "内部自动确保用户记录存在,并限制昵称长度与 URL 前缀。", }, ) @override def dependencies(self) -> List[Type]: return [WPSAPI] @override def is_enable_plugin(self) -> bool: return True @override def register_db_model(self) -> DatabaseModel: return DatabaseModel( table_name="user_info", column_defs={ "user_id": "INTEGER PRIMARY KEY", "username": "TEXT DEFAULT ''", "userurl": "TEXT DEFAULT ''", "userpoint": "INTEGER DEFAULT 0" } ) @override def wake_up(self) -> None: logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSConfigAPI 插件已加载{ConsoleFrontColor.RESET}") self.register_plugin("config") self.register_plugin("cfg") BasicWPSInterface.user_id_to_username = self.get_user_name async def do_callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: tokens = [token.strip() for token in message.strip().split() if token.strip()] if not tokens: return self._help_message() action = tokens[0].lower() if action == "set" and len(tokens) >= 3: key = tokens[1].lower() value = " ".join(tokens[2:]).strip() return await self._handle_set(chat_id, user_id, key, value) if action == "get" and len(tokens) >= 2: key = tokens[1].lower() return await self._handle_get(chat_id, user_id, key) return self._help_message() @override async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: message = self.parse_message_after_at(message) message = await self.do_callback(message, chat_id, user_id) return await self.send_markdown_message(message, chat_id, user_id) async def _handle_set(self, chat_id: int, user_id: int, key: str, value: str) -> str: if key == "user.name": if not value: return "❌ 用户名不能为空" if len(value) > 40: return "❌ 用户名长度不能超过40个字符" await self._set_value(chat_id, user_id, key, value) return f"✅ 已设置用户名为 {value}" if key == "user.url": if not value: return "❌ 个人URL不能为空" lowered = value.lower() if not (lowered.startswith("http://") or lowered.startswith("https://")): return "❌ 个人URL 必须以 http:// 或 https:// 开头" await self._set_value(chat_id, user_id, key, value) return "✅ 已更新个人URL" return "❌ 未识别的配置键,请使用 user.name / user.url" async def _handle_get(self, chat_id: int, user_id: int, key: str) -> str: record = self._get_user_record(user_id) if key == "user.point": points = record.get("userpoint") if record else 0 return f"当前积分: {self._coerce_int(points)}" if key == "user.name": value = record.get("username") if record else None elif key == "user.url": value = record.get("userurl") if record else None else: return "❌ 未识别的配置键,请使用 user.name / user.url" if value is None or value == "": return f"⚠️ 尚未设置 {key}" return f"当前 {key} = {value}" async def _handle_adjust(self, chat_id: int, user_id: int, key: str, delta: int, reason: str) -> str: if key != "user.point": return "❌ 仅支持调整 user.point" new_value = await self._adjust_points(chat_id, user_id, delta, reason) prefix = "增加" if delta >= 0 else "扣除" detail = f"{prefix} {abs(delta)}" if delta != 0 else "未变动" return f"✅ {detail},当前积分: {new_value}" async def _set_value(self, chat_id: int, user_id: int, key: str, value: Any) -> None: if key == "user.name": self._update_user_field(user_id, "username", value) elif key == "user.url": self._update_user_field(user_id, "userurl", value) async def _adjust_points(self, chat_id: int, user_id: int, delta: int, reason: str) -> int: return self._adjust_db_points(user_id, delta) def _get_user_record(self, user_id: int) -> Optional[Dict[str, Any]]: cursor = get_db().conn.cursor() cursor.execute( "SELECT user_id, username, userurl, userpoint FROM user_info WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() return dict(row) if row else None def _ensure_user_row(self, user_id: int) -> None: cursor = get_db().conn.cursor() cursor.execute( "INSERT INTO user_info (user_id) VALUES (?) ON CONFLICT(user_id) DO NOTHING", (user_id,) ) def _update_user_field(self, user_id: int, column: str, value: Any) -> None: self._ensure_user_row(user_id) cursor = get_db().conn.cursor() cursor.execute( f""" INSERT INTO user_info (user_id, {column}) VALUES (?, ?) ON CONFLICT(user_id) DO UPDATE SET {column} = excluded.{column} """, (user_id, value) ) def _adjust_db_points(self, user_id: int, delta: int) -> int: self._ensure_user_row(user_id) cursor = get_db().conn.cursor() cursor.execute( "UPDATE user_info SET userpoint = COALESCE(userpoint, 0) + ? WHERE user_id = ?", (delta, user_id) ) cursor.execute("SELECT userpoint FROM user_info WHERE user_id = ?", (user_id,)) row = cursor.fetchone() return self._coerce_int(row[0] if row else 0) def _coerce_int(self, value: Any) -> int: try: return int(value) except (TypeError, ValueError): return 0 def get_user_name(self, user_id: int) -> Optional[str]: record = self._get_user_record(user_id) if not record: return f"user_{user_id}" value = record.get("username") return str(value) if value else f"user_{user_id}" def find_user_id_by_username(self, username: str) -> Optional[int]: text = (username or "").strip() if not text: return None cursor = get_db().conn.cursor() cursor.execute( "SELECT user_id FROM user_info WHERE username = ? COLLATE NOCASE", (text,), ) row = cursor.fetchone() return int(row["user_id"]) if row else None def get_user_url(self, user_id: int) -> Optional[str]: record = self._get_user_record(user_id) if not record: return None value = record.get("userurl") return str(value) if value else None def get_user_points(self, user_id: int) -> int: record = self._get_user_record(user_id) if not record: return 0 return self._coerce_int(record.get("userpoint", 0)) async def adjust_user_points(self, chat_id: int, user_id: int, delta: int, reason: str = "") -> int: return await self._adjust_points(chat_id, user_id, delta, reason) def adjust_user_points_sync(self, user_id: int, delta: int, reason: str = "") -> int: """同步调整积分,供非异步流程使用""" return self._adjust_db_points(user_id, delta) def _help_message(self) -> str: return '''# 🛠️ Config 命令帮助 - config set user.name <用户名> - config set user.url - config get user.name|user.url|user.point ''' class WPSCheckinAPI(WPSAPI): def get_guide_subtitle(self) -> str: return "每日签到并发放积分的快捷指令" def get_guide_metadata(self) -> Dict[str, str]: return { "数据表": "daily_checkin", "签到积分": str(CHECKIN_POINTS), } def collect_command_entries(self) -> Sequence[GuideEntry]: return ( { "title": "checkin", "identifier": "checkin", "description": "执行签到流程,发放积分并反馈今日进度。", "metadata": {"别名": "签到 / 积分"}, }, ) def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "签到逻辑", "description": "同一日多次调用仅第一次成功,并记录在 `daily_checkin` 表内。", }, { "title": "积分结算", "description": "成功签到将通过 `WPSConfigAPI.adjust_user_points` 增加积分。", }, ) @override def dependencies(self) -> List[Type]: return [WPSAPI] @override def is_enable_plugin(self) -> bool: return True @override def wake_up(self) -> None: logger.Log("Info", f"{ConsoleFrontColor.GREEN}WPSCheckinAPI 插件已加载{ConsoleFrontColor.RESET}") self.register_plugin("checkin") self.register_plugin("签到") self.register_plugin("积分") @override def register_db_model(self) -> DatabaseModel: return DatabaseModel( table_name="daily_checkin", column_defs={ "user_id": "INTEGER PRIMARY KEY", "checkin_date": "TEXT", } ) def _get_today_checkin_status(self, user_id: int) -> bool: cursor = get_db().conn.cursor() cursor.execute("SELECT checkin_date FROM daily_checkin WHERE user_id = ?", (user_id,)) row = cursor.fetchone() return row is not None and row[0] == datetime.datetime.now().strftime("%Y-%m-%d") def _set_today_checkin_status(self, user_id: int) -> None: cursor = get_db().conn.cursor() cursor.execute( """ INSERT INTO daily_checkin (user_id, checkin_date) VALUES (?, ?) ON CONFLICT(user_id) DO UPDATE SET checkin_date = excluded.checkin_date """, (user_id, datetime.datetime.now().strftime("%Y-%m-%d")), ) get_db().conn.commit() async def do_callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: checkin_message = await self._handle_checkin(chat_id, user_id) return f"""{checkin_message} --- {self._help_message(chat_id, user_id)} """ # tokens = [token.strip() for token in message.strip().split() if token.strip()] # if not tokens: # return self._help_message(chat_id, user_id) # action = tokens[0].lower() # if action == "checkin": # return await self._handle_checkin(chat_id, user_id) # return self._help_message() async def _handle_checkin(self, chat_id: int, user_id: int) -> str: wps_config_api: "WPSConfigAPI" = Architecture.Get(WPSConfigAPI) if self._get_today_checkin_status(user_id): return "今日已签到" self._set_today_checkin_status(user_id) return f"签到成功, 当前分数: {await wps_config_api.adjust_user_points(chat_id, user_id, CHECKIN_POINTS, "每日签到")}" @override async def callback(self, message: str, chat_id: int, user_id: int) -> Optional[str]: message = self.parse_message_after_at(message) message = await self.do_callback(message, chat_id, user_id) return await self.send_markdown_message(message, chat_id, user_id) def _help_message(self, chat_id: int, user_id: int) -> str: wps_config_api: "WPSConfigAPI" = Architecture.Get(WPSConfigAPI) return f'''# 📅 Daily 命令帮助 - checkin: 签到 --- - 当前分数: {wps_config_api.get_user_points(user_id)} - 今日签到状态: {"已签到" if self._get_today_checkin_status(user_id) else "未签到"} ''' __all__ = ["WPSConfigAPI"]