406 lines
13 KiB
Python
406 lines
13 KiB
Python
from __future__ import annotations
|
||
|
||
import random
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timezone
|
||
from enum import Enum
|
||
from typing import List, Optional, Sequence, Tuple
|
||
from uuid import uuid4
|
||
|
||
from PWF.Convention.Runtime.Architecture import Architecture
|
||
from PWF.Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
|
||
from PWF.CoreModules.database import get_db
|
||
from PWF.CoreModules.plugin_interface import DatabaseModel
|
||
|
||
from Plugins.WPSAPI import GuideEntry, WPSAPI
|
||
from Plugins.WPSConfigSystem import WPSConfigAPI
|
||
|
||
logger: ProjectConfig = Architecture.Get(ProjectConfig)
|
||
|
||
|
||
class RedPacketType(str, Enum):
|
||
RANDOM = "random"
|
||
PASSWORD = "password"
|
||
EXCLUSIVE = "exclusive"
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class RedPacket:
|
||
packet_id: str
|
||
owner_id: int
|
||
packet_type: RedPacketType
|
||
total_amount: int
|
||
total_slots: int
|
||
remaining_amount: int
|
||
remaining_slots: int
|
||
exclusive_user_id: Optional[int] = None
|
||
exclusive_username: Optional[str] = None
|
||
password: Optional[str] = None
|
||
created_at: str = ""
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class RedPacketClaim:
|
||
packet_id: str
|
||
user_id: int
|
||
username: str
|
||
amount: int
|
||
claimed_at: str
|
||
|
||
|
||
class WPSRedPacketService(WPSAPI):
|
||
"""红包核心服务,负责数据存储与积分结算。"""
|
||
|
||
PACKET_TABLE = "red_packets"
|
||
CLAIM_TABLE = "red_packet_claims"
|
||
|
||
def __init__(self) -> None:
|
||
super().__init__()
|
||
self._default_amount: int = int(logger.FindItem("red_packet_default_amount", 100))
|
||
self._default_slots: int = int(logger.FindItem("red_packet_default_slots", 4))
|
||
logger.SaveProperties()
|
||
|
||
def get_guide_subtitle(self) -> str:
|
||
return "统一管理红包生命周期与积分结算的核心服务"
|
||
|
||
def collect_command_entries(self) -> Sequence[GuideEntry]:
|
||
return ()
|
||
|
||
def collect_guide_entries(self) -> Sequence[GuideEntry]:
|
||
return (
|
||
{
|
||
"title": "默认配置",
|
||
"description": f"默认金额 {self._default_amount} 分,默认份数 {self._default_slots} 份,可通过 ProjectConfig 覆盖。",
|
||
},
|
||
{
|
||
"title": "积分结算",
|
||
"description": "发送红包时一次性扣除积分,领取时再发放给领取者,红包不支持退款或撤销。",
|
||
},
|
||
)
|
||
|
||
def dependencies(self) -> List[type]:
|
||
return [WPSAPI, WPSConfigAPI]
|
||
|
||
def is_enable_plugin(self) -> bool:
|
||
return True
|
||
|
||
def register_db_model(self):
|
||
return [
|
||
DatabaseModel(
|
||
table_name=self.PACKET_TABLE,
|
||
column_defs={
|
||
"packet_id": "TEXT PRIMARY KEY",
|
||
"owner_id": "INTEGER NOT NULL",
|
||
"packet_type": "TEXT NOT NULL",
|
||
"total_amount": "INTEGER NOT NULL",
|
||
"total_slots": "INTEGER NOT NULL",
|
||
"remaining_amount": "INTEGER NOT NULL",
|
||
"remaining_slots": "INTEGER NOT NULL",
|
||
"exclusive_user_id": "INTEGER",
|
||
"exclusive_username": "TEXT",
|
||
"password": "TEXT",
|
||
"created_at": "TEXT NOT NULL",
|
||
},
|
||
),
|
||
DatabaseModel(
|
||
table_name=self.CLAIM_TABLE,
|
||
column_defs={
|
||
"claim_id": "INTEGER PRIMARY KEY AUTOINCREMENT",
|
||
"packet_id": "TEXT NOT NULL",
|
||
"user_id": "INTEGER NOT NULL",
|
||
"username": "TEXT",
|
||
"amount": "INTEGER NOT NULL",
|
||
"claimed_at": "TEXT NOT NULL",
|
||
},
|
||
),
|
||
]
|
||
|
||
def wake_up(self) -> None:
|
||
logger.Log(
|
||
"Info",
|
||
f"{ConsoleFrontColor.GREEN}WPSRedPacketService 插件已加载{ConsoleFrontColor.RESET}",
|
||
)
|
||
|
||
# region 配置 / 公共接口
|
||
|
||
def get_default_amount(self) -> int:
|
||
return max(1, self._default_amount)
|
||
|
||
def get_default_slots(self) -> int:
|
||
return max(1, self._default_slots)
|
||
|
||
def generate_packet_id(self) -> str:
|
||
return uuid4().hex[:12]
|
||
|
||
def _config_api(self) -> WPSConfigAPI:
|
||
return Architecture.Get(WPSConfigAPI)
|
||
|
||
def _now(self) -> str:
|
||
return datetime.now(timezone.utc).isoformat()
|
||
|
||
# endregion
|
||
|
||
# region 发红包
|
||
|
||
def create_random_packet(
|
||
self,
|
||
chat_id: int,
|
||
owner_id: int,
|
||
total_amount: int,
|
||
total_slots: int,
|
||
) -> RedPacket:
|
||
return self._create_packet(
|
||
chat_id=chat_id,
|
||
owner_id=owner_id,
|
||
total_amount=total_amount,
|
||
total_slots=total_slots,
|
||
packet_type=RedPacketType.RANDOM,
|
||
)
|
||
|
||
def create_password_packet(
|
||
self,
|
||
chat_id: int,
|
||
owner_id: int,
|
||
total_amount: int,
|
||
total_slots: int,
|
||
password: str,
|
||
) -> RedPacket:
|
||
if not password:
|
||
raise ValueError("口令不能为空")
|
||
return self._create_packet(
|
||
chat_id=chat_id,
|
||
owner_id=owner_id,
|
||
total_amount=total_amount,
|
||
total_slots=total_slots,
|
||
packet_type=RedPacketType.PASSWORD,
|
||
password=password,
|
||
)
|
||
|
||
def create_exclusive_packet(
|
||
self,
|
||
chat_id: int,
|
||
owner_id: int,
|
||
total_amount: int,
|
||
target_user_id: int,
|
||
target_username: str,
|
||
) -> RedPacket:
|
||
if target_user_id <= 0:
|
||
raise ValueError("目标用户ID无效")
|
||
return self._create_packet(
|
||
chat_id=chat_id,
|
||
owner_id=owner_id,
|
||
total_amount=total_amount,
|
||
total_slots=1,
|
||
packet_type=RedPacketType.EXCLUSIVE,
|
||
exclusive_user_id=target_user_id,
|
||
exclusive_username=target_username or f"user_{target_user_id}",
|
||
)
|
||
|
||
def _create_packet(
|
||
self,
|
||
*,
|
||
chat_id: int,
|
||
owner_id: int,
|
||
total_amount: int,
|
||
total_slots: int,
|
||
packet_type: RedPacketType,
|
||
password: Optional[str] = None,
|
||
exclusive_user_id: Optional[int] = None,
|
||
exclusive_username: Optional[str] = None,
|
||
) -> RedPacket:
|
||
if total_amount <= 0:
|
||
raise ValueError("红包金额必须大于0")
|
||
if total_slots <= 0:
|
||
raise ValueError("红包份数必须大于0")
|
||
if total_amount < total_slots:
|
||
raise ValueError("红包金额不能小于份数")
|
||
|
||
self._deduct_points(owner_id, total_amount, packet_type, chat_id)
|
||
|
||
packet_id = self.generate_packet_id()
|
||
created_at = self._now()
|
||
|
||
cursor = get_db().conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
INSERT INTO {self.PACKET_TABLE} (
|
||
packet_id, owner_id, packet_type,
|
||
total_amount, total_slots,
|
||
remaining_amount, remaining_slots,
|
||
exclusive_user_id, exclusive_username,
|
||
password, created_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
packet_id,
|
||
owner_id,
|
||
packet_type.value,
|
||
total_amount,
|
||
total_slots,
|
||
total_amount,
|
||
total_slots,
|
||
exclusive_user_id,
|
||
exclusive_username,
|
||
password,
|
||
created_at,
|
||
),
|
||
)
|
||
get_db().conn.commit()
|
||
return RedPacket(
|
||
packet_id=packet_id,
|
||
owner_id=owner_id,
|
||
packet_type=packet_type,
|
||
total_amount=total_amount,
|
||
total_slots=total_slots,
|
||
remaining_amount=total_amount,
|
||
remaining_slots=total_slots,
|
||
exclusive_user_id=exclusive_user_id,
|
||
exclusive_username=exclusive_username,
|
||
password=password,
|
||
created_at=created_at,
|
||
)
|
||
|
||
def _deduct_points(
|
||
self,
|
||
owner_id: int,
|
||
total_amount: int,
|
||
packet_type: RedPacketType,
|
||
chat_id: int,
|
||
) -> None:
|
||
config_api = self._config_api()
|
||
current_points = config_api.get_user_points(owner_id)
|
||
if current_points < total_amount:
|
||
raise ValueError("积分不足,无法发放红包")
|
||
config_api.adjust_user_points_sync(
|
||
owner_id,
|
||
-total_amount,
|
||
reason=f"发放{packet_type.value}红包(会话{chat_id})",
|
||
)
|
||
|
||
# endregion
|
||
|
||
# region 抢红包
|
||
|
||
def claim_packet(
|
||
self,
|
||
chat_id: int,
|
||
packet_id: str,
|
||
user_id: int,
|
||
username: str,
|
||
tokens: Optional[Sequence[str]] = None,
|
||
) -> Tuple[bool, str, Optional[int]]:
|
||
if not packet_id:
|
||
return False, "❌ 红包ID不能为空", None
|
||
conn = get_db().conn
|
||
cursor = conn.cursor()
|
||
try:
|
||
cursor.execute("BEGIN IMMEDIATE")
|
||
cursor.execute(
|
||
f"""
|
||
SELECT packet_id, owner_id, packet_type, total_amount, total_slots,
|
||
remaining_amount, remaining_slots, exclusive_user_id,
|
||
exclusive_username, password
|
||
FROM {self.PACKET_TABLE}
|
||
WHERE packet_id = ?
|
||
""",
|
||
(packet_id,),
|
||
)
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
conn.rollback()
|
||
return False, "❌ 红包不存在或已失效", None
|
||
|
||
packet = self._row_to_packet(row)
|
||
|
||
if packet.remaining_slots <= 0 or packet.remaining_amount <= 0:
|
||
conn.rollback()
|
||
return False, "⚠️ 红包已经被抢完了", None
|
||
|
||
cursor.execute(
|
||
f"""
|
||
SELECT 1 FROM {self.CLAIM_TABLE}
|
||
WHERE packet_id = ? AND user_id = ?
|
||
""",
|
||
(packet_id, user_id),
|
||
)
|
||
if cursor.fetchone():
|
||
conn.rollback()
|
||
return False, "⚠️ 你已经抢过这个红包了", None
|
||
|
||
if packet.packet_type == RedPacketType.EXCLUSIVE:
|
||
if packet.exclusive_user_id != user_id:
|
||
conn.rollback()
|
||
return False, "❌ 这是专属红包,只有目标用户可以领取", None
|
||
|
||
if packet.packet_type == RedPacketType.PASSWORD:
|
||
provided = " ".join(tokens or []).strip()
|
||
if not provided:
|
||
conn.rollback()
|
||
return False, "❌ 这是口令红包,需要附带口令", None
|
||
if provided != (packet.password or ""):
|
||
conn.rollback()
|
||
return False, "❌ 口令不正确", None
|
||
|
||
grant_amount = self._allocate_amount(packet.remaining_amount, packet.remaining_slots)
|
||
|
||
cursor.execute(
|
||
f"""
|
||
UPDATE {self.PACKET_TABLE}
|
||
SET remaining_amount = remaining_amount - ?,
|
||
remaining_slots = remaining_slots - 1
|
||
WHERE packet_id = ?
|
||
""",
|
||
(grant_amount, packet_id),
|
||
)
|
||
cursor.execute(
|
||
f"""
|
||
INSERT INTO {self.CLAIM_TABLE} (packet_id, user_id, username, amount, claimed_at)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
""",
|
||
(packet_id, user_id, username, grant_amount, self._now()),
|
||
)
|
||
conn.commit()
|
||
except Exception as exc:
|
||
conn.rollback()
|
||
logger.Log("Error", f"{ConsoleFrontColor.RED}领取红包失败: {exc}{ConsoleFrontColor.RESET}")
|
||
return False, "❌ 领取红包时出现错误,请稍后再试", None
|
||
|
||
config_api = self._config_api()
|
||
config_api.adjust_user_points_sync(
|
||
user_id,
|
||
grant_amount,
|
||
reason=f"抢到红包 {packet_id}(会话{chat_id})",
|
||
)
|
||
|
||
return True, f"✅ 领取成功,获得 {grant_amount} 分!", grant_amount
|
||
|
||
def _allocate_amount(self, remaining_amount: int, remaining_slots: int) -> int:
|
||
if remaining_slots <= 1:
|
||
return remaining_amount
|
||
min_per_slot = 1
|
||
max_available = remaining_amount - (remaining_slots - 1) * min_per_slot
|
||
avg = remaining_amount // remaining_slots
|
||
upper = max(min_per_slot, min(max_available, max(min_per_slot, avg * 2)))
|
||
return random.randint(min_per_slot, upper)
|
||
|
||
def _row_to_packet(self, row) -> RedPacket:
|
||
return RedPacket(
|
||
packet_id=row["packet_id"],
|
||
owner_id=row["owner_id"],
|
||
packet_type=RedPacketType(row["packet_type"]),
|
||
total_amount=row["total_amount"],
|
||
total_slots=row["total_slots"],
|
||
remaining_amount=row["remaining_amount"],
|
||
remaining_slots=row["remaining_slots"],
|
||
exclusive_user_id=row["exclusive_user_id"],
|
||
exclusive_username=row["exclusive_username"],
|
||
password=row["password"],
|
||
created_at="",
|
||
)
|
||
|
||
# endregion
|
||
|
||
|
||
__all__ = ["WPSRedPacketService", "RedPacketType", "RedPacket", "RedPacketClaim"]
|
||
|