"""SQLite数据库操作模块 - 使用标准库sqlite3""" import sqlite3 import json import time import logging from typing import Optional, Dict, Any, List from pathlib import Path from config import DATABASE_PATH logger = logging.getLogger(__name__) class Database: """数据库管理类""" def __init__(self, db_path: str = DATABASE_PATH): """初始化数据库连接 Args: db_path: 数据库文件路径 """ self.db_path = db_path self._conn: Optional[sqlite3.Connection] = None self._ensure_db_exists() self.init_tables() def _ensure_db_exists(self): """确保数据库目录存在""" db_dir = Path(self.db_path).parent db_dir.mkdir(parents=True, exist_ok=True) @property def conn(self) -> sqlite3.Connection: """获取数据库连接(懒加载)""" if self._conn is None: try: self._conn = sqlite3.connect( self.db_path, check_same_thread=False, # 允许多线程访问 isolation_level=None, # 自动提交 timeout=30.0 # 增加超时时间 ) self._conn.row_factory = sqlite3.Row # 支持字典式访问 # 启用WAL模式以提高并发性能 self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA synchronous=NORMAL") self._conn.execute("PRAGMA cache_size=1000") self._conn.execute("PRAGMA temp_store=MEMORY") logger.info(f"数据库连接成功: {self.db_path}") except Exception as e: logger.error(f"数据库连接失败: {e}", exc_info=True) raise return self._conn def _column_exists(self, table_name: str, column_name: str) -> bool: """检查表中列是否存在 Args: table_name: 表名 column_name: 列名 Returns: 是否存在 """ cursor = self.conn.cursor() cursor.execute(f"PRAGMA table_info({table_name})") columns = [row[1] for row in cursor.fetchall()] return column_name in columns def _add_column_if_not_exists(self, table_name: str, column_name: str, column_def: str): """安全地添加列(如果不存在) Args: table_name: 表名 column_name: 列名 column_def: 列定义(如 "INTEGER" 或 "TEXT DEFAULT ''") """ if not self._column_exists(table_name, column_name): try: cursor = self.conn.cursor() cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}") logger.info(f"为表 {table_name} 添加列 {column_name}") except Exception as e: logger.warning(f"添加列失败: {e}") def init_tables(self): """初始化数据库表""" cursor = self.conn.cursor() # 用户表 cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY, username TEXT, created_at INTEGER NOT NULL, last_active INTEGER NOT NULL ) """) # 游戏状态表(新增 confirmed 列用于标记可回收状态) cursor.execute(""" CREATE TABLE IF NOT EXISTS game_states ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, user_id INTEGER NOT NULL, game_type TEXT NOT NULL, state_data TEXT, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, confirmed INTEGER DEFAULT 0, UNIQUE(chat_id, user_id, game_type) ) """) # 幂等迁移:为已有表增加 confirmed 列 try: cursor.execute("ALTER TABLE game_states ADD COLUMN confirmed INTEGER DEFAULT 0") logger.info("为 game_states 增加 confirmed 列") except Exception: # 列已存在或不可变更时忽略 pass # 创建索引 cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_chat_user ON game_states(chat_id, user_id) """) # 创建用户名索引 cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_username ON users(username) """) # 游戏统计表 cursor.execute(""" CREATE TABLE IF NOT EXISTS game_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, game_type TEXT NOT NULL, wins INTEGER DEFAULT 0, losses INTEGER DEFAULT 0, draws INTEGER DEFAULT 0, total_plays INTEGER DEFAULT 0, UNIQUE(user_id, game_type) ) """) # 用户积分表 - 简化版本,只保留必要字段 cursor.execute(""" CREATE TABLE IF NOT EXISTS user_points ( user_id INTEGER PRIMARY KEY, points INTEGER DEFAULT 0, last_checkin_date TEXT, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users (user_id) ) """) # 赌场下注记录表 cursor.execute(""" CREATE TABLE IF NOT EXISTS casino_bets ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, game_type TEXT NOT NULL, user_id INTEGER NOT NULL, bet_type TEXT NOT NULL, amount INTEGER NOT NULL, multiplier REAL NOT NULL, status TEXT DEFAULT 'pending', result TEXT, win_amount INTEGER, created_at INTEGER NOT NULL, settled_at INTEGER, FOREIGN KEY (user_id) REFERENCES users (user_id) ) """) # 赌场游戏会话表 # 注意:移除了UNIQUE(chat_id, game_type, status)约束 # 因为status='closed'时需要允许多条历史记录 # 单场限制通过应用层逻辑(get_any_active_casino_session)保证 cursor.execute(""" CREATE TABLE IF NOT EXISTS casino_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, game_type TEXT NOT NULL, banker_id INTEGER NOT NULL, min_bet INTEGER NOT NULL, max_bet INTEGER NOT NULL, multiplier REAL NOT NULL, house_fee REAL DEFAULT 0.05, status TEXT DEFAULT 'open', created_at INTEGER NOT NULL, settled_at INTEGER ) """) # 迁移:如果表已存在且有UNIQUE约束,需要重建表 # 检查是否已有旧表(通过检查是否有UNIQUE约束的索引) try: # 尝试查询表结构,检查是否有UNIQUE约束相关的索引 cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='casino_sessions'") old_sql = cursor.fetchone() if old_sql and 'UNIQUE(chat_id, game_type, status)' in old_sql[0]: # 需要重建表以移除UNIQUE约束 logger.info("检测到旧版本的casino_sessions表,需要重建以移除UNIQUE约束") # 禁用外键检查(SQLite默认可能未启用,但为了安全) cursor.execute("PRAGMA foreign_keys=OFF") # 创建临时表 cursor.execute(""" CREATE TABLE IF NOT EXISTS casino_sessions_new ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, game_type TEXT NOT NULL, banker_id INTEGER NOT NULL, min_bet INTEGER NOT NULL, max_bet INTEGER NOT NULL, multiplier REAL NOT NULL, house_fee REAL DEFAULT 0.05, status TEXT DEFAULT 'open', created_at INTEGER NOT NULL, settled_at INTEGER, current_phase TEXT DEFAULT 'betting', blackjack_multiplier REAL DEFAULT 1.5 ) """) # 复制所有数据(包括历史记录) # 检查旧表是否有新字段 old_columns = [row[1] for row in cursor.execute("PRAGMA table_info(casino_sessions)").fetchall()] has_current_phase = 'current_phase' in old_columns has_blackjack_multiplier = 'blackjack_multiplier' in old_columns if has_current_phase and has_blackjack_multiplier: # 旧表已有新字段,直接复制所有列 cursor.execute(""" INSERT INTO casino_sessions_new SELECT id, chat_id, game_type, banker_id, min_bet, max_bet, multiplier, house_fee, status, created_at, settled_at, current_phase, blackjack_multiplier FROM casino_sessions """) elif has_current_phase: # 只有current_phase字段 cursor.execute(""" INSERT INTO casino_sessions_new SELECT id, chat_id, game_type, banker_id, min_bet, max_bet, multiplier, house_fee, status, created_at, settled_at, current_phase, 1.5 as blackjack_multiplier FROM casino_sessions """) elif has_blackjack_multiplier: # 只有blackjack_multiplier字段 cursor.execute(""" INSERT INTO casino_sessions_new SELECT id, chat_id, game_type, banker_id, min_bet, max_bet, multiplier, house_fee, status, created_at, settled_at, 'betting' as current_phase, blackjack_multiplier FROM casino_sessions """) else: # 都没有新字段,使用默认值 cursor.execute(""" INSERT INTO casino_sessions_new SELECT id, chat_id, game_type, banker_id, min_bet, max_bet, multiplier, house_fee, status, created_at, settled_at, 'betting' as current_phase, 1.5 as blackjack_multiplier FROM casino_sessions """) # 删除旧表 cursor.execute("DROP TABLE casino_sessions") # 重命名新表 cursor.execute("ALTER TABLE casino_sessions_new RENAME TO casino_sessions") # 重新启用外键检查 cursor.execute("PRAGMA foreign_keys=ON") logger.info("成功重建casino_sessions表,移除UNIQUE约束") except Exception as e: # 如果迁移失败,记录日志但不影响正常运行 logger.warning(f"迁移casino_sessions表时出现错误(可能表结构已更新): {e}") # 21点手牌表 cursor.execute(""" CREATE TABLE IF NOT EXISTS casino_blackjack_hands ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id INTEGER NOT NULL, user_id INTEGER NOT NULL, hand_data TEXT NOT NULL, hand_status TEXT DEFAULT 'playing', created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, FOREIGN KEY (session_id) REFERENCES casino_sessions(id) ) """) # 创建索引 cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_casino_bets ON casino_bets(chat_id, game_type, status) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_casino_sessions ON casino_sessions(chat_id, game_type, status) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_casino_blackjack_hands ON casino_blackjack_hands(session_id, user_id) """) # 兼容性检查:为casino_sessions表添加新字段 self._add_column_if_not_exists('casino_sessions', 'current_phase', "TEXT DEFAULT 'betting'") self._add_column_if_not_exists('casino_sessions', 'blackjack_multiplier', 'REAL DEFAULT 1.5') # 兼容性检查:为casino_bets表添加新字段(轮盘和21点专用) self._add_column_if_not_exists('casino_bets', 'bet_category', "TEXT") self._add_column_if_not_exists('casino_bets', 'bet_number', 'INTEGER') self._add_column_if_not_exists('casino_bets', 'bet_value', "TEXT") self._add_column_if_not_exists('casino_bets', 'hand_status', "TEXT") # 兼容性检查:为users表添加webhook_url字段 self._add_column_if_not_exists('users', 'webhook_url', 'TEXT') logger.info("数据库表初始化完成") # ===== 用户相关操作 ===== def get_or_create_user(self, user_id: int, username: str = None) -> Dict: """获取或创建用户 Args: user_id: 用户ID username: 用户名 Returns: 用户信息字典 """ cursor = self.conn.cursor() current_time = int(time.time()) # 尝试获取用户 cursor.execute( "SELECT * FROM users WHERE user_id = ?", (user_id,) ) user = cursor.fetchone() if user: # 更新最后活跃时间 cursor.execute( "UPDATE users SET last_active = ? WHERE user_id = ?", (current_time, user_id) ) return dict(user) else: # 创建新用户 cursor.execute( "INSERT INTO users (user_id, username, created_at, last_active) VALUES (?, ?, ?, ?)", (user_id, username, current_time, current_time) ) return { 'user_id': user_id, 'username': username, 'created_at': current_time, 'last_active': current_time } def get_user_by_name(self, username: str) -> Optional[Dict]: """根据用户名查找用户 Args: username: 用户名 Returns: 用户信息字典,如果不存在返回None """ cursor = self.conn.cursor() cursor.execute( "SELECT * FROM users WHERE username = ?", (username,) ) row = cursor.fetchone() if row: return dict(row) return None def update_user_name(self, user_id: int, username: str) -> bool: """更新用户名称 Args: user_id: 用户ID username: 新用户名 Returns: 是否成功 """ try: # 确保用户存在 self.get_or_create_user(user_id) cursor = self.conn.cursor() cursor.execute( "UPDATE users SET username = ? WHERE user_id = ?", (username, user_id) ) logger.info(f"用户 {user_id} 更新名称为: {username}") return True except Exception as e: logger.error(f"更新用户名失败: user_id={user_id}, username={username}, error={e}", exc_info=True) return False def set_user_webhook_url(self, user_id: int, webhook_url: str) -> bool: """设置用户webhook URL Args: user_id: 用户ID webhook_url: Webhook URL Returns: 是否成功 """ try: # 确保用户存在 self.get_or_create_user(user_id) cursor = self.conn.cursor() cursor.execute( "UPDATE users SET webhook_url = ? WHERE user_id = ?", (webhook_url, user_id) ) logger.info(f"用户 {user_id} 设置webhook URL: {webhook_url}") return True except Exception as e: logger.error(f"设置用户webhook URL失败: user_id={user_id}, error={e}", exc_info=True) return False def get_user_webhook_url(self, user_id: int) -> Optional[str]: """获取用户webhook URL Args: user_id: 用户ID Returns: Webhook URL,如果不存在返回None """ cursor = self.conn.cursor() cursor.execute( "SELECT webhook_url FROM users WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() if not row: return None webhook_url = row[0] if not webhook_url or webhook_url.strip() == '': return None return webhook_url def has_webhook_url(self, user_id: int) -> bool: """检查用户是否有个人webhook URL Args: user_id: 用户ID Returns: 是否有个人URL """ webhook_url = self.get_user_webhook_url(user_id) return webhook_url is not None def check_users_webhook_urls(self, user_ids: List[int]) -> Dict[int, bool]: """批量检查用户是否有个人webhook URL Args: user_ids: 用户ID列表 Returns: 字典 {user_id: has_url} """ if not user_ids: return {} # 初始化结果字典,所有用户默认为False results = {user_id: False for user_id in user_ids} cursor = self.conn.cursor() # 使用IN子句查询 placeholders = ','.join('?' * len(user_ids)) cursor.execute( f"SELECT user_id, webhook_url FROM users WHERE user_id IN ({placeholders})", user_ids ) rows = cursor.fetchall() for row in rows: user_id = row[0] webhook_url = row[1] # 如果webhook_url不为None且不为空字符串,则设为True if webhook_url and webhook_url.strip() != '': results[user_id] = True return results def get_user_display_name(self, user_id: int) -> str: """获取用户显示名称 如果用户已注册(username不为None),返回用户名;否则返回"用户{user_id}" Args: user_id: 用户ID Returns: 用户显示名称(用户名或"用户{user_id}") """ user_dict = self.get_or_create_user(user_id) username = user_dict.get('username') if username: return username else: return f"用户{user_id}" # ===== 游戏状态相关操作 ===== def get_game_state(self, chat_id: int, user_id: int, game_type: str) -> Optional[Dict]: """获取游戏状态 Args: chat_id: 会话ID user_id: 用户ID game_type: 游戏类型 Returns: 游戏状态字典,如果不存在返回None """ cursor = self.conn.cursor() cursor.execute( "SELECT * FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?", (chat_id, user_id, game_type) ) row = cursor.fetchone() if row: state = dict(row) # 解析JSON数据 if state.get('state_data'): state['state_data'] = json.loads(state['state_data']) return state return None def save_game_state(self, chat_id: int, user_id: int, game_type: str, state_data: Dict): """保存游戏状态 Args: chat_id: 会话ID user_id: 用户ID game_type: 游戏类型 state_data: 状态数据字典 """ cursor = self.conn.cursor() current_time = int(time.time()) state_json = json.dumps(state_data, ensure_ascii=False) cursor.execute(""" INSERT INTO game_states (chat_id, user_id, game_type, state_data, created_at, updated_at, confirmed) VALUES (?, ?, ?, ?, ?, ?, 0) ON CONFLICT(chat_id, user_id, game_type) DO UPDATE SET state_data = excluded.state_data, updated_at = excluded.updated_at, confirmed = 0 """, (chat_id, user_id, game_type, state_json, current_time, current_time)) logger.debug(f"保存游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}") def confirm_game_state(self, chat_id: int, user_id: int, game_type: str): """标记游戏状态为已确认(可由清理器删除) Args: chat_id: 会话ID user_id: 用户ID game_type: 游戏类型 """ cursor = self.conn.cursor() current_time = int(time.time()) cursor.execute( "UPDATE game_states SET confirmed = 1, updated_at = ? WHERE chat_id = ? AND user_id = ? AND game_type = ?", (current_time, chat_id, user_id, game_type) ) logger.debug(f"确认游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}") def delete_game_state(self, chat_id: int, user_id: int, game_type: str): """删除游戏状态 Args: chat_id: 会话ID user_id: 用户ID game_type: 游戏类型 """ cursor = self.conn.cursor() cursor.execute( "DELETE FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?", (chat_id, user_id, game_type) ) logger.debug(f"删除游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}") def cleanup_old_sessions(self, timeout: int = 1800): """清理过期的游戏会话 Args: timeout: 超时时间(秒) """ cursor = self.conn.cursor() cutoff_time = int(time.time()) - timeout # 仅删除已确认(confirmed=1)的旧会话,避免误删进行中的长时任务 cursor.execute( "DELETE FROM game_states WHERE confirmed = 1 AND updated_at < ?", (cutoff_time,) ) deleted = cursor.rowcount if deleted > 0: logger.info(f"清理了 {deleted} 个过期游戏会话") # ===== 游戏统计相关操作 ===== def get_game_stats(self, user_id: int, game_type: str) -> Dict: """获取游戏统计 Args: user_id: 用户ID game_type: 游戏类型 Returns: 统计数据字典 """ cursor = self.conn.cursor() cursor.execute( "SELECT * FROM game_stats WHERE user_id = ? AND game_type = ?", (user_id, game_type) ) row = cursor.fetchone() if row: return dict(row) else: # 返回默认值 return { 'user_id': user_id, 'game_type': game_type, 'wins': 0, 'losses': 0, 'draws': 0, 'total_plays': 0 } def update_game_stats(self, user_id: int, game_type: str, win: bool = False, loss: bool = False, draw: bool = False): """更新游戏统计 Args: user_id: 用户ID game_type: 游戏类型 win: 是否获胜 loss: 是否失败 draw: 是否平局 """ cursor = self.conn.cursor() # 使用UPSERT语法 cursor.execute(""" INSERT INTO game_stats (user_id, game_type, wins, losses, draws, total_plays) VALUES (?, ?, ?, ?, ?, 1) ON CONFLICT(user_id, game_type) DO UPDATE SET wins = wins + ?, losses = losses + ?, draws = draws + ?, total_plays = total_plays + 1 """, (user_id, game_type, int(win), int(loss), int(draw), int(win), int(loss), int(draw))) logger.debug(f"更新游戏统计: user_id={user_id}, game_type={game_type}") # ===== 积分相关操作 ===== def get_user_points(self, user_id: int) -> Dict: """获取用户积分信息 Args: user_id: 用户ID Returns: 积分信息字典 """ cursor = self.conn.cursor() cursor.execute( "SELECT * FROM user_points WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() if row: return dict(row) else: # 创建新用户积分记录 current_time = int(time.time()) cursor.execute( "INSERT INTO user_points (user_id, points, created_at, updated_at) VALUES (?, 0, ?, ?)", (user_id, current_time, current_time) ) return { 'user_id': user_id, 'points': 0, 'last_checkin_date': None, 'created_at': current_time, 'updated_at': current_time } def add_points(self, user_id: int, points: int, source: str, description: str = None) -> bool: """添加积分 Args: user_id: 用户ID points: 积分数量 source: 积分来源 description: 描述 Returns: 是否成功 """ if points <= 0: logger.warning(f"积分数量无效: {points}") return False cursor = self.conn.cursor() current_time = int(time.time()) try: # 确保用户存在 self.get_or_create_user(user_id) # 更新用户积分(不触碰 last_checkin_date): # 若不存在则插入;存在则 points += ? 且更新 updated_at cursor.execute(""" INSERT INTO user_points (user_id, points, created_at, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(user_id) DO UPDATE SET points = user_points.points + excluded.points, updated_at = excluded.updated_at """, (user_id, points, current_time, current_time)) logger.info(f"用户 {user_id} 成功获得 {points} 积分,来源:{source}") return True except Exception as e: logger.error(f"添加积分失败: user_id={user_id}, points={points}, error={e}", exc_info=True) return False def consume_points(self, user_id: int, points: int, source: str, description: str = None) -> bool: """消费积分 Args: user_id: 用户ID points: 积分数量 source: 消费来源 description: 描述 Returns: 是否成功 """ if points <= 0: return False cursor = self.conn.cursor() current_time = int(time.time()) try: # 检查积分是否足够 cursor.execute( "SELECT points FROM user_points WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() if not row or row[0] < points: logger.warning(f"用户 {user_id} 积分不足,需要 {points},当前可用 {row[0] if row else 0}") return False # 消费积分 cursor.execute( "UPDATE user_points SET points = points - ?, updated_at = ? WHERE user_id = ?", (points, current_time, user_id) ) logger.debug(f"用户 {user_id} 消费 {points} 积分,来源:{source}") return True except Exception as e: logger.error(f"消费积分失败: {e}") return False def check_daily_checkin(self, user_id: int, date: str) -> bool: """检查用户是否已签到 Args: user_id: 用户ID date: 日期字符串 (YYYY-MM-DD) Returns: 是否已签到 """ cursor = self.conn.cursor() cursor.execute( "SELECT last_checkin_date FROM user_points WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() return row and row[0] == date def daily_checkin(self, user_id: int, points: int) -> bool: """每日签到 Args: user_id: 用户ID points: 签到积分 Returns: 是否成功 """ from datetime import datetime today = datetime.now().strftime('%Y-%m-%d') if self.check_daily_checkin(user_id, today): logger.warning(f"用户 {user_id} 今日已签到") return False cursor = self.conn.cursor() current_time = int(time.time()) try: # 确保用户存在 self.get_or_create_user(user_id) # 获取签到前积分 points_before = self.get_user_points(user_id) logger.info(f"用户 {user_id} 签到前积分: {points_before['points']}") # 更新积分和签到日期 cursor.execute(""" INSERT OR REPLACE INTO user_points (user_id, points, last_checkin_date, created_at, updated_at) VALUES (?, COALESCE((SELECT points FROM user_points WHERE user_id = ?), 0) + ?, ?, ?, ?) """, (user_id, user_id, points, today, current_time, current_time)) # 验证积分是否真的增加了 points_after = self.get_user_points(user_id) logger.info(f"用户 {user_id} 签到后积分: {points_after['points']}") if points_after['points'] > points_before['points']: logger.info(f"用户 {user_id} 签到成功,积分增加: {points_after['points'] - points_before['points']}") return True else: logger.error(f"用户 {user_id} 签到失败,积分未增加") return False except Exception as e: logger.error(f"每日签到失败: {e}", exc_info=True) return False def get_points_leaderboard(self, limit: int = 10) -> List[Dict]: """获取积分排行榜 Args: limit: 限制数量 Returns: 排行榜列表 """ cursor = self.conn.cursor() cursor.execute(""" SELECT u.user_id, u.username, up.points FROM users u LEFT JOIN user_points up ON u.user_id = up.user_id ORDER BY COALESCE(up.points, 0) DESC LIMIT ? """, (limit,)) rows = cursor.fetchall() return [dict(row) for row in rows] # ===== 赌场相关操作 ===== def get_any_active_casino_session(self, chat_id: int) -> Optional[Dict]: """获取任意活跃的游戏会话(用于单场限制检查) Args: chat_id: 会话ID Returns: 会话信息字典或None """ cursor = self.conn.cursor() cursor.execute(""" SELECT * FROM casino_sessions WHERE chat_id = ? AND status = 'open' ORDER BY id DESC LIMIT 1 """, (chat_id,)) row = cursor.fetchone() if row: return dict(row) return None def create_casino_session(self, chat_id: int, game_type: str, banker_id: int, min_bet: int, max_bet: int, multiplier: float, house_fee: float = 0.05, current_phase: str = 'betting', blackjack_multiplier: float = 1.5) -> int: """创建新的赌场游戏会话 Args: chat_id: 会话ID game_type: 游戏类型 banker_id: 庄家ID min_bet: 最小下注金额 max_bet: 最大下注金额 multiplier: 赔率 house_fee: 抽水率 current_phase: 当前阶段(默认'betting') blackjack_multiplier: 21点黑杰克倍数(默认1.5) Returns: session_id """ cursor = self.conn.cursor() current_time = int(time.time()) # 检查是否已有活跃的会话(单场限制:同一chat_id只能有一个活跃游戏) existing = self.get_any_active_casino_session(chat_id) if existing: # 如果已有活跃游戏,返回其ID(保持向后兼容,但实际应该在应用层阻止) return existing['id'] # 检查是否已有相同game_type的活跃会话(向后兼容) cursor.execute(""" SELECT id FROM casino_sessions WHERE chat_id = ? AND game_type = ? AND status = 'open' """, (chat_id, game_type)) existing_same_type = cursor.fetchone() if existing_same_type: return existing_same_type['id'] cursor.execute(""" INSERT INTO casino_sessions (chat_id, game_type, banker_id, min_bet, max_bet, multiplier, house_fee, status, created_at, current_phase, blackjack_multiplier) VALUES (?, ?, ?, ?, ?, ?, ?, 'open', ?, ?, ?) """, (chat_id, game_type, banker_id, min_bet, max_bet, multiplier, house_fee, current_time, current_phase, blackjack_multiplier)) return cursor.lastrowid def get_active_casino_session(self, chat_id: int, game_type: str) -> Optional[Dict]: """获取活跃的游戏会话 Args: chat_id: 会话ID game_type: 游戏类型 Returns: 会话信息字典或None """ cursor = self.conn.cursor() cursor.execute(""" SELECT * FROM casino_sessions WHERE chat_id = ? AND game_type = ? AND status = 'open' ORDER BY id DESC LIMIT 1 """, (chat_id, game_type)) row = cursor.fetchone() if row: return dict(row) return None def create_casino_bet(self, chat_id: int, game_type: str, user_id: int, bet_type: str, amount: int, multiplier: float, bet_category: str = None, bet_number: int = None, bet_value: str = None, hand_status: str = None) -> int: """创建下注记录 Args: chat_id: 会话ID game_type: 游戏类型 user_id: 用户ID bet_type: 下注类型(大小游戏:"大"/"小";21点:"标准"等) amount: 下注金额 multiplier: 赔率 bet_category: 下注类别(轮盘游戏:["数字"/"颜色"/"奇偶"/"大小"/"区间"]) bet_number: 数字下注(轮盘游戏:0-36) bet_value: 下注值(轮盘游戏:如"红色"、"奇数"、"1-12"等) hand_status: 手牌状态(21点游戏:["playing"/"stood"/"busted"/"blackjack"]) Returns: bet_id """ cursor = self.conn.cursor() current_time = int(time.time()) cursor.execute(""" INSERT INTO casino_bets (chat_id, game_type, user_id, bet_type, amount, multiplier, status, created_at, bet_category, bet_number, bet_value, hand_status) VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?) """, (chat_id, game_type, user_id, bet_type, amount, multiplier, current_time, bet_category, bet_number, bet_value, hand_status)) return cursor.lastrowid def get_pending_bets(self, chat_id: int, game_type: str) -> List[Dict]: """获取待结算的下注列表 Args: chat_id: 会话ID game_type: 游戏类型 Returns: 下注列表 """ cursor = self.conn.cursor() cursor.execute(""" SELECT * FROM casino_bets WHERE chat_id = ? AND game_type = ? AND status = 'pending' ORDER BY created_at ASC """, (chat_id, game_type)) rows = cursor.fetchall() return [dict(row) for row in rows] def settle_casino_bets(self, chat_id: int, game_type: str, result: str, banker_id: int, **kwargs) -> Dict: """结算所有下注(根据游戏类型分发到不同结算方法) Args: chat_id: 会话ID game_type: 游戏类型 result: 游戏结果(大小游戏:"大"/"小";轮盘:数字字符串;21点:特殊格式) banker_id: 庄家ID **kwargs: 其他参数(轮盘:result_number;21点:hands_dict等) Returns: 结算详情字典 """ if game_type == '大小': return self._settle_bigsmall_bets(chat_id, game_type, result, banker_id) elif game_type == '轮盘': result_number = kwargs.get('result_number', int(result) if result.isdigit() else 0) return self._settle_roulette_bets(chat_id, game_type, result_number, banker_id) elif game_type == '21点': hands_dict = kwargs.get('hands_dict', {}) return self._settle_blackjack_bets(chat_id, game_type, hands_dict, banker_id) else: # 兼容旧的大小游戏逻辑 return self._settle_bigsmall_bets(chat_id, game_type, result, banker_id) def _settle_bigsmall_bets(self, chat_id: int, game_type: str, result: str, banker_id: int) -> Dict: """结算大小游戏下注 Args: chat_id: 会话ID game_type: 游戏类型 result: 游戏结果("大" 或 "小") banker_id: 庄家ID Returns: 结算详情字典 """ cursor = self.conn.cursor() current_time = int(time.time()) # 获取活跃会话 session = self.get_active_casino_session(chat_id, game_type) if not session: raise ValueError("没有活跃的游戏会话") if session['banker_id'] != banker_id: raise ValueError("只有庄家可以结算游戏") # 获取所有待结算下注 bets = self.get_pending_bets(chat_id, game_type) winners = [] losers = [] total_win = 0 # 计算输赢 for bet in bets: is_win = (bet['bet_type'] == result) if is_win: # 计算赢得金额 win_amount = int(bet['amount'] * bet['multiplier']) # 扣除抽水 house_fee = session['house_fee'] actual_win = int(win_amount * (1 - house_fee)) winners.append({ 'user_id': bet['user_id'], 'amount': bet['amount'], 'win_amount': actual_win, 'bet_id': bet['id'] }) total_win += actual_win else: losers.append({ 'user_id': bet['user_id'], 'amount': bet['amount'], 'bet_id': bet['id'] }) # 使用事务确保原子性 try: # 更新下注状态 for bet in bets: is_win = (bet['bet_type'] == result) if is_win: win_amount = int(bet['amount'] * bet['multiplier']) actual_win = int(win_amount * (1 - session['house_fee'])) # 发放奖励 self.add_points(bet['user_id'], actual_win, 'casino_win', f"赌场游戏{game_type}赢得") cursor.execute(""" UPDATE casino_bets SET status = 'settled', result = ?, win_amount = ?, settled_at = ? WHERE id = ? """, (result, actual_win, current_time, bet['id'])) else: cursor.execute(""" UPDATE casino_bets SET status = 'settled', result = ?, settled_at = ? WHERE id = ? """, (result, current_time, bet['id'])) # 关闭会话 cursor.execute(""" UPDATE casino_sessions SET status = 'closed', settled_at = ? WHERE id = ? """, (current_time, session['id'])) return { 'winners': winners, 'losers': losers, 'total_win': total_win, 'result': result } except Exception as e: logger.error(f"结算失败: {e}", exc_info=True) raise def _settle_roulette_bets(self, chat_id: int, game_type: str, result_number: int, banker_id: int) -> Dict: """结算轮盘游戏下注 Args: chat_id: 会话ID game_type: 游戏类型 result_number: 结果数字(0-36) banker_id: 庄家ID Returns: 结算详情字典 """ cursor = self.conn.cursor() current_time = int(time.time()) session = self.get_active_casino_session(chat_id, game_type) if not session: raise ValueError("没有活跃的游戏会话") if session['banker_id'] != banker_id: raise ValueError("只有庄家可以结算游戏") bets = self.get_pending_bets(chat_id, game_type) winners = [] losers = [] total_win = 0 # 轮盘数字对应的颜色(欧式轮盘,0为绿色) roulette_red = {1, 3, 5, 7, 9, 12, 14, 16, 18, 19, 21, 23, 25, 27, 30, 32, 34, 36} result_color = '绿色' if result_number == 0 else ('红色' if result_number in roulette_red else '黑色') result_odd_even = None if result_number == 0 else ('奇数' if result_number % 2 == 1 else '偶数') result_big_small = None if result_number == 0 else ('小' if 1 <= result_number <= 18 else '大') result_range = None if 1 <= result_number <= 12: result_range = '1-12' elif 13 <= result_number <= 24: result_range = '13-24' elif 25 <= result_number <= 36: result_range = '25-36' for bet in bets: is_win = False multiplier = bet['multiplier'] bet_category = bet.get('bet_category') if bet_category == '数字': if bet.get('bet_number') == result_number: is_win = True elif bet_category == '颜色': if bet.get('bet_value') == result_color: is_win = True elif bet_category == '奇偶': if result_odd_even and bet.get('bet_value') == result_odd_even: is_win = True elif bet_category == '大小': if result_big_small and bet.get('bet_value') == result_big_small: is_win = True elif bet_category == '区间': if result_range and bet.get('bet_value') == result_range: is_win = True if is_win: win_amount = int(bet['amount'] * multiplier) house_fee = session['house_fee'] actual_win = int(win_amount * (1 - house_fee)) winners.append({ 'user_id': bet['user_id'], 'amount': bet['amount'], 'win_amount': actual_win, 'bet_id': bet['id'] }) total_win += actual_win else: losers.append({ 'user_id': bet['user_id'], 'amount': bet['amount'], 'bet_id': bet['id'] }) try: result_str = str(result_number) for bet in bets: is_win = False multiplier = bet['multiplier'] bet_category = bet.get('bet_category') if bet_category == '数字': if bet.get('bet_number') == result_number: is_win = True elif bet_category == '颜色': if bet.get('bet_value') == result_color: is_win = True elif bet_category == '奇偶': if result_odd_even and bet.get('bet_value') == result_odd_even: is_win = True elif bet_category == '大小': if result_big_small and bet.get('bet_value') == result_big_small: is_win = True elif bet_category == '区间': if result_range and bet.get('bet_value') == result_range: is_win = True if is_win: win_amount = int(bet['amount'] * multiplier) actual_win = int(win_amount * (1 - session['house_fee'])) self.add_points(bet['user_id'], actual_win, 'casino_win', f"赌场游戏{game_type}赢得") cursor.execute(""" UPDATE casino_bets SET status = 'settled', result = ?, win_amount = ?, settled_at = ? WHERE id = ? """, (result_str, actual_win, current_time, bet['id'])) else: cursor.execute(""" UPDATE casino_bets SET status = 'settled', result = ?, settled_at = ? WHERE id = ? """, (result_str, current_time, bet['id'])) cursor.execute(""" UPDATE casino_sessions SET status = 'closed', settled_at = ? WHERE id = ? """, (current_time, session['id'])) return { 'winners': winners, 'losers': losers, 'total_win': total_win, 'result': result_str, 'result_number': result_number } except Exception as e: logger.error(f"结算失败: {e}", exc_info=True) raise def _settle_blackjack_bets(self, chat_id: int, game_type: str, hands_dict: Dict, banker_id: int) -> Dict: """结算21点游戏下注 Args: chat_id: 会话ID game_type: 游戏类型 hands_dict: 手牌字典 {user_id: {'cards': [2,3,4], 'status': 'stood'}, ...} banker_id: 庄家ID Returns: 结算详情字典 """ cursor = self.conn.cursor() current_time = int(time.time()) session = self.get_active_casino_session(chat_id, game_type) if not session: raise ValueError("没有活跃的游戏会话") if session['banker_id'] != banker_id: raise ValueError("只有庄家可以结算游戏") bets = self.get_pending_bets(chat_id, game_type) banker_hand = hands_dict.get(0, {}) # 0表示庄家 banker_cards = banker_hand.get('cards', []) banker_status = banker_hand.get('status', 'stood') banker_points = self._calculate_blackjack_points(banker_cards) banker_is_busted = banker_status == 'busted' banker_is_blackjack = banker_status == 'blackjack' winners = [] losers = [] total_win = 0 for bet in bets: user_id = bet['user_id'] player_hand = hands_dict.get(user_id, {}) player_cards = player_hand.get('cards', []) player_status = player_hand.get('status', 'stood') player_points = self._calculate_blackjack_points(player_cards) player_is_busted = player_status == 'busted' player_is_blackjack = player_status == 'blackjack' is_win = False multiplier = bet['multiplier'] if player_is_busted: is_win = False elif player_is_blackjack: if banker_is_blackjack: # 双方都是黑杰克,平局(返还下注) is_win = False self.add_points(user_id, bet['amount'], 'casino_blackjack_push', "21点游戏平局,返还下注") else: # 玩家黑杰克,赢得1.5倍 is_win = True multiplier = session.get('blackjack_multiplier', 1.5) elif banker_is_busted: is_win = True elif player_points > banker_points: is_win = True elif player_points == banker_points: # 平局,返还下注 is_win = False self.add_points(user_id, bet['amount'], 'casino_blackjack_push', "21点游戏平局,返还下注") if is_win: win_amount = int(bet['amount'] * multiplier) house_fee = session['house_fee'] actual_win = int(win_amount * (1 - house_fee)) winners.append({ 'user_id': user_id, 'amount': bet['amount'], 'win_amount': actual_win, 'bet_id': bet['id'] }) total_win += actual_win elif player_points != banker_points: # 输家:包括爆牌或点数小于庄家 losers.append({ 'user_id': user_id, 'amount': bet['amount'], 'bet_id': bet['id'] }) try: for bet in bets: user_id = bet['user_id'] player_hand = hands_dict.get(user_id, {}) player_cards = player_hand.get('cards', []) player_status = player_hand.get('status', 'stood') player_points = self._calculate_blackjack_points(player_cards) player_is_busted = player_status == 'busted' player_is_blackjack = player_status == 'blackjack' is_win = False multiplier = bet['multiplier'] if player_is_busted: is_win = False elif player_is_blackjack: if banker_is_blackjack: is_win = False self.add_points(user_id, bet['amount'], 'casino_blackjack_push', "21点游戏平局,返还下注") else: is_win = True multiplier = session.get('blackjack_multiplier', 1.5) elif banker_is_busted: is_win = True elif player_points > banker_points: is_win = True elif player_points == banker_points: is_win = False self.add_points(user_id, bet['amount'], 'casino_blackjack_push', "21点游戏平局,返还下注") # 生成结果字符串,包含玩家状态信息 if player_is_busted: player_desc = "爆牌" elif player_is_blackjack: player_desc = f"{player_points}点(黑杰克)" else: player_desc = f"{player_points}点" banker_desc = "爆牌" if banker_is_busted else (f"{banker_points}点(黑杰克)" if banker_is_blackjack else f"{banker_points}点") result_str = f"庄家{banker_desc} vs 玩家{player_desc}" if is_win: win_amount = int(bet['amount'] * multiplier) actual_win = int(win_amount * (1 - session['house_fee'])) self.add_points(user_id, actual_win, 'casino_win', f"赌场游戏{game_type}赢得") cursor.execute(""" UPDATE casino_bets SET status = 'settled', result = ?, win_amount = ?, settled_at = ? WHERE id = ? """, (result_str, actual_win, current_time, bet['id'])) elif player_points == banker_points: # 平局,已返还下注(在上面的逻辑中已处理) cursor.execute(""" UPDATE casino_bets SET status = 'settled', result = ?, settled_at = ? WHERE id = ? """, (result_str, current_time, bet['id'])) else: # 输家:包括爆牌或点数小于庄家 cursor.execute(""" UPDATE casino_bets SET status = 'settled', result = ?, settled_at = ? WHERE id = ? """, (result_str, current_time, bet['id'])) cursor.execute(""" UPDATE casino_sessions SET status = 'closed', settled_at = ? WHERE id = ? """, (current_time, session['id'])) return { 'winners': winners, 'losers': losers, 'total_win': total_win, 'result': f"庄家{banker_points}点" } except Exception as e: logger.error(f"结算失败: {e}", exc_info=True) raise def _calculate_blackjack_points(self, cards: List[int]) -> int: """计算21点手牌点数 Args: cards: 手牌列表,A=1,J/Q/K=10,其他为本身值(1-13,其中11=J, 12=Q, 13=K) Returns: 点数 """ points = 0 ace_count = 0 for card in cards: if card == 1: ace_count += 1 points += 11 elif card >= 11: points += 10 else: points += card # 处理A的1/11选择 while points > 21 and ace_count > 0: points -= 10 ace_count -= 1 return points def close_casino_session(self, chat_id: int, game_type: str): """关闭游戏会话 Args: chat_id: 会话ID game_type: 游戏类型 """ cursor = self.conn.cursor() current_time = int(time.time()) cursor.execute(""" UPDATE casino_sessions SET status = 'closed', settled_at = ? WHERE chat_id = ? AND game_type = ? AND status = 'open' """, (current_time, chat_id, game_type)) # ===== 21点手牌管理 ===== def create_blackjack_hand(self, session_id: int, user_id: int, hand_data: List[int], hand_status: str = 'playing') -> int: """创建21点手牌记录 Args: session_id: 会话ID user_id: 用户ID(0表示庄家) hand_data: 手牌列表 hand_status: 手牌状态 Returns: hand_id """ cursor = self.conn.cursor() current_time = int(time.time()) hand_data_json = json.dumps(hand_data) cursor.execute(""" INSERT INTO casino_blackjack_hands (session_id, user_id, hand_data, hand_status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, (session_id, user_id, hand_data_json, hand_status, current_time, current_time)) return cursor.lastrowid def get_blackjack_hand(self, session_id: int, user_id: int) -> Optional[Dict]: """获取21点手牌 Args: session_id: 会话ID user_id: 用户ID Returns: 手牌信息字典或None """ cursor = self.conn.cursor() cursor.execute(""" SELECT * FROM casino_blackjack_hands WHERE session_id = ? AND user_id = ? ORDER BY id DESC LIMIT 1 """, (session_id, user_id)) row = cursor.fetchone() if row: hand_dict = dict(row) hand_dict['hand_data'] = json.loads(hand_dict['hand_data']) return hand_dict return None def update_blackjack_hand(self, session_id: int, user_id: int, hand_data: List[int], hand_status: str): """更新21点手牌 Args: session_id: 会话ID user_id: 用户ID hand_data: 手牌列表 hand_status: 手牌状态 """ cursor = self.conn.cursor() current_time = int(time.time()) hand_data_json = json.dumps(hand_data) cursor.execute(""" UPDATE casino_blackjack_hands SET hand_data = ?, hand_status = ?, updated_at = ? WHERE session_id = ? AND user_id = ? """, (hand_data_json, hand_status, current_time, session_id, user_id)) def get_all_blackjack_hands(self, session_id: int) -> Dict[int, Dict]: """获取所有21点手牌(用于结算) Args: session_id: 会话ID Returns: 手牌字典 {user_id: {'cards': [...], 'status': ...}, ...} """ cursor = self.conn.cursor() cursor.execute(""" SELECT * FROM casino_blackjack_hands WHERE session_id = ? ORDER BY user_id, id DESC """, (session_id,)) rows = cursor.fetchall() hands_dict = {} for row in rows: hand_dict = dict(row) user_id = hand_dict['user_id'] # 只保留最新的手牌(每个用户) if user_id not in hands_dict: hands_dict[user_id] = { 'cards': json.loads(hand_dict['hand_data']), 'status': hand_dict['hand_status'] } return hands_dict def close(self): """关闭数据库连接""" if self._conn: self._conn.close() self._conn = None logger.info("数据库连接已关闭") # 全局数据库实例 _db_instance: Optional[Database] = None def get_db() -> Database: """获取全局数据库实例(单例模式)""" global _db_instance if _db_instance is None: _db_instance = Database() return _db_instance