Files
WPSBot/core/database.py
2025-10-31 11:58:35 +08:00

1447 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SQLite数据库操作模块 - 使用标准库sqlite3"""
import sqlite3
import json
import time
import logging
from typing import Optional, Dict, Any, List
from pathlib import Path
from config import DATABASE_PATH
logger = logging.getLogger(__name__)
class Database:
"""数据库管理类"""
def __init__(self, db_path: str = DATABASE_PATH):
"""初始化数据库连接
Args:
db_path: 数据库文件路径
"""
self.db_path = db_path
self._conn: Optional[sqlite3.Connection] = None
self._ensure_db_exists()
self.init_tables()
def _ensure_db_exists(self):
"""确保数据库目录存在"""
db_dir = Path(self.db_path).parent
db_dir.mkdir(parents=True, exist_ok=True)
@property
def conn(self) -> sqlite3.Connection:
"""获取数据库连接(懒加载)"""
if self._conn is None:
try:
self._conn = sqlite3.connect(
self.db_path,
check_same_thread=False, # 允许多线程访问
isolation_level=None, # 自动提交
timeout=30.0 # 增加超时时间
)
self._conn.row_factory = sqlite3.Row # 支持字典式访问
# 启用WAL模式以提高并发性能
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._conn.execute("PRAGMA cache_size=1000")
self._conn.execute("PRAGMA temp_store=MEMORY")
logger.info(f"数据库连接成功: {self.db_path}")
except Exception as e:
logger.error(f"数据库连接失败: {e}", exc_info=True)
raise
return self._conn
def _column_exists(self, table_name: str, column_name: str) -> bool:
"""检查表中列是否存在
Args:
table_name: 表名
column_name: 列名
Returns:
是否存在
"""
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
def _add_column_if_not_exists(self, table_name: str, column_name: str, column_def: str):
"""安全地添加列(如果不存在)
Args:
table_name: 表名
column_name: 列名
column_def: 列定义(如 "INTEGER""TEXT DEFAULT ''"
"""
if not self._column_exists(table_name, column_name):
try:
cursor = self.conn.cursor()
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}")
logger.info(f"为表 {table_name} 添加列 {column_name}")
except Exception as e:
logger.warning(f"添加列失败: {e}")
def init_tables(self):
"""初始化数据库表"""
cursor = self.conn.cursor()
# 用户表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
created_at INTEGER NOT NULL,
last_active INTEGER NOT NULL
)
""")
# 游戏状态表(新增 confirmed 列用于标记可回收状态)
cursor.execute("""
CREATE TABLE IF NOT EXISTS game_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
state_data TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
confirmed INTEGER DEFAULT 0,
UNIQUE(chat_id, user_id, game_type)
)
""")
# 幂等迁移:为已有表增加 confirmed 列
try:
cursor.execute("ALTER TABLE game_states ADD COLUMN confirmed INTEGER DEFAULT 0")
logger.info("为 game_states 增加 confirmed 列")
except Exception:
# 列已存在或不可变更时忽略
pass
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_chat_user
ON game_states(chat_id, user_id)
""")
# 创建用户名索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_username
ON users(username)
""")
# 游戏统计表
cursor.execute("""
CREATE TABLE IF NOT EXISTS game_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
wins INTEGER DEFAULT 0,
losses INTEGER DEFAULT 0,
draws INTEGER DEFAULT 0,
total_plays INTEGER DEFAULT 0,
UNIQUE(user_id, game_type)
)
""")
# 用户积分表 - 简化版本,只保留必要字段
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_points (
user_id INTEGER PRIMARY KEY,
points INTEGER DEFAULT 0,
last_checkin_date TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
# 赌场下注记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS casino_bets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
user_id INTEGER NOT NULL,
bet_type TEXT NOT NULL,
amount INTEGER NOT NULL,
multiplier REAL NOT NULL,
status TEXT DEFAULT 'pending',
result TEXT,
win_amount INTEGER,
created_at INTEGER NOT NULL,
settled_at INTEGER,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
# 赌场游戏会话表
cursor.execute("""
CREATE TABLE IF NOT EXISTS casino_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
banker_id INTEGER NOT NULL,
min_bet INTEGER NOT NULL,
max_bet INTEGER NOT NULL,
multiplier REAL NOT NULL,
house_fee REAL DEFAULT 0.05,
status TEXT DEFAULT 'open',
created_at INTEGER NOT NULL,
settled_at INTEGER,
UNIQUE(chat_id, game_type, status)
)
""")
# 21点手牌表
cursor.execute("""
CREATE TABLE IF NOT EXISTS casino_blackjack_hands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
hand_data TEXT NOT NULL,
hand_status TEXT DEFAULT 'playing',
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (session_id) REFERENCES casino_sessions(id)
)
""")
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_casino_bets
ON casino_bets(chat_id, game_type, status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_casino_sessions
ON casino_sessions(chat_id, game_type, status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_casino_blackjack_hands
ON casino_blackjack_hands(session_id, user_id)
""")
# 兼容性检查为casino_sessions表添加新字段
self._add_column_if_not_exists('casino_sessions', 'current_phase', "TEXT DEFAULT 'betting'")
self._add_column_if_not_exists('casino_sessions', 'blackjack_multiplier', 'REAL DEFAULT 1.5')
# 兼容性检查为casino_bets表添加新字段轮盘和21点专用
self._add_column_if_not_exists('casino_bets', 'bet_category', "TEXT")
self._add_column_if_not_exists('casino_bets', 'bet_number', 'INTEGER')
self._add_column_if_not_exists('casino_bets', 'bet_value', "TEXT")
self._add_column_if_not_exists('casino_bets', 'hand_status', "TEXT")
logger.info("数据库表初始化完成")
# ===== 用户相关操作 =====
def get_or_create_user(self, user_id: int, username: str = None) -> Dict:
"""获取或创建用户
Args:
user_id: 用户ID
username: 用户名
Returns:
用户信息字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
# 尝试获取用户
cursor.execute(
"SELECT * FROM users WHERE user_id = ?",
(user_id,)
)
user = cursor.fetchone()
if user:
# 更新最后活跃时间
cursor.execute(
"UPDATE users SET last_active = ? WHERE user_id = ?",
(current_time, user_id)
)
return dict(user)
else:
# 创建新用户
cursor.execute(
"INSERT INTO users (user_id, username, created_at, last_active) VALUES (?, ?, ?, ?)",
(user_id, username, current_time, current_time)
)
return {
'user_id': user_id,
'username': username,
'created_at': current_time,
'last_active': current_time
}
def get_user_by_name(self, username: str) -> Optional[Dict]:
"""根据用户名查找用户
Args:
username: 用户名
Returns:
用户信息字典如果不存在返回None
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM users WHERE username = ?",
(username,)
)
row = cursor.fetchone()
if row:
return dict(row)
return None
def update_user_name(self, user_id: int, username: str) -> bool:
"""更新用户名称
Args:
user_id: 用户ID
username: 新用户名
Returns:
是否成功
"""
try:
# 确保用户存在
self.get_or_create_user(user_id)
cursor = self.conn.cursor()
cursor.execute(
"UPDATE users SET username = ? WHERE user_id = ?",
(username, user_id)
)
logger.info(f"用户 {user_id} 更新名称为: {username}")
return True
except Exception as e:
logger.error(f"更新用户名失败: user_id={user_id}, username={username}, error={e}", exc_info=True)
return False
def get_user_display_name(self, user_id: int) -> str:
"""获取用户显示名称
如果用户已注册username不为None返回用户名否则返回"用户{user_id}"
Args:
user_id: 用户ID
Returns:
用户显示名称(用户名或"用户{user_id}"
"""
user_dict = self.get_or_create_user(user_id)
username = user_dict.get('username')
if username:
return username
else:
return f"用户{user_id}"
# ===== 游戏状态相关操作 =====
def get_game_state(self, chat_id: int, user_id: int, game_type: str) -> Optional[Dict]:
"""获取游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
Returns:
游戏状态字典如果不存在返回None
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(chat_id, user_id, game_type)
)
row = cursor.fetchone()
if row:
state = dict(row)
# 解析JSON数据
if state.get('state_data'):
state['state_data'] = json.loads(state['state_data'])
return state
return None
def save_game_state(self, chat_id: int, user_id: int, game_type: str, state_data: Dict):
"""保存游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
state_data: 状态数据字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
state_json = json.dumps(state_data, ensure_ascii=False)
cursor.execute("""
INSERT INTO game_states (chat_id, user_id, game_type, state_data, created_at, updated_at, confirmed)
VALUES (?, ?, ?, ?, ?, ?, 0)
ON CONFLICT(chat_id, user_id, game_type)
DO UPDATE SET state_data = excluded.state_data,
updated_at = excluded.updated_at,
confirmed = 0
""", (chat_id, user_id, game_type, state_json, current_time, current_time))
logger.debug(f"保存游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def confirm_game_state(self, chat_id: int, user_id: int, game_type: str):
"""标记游戏状态为已确认(可由清理器删除)
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
"""
cursor = self.conn.cursor()
current_time = int(time.time())
cursor.execute(
"UPDATE game_states SET confirmed = 1, updated_at = ? WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(current_time, chat_id, user_id, game_type)
)
logger.debug(f"确认游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def delete_game_state(self, chat_id: int, user_id: int, game_type: str):
"""删除游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
"""
cursor = self.conn.cursor()
cursor.execute(
"DELETE FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(chat_id, user_id, game_type)
)
logger.debug(f"删除游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def cleanup_old_sessions(self, timeout: int = 1800):
"""清理过期的游戏会话
Args:
timeout: 超时时间(秒)
"""
cursor = self.conn.cursor()
cutoff_time = int(time.time()) - timeout
# 仅删除已确认confirmed=1的旧会话避免误删进行中的长时任务
cursor.execute(
"DELETE FROM game_states WHERE confirmed = 1 AND updated_at < ?",
(cutoff_time,)
)
deleted = cursor.rowcount
if deleted > 0:
logger.info(f"清理了 {deleted} 个过期游戏会话")
# ===== 游戏统计相关操作 =====
def get_game_stats(self, user_id: int, game_type: str) -> Dict:
"""获取游戏统计
Args:
user_id: 用户ID
game_type: 游戏类型
Returns:
统计数据字典
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM game_stats WHERE user_id = ? AND game_type = ?",
(user_id, game_type)
)
row = cursor.fetchone()
if row:
return dict(row)
else:
# 返回默认值
return {
'user_id': user_id,
'game_type': game_type,
'wins': 0,
'losses': 0,
'draws': 0,
'total_plays': 0
}
def update_game_stats(self, user_id: int, game_type: str,
win: bool = False, loss: bool = False, draw: bool = False):
"""更新游戏统计
Args:
user_id: 用户ID
game_type: 游戏类型
win: 是否获胜
loss: 是否失败
draw: 是否平局
"""
cursor = self.conn.cursor()
# 使用UPSERT语法
cursor.execute("""
INSERT INTO game_stats (user_id, game_type, wins, losses, draws, total_plays)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(user_id, game_type)
DO UPDATE SET
wins = wins + ?,
losses = losses + ?,
draws = draws + ?,
total_plays = total_plays + 1
""", (user_id, game_type, int(win), int(loss), int(draw),
int(win), int(loss), int(draw)))
logger.debug(f"更新游戏统计: user_id={user_id}, game_type={game_type}")
# ===== 积分相关操作 =====
def get_user_points(self, user_id: int) -> Dict:
"""获取用户积分信息
Args:
user_id: 用户ID
Returns:
积分信息字典
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if row:
return dict(row)
else:
# 创建新用户积分记录
current_time = int(time.time())
cursor.execute(
"INSERT INTO user_points (user_id, points, created_at, updated_at) VALUES (?, 0, ?, ?)",
(user_id, current_time, current_time)
)
return {
'user_id': user_id,
'points': 0,
'last_checkin_date': None,
'created_at': current_time,
'updated_at': current_time
}
def add_points(self, user_id: int, points: int, source: str, description: str = None) -> bool:
"""添加积分
Args:
user_id: 用户ID
points: 积分数量
source: 积分来源
description: 描述
Returns:
是否成功
"""
if points <= 0:
logger.warning(f"积分数量无效: {points}")
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 确保用户存在
self.get_or_create_user(user_id)
# 更新用户积分(不触碰 last_checkin_date
# 若不存在则插入;存在则 points += ? 且更新 updated_at
cursor.execute("""
INSERT INTO user_points (user_id, points, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
points = user_points.points + excluded.points,
updated_at = excluded.updated_at
""", (user_id, points, current_time, current_time))
logger.info(f"用户 {user_id} 成功获得 {points} 积分,来源:{source}")
return True
except Exception as e:
logger.error(f"添加积分失败: user_id={user_id}, points={points}, error={e}", exc_info=True)
return False
def consume_points(self, user_id: int, points: int, source: str, description: str = None) -> bool:
"""消费积分
Args:
user_id: 用户ID
points: 积分数量
source: 消费来源
description: 描述
Returns:
是否成功
"""
if points <= 0:
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 检查积分是否足够
cursor.execute(
"SELECT points FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if not row or row[0] < points:
logger.warning(f"用户 {user_id} 积分不足,需要 {points},当前可用 {row[0] if row else 0}")
return False
# 消费积分
cursor.execute(
"UPDATE user_points SET points = points - ?, updated_at = ? WHERE user_id = ?",
(points, current_time, user_id)
)
logger.debug(f"用户 {user_id} 消费 {points} 积分,来源:{source}")
return True
except Exception as e:
logger.error(f"消费积分失败: {e}")
return False
def check_daily_checkin(self, user_id: int, date: str) -> bool:
"""检查用户是否已签到
Args:
user_id: 用户ID
date: 日期字符串 (YYYY-MM-DD)
Returns:
是否已签到
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT last_checkin_date FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
return row and row[0] == date
def daily_checkin(self, user_id: int, points: int) -> bool:
"""每日签到
Args:
user_id: 用户ID
points: 签到积分
Returns:
是否成功
"""
from datetime import datetime
today = datetime.now().strftime('%Y-%m-%d')
if self.check_daily_checkin(user_id, today):
logger.warning(f"用户 {user_id} 今日已签到")
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 确保用户存在
self.get_or_create_user(user_id)
# 获取签到前积分
points_before = self.get_user_points(user_id)
logger.info(f"用户 {user_id} 签到前积分: {points_before['points']}")
# 更新积分和签到日期
cursor.execute("""
INSERT OR REPLACE INTO user_points (user_id, points, last_checkin_date, created_at, updated_at)
VALUES (?, COALESCE((SELECT points FROM user_points WHERE user_id = ?), 0) + ?, ?, ?, ?)
""", (user_id, user_id, points, today, current_time, current_time))
# 验证积分是否真的增加了
points_after = self.get_user_points(user_id)
logger.info(f"用户 {user_id} 签到后积分: {points_after['points']}")
if points_after['points'] > points_before['points']:
logger.info(f"用户 {user_id} 签到成功,积分增加: {points_after['points'] - points_before['points']}")
return True
else:
logger.error(f"用户 {user_id} 签到失败,积分未增加")
return False
except Exception as e:
logger.error(f"每日签到失败: {e}", exc_info=True)
return False
def get_points_leaderboard(self, limit: int = 10) -> List[Dict]:
"""获取积分排行榜
Args:
limit: 限制数量
Returns:
排行榜列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT u.user_id, u.username, up.points
FROM users u
LEFT JOIN user_points up ON u.user_id = up.user_id
ORDER BY COALESCE(up.points, 0) DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
return [dict(row) for row in rows]
# ===== 赌场相关操作 =====
def get_any_active_casino_session(self, chat_id: int) -> Optional[Dict]:
"""获取任意活跃的游戏会话(用于单场限制检查)
Args:
chat_id: 会话ID
Returns:
会话信息字典或None
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM casino_sessions
WHERE chat_id = ? AND status = 'open'
ORDER BY id DESC LIMIT 1
""", (chat_id,))
row = cursor.fetchone()
if row:
return dict(row)
return None
def create_casino_session(self, chat_id: int, game_type: str, banker_id: int,
min_bet: int, max_bet: int, multiplier: float,
house_fee: float = 0.05, current_phase: str = 'betting',
blackjack_multiplier: float = 1.5) -> int:
"""创建新的赌场游戏会话
Args:
chat_id: 会话ID
game_type: 游戏类型
banker_id: 庄家ID
min_bet: 最小下注金额
max_bet: 最大下注金额
multiplier: 赔率
house_fee: 抽水率
current_phase: 当前阶段(默认'betting'
blackjack_multiplier: 21点黑杰克倍数默认1.5
Returns:
session_id
"""
cursor = self.conn.cursor()
current_time = int(time.time())
# 检查是否已有活跃的会话单场限制同一chat_id只能有一个活跃游戏
existing = self.get_any_active_casino_session(chat_id)
if existing:
# 如果已有活跃游戏返回其ID保持向后兼容但实际应该在应用层阻止
return existing['id']
# 检查是否已有相同game_type的活跃会话向后兼容
cursor.execute("""
SELECT id FROM casino_sessions
WHERE chat_id = ? AND game_type = ? AND status = 'open'
""", (chat_id, game_type))
existing_same_type = cursor.fetchone()
if existing_same_type:
return existing_same_type['id']
cursor.execute("""
INSERT INTO casino_sessions
(chat_id, game_type, banker_id, min_bet, max_bet, multiplier, house_fee,
status, created_at, current_phase, blackjack_multiplier)
VALUES (?, ?, ?, ?, ?, ?, ?, 'open', ?, ?, ?)
""", (chat_id, game_type, banker_id, min_bet, max_bet, multiplier, house_fee,
current_time, current_phase, blackjack_multiplier))
return cursor.lastrowid
def get_active_casino_session(self, chat_id: int, game_type: str) -> Optional[Dict]:
"""获取活跃的游戏会话
Args:
chat_id: 会话ID
game_type: 游戏类型
Returns:
会话信息字典或None
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM casino_sessions
WHERE chat_id = ? AND game_type = ? AND status = 'open'
ORDER BY id DESC LIMIT 1
""", (chat_id, game_type))
row = cursor.fetchone()
if row:
return dict(row)
return None
def create_casino_bet(self, chat_id: int, game_type: str, user_id: int,
bet_type: str, amount: int, multiplier: float,
bet_category: str = None, bet_number: int = None,
bet_value: str = None, hand_status: str = None) -> int:
"""创建下注记录
Args:
chat_id: 会话ID
game_type: 游戏类型
user_id: 用户ID
bet_type: 下注类型(大小游戏:""/""21点"标准"等)
amount: 下注金额
multiplier: 赔率
bet_category: 下注类别(轮盘游戏:["数字"/"颜色"/"奇偶"/"大小"/"区间"]
bet_number: 数字下注轮盘游戏0-36
bet_value: 下注值(轮盘游戏:如"红色""奇数""1-12"等)
hand_status: 手牌状态21点游戏["playing"/"stood"/"busted"/"blackjack"]
Returns:
bet_id
"""
cursor = self.conn.cursor()
current_time = int(time.time())
cursor.execute("""
INSERT INTO casino_bets
(chat_id, game_type, user_id, bet_type, amount, multiplier, status, created_at,
bet_category, bet_number, bet_value, hand_status)
VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?)
""", (chat_id, game_type, user_id, bet_type, amount, multiplier, current_time,
bet_category, bet_number, bet_value, hand_status))
return cursor.lastrowid
def get_pending_bets(self, chat_id: int, game_type: str) -> List[Dict]:
"""获取待结算的下注列表
Args:
chat_id: 会话ID
game_type: 游戏类型
Returns:
下注列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM casino_bets
WHERE chat_id = ? AND game_type = ? AND status = 'pending'
ORDER BY created_at ASC
""", (chat_id, game_type))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def settle_casino_bets(self, chat_id: int, game_type: str, result: str,
banker_id: int, **kwargs) -> Dict:
"""结算所有下注(根据游戏类型分发到不同结算方法)
Args:
chat_id: 会话ID
game_type: 游戏类型
result: 游戏结果(大小游戏:""/""轮盘数字字符串21点特殊格式
banker_id: 庄家ID
**kwargs: 其他参数轮盘result_number21点hands_dict等
Returns:
结算详情字典
"""
if game_type == '大小':
return self._settle_bigsmall_bets(chat_id, game_type, result, banker_id)
elif game_type == '轮盘':
result_number = kwargs.get('result_number', int(result) if result.isdigit() else 0)
return self._settle_roulette_bets(chat_id, game_type, result_number, banker_id)
elif game_type == '21点':
hands_dict = kwargs.get('hands_dict', {})
return self._settle_blackjack_bets(chat_id, game_type, hands_dict, banker_id)
else:
# 兼容旧的大小游戏逻辑
return self._settle_bigsmall_bets(chat_id, game_type, result, banker_id)
def _settle_bigsmall_bets(self, chat_id: int, game_type: str, result: str,
banker_id: int) -> Dict:
"""结算大小游戏下注
Args:
chat_id: 会话ID
game_type: 游戏类型
result: 游戏结果(""""
banker_id: 庄家ID
Returns:
结算详情字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
# 获取活跃会话
session = self.get_active_casino_session(chat_id, game_type)
if not session:
raise ValueError("没有活跃的游戏会话")
if session['banker_id'] != banker_id:
raise ValueError("只有庄家可以结算游戏")
# 获取所有待结算下注
bets = self.get_pending_bets(chat_id, game_type)
winners = []
losers = []
total_win = 0
# 计算输赢
for bet in bets:
is_win = (bet['bet_type'] == result)
if is_win:
# 计算赢得金额
win_amount = int(bet['amount'] * bet['multiplier'])
# 扣除抽水
house_fee = session['house_fee']
actual_win = int(win_amount * (1 - house_fee))
winners.append({
'user_id': bet['user_id'],
'amount': bet['amount'],
'win_amount': actual_win,
'bet_id': bet['id']
})
total_win += actual_win
else:
losers.append({
'user_id': bet['user_id'],
'amount': bet['amount'],
'bet_id': bet['id']
})
# 使用事务确保原子性
try:
# 更新下注状态
for bet in bets:
is_win = (bet['bet_type'] == result)
if is_win:
win_amount = int(bet['amount'] * bet['multiplier'])
actual_win = int(win_amount * (1 - session['house_fee']))
# 发放奖励
self.add_points(bet['user_id'], actual_win, 'casino_win',
f"赌场游戏{game_type}赢得")
cursor.execute("""
UPDATE casino_bets
SET status = 'settled', result = ?, win_amount = ?, settled_at = ?
WHERE id = ?
""", (result, actual_win, current_time, bet['id']))
else:
cursor.execute("""
UPDATE casino_bets
SET status = 'settled', result = ?, settled_at = ?
WHERE id = ?
""", (result, current_time, bet['id']))
# 关闭会话
cursor.execute("""
UPDATE casino_sessions
SET status = 'closed', settled_at = ?
WHERE id = ?
""", (current_time, session['id']))
return {
'winners': winners,
'losers': losers,
'total_win': total_win,
'result': result
}
except Exception as e:
logger.error(f"结算失败: {e}", exc_info=True)
raise
def _settle_roulette_bets(self, chat_id: int, game_type: str, result_number: int,
banker_id: int) -> Dict:
"""结算轮盘游戏下注
Args:
chat_id: 会话ID
game_type: 游戏类型
result_number: 结果数字0-36
banker_id: 庄家ID
Returns:
结算详情字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
session = self.get_active_casino_session(chat_id, game_type)
if not session:
raise ValueError("没有活跃的游戏会话")
if session['banker_id'] != banker_id:
raise ValueError("只有庄家可以结算游戏")
bets = self.get_pending_bets(chat_id, game_type)
winners = []
losers = []
total_win = 0
# 轮盘数字对应的颜色欧式轮盘0为绿色
roulette_red = {1, 3, 5, 7, 9, 12, 14, 16, 18, 19, 21, 23, 25, 27, 30, 32, 34, 36}
result_color = '绿色' if result_number == 0 else ('红色' if result_number in roulette_red else '黑色')
result_odd_even = None if result_number == 0 else ('奇数' if result_number % 2 == 1 else '偶数')
result_big_small = None if result_number == 0 else ('' if 1 <= result_number <= 18 else '')
result_range = None
if 1 <= result_number <= 12:
result_range = '1-12'
elif 13 <= result_number <= 24:
result_range = '13-24'
elif 25 <= result_number <= 36:
result_range = '25-36'
for bet in bets:
is_win = False
multiplier = bet['multiplier']
bet_category = bet.get('bet_category')
if bet_category == '数字':
if bet.get('bet_number') == result_number:
is_win = True
elif bet_category == '颜色':
if bet.get('bet_value') == result_color:
is_win = True
elif bet_category == '奇偶':
if result_odd_even and bet.get('bet_value') == result_odd_even:
is_win = True
elif bet_category == '大小':
if result_big_small and bet.get('bet_value') == result_big_small:
is_win = True
elif bet_category == '区间':
if result_range and bet.get('bet_value') == result_range:
is_win = True
if is_win:
win_amount = int(bet['amount'] * multiplier)
house_fee = session['house_fee']
actual_win = int(win_amount * (1 - house_fee))
winners.append({
'user_id': bet['user_id'],
'amount': bet['amount'],
'win_amount': actual_win,
'bet_id': bet['id']
})
total_win += actual_win
else:
losers.append({
'user_id': bet['user_id'],
'amount': bet['amount'],
'bet_id': bet['id']
})
try:
result_str = str(result_number)
for bet in bets:
is_win = False
multiplier = bet['multiplier']
bet_category = bet.get('bet_category')
if bet_category == '数字':
if bet.get('bet_number') == result_number:
is_win = True
elif bet_category == '颜色':
if bet.get('bet_value') == result_color:
is_win = True
elif bet_category == '奇偶':
if result_odd_even and bet.get('bet_value') == result_odd_even:
is_win = True
elif bet_category == '大小':
if result_big_small and bet.get('bet_value') == result_big_small:
is_win = True
elif bet_category == '区间':
if result_range and bet.get('bet_value') == result_range:
is_win = True
if is_win:
win_amount = int(bet['amount'] * multiplier)
actual_win = int(win_amount * (1 - session['house_fee']))
self.add_points(bet['user_id'], actual_win, 'casino_win',
f"赌场游戏{game_type}赢得")
cursor.execute("""
UPDATE casino_bets
SET status = 'settled', result = ?, win_amount = ?, settled_at = ?
WHERE id = ?
""", (result_str, actual_win, current_time, bet['id']))
else:
cursor.execute("""
UPDATE casino_bets
SET status = 'settled', result = ?, settled_at = ?
WHERE id = ?
""", (result_str, current_time, bet['id']))
cursor.execute("""
UPDATE casino_sessions
SET status = 'closed', settled_at = ?
WHERE id = ?
""", (current_time, session['id']))
return {
'winners': winners,
'losers': losers,
'total_win': total_win,
'result': result_str,
'result_number': result_number
}
except Exception as e:
logger.error(f"结算失败: {e}", exc_info=True)
raise
def _settle_blackjack_bets(self, chat_id: int, game_type: str, hands_dict: Dict,
banker_id: int) -> Dict:
"""结算21点游戏下注
Args:
chat_id: 会话ID
game_type: 游戏类型
hands_dict: 手牌字典 {user_id: {'cards': [2,3,4], 'status': 'stood'}, ...}
banker_id: 庄家ID
Returns:
结算详情字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
session = self.get_active_casino_session(chat_id, game_type)
if not session:
raise ValueError("没有活跃的游戏会话")
if session['banker_id'] != banker_id:
raise ValueError("只有庄家可以结算游戏")
bets = self.get_pending_bets(chat_id, game_type)
banker_hand = hands_dict.get(0, {}) # 0表示庄家
banker_cards = banker_hand.get('cards', [])
banker_status = banker_hand.get('status', 'stood')
banker_points = self._calculate_blackjack_points(banker_cards)
banker_is_busted = banker_status == 'busted'
banker_is_blackjack = banker_status == 'blackjack'
winners = []
losers = []
total_win = 0
for bet in bets:
user_id = bet['user_id']
player_hand = hands_dict.get(user_id, {})
player_cards = player_hand.get('cards', [])
player_status = player_hand.get('status', 'stood')
player_points = self._calculate_blackjack_points(player_cards)
player_is_busted = player_status == 'busted'
player_is_blackjack = player_status == 'blackjack'
is_win = False
multiplier = bet['multiplier']
if player_is_busted:
is_win = False
elif player_is_blackjack:
if banker_is_blackjack:
# 双方都是黑杰克,平局(返还下注)
is_win = False
self.add_points(user_id, bet['amount'], 'casino_blackjack_push',
"21点游戏平局返还下注")
else:
# 玩家黑杰克赢得1.5倍
is_win = True
multiplier = session.get('blackjack_multiplier', 1.5)
elif banker_is_busted:
is_win = True
elif player_points > banker_points:
is_win = True
elif player_points == banker_points:
# 平局,返还下注
is_win = False
self.add_points(user_id, bet['amount'], 'casino_blackjack_push',
"21点游戏平局返还下注")
if is_win:
win_amount = int(bet['amount'] * multiplier)
house_fee = session['house_fee']
actual_win = int(win_amount * (1 - house_fee))
winners.append({
'user_id': user_id,
'amount': bet['amount'],
'win_amount': actual_win,
'bet_id': bet['id']
})
total_win += actual_win
elif not player_is_busted and player_points != banker_points:
losers.append({
'user_id': user_id,
'amount': bet['amount'],
'bet_id': bet['id']
})
try:
for bet in bets:
user_id = bet['user_id']
player_hand = hands_dict.get(user_id, {})
player_cards = player_hand.get('cards', [])
player_status = player_hand.get('status', 'stood')
player_points = self._calculate_blackjack_points(player_cards)
player_is_busted = player_status == 'busted'
player_is_blackjack = player_status == 'blackjack'
is_win = False
multiplier = bet['multiplier']
if player_is_busted:
is_win = False
elif player_is_blackjack:
if banker_is_blackjack:
is_win = False
self.add_points(user_id, bet['amount'], 'casino_blackjack_push',
"21点游戏平局返还下注")
else:
is_win = True
multiplier = session.get('blackjack_multiplier', 1.5)
elif banker_is_busted:
is_win = True
elif player_points > banker_points:
is_win = True
elif player_points == banker_points:
is_win = False
self.add_points(user_id, bet['amount'], 'casino_blackjack_push',
"21点游戏平局返还下注")
result_str = f"庄家{banker_points}点 vs 玩家{player_points}"
if is_win:
win_amount = int(bet['amount'] * multiplier)
actual_win = int(win_amount * (1 - session['house_fee']))
self.add_points(user_id, actual_win, 'casino_win',
f"赌场游戏{game_type}赢得")
cursor.execute("""
UPDATE casino_bets
SET status = 'settled', result = ?, win_amount = ?, settled_at = ?
WHERE id = ?
""", (result_str, actual_win, current_time, bet['id']))
elif not player_is_busted and player_points != banker_points:
cursor.execute("""
UPDATE casino_bets
SET status = 'settled', result = ?, settled_at = ?
WHERE id = ?
""", (result_str, current_time, bet['id']))
else:
# 平局,已返还下注
cursor.execute("""
UPDATE casino_bets
SET status = 'settled', result = ?, settled_at = ?
WHERE id = ?
""", (result_str, current_time, bet['id']))
cursor.execute("""
UPDATE casino_sessions
SET status = 'closed', settled_at = ?
WHERE id = ?
""", (current_time, session['id']))
return {
'winners': winners,
'losers': losers,
'total_win': total_win,
'result': f"庄家{banker_points}"
}
except Exception as e:
logger.error(f"结算失败: {e}", exc_info=True)
raise
def _calculate_blackjack_points(self, cards: List[int]) -> int:
"""计算21点手牌点数
Args:
cards: 手牌列表A=1J/Q/K=10其他为本身值1-13其中11=J, 12=Q, 13=K
Returns:
点数
"""
points = 0
ace_count = 0
for card in cards:
if card == 1:
ace_count += 1
points += 11
elif card >= 11:
points += 10
else:
points += card
# 处理A的1/11选择
while points > 21 and ace_count > 0:
points -= 10
ace_count -= 1
return points
def close_casino_session(self, chat_id: int, game_type: str):
"""关闭游戏会话
Args:
chat_id: 会话ID
game_type: 游戏类型
"""
cursor = self.conn.cursor()
current_time = int(time.time())
cursor.execute("""
UPDATE casino_sessions
SET status = 'closed', settled_at = ?
WHERE chat_id = ? AND game_type = ? AND status = 'open'
""", (current_time, chat_id, game_type))
# ===== 21点手牌管理 =====
def create_blackjack_hand(self, session_id: int, user_id: int,
hand_data: List[int], hand_status: str = 'playing') -> int:
"""创建21点手牌记录
Args:
session_id: 会话ID
user_id: 用户ID0表示庄家
hand_data: 手牌列表
hand_status: 手牌状态
Returns:
hand_id
"""
cursor = self.conn.cursor()
current_time = int(time.time())
hand_data_json = json.dumps(hand_data)
cursor.execute("""
INSERT INTO casino_blackjack_hands
(session_id, user_id, hand_data, hand_status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (session_id, user_id, hand_data_json, hand_status, current_time, current_time))
return cursor.lastrowid
def get_blackjack_hand(self, session_id: int, user_id: int) -> Optional[Dict]:
"""获取21点手牌
Args:
session_id: 会话ID
user_id: 用户ID
Returns:
手牌信息字典或None
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM casino_blackjack_hands
WHERE session_id = ? AND user_id = ?
ORDER BY id DESC LIMIT 1
""", (session_id, user_id))
row = cursor.fetchone()
if row:
hand_dict = dict(row)
hand_dict['hand_data'] = json.loads(hand_dict['hand_data'])
return hand_dict
return None
def update_blackjack_hand(self, session_id: int, user_id: int,
hand_data: List[int], hand_status: str):
"""更新21点手牌
Args:
session_id: 会话ID
user_id: 用户ID
hand_data: 手牌列表
hand_status: 手牌状态
"""
cursor = self.conn.cursor()
current_time = int(time.time())
hand_data_json = json.dumps(hand_data)
cursor.execute("""
UPDATE casino_blackjack_hands
SET hand_data = ?, hand_status = ?, updated_at = ?
WHERE session_id = ? AND user_id = ?
""", (hand_data_json, hand_status, current_time, session_id, user_id))
def get_all_blackjack_hands(self, session_id: int) -> Dict[int, Dict]:
"""获取所有21点手牌用于结算
Args:
session_id: 会话ID
Returns:
手牌字典 {user_id: {'cards': [...], 'status': ...}, ...}
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM casino_blackjack_hands
WHERE session_id = ?
ORDER BY user_id, id DESC
""", (session_id,))
rows = cursor.fetchall()
hands_dict = {}
for row in rows:
hand_dict = dict(row)
user_id = hand_dict['user_id']
# 只保留最新的手牌(每个用户)
if user_id not in hands_dict:
hands_dict[user_id] = {
'cards': json.loads(hand_dict['hand_data']),
'status': hand_dict['hand_status']
}
return hands_dict
def close(self):
"""关闭数据库连接"""
if self._conn:
self._conn.close()
self._conn = None
logger.info("数据库连接已关闭")
# 全局数据库实例
_db_instance: Optional[Database] = None
def get_db() -> Database:
"""获取全局数据库实例(单例模式)"""
global _db_instance
if _db_instance is None:
_db_instance = Database()
return _db_instance