Files
NewWPSBot/Plugins/WPSRedPacketService.py

406 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import random
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import List, Optional, Sequence, Tuple
from uuid import uuid4
from PWF.Convention.Runtime.Architecture import Architecture
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from PWF.CoreModules.database import get_db
from PWF.CoreModules.plugin_interface import DatabaseModel
from Plugins.WPSAPI import GuideEntry, WPSAPI
from Plugins.WPSConfigSystem import WPSConfigAPI
logger: ProjectConfig = Architecture.Get(ProjectConfig)
class RedPacketType(str, Enum):
RANDOM = "random"
PASSWORD = "password"
EXCLUSIVE = "exclusive"
@dataclass(frozen=True)
class RedPacket:
packet_id: str
owner_id: int
packet_type: RedPacketType
total_amount: int
total_slots: int
remaining_amount: int
remaining_slots: int
exclusive_user_id: Optional[int] = None
exclusive_username: Optional[str] = None
password: Optional[str] = None
created_at: str = ""
@dataclass(frozen=True)
class RedPacketClaim:
packet_id: str
user_id: int
username: str
amount: int
claimed_at: str
class WPSRedPacketService(WPSAPI):
"""红包核心服务,负责数据存储与积分结算。"""
PACKET_TABLE = "red_packets"
CLAIM_TABLE = "red_packet_claims"
def __init__(self) -> None:
super().__init__()
self._default_amount: int = int(logger.FindItem("red_packet_default_amount", 100))
self._default_slots: int = int(logger.FindItem("red_packet_default_slots", 4))
logger.SaveProperties()
def get_guide_subtitle(self) -> str:
return "统一管理红包生命周期与积分结算的核心服务"
def collect_command_entries(self) -> Sequence[GuideEntry]:
return ()
def collect_guide_entries(self) -> Sequence[GuideEntry]:
return (
{
"title": "默认配置",
"description": f"默认金额 {self._default_amount} 分,默认份数 {self._default_slots} 份,可通过 ProjectConfig 覆盖。",
},
{
"title": "积分结算",
"description": "发送红包时一次性扣除积分,领取时再发放给领取者,红包不支持退款或撤销。",
},
)
def dependencies(self) -> List[type]:
return [WPSAPI, WPSConfigAPI]
def is_enable_plugin(self) -> bool:
return True
def register_db_model(self):
return [
DatabaseModel(
table_name=self.PACKET_TABLE,
column_defs={
"packet_id": "TEXT PRIMARY KEY",
"owner_id": "INTEGER NOT NULL",
"packet_type": "TEXT NOT NULL",
"total_amount": "INTEGER NOT NULL",
"total_slots": "INTEGER NOT NULL",
"remaining_amount": "INTEGER NOT NULL",
"remaining_slots": "INTEGER NOT NULL",
"exclusive_user_id": "INTEGER",
"exclusive_username": "TEXT",
"password": "TEXT",
"created_at": "TEXT NOT NULL",
},
),
DatabaseModel(
table_name=self.CLAIM_TABLE,
column_defs={
"claim_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
"packet_id": "TEXT NOT NULL",
"user_id": "INTEGER NOT NULL",
"username": "TEXT",
"amount": "INTEGER NOT NULL",
"claimed_at": "TEXT NOT NULL",
},
),
]
def wake_up(self) -> None:
logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}WPSRedPacketService 插件已加载{ConsoleFrontColor.RESET}",
)
# region 配置 / 公共接口
def get_default_amount(self) -> int:
return max(1, self._default_amount)
def get_default_slots(self) -> int:
return max(1, self._default_slots)
def generate_packet_id(self) -> str:
return uuid4().hex[:12]
def _config_api(self) -> WPSConfigAPI:
return Architecture.Get(WPSConfigAPI)
def _now(self) -> str:
return datetime.now(timezone.utc).isoformat()
# endregion
# region 发红包
def create_random_packet(
self,
chat_id: int,
owner_id: int,
total_amount: int,
total_slots: int,
) -> RedPacket:
return self._create_packet(
chat_id=chat_id,
owner_id=owner_id,
total_amount=total_amount,
total_slots=total_slots,
packet_type=RedPacketType.RANDOM,
)
def create_password_packet(
self,
chat_id: int,
owner_id: int,
total_amount: int,
total_slots: int,
password: str,
) -> RedPacket:
if not password:
raise ValueError("口令不能为空")
return self._create_packet(
chat_id=chat_id,
owner_id=owner_id,
total_amount=total_amount,
total_slots=total_slots,
packet_type=RedPacketType.PASSWORD,
password=password,
)
def create_exclusive_packet(
self,
chat_id: int,
owner_id: int,
total_amount: int,
target_user_id: int,
target_username: str,
) -> RedPacket:
if target_user_id <= 0:
raise ValueError("目标用户ID无效")
return self._create_packet(
chat_id=chat_id,
owner_id=owner_id,
total_amount=total_amount,
total_slots=1,
packet_type=RedPacketType.EXCLUSIVE,
exclusive_user_id=target_user_id,
exclusive_username=target_username or f"user_{target_user_id}",
)
def _create_packet(
self,
*,
chat_id: int,
owner_id: int,
total_amount: int,
total_slots: int,
packet_type: RedPacketType,
password: Optional[str] = None,
exclusive_user_id: Optional[int] = None,
exclusive_username: Optional[str] = None,
) -> RedPacket:
if total_amount <= 0:
raise ValueError("红包金额必须大于0")
if total_slots <= 0:
raise ValueError("红包份数必须大于0")
if total_amount < total_slots:
raise ValueError("红包金额不能小于份数")
self._deduct_points(owner_id, total_amount, packet_type, chat_id)
packet_id = self.generate_packet_id()
created_at = self._now()
cursor = get_db().conn.cursor()
cursor.execute(
f"""
INSERT INTO {self.PACKET_TABLE} (
packet_id, owner_id, packet_type,
total_amount, total_slots,
remaining_amount, remaining_slots,
exclusive_user_id, exclusive_username,
password, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
packet_id,
owner_id,
packet_type.value,
total_amount,
total_slots,
total_amount,
total_slots,
exclusive_user_id,
exclusive_username,
password,
created_at,
),
)
get_db().conn.commit()
return RedPacket(
packet_id=packet_id,
owner_id=owner_id,
packet_type=packet_type,
total_amount=total_amount,
total_slots=total_slots,
remaining_amount=total_amount,
remaining_slots=total_slots,
exclusive_user_id=exclusive_user_id,
exclusive_username=exclusive_username,
password=password,
created_at=created_at,
)
def _deduct_points(
self,
owner_id: int,
total_amount: int,
packet_type: RedPacketType,
chat_id: int,
) -> None:
config_api = self._config_api()
current_points = config_api.get_user_points(owner_id)
if current_points < total_amount:
raise ValueError("积分不足,无法发放红包")
config_api.adjust_user_points_sync(
owner_id,
-total_amount,
reason=f"发放{packet_type.value}红包(会话{chat_id}",
)
# endregion
# region 抢红包
def claim_packet(
self,
chat_id: int,
packet_id: str,
user_id: int,
username: str,
tokens: Optional[Sequence[str]] = None,
) -> Tuple[bool, str, Optional[int]]:
if not packet_id:
return False, "❌ 红包ID不能为空", None
conn = get_db().conn
cursor = conn.cursor()
try:
cursor.execute("BEGIN IMMEDIATE")
cursor.execute(
f"""
SELECT packet_id, owner_id, packet_type, total_amount, total_slots,
remaining_amount, remaining_slots, exclusive_user_id,
exclusive_username, password
FROM {self.PACKET_TABLE}
WHERE packet_id = ?
""",
(packet_id,),
)
row = cursor.fetchone()
if not row:
conn.rollback()
return False, "❌ 红包不存在或已失效", None
packet = self._row_to_packet(row)
if packet.remaining_slots <= 0 or packet.remaining_amount <= 0:
conn.rollback()
return False, "⚠️ 红包已经被抢完了", None
cursor.execute(
f"""
SELECT 1 FROM {self.CLAIM_TABLE}
WHERE packet_id = ? AND user_id = ?
""",
(packet_id, user_id),
)
if cursor.fetchone():
conn.rollback()
return False, "⚠️ 你已经抢过这个红包了", None
if packet.packet_type == RedPacketType.EXCLUSIVE:
if packet.exclusive_user_id != user_id:
conn.rollback()
return False, "❌ 这是专属红包,只有目标用户可以领取", None
if packet.packet_type == RedPacketType.PASSWORD:
provided = " ".join(tokens or []).strip()
if not provided:
conn.rollback()
return False, "❌ 这是口令红包,需要附带口令", None
if provided != (packet.password or ""):
conn.rollback()
return False, "❌ 口令不正确", None
grant_amount = self._allocate_amount(packet.remaining_amount, packet.remaining_slots)
cursor.execute(
f"""
UPDATE {self.PACKET_TABLE}
SET remaining_amount = remaining_amount - ?,
remaining_slots = remaining_slots - 1
WHERE packet_id = ?
""",
(grant_amount, packet_id),
)
cursor.execute(
f"""
INSERT INTO {self.CLAIM_TABLE} (packet_id, user_id, username, amount, claimed_at)
VALUES (?, ?, ?, ?, ?)
""",
(packet_id, user_id, username, grant_amount, self._now()),
)
conn.commit()
except Exception as exc:
conn.rollback()
logger.Log("Error", f"{ConsoleFrontColor.RED}领取红包失败: {exc}{ConsoleFrontColor.RESET}")
return False, "❌ 领取红包时出现错误,请稍后再试", None
config_api = self._config_api()
config_api.adjust_user_points_sync(
user_id,
grant_amount,
reason=f"抢到红包 {packet_id}(会话{chat_id}",
)
return True, f"✅ 领取成功,获得 {grant_amount} 分!", grant_amount
def _allocate_amount(self, remaining_amount: int, remaining_slots: int) -> int:
if remaining_slots <= 1:
return remaining_amount
min_per_slot = 1
max_available = remaining_amount - (remaining_slots - 1) * min_per_slot
avg = remaining_amount // remaining_slots
upper = max(min_per_slot, min(max_available, max(min_per_slot, avg * 2)))
return random.randint(min_per_slot, upper)
def _row_to_packet(self, row) -> RedPacket:
return RedPacket(
packet_id=row["packet_id"],
owner_id=row["owner_id"],
packet_type=RedPacketType(row["packet_type"]),
total_amount=row["total_amount"],
total_slots=row["total_slots"],
remaining_amount=row["remaining_amount"],
remaining_slots=row["remaining_slots"],
exclusive_user_id=row["exclusive_user_id"],
exclusive_username=row["exclusive_username"],
password=row["password"],
created_at="",
)
# endregion
__all__ = ["WPSRedPacketService", "RedPacketType", "RedPacket", "RedPacketClaim"]