Files
WPSBot/core/database.py

1094 lines
37 KiB
Python
Raw Normal View History

2025-10-28 13:00:35 +08:00
"""SQLite数据库操作模块 - 使用标准库sqlite3"""
import sqlite3
import json
import time
import logging
from typing import Optional, Dict, Any, List
from pathlib import Path
from config import DATABASE_PATH
logger = logging.getLogger(__name__)
class Database:
"""数据库管理类"""
def __init__(self, db_path: str = DATABASE_PATH):
"""初始化数据库连接
Args:
db_path: 数据库文件路径
"""
self.db_path = db_path
self._conn: Optional[sqlite3.Connection] = None
self._ensure_db_exists()
self.init_tables()
def _ensure_db_exists(self):
"""确保数据库目录存在"""
db_dir = Path(self.db_path).parent
db_dir.mkdir(parents=True, exist_ok=True)
@property
def conn(self) -> sqlite3.Connection:
"""获取数据库连接(懒加载)"""
if self._conn is None:
2025-10-29 12:06:18 +08:00
try:
self._conn = sqlite3.connect(
self.db_path,
check_same_thread=False, # 允许多线程访问
isolation_level=None, # 自动提交
timeout=30.0 # 增加超时时间
)
self._conn.row_factory = sqlite3.Row # 支持字典式访问
# 启用WAL模式以提高并发性能
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._conn.execute("PRAGMA cache_size=1000")
self._conn.execute("PRAGMA temp_store=MEMORY")
logger.info(f"数据库连接成功: {self.db_path}")
except Exception as e:
logger.error(f"数据库连接失败: {e}", exc_info=True)
raise
2025-10-28 13:00:35 +08:00
return self._conn
def _column_exists(self, table_name: str, column_name: str) -> bool:
"""检查表中列是否存在
Args:
table_name: 表名
column_name: 列名
Returns:
是否存在
"""
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
def _add_column_if_not_exists(self, table_name: str, column_name: str, column_def: str):
"""安全地添加列(如果不存在)
Args:
table_name: 表名
column_name: 列名
column_def: 列定义 "INTEGER" "TEXT DEFAULT ''"
"""
if not self._column_exists(table_name, column_name):
try:
cursor = self.conn.cursor()
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}")
logger.info(f"为表 {table_name} 添加列 {column_name}")
except Exception as e:
logger.warning(f"添加列失败: {e}")
2025-10-28 13:00:35 +08:00
def init_tables(self):
"""初始化数据库表"""
cursor = self.conn.cursor()
# 用户表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
created_at INTEGER NOT NULL,
last_active INTEGER NOT NULL
)
""")
# 游戏状态表(新增 confirmed 列用于标记可回收状态)
2025-10-28 13:00:35 +08:00
cursor.execute("""
CREATE TABLE IF NOT EXISTS game_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
state_data TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
confirmed INTEGER DEFAULT 0,
2025-10-28 13:00:35 +08:00
UNIQUE(chat_id, user_id, game_type)
)
""")
# 幂等迁移:为已有表增加 confirmed 列
try:
cursor.execute("ALTER TABLE game_states ADD COLUMN confirmed INTEGER DEFAULT 0")
logger.info("为 game_states 增加 confirmed 列")
except Exception:
# 列已存在或不可变更时忽略
pass
2025-10-28 13:00:35 +08:00
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_chat_user
ON game_states(chat_id, user_id)
""")
2025-10-29 15:49:39 +08:00
# 创建用户名索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_username
ON users(username)
""")
2025-10-28 13:00:35 +08:00
# 游戏统计表
cursor.execute("""
CREATE TABLE IF NOT EXISTS game_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
wins INTEGER DEFAULT 0,
losses INTEGER DEFAULT 0,
draws INTEGER DEFAULT 0,
total_plays INTEGER DEFAULT 0,
UNIQUE(user_id, game_type)
)
""")
# 用户积分表 - 简化版本,只保留必要字段
2025-10-29 11:32:43 +08:00
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_points (
user_id INTEGER PRIMARY KEY,
points INTEGER DEFAULT 0,
last_checkin_date TEXT,
2025-10-29 11:32:43 +08:00
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
2025-10-30 16:11:50 +08:00
# 赌场下注记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS casino_bets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
user_id INTEGER NOT NULL,
bet_type TEXT NOT NULL,
amount INTEGER NOT NULL,
multiplier REAL NOT NULL,
status TEXT DEFAULT 'pending',
result TEXT,
win_amount INTEGER,
created_at INTEGER NOT NULL,
settled_at INTEGER,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
# 赌场游戏会话表
cursor.execute("""
CREATE TABLE IF NOT EXISTS casino_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
banker_id INTEGER NOT NULL,
min_bet INTEGER NOT NULL,
max_bet INTEGER NOT NULL,
multiplier REAL NOT NULL,
house_fee REAL DEFAULT 0.05,
status TEXT DEFAULT 'open',
created_at INTEGER NOT NULL,
settled_at INTEGER,
UNIQUE(chat_id, game_type, status)
)
""")
# 21点手牌表
cursor.execute("""
CREATE TABLE IF NOT EXISTS casino_blackjack_hands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
hand_data TEXT NOT NULL,
hand_status TEXT DEFAULT 'playing',
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (session_id) REFERENCES casino_sessions(id)
)
""")
2025-10-30 16:11:50 +08:00
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_casino_bets
ON casino_bets(chat_id, game_type, status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_casino_sessions
ON casino_sessions(chat_id, game_type, status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_casino_blackjack_hands
ON casino_blackjack_hands(session_id, user_id)
""")
# 兼容性检查为casino_sessions表添加新字段
self._add_column_if_not_exists('casino_sessions', 'current_phase', "TEXT DEFAULT 'betting'")
self._add_column_if_not_exists('casino_sessions', 'blackjack_multiplier', 'REAL DEFAULT 1.5')
# 兼容性检查为casino_bets表添加新字段轮盘和21点专用
self._add_column_if_not_exists('casino_bets', 'bet_category', "TEXT")
self._add_column_if_not_exists('casino_bets', 'bet_number', 'INTEGER')
self._add_column_if_not_exists('casino_bets', 'bet_value', "TEXT")
self._add_column_if_not_exists('casino_bets', 'hand_status', "TEXT")
2025-10-28 13:00:35 +08:00
logger.info("数据库表初始化完成")
# ===== 用户相关操作 =====
def get_or_create_user(self, user_id: int, username: str = None) -> Dict:
"""获取或创建用户
Args:
user_id: 用户ID
username: 用户名
Returns:
用户信息字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
# 尝试获取用户
cursor.execute(
"SELECT * FROM users WHERE user_id = ?",
(user_id,)
)
user = cursor.fetchone()
if user:
# 更新最后活跃时间
cursor.execute(
"UPDATE users SET last_active = ? WHERE user_id = ?",
(current_time, user_id)
)
return dict(user)
else:
# 创建新用户
cursor.execute(
"INSERT INTO users (user_id, username, created_at, last_active) VALUES (?, ?, ?, ?)",
(user_id, username, current_time, current_time)
)
return {
'user_id': user_id,
'username': username,
'created_at': current_time,
'last_active': current_time
}
2025-10-29 15:49:39 +08:00
def get_user_by_name(self, username: str) -> Optional[Dict]:
"""根据用户名查找用户
Args:
username: 用户名
Returns:
用户信息字典如果不存在返回None
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM users WHERE username = ?",
(username,)
)
row = cursor.fetchone()
if row:
return dict(row)
return None
def update_user_name(self, user_id: int, username: str) -> bool:
"""更新用户名称
Args:
user_id: 用户ID
username: 新用户名
Returns:
是否成功
"""
try:
# 确保用户存在
self.get_or_create_user(user_id)
cursor = self.conn.cursor()
cursor.execute(
"UPDATE users SET username = ? WHERE user_id = ?",
(username, user_id)
)
logger.info(f"用户 {user_id} 更新名称为: {username}")
return True
except Exception as e:
logger.error(f"更新用户名失败: user_id={user_id}, username={username}, error={e}", exc_info=True)
return False
def get_user_display_name(self, user_id: int) -> str:
"""获取用户显示名称
如果用户已注册username不为None返回用户名否则返回"用户{user_id}"
Args:
user_id: 用户ID
Returns:
用户显示名称用户名或"用户{user_id}"
"""
user_dict = self.get_or_create_user(user_id)
username = user_dict.get('username')
if username:
return username
else:
return f"用户{user_id}"
2025-10-28 13:00:35 +08:00
# ===== 游戏状态相关操作 =====
def get_game_state(self, chat_id: int, user_id: int, game_type: str) -> Optional[Dict]:
"""获取游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
Returns:
游戏状态字典如果不存在返回None
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(chat_id, user_id, game_type)
)
row = cursor.fetchone()
if row:
state = dict(row)
# 解析JSON数据
if state.get('state_data'):
state['state_data'] = json.loads(state['state_data'])
return state
return None
def save_game_state(self, chat_id: int, user_id: int, game_type: str, state_data: Dict):
"""保存游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
state_data: 状态数据字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
state_json = json.dumps(state_data, ensure_ascii=False)
cursor.execute("""
INSERT INTO game_states (chat_id, user_id, game_type, state_data, created_at, updated_at, confirmed)
VALUES (?, ?, ?, ?, ?, ?, 0)
2025-10-28 13:00:35 +08:00
ON CONFLICT(chat_id, user_id, game_type)
DO UPDATE SET state_data = excluded.state_data,
updated_at = excluded.updated_at,
confirmed = 0
""", (chat_id, user_id, game_type, state_json, current_time, current_time))
2025-10-28 13:00:35 +08:00
logger.debug(f"保存游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def confirm_game_state(self, chat_id: int, user_id: int, game_type: str):
"""标记游戏状态为已确认(可由清理器删除)
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
"""
cursor = self.conn.cursor()
current_time = int(time.time())
cursor.execute(
"UPDATE game_states SET confirmed = 1, updated_at = ? WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(current_time, chat_id, user_id, game_type)
)
logger.debug(f"确认游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
2025-10-28 13:00:35 +08:00
def delete_game_state(self, chat_id: int, user_id: int, game_type: str):
"""删除游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
"""
cursor = self.conn.cursor()
cursor.execute(
"DELETE FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(chat_id, user_id, game_type)
)
logger.debug(f"删除游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def cleanup_old_sessions(self, timeout: int = 1800):
"""清理过期的游戏会话
Args:
timeout: 超时时间
"""
cursor = self.conn.cursor()
cutoff_time = int(time.time()) - timeout
# 仅删除已确认confirmed=1的旧会话避免误删进行中的长时任务
2025-10-28 13:00:35 +08:00
cursor.execute(
"DELETE FROM game_states WHERE confirmed = 1 AND updated_at < ?",
2025-10-28 13:00:35 +08:00
(cutoff_time,)
)
deleted = cursor.rowcount
if deleted > 0:
logger.info(f"清理了 {deleted} 个过期游戏会话")
# ===== 游戏统计相关操作 =====
def get_game_stats(self, user_id: int, game_type: str) -> Dict:
"""获取游戏统计
Args:
user_id: 用户ID
game_type: 游戏类型
Returns:
统计数据字典
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM game_stats WHERE user_id = ? AND game_type = ?",
(user_id, game_type)
)
row = cursor.fetchone()
if row:
return dict(row)
else:
# 返回默认值
return {
'user_id': user_id,
'game_type': game_type,
'wins': 0,
'losses': 0,
'draws': 0,
'total_plays': 0
}
def update_game_stats(self, user_id: int, game_type: str,
win: bool = False, loss: bool = False, draw: bool = False):
"""更新游戏统计
Args:
user_id: 用户ID
game_type: 游戏类型
win: 是否获胜
loss: 是否失败
draw: 是否平局
"""
cursor = self.conn.cursor()
# 使用UPSERT语法
cursor.execute("""
INSERT INTO game_stats (user_id, game_type, wins, losses, draws, total_plays)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(user_id, game_type)
DO UPDATE SET
wins = wins + ?,
losses = losses + ?,
draws = draws + ?,
total_plays = total_plays + 1
""", (user_id, game_type, int(win), int(loss), int(draw),
int(win), int(loss), int(draw)))
logger.debug(f"更新游戏统计: user_id={user_id}, game_type={game_type}")
2025-10-29 11:32:43 +08:00
# ===== 积分相关操作 =====
def get_user_points(self, user_id: int) -> Dict:
"""获取用户积分信息
Args:
user_id: 用户ID
Returns:
积分信息字典
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if row:
return dict(row)
else:
# 创建新用户积分记录
current_time = int(time.time())
cursor.execute(
"INSERT INTO user_points (user_id, points, created_at, updated_at) VALUES (?, 0, ?, ?)",
2025-10-29 11:32:43 +08:00
(user_id, current_time, current_time)
)
return {
'user_id': user_id,
'points': 0,
'last_checkin_date': None,
2025-10-29 11:32:43 +08:00
'created_at': current_time,
'updated_at': current_time
}
def add_points(self, user_id: int, points: int, source: str, description: str = None) -> bool:
"""添加积分
Args:
user_id: 用户ID
points: 积分数量
source: 积分来源
description: 描述
Returns:
是否成功
"""
if points <= 0:
2025-10-29 12:06:18 +08:00
logger.warning(f"积分数量无效: {points}")
2025-10-29 11:32:43 +08:00
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
2025-10-29 12:06:18 +08:00
# 确保用户存在
self.get_or_create_user(user_id)
# 更新用户积分(不触碰 last_checkin_date
# 若不存在则插入;存在则 points += ? 且更新 updated_at
2025-10-29 11:32:43 +08:00
cursor.execute("""
INSERT INTO user_points (user_id, points, created_at, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(user_id) DO UPDATE SET
points = user_points.points + excluded.points,
updated_at = excluded.updated_at
""", (user_id, points, current_time, current_time))
2025-10-29 11:32:43 +08:00
logger.info(f"用户 {user_id} 成功获得 {points} 积分,来源:{source}")
2025-10-29 11:32:43 +08:00
return True
except Exception as e:
2025-10-29 12:06:18 +08:00
logger.error(f"添加积分失败: user_id={user_id}, points={points}, error={e}", exc_info=True)
2025-10-29 11:32:43 +08:00
return False
def consume_points(self, user_id: int, points: int, source: str, description: str = None) -> bool:
"""消费积分
Args:
user_id: 用户ID
points: 积分数量
source: 消费来源
description: 描述
Returns:
是否成功
"""
if points <= 0:
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 检查积分是否足够
cursor.execute(
"SELECT points FROM user_points WHERE user_id = ?",
2025-10-29 11:32:43 +08:00
(user_id,)
)
row = cursor.fetchone()
if not row or row[0] < points:
logger.warning(f"用户 {user_id} 积分不足,需要 {points},当前可用 {row[0] if row else 0}")
return False
# 消费积分
cursor.execute(
"UPDATE user_points SET points = points - ?, updated_at = ? WHERE user_id = ?",
2025-10-29 11:32:43 +08:00
(points, current_time, user_id)
)
logger.debug(f"用户 {user_id} 消费 {points} 积分,来源:{source}")
return True
except Exception as e:
logger.error(f"消费积分失败: {e}")
return False
def check_daily_checkin(self, user_id: int, date: str) -> bool:
"""检查用户是否已签到
Args:
user_id: 用户ID
date: 日期字符串 (YYYY-MM-DD)
Returns:
是否已签到
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT last_checkin_date FROM user_points WHERE user_id = ?",
(user_id,)
2025-10-29 11:32:43 +08:00
)
row = cursor.fetchone()
return row and row[0] == date
2025-10-29 11:32:43 +08:00
def daily_checkin(self, user_id: int, points: int) -> bool:
"""每日签到
Args:
user_id: 用户ID
points: 签到积分
Returns:
是否成功
"""
from datetime import datetime
today = datetime.now().strftime('%Y-%m-%d')
if self.check_daily_checkin(user_id, today):
logger.warning(f"用户 {user_id} 今日已签到")
2025-10-29 11:32:43 +08:00
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 确保用户存在
self.get_or_create_user(user_id)
2025-10-29 12:06:18 +08:00
# 获取签到前积分
points_before = self.get_user_points(user_id)
logger.info(f"用户 {user_id} 签到前积分: {points_before['points']}")
2025-10-29 11:32:43 +08:00
# 更新积分和签到日期
cursor.execute("""
2025-10-29 12:36:20 +08:00
INSERT OR REPLACE INTO user_points (user_id, points, last_checkin_date, created_at, updated_at)
VALUES (?, COALESCE((SELECT points FROM user_points WHERE user_id = ?), 0) + ?, ?, ?, ?)
""", (user_id, user_id, points, today, current_time, current_time))
2025-10-29 12:06:18 +08:00
# 验证积分是否真的增加了
points_after = self.get_user_points(user_id)
logger.info(f"用户 {user_id} 签到后积分: {points_after['points']}")
2025-10-29 12:06:18 +08:00
if points_after['points'] > points_before['points']:
logger.info(f"用户 {user_id} 签到成功,积分增加: {points_after['points'] - points_before['points']}")
2025-10-29 12:06:18 +08:00
return True
else:
logger.error(f"用户 {user_id} 签到失败,积分未增加")
return False
2025-10-29 11:32:43 +08:00
except Exception as e:
logger.error(f"每日签到失败: {e}", exc_info=True)
2025-10-29 11:32:43 +08:00
return False
def get_points_leaderboard(self, limit: int = 10) -> List[Dict]:
"""获取积分排行榜
Args:
limit: 限制数量
Returns:
排行榜列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT u.user_id, u.username, up.points
2025-10-29 11:32:43 +08:00
FROM users u
LEFT JOIN user_points up ON u.user_id = up.user_id
ORDER BY COALESCE(up.points, 0) DESC
2025-10-29 11:32:43 +08:00
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
return [dict(row) for row in rows]
2025-10-30 16:11:50 +08:00
# ===== 赌场相关操作 =====
def get_any_active_casino_session(self, chat_id: int) -> Optional[Dict]:
"""获取任意活跃的游戏会话(用于单场限制检查)
Args:
chat_id: 会话ID
Returns:
会话信息字典或None
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM casino_sessions
WHERE chat_id = ? AND status = 'open'
ORDER BY id DESC LIMIT 1
""", (chat_id,))
row = cursor.fetchone()
if row:
return dict(row)
return None
2025-10-30 16:11:50 +08:00
def create_casino_session(self, chat_id: int, game_type: str, banker_id: int,
min_bet: int, max_bet: int, multiplier: float,
house_fee: float = 0.05, current_phase: str = 'betting',
blackjack_multiplier: float = 1.5) -> int:
2025-10-30 16:11:50 +08:00
"""创建新的赌场游戏会话
Args:
chat_id: 会话ID
game_type: 游戏类型
banker_id: 庄家ID
min_bet: 最小下注金额
max_bet: 最大下注金额
multiplier: 赔率
house_fee: 抽水率
current_phase: 当前阶段默认'betting'
blackjack_multiplier: 21点黑杰克倍数默认1.5
2025-10-30 16:11:50 +08:00
Returns:
session_id
"""
cursor = self.conn.cursor()
current_time = int(time.time())
# 检查是否已有活跃的会话单场限制同一chat_id只能有一个活跃游戏
existing = self.get_any_active_casino_session(chat_id)
if existing:
# 如果已有活跃游戏返回其ID保持向后兼容但实际应该在应用层阻止
return existing['id']
# 检查是否已有相同game_type的活跃会话向后兼容
2025-10-30 16:11:50 +08:00
cursor.execute("""
SELECT id FROM casino_sessions
WHERE chat_id = ? AND game_type = ? AND status = 'open'
""", (chat_id, game_type))
existing_same_type = cursor.fetchone()
if existing_same_type:
return existing_same_type['id']
2025-10-30 16:11:50 +08:00
cursor.execute("""
INSERT INTO casino_sessions
(chat_id, game_type, banker_id, min_bet, max_bet, multiplier, house_fee,
status, created_at, current_phase, blackjack_multiplier)
VALUES (?, ?, ?, ?, ?, ?, ?, 'open', ?, ?, ?)
""", (chat_id, game_type, banker_id, min_bet, max_bet, multiplier, house_fee,
current_time, current_phase, blackjack_multiplier))
2025-10-30 16:11:50 +08:00
return cursor.lastrowid
def get_active_casino_session(self, chat_id: int, game_type: str) -> Optional[Dict]:
"""获取活跃的游戏会话
Args:
chat_id: 会话ID
game_type: 游戏类型
Returns:
会话信息字典或None
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM casino_sessions
WHERE chat_id = ? AND game_type = ? AND status = 'open'
ORDER BY id DESC LIMIT 1
""", (chat_id, game_type))
row = cursor.fetchone()
if row:
return dict(row)
return None
def create_casino_bet(self, chat_id: int, game_type: str, user_id: int,
bet_type: str, amount: int, multiplier: float,
bet_category: str = None, bet_number: int = None,
bet_value: str = None, hand_status: str = None) -> int:
2025-10-30 16:11:50 +08:00
"""创建下注记录
Args:
chat_id: 会话ID
game_type: 游戏类型
user_id: 用户ID
bet_type: 下注类型大小游戏""/""21"标准"
2025-10-30 16:11:50 +08:00
amount: 下注金额
multiplier: 赔率
bet_category: 下注类别轮盘游戏["数字"/"颜色"/"奇偶"/"大小"/"区间"]
bet_number: 数字下注轮盘游戏0-36
bet_value: 下注值轮盘游戏"红色""奇数""1-12"
hand_status: 手牌状态21点游戏["playing"/"stood"/"busted"/"blackjack"]
2025-10-30 16:11:50 +08:00
Returns:
bet_id
"""
cursor = self.conn.cursor()
current_time = int(time.time())
cursor.execute("""
INSERT INTO casino_bets
(chat_id, game_type, user_id, bet_type, amount, multiplier, status, created_at,
bet_category, bet_number, bet_value, hand_status)
VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, ?, ?, ?, ?)
""", (chat_id, game_type, user_id, bet_type, amount, multiplier, current_time,
bet_category, bet_number, bet_value, hand_status))
2025-10-30 16:11:50 +08:00
return cursor.lastrowid
def get_pending_bets(self, chat_id: int, game_type: str) -> List[Dict]:
"""获取待结算的下注列表
Args:
chat_id: 会话ID
game_type: 游戏类型
Returns:
下注列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM casino_bets
WHERE chat_id = ? AND game_type = ? AND status = 'pending'
ORDER BY created_at ASC
""", (chat_id, game_type))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def settle_casino_bets(self, chat_id: int, game_type: str, result: str,
banker_id: int) -> Dict:
"""结算所有下注
Args:
chat_id: 会话ID
game_type: 游戏类型
result: 游戏结果'' ''
banker_id: 庄家ID
Returns:
结算详情字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
# 获取活跃会话
session = self.get_active_casino_session(chat_id, game_type)
if not session:
raise ValueError("没有活跃的游戏会话")
if session['banker_id'] != banker_id:
raise ValueError("只有庄家可以结算游戏")
# 获取所有待结算下注
bets = self.get_pending_bets(chat_id, game_type)
winners = []
losers = []
total_win = 0
# 计算输赢
for bet in bets:
is_win = (bet['bet_type'] == result)
if is_win:
# 计算赢得金额
win_amount = int(bet['amount'] * bet['multiplier'])
# 扣除抽水
house_fee = session['house_fee']
actual_win = int(win_amount * (1 - house_fee))
winners.append({
'user_id': bet['user_id'],
'amount': bet['amount'],
'win_amount': actual_win,
'bet_id': bet['id']
})
total_win += actual_win
else:
losers.append({
'user_id': bet['user_id'],
'amount': bet['amount'],
'bet_id': bet['id']
})
# 使用事务确保原子性
try:
# 更新下注状态
for bet in bets:
is_win = (bet['bet_type'] == result)
if is_win:
win_amount = int(bet['amount'] * bet['multiplier'])
actual_win = int(win_amount * (1 - session['house_fee']))
# 发放奖励
self.add_points(bet['user_id'], actual_win, 'casino_win',
f"赌场游戏{game_type}赢得")
cursor.execute("""
UPDATE casino_bets
SET status = 'settled', result = ?, win_amount = ?, settled_at = ?
WHERE id = ?
""", (result, actual_win, current_time, bet['id']))
else:
cursor.execute("""
UPDATE casino_bets
SET status = 'settled', result = ?, settled_at = ?
WHERE id = ?
""", (result, current_time, bet['id']))
# 关闭会话
cursor.execute("""
UPDATE casino_sessions
SET status = 'closed', settled_at = ?
WHERE id = ?
""", (current_time, session['id']))
return {
'winners': winners,
'losers': losers,
'total_win': total_win,
'result': result
}
except Exception as e:
logger.error(f"结算失败: {e}", exc_info=True)
raise
def close_casino_session(self, chat_id: int, game_type: str):
"""关闭游戏会话
Args:
chat_id: 会话ID
game_type: 游戏类型
"""
cursor = self.conn.cursor()
current_time = int(time.time())
cursor.execute("""
UPDATE casino_sessions
SET status = 'closed', settled_at = ?
WHERE chat_id = ? AND game_type = ? AND status = 'open'
""", (current_time, chat_id, game_type))
# ===== 21点手牌管理 =====
def create_blackjack_hand(self, session_id: int, user_id: int,
hand_data: List[int], hand_status: str = 'playing') -> int:
"""创建21点手牌记录
Args:
session_id: 会话ID
user_id: 用户ID0表示庄家
hand_data: 手牌列表
hand_status: 手牌状态
Returns:
hand_id
"""
cursor = self.conn.cursor()
current_time = int(time.time())
hand_data_json = json.dumps(hand_data)
cursor.execute("""
INSERT INTO casino_blackjack_hands
(session_id, user_id, hand_data, hand_status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (session_id, user_id, hand_data_json, hand_status, current_time, current_time))
return cursor.lastrowid
def get_blackjack_hand(self, session_id: int, user_id: int) -> Optional[Dict]:
"""获取21点手牌
Args:
session_id: 会话ID
user_id: 用户ID
Returns:
手牌信息字典或None
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM casino_blackjack_hands
WHERE session_id = ? AND user_id = ?
ORDER BY id DESC LIMIT 1
""", (session_id, user_id))
row = cursor.fetchone()
if row:
hand_dict = dict(row)
hand_dict['hand_data'] = json.loads(hand_dict['hand_data'])
return hand_dict
return None
def update_blackjack_hand(self, session_id: int, user_id: int,
hand_data: List[int], hand_status: str):
"""更新21点手牌
Args:
session_id: 会话ID
user_id: 用户ID
hand_data: 手牌列表
hand_status: 手牌状态
"""
cursor = self.conn.cursor()
current_time = int(time.time())
hand_data_json = json.dumps(hand_data)
cursor.execute("""
UPDATE casino_blackjack_hands
SET hand_data = ?, hand_status = ?, updated_at = ?
WHERE session_id = ? AND user_id = ?
""", (hand_data_json, hand_status, current_time, session_id, user_id))
def get_all_blackjack_hands(self, session_id: int) -> Dict[int, Dict]:
"""获取所有21点手牌用于结算
Args:
session_id: 会话ID
Returns:
手牌字典 {user_id: {'cards': [...], 'status': ...}, ...}
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM casino_blackjack_hands
WHERE session_id = ?
ORDER BY user_id, id DESC
""", (session_id,))
rows = cursor.fetchall()
hands_dict = {}
for row in rows:
hand_dict = dict(row)
user_id = hand_dict['user_id']
# 只保留最新的手牌(每个用户)
if user_id not in hands_dict:
hands_dict[user_id] = {
'cards': json.loads(hand_dict['hand_data']),
'status': hand_dict['hand_status']
}
return hands_dict
2025-10-28 13:00:35 +08:00
def close(self):
"""关闭数据库连接"""
if self._conn:
self._conn.close()
self._conn = None
logger.info("数据库连接已关闭")
# 全局数据库实例
_db_instance: Optional[Database] = None
def get_db() -> Database:
"""获取全局数据库实例(单例模式)"""
global _db_instance
if _db_instance is None:
_db_instance = Database()
return _db_instance