"""SQLite数据库操作模块 - 使用标准库sqlite3""" import sqlite3 import json import time import logging from typing import Optional, Dict, Any, List from pathlib import Path from config import DATABASE_PATH logger = logging.getLogger(__name__) class Database: """数据库管理类""" def __init__(self, db_path: str = DATABASE_PATH): """初始化数据库连接 Args: db_path: 数据库文件路径 """ self.db_path = db_path self._conn: Optional[sqlite3.Connection] = None self._ensure_db_exists() self.init_tables() def _ensure_db_exists(self): """确保数据库目录存在""" db_dir = Path(self.db_path).parent db_dir.mkdir(parents=True, exist_ok=True) @property def conn(self) -> sqlite3.Connection: """获取数据库连接(懒加载)""" if self._conn is None: try: self._conn = sqlite3.connect( self.db_path, check_same_thread=False, # 允许多线程访问 isolation_level=None, # 自动提交 timeout=30.0 # 增加超时时间 ) self._conn.row_factory = sqlite3.Row # 支持字典式访问 # 启用WAL模式以提高并发性能 self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA synchronous=NORMAL") self._conn.execute("PRAGMA cache_size=1000") self._conn.execute("PRAGMA temp_store=MEMORY") logger.info(f"数据库连接成功: {self.db_path}") except Exception as e: logger.error(f"数据库连接失败: {e}", exc_info=True) raise return self._conn def init_tables(self): """初始化数据库表""" cursor = self.conn.cursor() # 用户表 cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY, username TEXT, created_at INTEGER NOT NULL, last_active INTEGER NOT NULL ) """) # 游戏状态表 cursor.execute(""" CREATE TABLE IF NOT EXISTS game_states ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER NOT NULL, user_id INTEGER NOT NULL, game_type TEXT NOT NULL, state_data TEXT, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, UNIQUE(chat_id, user_id, game_type) ) """) # 创建索引 cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_chat_user ON game_states(chat_id, user_id) """) # 游戏统计表 cursor.execute(""" CREATE TABLE IF NOT EXISTS game_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, game_type TEXT NOT NULL, wins INTEGER DEFAULT 0, losses INTEGER DEFAULT 0, draws INTEGER DEFAULT 0, total_plays INTEGER DEFAULT 0, UNIQUE(user_id, game_type) ) """) # 用户积分表 cursor.execute(""" CREATE TABLE IF NOT EXISTS user_points ( user_id INTEGER PRIMARY KEY, total_points INTEGER DEFAULT 0, available_points INTEGER DEFAULT 0, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users (user_id) ) """) # 积分记录表 cursor.execute(""" CREATE TABLE IF NOT EXISTS points_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, points INTEGER NOT NULL, source TEXT NOT NULL, description TEXT, created_at INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users (user_id) ) """) # 每日签到记录表 cursor.execute(""" CREATE TABLE IF NOT EXISTS daily_checkins ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, checkin_date TEXT NOT NULL, points_earned INTEGER NOT NULL, created_at INTEGER NOT NULL, UNIQUE(user_id, checkin_date) ) """) # 炼金抽奖记录表 cursor.execute(""" CREATE TABLE IF NOT EXISTS alchemy_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, cost_points INTEGER NOT NULL, reward_type TEXT NOT NULL, reward_value INTEGER NOT NULL, reward_description TEXT, created_at INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users (user_id) ) """) # 积分赠送记录表 cursor.execute(""" CREATE TABLE IF NOT EXISTS gift_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, sender_id INTEGER NOT NULL, receiver_id INTEGER NOT NULL, points INTEGER NOT NULL, message TEXT, created_at INTEGER NOT NULL, FOREIGN KEY (sender_id) REFERENCES users (user_id), FOREIGN KEY (receiver_id) REFERENCES users (user_id) ) """) # 创建积分相关索引 cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_points_records_user ON points_records(user_id, created_at) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_checkins_user_date ON daily_checkins(user_id, checkin_date) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_alchemy_records_user ON alchemy_records(user_id, created_at) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_gift_records_sender ON gift_records(sender_id, created_at) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_gift_records_receiver ON gift_records(receiver_id, created_at) """) logger.info("数据库表初始化完成") # ===== 用户相关操作 ===== def get_or_create_user(self, user_id: int, username: str = None) -> Dict: """获取或创建用户 Args: user_id: 用户ID username: 用户名 Returns: 用户信息字典 """ cursor = self.conn.cursor() current_time = int(time.time()) # 尝试获取用户 cursor.execute( "SELECT * FROM users WHERE user_id = ?", (user_id,) ) user = cursor.fetchone() if user: # 更新最后活跃时间 cursor.execute( "UPDATE users SET last_active = ? WHERE user_id = ?", (current_time, user_id) ) return dict(user) else: # 创建新用户 cursor.execute( "INSERT INTO users (user_id, username, created_at, last_active) VALUES (?, ?, ?, ?)", (user_id, username, current_time, current_time) ) return { 'user_id': user_id, 'username': username, 'created_at': current_time, 'last_active': current_time } # ===== 游戏状态相关操作 ===== def get_game_state(self, chat_id: int, user_id: int, game_type: str) -> Optional[Dict]: """获取游戏状态 Args: chat_id: 会话ID user_id: 用户ID game_type: 游戏类型 Returns: 游戏状态字典,如果不存在返回None """ cursor = self.conn.cursor() cursor.execute( "SELECT * FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?", (chat_id, user_id, game_type) ) row = cursor.fetchone() if row: state = dict(row) # 解析JSON数据 if state.get('state_data'): state['state_data'] = json.loads(state['state_data']) return state return None def save_game_state(self, chat_id: int, user_id: int, game_type: str, state_data: Dict): """保存游戏状态 Args: chat_id: 会话ID user_id: 用户ID game_type: 游戏类型 state_data: 状态数据字典 """ cursor = self.conn.cursor() current_time = int(time.time()) state_json = json.dumps(state_data, ensure_ascii=False) cursor.execute(""" INSERT INTO game_states (chat_id, user_id, game_type, state_data, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(chat_id, user_id, game_type) DO UPDATE SET state_data = ?, updated_at = ? """, (chat_id, user_id, game_type, state_json, current_time, current_time, state_json, current_time)) logger.debug(f"保存游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}") def delete_game_state(self, chat_id: int, user_id: int, game_type: str): """删除游戏状态 Args: chat_id: 会话ID user_id: 用户ID game_type: 游戏类型 """ cursor = self.conn.cursor() cursor.execute( "DELETE FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?", (chat_id, user_id, game_type) ) logger.debug(f"删除游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}") def cleanup_old_sessions(self, timeout: int = 1800): """清理过期的游戏会话 Args: timeout: 超时时间(秒) """ cursor = self.conn.cursor() cutoff_time = int(time.time()) - timeout cursor.execute( "DELETE FROM game_states WHERE updated_at < ?", (cutoff_time,) ) deleted = cursor.rowcount if deleted > 0: logger.info(f"清理了 {deleted} 个过期游戏会话") # ===== 游戏统计相关操作 ===== def get_game_stats(self, user_id: int, game_type: str) -> Dict: """获取游戏统计 Args: user_id: 用户ID game_type: 游戏类型 Returns: 统计数据字典 """ cursor = self.conn.cursor() cursor.execute( "SELECT * FROM game_stats WHERE user_id = ? AND game_type = ?", (user_id, game_type) ) row = cursor.fetchone() if row: return dict(row) else: # 返回默认值 return { 'user_id': user_id, 'game_type': game_type, 'wins': 0, 'losses': 0, 'draws': 0, 'total_plays': 0 } def update_game_stats(self, user_id: int, game_type: str, win: bool = False, loss: bool = False, draw: bool = False): """更新游戏统计 Args: user_id: 用户ID game_type: 游戏类型 win: 是否获胜 loss: 是否失败 draw: 是否平局 """ cursor = self.conn.cursor() # 使用UPSERT语法 cursor.execute(""" INSERT INTO game_stats (user_id, game_type, wins, losses, draws, total_plays) VALUES (?, ?, ?, ?, ?, 1) ON CONFLICT(user_id, game_type) DO UPDATE SET wins = wins + ?, losses = losses + ?, draws = draws + ?, total_plays = total_plays + 1 """, (user_id, game_type, int(win), int(loss), int(draw), int(win), int(loss), int(draw))) logger.debug(f"更新游戏统计: user_id={user_id}, game_type={game_type}") # ===== 积分相关操作 ===== def get_user_points(self, user_id: int) -> Dict: """获取用户积分信息 Args: user_id: 用户ID Returns: 积分信息字典 """ cursor = self.conn.cursor() cursor.execute( "SELECT * FROM user_points WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() if row: return dict(row) else: # 创建新用户积分记录 current_time = int(time.time()) cursor.execute( "INSERT INTO user_points (user_id, total_points, available_points, created_at, updated_at) VALUES (?, 0, 0, ?, ?)", (user_id, current_time, current_time) ) return { 'user_id': user_id, 'total_points': 0, 'available_points': 0, 'created_at': current_time, 'updated_at': current_time } def add_points(self, user_id: int, points: int, source: str, description: str = None) -> bool: """添加积分 Args: user_id: 用户ID points: 积分数量 source: 积分来源 description: 描述 Returns: 是否成功 """ if points <= 0: logger.warning(f"积分数量无效: {points}") return False cursor = self.conn.cursor() current_time = int(time.time()) try: # 确保用户存在 self.get_or_create_user(user_id) # 更新用户积分 cursor.execute(""" INSERT INTO user_points (user_id, total_points, available_points, created_at, updated_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(user_id) DO UPDATE SET total_points = total_points + ?, available_points = available_points + ?, updated_at = ? """, (user_id, points, points, current_time, current_time, points, points, current_time)) # 验证积分是否真的更新了 cursor.execute("SELECT available_points FROM user_points WHERE user_id = ?", (user_id,)) updated_points = cursor.fetchone() if not updated_points: logger.error(f"积分更新后查询失败: user_id={user_id}") return False # 记录积分变动 cursor.execute( "INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)", (user_id, points, source, description, current_time) ) logger.info(f"用户 {user_id} 成功获得 {points} 积分,来源:{source},当前积分:{updated_points[0]}") return True except Exception as e: logger.error(f"添加积分失败: user_id={user_id}, points={points}, error={e}", exc_info=True) return False def consume_points(self, user_id: int, points: int, source: str, description: str = None) -> bool: """消费积分 Args: user_id: 用户ID points: 积分数量 source: 消费来源 description: 描述 Returns: 是否成功 """ if points <= 0: return False cursor = self.conn.cursor() current_time = int(time.time()) try: # 检查积分是否足够 cursor.execute( "SELECT available_points FROM user_points WHERE user_id = ?", (user_id,) ) row = cursor.fetchone() if not row or row[0] < points: logger.warning(f"用户 {user_id} 积分不足,需要 {points},当前可用 {row[0] if row else 0}") return False # 消费积分 cursor.execute( "UPDATE user_points SET available_points = available_points - ?, updated_at = ? WHERE user_id = ?", (points, current_time, user_id) ) # 记录积分变动 cursor.execute( "INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)", (user_id, -points, source, description, current_time) ) logger.debug(f"用户 {user_id} 消费 {points} 积分,来源:{source}") return True except Exception as e: logger.error(f"消费积分失败: {e}") return False def get_points_records(self, user_id: int, limit: int = 20) -> List[Dict]: """获取用户积分记录 Args: user_id: 用户ID limit: 限制数量 Returns: 积分记录列表 """ cursor = self.conn.cursor() cursor.execute( "SELECT * FROM points_records WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", (user_id, limit) ) rows = cursor.fetchall() return [dict(row) for row in rows] def check_daily_checkin(self, user_id: int, date: str) -> bool: """检查用户是否已签到 Args: user_id: 用户ID date: 日期字符串 (YYYY-MM-DD) Returns: 是否已签到 """ cursor = self.conn.cursor() cursor.execute( "SELECT 1 FROM daily_checkins WHERE user_id = ? AND checkin_date = ?", (user_id, date) ) return cursor.fetchone() is not None def daily_checkin(self, user_id: int, points: int) -> bool: """每日签到 Args: user_id: 用户ID points: 签到积分 Returns: 是否成功 """ from datetime import datetime today = datetime.now().strftime('%Y-%m-%d') if self.check_daily_checkin(user_id, today): logger.warning(f"用户 {user_id} 今日已签到") return False cursor = self.conn.cursor() current_time = int(time.time()) try: # 确保用户存在 self.get_or_create_user(user_id) # 获取签到前积分 points_before = self.get_user_points(user_id) logger.info(f"用户 {user_id} 签到前积分: {points_before['available_points']}") # 记录签到 cursor.execute( "INSERT INTO daily_checkins (user_id, checkin_date, points_earned, created_at) VALUES (?, ?, ?, ?)", (user_id, today, points, current_time) ) # 添加积分 result = self.add_points(user_id, points, "daily_checkin", f"每日签到奖励") # 验证积分是否真的增加了 points_after = self.get_user_points(user_id) logger.info(f"用户 {user_id} 签到后积分: {points_after['available_points']}") if points_after['available_points'] > points_before['available_points']: logger.info(f"用户 {user_id} 签到成功,积分增加: {points_after['available_points'] - points_before['available_points']}") return True else: logger.error(f"用户 {user_id} 签到失败,积分未增加") return False except Exception as e: logger.error(f"每日签到失败: {e}", exc_info=True) return False def get_points_leaderboard(self, limit: int = 10) -> List[Dict]: """获取积分排行榜 Args: limit: 限制数量 Returns: 排行榜列表 """ cursor = self.conn.cursor() cursor.execute(""" SELECT u.user_id, u.username, up.total_points, up.available_points FROM users u LEFT JOIN user_points up ON u.user_id = up.user_id ORDER BY COALESCE(up.total_points, 0) DESC LIMIT ? """, (limit,)) rows = cursor.fetchall() return [dict(row) for row in rows] def add_alchemy_record(self, user_id: int, cost_points: int, reward_type: str, reward_value: int, reward_description: str = None) -> bool: """添加炼金抽奖记录 Args: user_id: 用户ID cost_points: 消耗积分 reward_type: 奖励类型 reward_value: 奖励数值 reward_description: 奖励描述 Returns: 是否成功 """ cursor = self.conn.cursor() current_time = int(time.time()) try: cursor.execute( "INSERT INTO alchemy_records (user_id, cost_points, reward_type, reward_value, reward_description, created_at) VALUES (?, ?, ?, ?, ?, ?)", (user_id, cost_points, reward_type, reward_value, reward_description, current_time) ) logger.debug(f"添加炼金记录: user_id={user_id}, cost={cost_points}, reward={reward_type}={reward_value}") return True except Exception as e: logger.error(f"添加炼金记录失败: {e}") return False def get_alchemy_records(self, user_id: int, limit: int = 20) -> List[Dict]: """获取用户炼金记录 Args: user_id: 用户ID limit: 限制数量 Returns: 炼金记录列表 """ cursor = self.conn.cursor() cursor.execute( "SELECT * FROM alchemy_records WHERE user_id = ? ORDER BY created_at DESC LIMIT ?", (user_id, limit) ) rows = cursor.fetchall() return [dict(row) for row in rows] def get_alchemy_stats(self, user_id: int) -> Dict: """获取用户炼金统计 Args: user_id: 用户ID Returns: 炼金统计信息 """ cursor = self.conn.cursor() # 总抽奖次数 cursor.execute( "SELECT COUNT(*) FROM alchemy_records WHERE user_id = ?", (user_id,) ) total_draws = cursor.fetchone()[0] # 总消耗积分 cursor.execute( "SELECT SUM(cost_points) FROM alchemy_records WHERE user_id = ?", (user_id,) ) total_cost = cursor.fetchone()[0] or 0 # 总获得积分 cursor.execute( "SELECT SUM(reward_value) FROM alchemy_records WHERE user_id = ? AND reward_type = 'points'", (user_id,) ) total_points_gained = cursor.fetchone()[0] or 0 return { 'total_draws': total_draws, 'total_cost': total_cost, 'total_points_gained': total_points_gained, 'net_points': total_points_gained - total_cost } # ===== 积分赠送相关操作 ===== def send_gift(self, sender_id: int, receiver_id: int, points: int, message: str = None) -> bool: """赠送积分 Args: sender_id: 赠送者ID receiver_id: 接收者ID points: 赠送积分数量 message: 附赠消息 Returns: 是否成功 """ if points <= 0: return False if sender_id == receiver_id: return False # 不能赠送给自己 cursor = self.conn.cursor() current_time = int(time.time()) try: # 检查赠送者积分是否足够 cursor.execute( "SELECT available_points FROM user_points WHERE user_id = ?", (sender_id,) ) row = cursor.fetchone() if not row or row[0] < points: logger.warning(f"用户 {sender_id} 积分不足,需要 {points},当前可用 {row[0] if row else 0}") return False # 确保接收者存在 self.get_or_create_user(receiver_id) # 赠送者扣分 cursor.execute( "UPDATE user_points SET available_points = available_points - ?, updated_at = ? WHERE user_id = ?", (points, current_time, sender_id) ) # 接收者加分 cursor.execute(""" INSERT INTO user_points (user_id, total_points, available_points, created_at, updated_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(user_id) DO UPDATE SET total_points = total_points + ?, available_points = available_points + ?, updated_at = ? """, (receiver_id, points, points, current_time, current_time, points, points, current_time)) # 记录赠送 cursor.execute( "INSERT INTO gift_records (sender_id, receiver_id, points, message, created_at) VALUES (?, ?, ?, ?, ?)", (sender_id, receiver_id, points, message, current_time) ) # 记录积分变动 cursor.execute( "INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)", (sender_id, -points, "gift_send", f"赠送积分给用户{receiver_id}", current_time) ) cursor.execute( "INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)", (receiver_id, points, "gift_receive", f"收到用户{sender_id}的积分赠送", current_time) ) logger.debug(f"用户 {sender_id} 赠送 {points} 积分给用户 {receiver_id}") return True except Exception as e: logger.error(f"赠送积分失败: {e}") return False def get_gift_records_sent(self, user_id: int, limit: int = 20) -> List[Dict]: """获取用户发送的赠送记录 Args: user_id: 用户ID limit: 限制数量 Returns: 赠送记录列表 """ cursor = self.conn.cursor() cursor.execute(""" SELECT gr.*, u.username as receiver_name FROM gift_records gr LEFT JOIN users u ON gr.receiver_id = u.user_id WHERE gr.sender_id = ? ORDER BY gr.created_at DESC LIMIT ? """, (user_id, limit)) rows = cursor.fetchall() return [dict(row) for row in rows] def get_gift_records_received(self, user_id: int, limit: int = 20) -> List[Dict]: """获取用户接收的赠送记录 Args: user_id: 用户ID limit: 限制数量 Returns: 接收记录列表 """ cursor = self.conn.cursor() cursor.execute(""" SELECT gr.*, u.username as sender_name FROM gift_records gr LEFT JOIN users u ON gr.sender_id = u.user_id WHERE gr.receiver_id = ? ORDER BY gr.created_at DESC LIMIT ? """, (user_id, limit)) rows = cursor.fetchall() return [dict(row) for row in rows] def get_gift_stats(self, user_id: int) -> Dict: """获取用户赠送统计 Args: user_id: 用户ID Returns: 赠送统计信息 """ cursor = self.conn.cursor() # 总发送积分 cursor.execute( "SELECT SUM(points) FROM gift_records WHERE sender_id = ?", (user_id,) ) total_sent = cursor.fetchone()[0] or 0 # 总接收积分 cursor.execute( "SELECT SUM(points) FROM gift_records WHERE receiver_id = ?", (user_id,) ) total_received = cursor.fetchone()[0] or 0 # 发送次数 cursor.execute( "SELECT COUNT(*) FROM gift_records WHERE sender_id = ?", (user_id,) ) sent_count = cursor.fetchone()[0] # 接收次数 cursor.execute( "SELECT COUNT(*) FROM gift_records WHERE receiver_id = ?", (user_id,) ) received_count = cursor.fetchone()[0] return { 'total_sent': total_sent, 'total_received': total_received, 'sent_count': sent_count, 'received_count': received_count, 'net_gift': total_received - total_sent } def close(self): """关闭数据库连接""" if self._conn: self._conn.close() self._conn = None logger.info("数据库连接已关闭") # 全局数据库实例 _db_instance: Optional[Database] = None def get_db() -> Database: """获取全局数据库实例(单例模式)""" global _db_instance if _db_instance is None: _db_instance = Database() return _db_instance