from __future__ import annotations import random from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum from typing import List, Optional, Sequence, Tuple from uuid import uuid4 from PWF.Convention.Runtime.Architecture import Architecture from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from PWF.CoreModules.database import get_db from PWF.CoreModules.plugin_interface import DatabaseModel from Plugins.WPSAPI import GuideEntry, WPSAPI from Plugins.WPSConfigSystem import WPSConfigAPI logger: ProjectConfig = Architecture.Get(ProjectConfig) class RedPacketType(str, Enum): RANDOM = "random" PASSWORD = "password" EXCLUSIVE = "exclusive" @dataclass(frozen=True) class RedPacket: packet_id: str owner_id: int packet_type: RedPacketType total_amount: int total_slots: int remaining_amount: int remaining_slots: int exclusive_user_id: Optional[int] = None exclusive_username: Optional[str] = None password: Optional[str] = None created_at: str = "" @dataclass(frozen=True) class RedPacketClaim: packet_id: str user_id: int username: str amount: int claimed_at: str class WPSRedPacketService(WPSAPI): """红包核心服务,负责数据存储与积分结算。""" PACKET_TABLE = "red_packets" CLAIM_TABLE = "red_packet_claims" def __init__(self) -> None: super().__init__() self._default_amount: int = int(logger.FindItem("red_packet_default_amount", 100)) self._default_slots: int = int(logger.FindItem("red_packet_default_slots", 4)) logger.SaveProperties() def get_guide_subtitle(self) -> str: return "统一管理红包生命周期与积分结算的核心服务" def collect_command_entries(self) -> Sequence[GuideEntry]: return () def collect_guide_entries(self) -> Sequence[GuideEntry]: return ( { "title": "默认配置", "description": f"默认金额 {self._default_amount} 分,默认份数 {self._default_slots} 份,可通过 ProjectConfig 覆盖。", }, { "title": "积分结算", "description": "发送红包时一次性扣除积分,领取时再发放给领取者,红包不支持退款或撤销。", }, ) def dependencies(self) -> List[type]: return [WPSAPI, WPSConfigAPI] def is_enable_plugin(self) -> bool: return True def register_db_model(self): return [ DatabaseModel( table_name=self.PACKET_TABLE, column_defs={ "packet_id": "TEXT PRIMARY KEY", "owner_id": "INTEGER NOT NULL", "packet_type": "TEXT NOT NULL", "total_amount": "INTEGER NOT NULL", "total_slots": "INTEGER NOT NULL", "remaining_amount": "INTEGER NOT NULL", "remaining_slots": "INTEGER NOT NULL", "exclusive_user_id": "INTEGER", "exclusive_username": "TEXT", "password": "TEXT", "created_at": "TEXT NOT NULL", }, ), DatabaseModel( table_name=self.CLAIM_TABLE, column_defs={ "claim_id": "INTEGER PRIMARY KEY AUTOINCREMENT", "packet_id": "TEXT NOT NULL", "user_id": "INTEGER NOT NULL", "username": "TEXT", "amount": "INTEGER NOT NULL", "claimed_at": "TEXT NOT NULL", }, ), ] def wake_up(self) -> None: logger.Log( "Info", f"{ConsoleFrontColor.GREEN}WPSRedPacketService 插件已加载{ConsoleFrontColor.RESET}", ) # region 配置 / 公共接口 def get_default_amount(self) -> int: return max(1, self._default_amount) def get_default_slots(self) -> int: return max(1, self._default_slots) def generate_packet_id(self) -> str: return uuid4().hex[:12] def _config_api(self) -> WPSConfigAPI: return Architecture.Get(WPSConfigAPI) def _now(self) -> str: return datetime.now(timezone.utc).isoformat() # endregion # region 发红包 def create_random_packet( self, chat_id: int, owner_id: int, total_amount: int, total_slots: int, ) -> RedPacket: return self._create_packet( chat_id=chat_id, owner_id=owner_id, total_amount=total_amount, total_slots=total_slots, packet_type=RedPacketType.RANDOM, ) def create_password_packet( self, chat_id: int, owner_id: int, total_amount: int, total_slots: int, password: str, ) -> RedPacket: if not password: raise ValueError("口令不能为空") return self._create_packet( chat_id=chat_id, owner_id=owner_id, total_amount=total_amount, total_slots=total_slots, packet_type=RedPacketType.PASSWORD, password=password, ) def create_exclusive_packet( self, chat_id: int, owner_id: int, total_amount: int, target_user_id: int, target_username: str, ) -> RedPacket: if target_user_id <= 0: raise ValueError("目标用户ID无效") return self._create_packet( chat_id=chat_id, owner_id=owner_id, total_amount=total_amount, total_slots=1, packet_type=RedPacketType.EXCLUSIVE, exclusive_user_id=target_user_id, exclusive_username=target_username or f"user_{target_user_id}", ) def _create_packet( self, *, chat_id: int, owner_id: int, total_amount: int, total_slots: int, packet_type: RedPacketType, password: Optional[str] = None, exclusive_user_id: Optional[int] = None, exclusive_username: Optional[str] = None, ) -> RedPacket: if total_amount <= 0: raise ValueError("红包金额必须大于0") if total_slots <= 0: raise ValueError("红包份数必须大于0") if total_amount < total_slots: raise ValueError("红包金额不能小于份数") self._deduct_points(owner_id, total_amount, packet_type, chat_id) packet_id = self.generate_packet_id() created_at = self._now() cursor = get_db().conn.cursor() cursor.execute( f""" INSERT INTO {self.PACKET_TABLE} ( packet_id, owner_id, packet_type, total_amount, total_slots, remaining_amount, remaining_slots, exclusive_user_id, exclusive_username, password, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( packet_id, owner_id, packet_type.value, total_amount, total_slots, total_amount, total_slots, exclusive_user_id, exclusive_username, password, created_at, ), ) get_db().conn.commit() return RedPacket( packet_id=packet_id, owner_id=owner_id, packet_type=packet_type, total_amount=total_amount, total_slots=total_slots, remaining_amount=total_amount, remaining_slots=total_slots, exclusive_user_id=exclusive_user_id, exclusive_username=exclusive_username, password=password, created_at=created_at, ) def _deduct_points( self, owner_id: int, total_amount: int, packet_type: RedPacketType, chat_id: int, ) -> None: config_api = self._config_api() current_points = config_api.get_user_points(owner_id) if current_points < total_amount: raise ValueError("积分不足,无法发放红包") config_api.adjust_user_points_sync( owner_id, -total_amount, reason=f"发放{packet_type.value}红包(会话{chat_id})", ) # endregion # region 抢红包 def claim_packet( self, chat_id: int, packet_id: str, user_id: int, username: str, tokens: Optional[Sequence[str]] = None, ) -> Tuple[bool, str, Optional[int]]: if not packet_id: return False, "❌ 红包ID不能为空", None conn = get_db().conn cursor = conn.cursor() try: cursor.execute("BEGIN IMMEDIATE") cursor.execute( f""" SELECT packet_id, owner_id, packet_type, total_amount, total_slots, remaining_amount, remaining_slots, exclusive_user_id, exclusive_username, password FROM {self.PACKET_TABLE} WHERE packet_id = ? """, (packet_id,), ) row = cursor.fetchone() if not row: conn.rollback() return False, "❌ 红包不存在或已失效", None packet = self._row_to_packet(row) if packet.remaining_slots <= 0 or packet.remaining_amount <= 0: conn.rollback() return False, "⚠️ 红包已经被抢完了", None cursor.execute( f""" SELECT 1 FROM {self.CLAIM_TABLE} WHERE packet_id = ? AND user_id = ? """, (packet_id, user_id), ) if cursor.fetchone(): conn.rollback() return False, "⚠️ 你已经抢过这个红包了", None if packet.packet_type == RedPacketType.EXCLUSIVE: if packet.exclusive_user_id != user_id: conn.rollback() return False, "❌ 这是专属红包,只有目标用户可以领取", None if packet.packet_type == RedPacketType.PASSWORD: provided = " ".join(tokens or []).strip() if not provided: conn.rollback() return False, "❌ 这是口令红包,需要附带口令", None if provided != (packet.password or ""): conn.rollback() return False, "❌ 口令不正确", None grant_amount = self._allocate_amount(packet.remaining_amount, packet.remaining_slots) cursor.execute( f""" UPDATE {self.PACKET_TABLE} SET remaining_amount = remaining_amount - ?, remaining_slots = remaining_slots - 1 WHERE packet_id = ? """, (grant_amount, packet_id), ) cursor.execute( f""" INSERT INTO {self.CLAIM_TABLE} (packet_id, user_id, username, amount, claimed_at) VALUES (?, ?, ?, ?, ?) """, (packet_id, user_id, username, grant_amount, self._now()), ) conn.commit() except Exception as exc: conn.rollback() logger.Log("Error", f"{ConsoleFrontColor.RED}领取红包失败: {exc}{ConsoleFrontColor.RESET}") return False, "❌ 领取红包时出现错误,请稍后再试", None config_api = self._config_api() config_api.adjust_user_points_sync( user_id, grant_amount, reason=f"抢到红包 {packet_id}(会话{chat_id})", ) return True, f"✅ 领取成功,获得 {grant_amount} 分!", grant_amount def _allocate_amount(self, remaining_amount: int, remaining_slots: int) -> int: if remaining_slots <= 1: return remaining_amount min_per_slot = 1 max_available = remaining_amount - (remaining_slots - 1) * min_per_slot avg = remaining_amount // remaining_slots upper = max(min_per_slot, min(max_available, max(min_per_slot, avg * 2))) return random.randint(min_per_slot, upper) def _row_to_packet(self, row) -> RedPacket: return RedPacket( packet_id=row["packet_id"], owner_id=row["owner_id"], packet_type=RedPacketType(row["packet_type"]), total_amount=row["total_amount"], total_slots=row["total_slots"], remaining_amount=row["remaining_amount"], remaining_slots=row["remaining_slots"], exclusive_user_id=row["exclusive_user_id"], exclusive_username=row["exclusive_username"], password=row["password"], created_at="", ) # endregion __all__ = ["WPSRedPacketService", "RedPacketType", "RedPacket", "RedPacketClaim"]