Files
WPSBot/core/database.py

905 lines
30 KiB
Python
Raw Normal View History

2025-10-28 13:00:35 +08:00
"""SQLite数据库操作模块 - 使用标准库sqlite3"""
import sqlite3
import json
import time
import logging
from typing import Optional, Dict, Any, List
from pathlib import Path
from config import DATABASE_PATH
logger = logging.getLogger(__name__)
class Database:
"""数据库管理类"""
def __init__(self, db_path: str = DATABASE_PATH):
"""初始化数据库连接
Args:
db_path: 数据库文件路径
"""
self.db_path = db_path
self._conn: Optional[sqlite3.Connection] = None
self._ensure_db_exists()
self.init_tables()
def _ensure_db_exists(self):
"""确保数据库目录存在"""
db_dir = Path(self.db_path).parent
db_dir.mkdir(parents=True, exist_ok=True)
@property
def conn(self) -> sqlite3.Connection:
"""获取数据库连接(懒加载)"""
if self._conn is None:
2025-10-29 12:06:18 +08:00
try:
self._conn = sqlite3.connect(
self.db_path,
check_same_thread=False, # 允许多线程访问
isolation_level=None, # 自动提交
timeout=30.0 # 增加超时时间
)
self._conn.row_factory = sqlite3.Row # 支持字典式访问
# 启用WAL模式以提高并发性能
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._conn.execute("PRAGMA cache_size=1000")
self._conn.execute("PRAGMA temp_store=MEMORY")
logger.info(f"数据库连接成功: {self.db_path}")
except Exception as e:
logger.error(f"数据库连接失败: {e}", exc_info=True)
raise
2025-10-28 13:00:35 +08:00
return self._conn
def init_tables(self):
"""初始化数据库表"""
cursor = self.conn.cursor()
# 用户表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
created_at INTEGER NOT NULL,
last_active INTEGER NOT NULL
)
""")
# 游戏状态表
cursor.execute("""
CREATE TABLE IF NOT EXISTS game_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
state_data TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
UNIQUE(chat_id, user_id, game_type)
)
""")
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_chat_user
ON game_states(chat_id, user_id)
""")
# 游戏统计表
cursor.execute("""
CREATE TABLE IF NOT EXISTS game_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
wins INTEGER DEFAULT 0,
losses INTEGER DEFAULT 0,
draws INTEGER DEFAULT 0,
total_plays INTEGER DEFAULT 0,
UNIQUE(user_id, game_type)
)
""")
2025-10-29 11:32:43 +08:00
# 用户积分表
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_points (
user_id INTEGER PRIMARY KEY,
total_points INTEGER DEFAULT 0,
available_points INTEGER DEFAULT 0,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
# 积分记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS points_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
points INTEGER NOT NULL,
source TEXT NOT NULL,
description TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
# 每日签到记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS daily_checkins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
checkin_date TEXT NOT NULL,
points_earned INTEGER NOT NULL,
created_at INTEGER NOT NULL,
UNIQUE(user_id, checkin_date)
)
""")
# 炼金抽奖记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS alchemy_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
cost_points INTEGER NOT NULL,
reward_type TEXT NOT NULL,
reward_value INTEGER NOT NULL,
reward_description TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
# 积分赠送记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS gift_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_id INTEGER NOT NULL,
receiver_id INTEGER NOT NULL,
points INTEGER NOT NULL,
message TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY (sender_id) REFERENCES users (user_id),
FOREIGN KEY (receiver_id) REFERENCES users (user_id)
)
""")
# 创建积分相关索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_points_records_user
ON points_records(user_id, created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_checkins_user_date
ON daily_checkins(user_id, checkin_date)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alchemy_records_user
ON alchemy_records(user_id, created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_gift_records_sender
ON gift_records(sender_id, created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_gift_records_receiver
ON gift_records(receiver_id, created_at)
""")
2025-10-28 13:00:35 +08:00
logger.info("数据库表初始化完成")
# ===== 用户相关操作 =====
def get_or_create_user(self, user_id: int, username: str = None) -> Dict:
"""获取或创建用户
Args:
user_id: 用户ID
username: 用户名
Returns:
用户信息字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
# 尝试获取用户
cursor.execute(
"SELECT * FROM users WHERE user_id = ?",
(user_id,)
)
user = cursor.fetchone()
if user:
# 更新最后活跃时间
cursor.execute(
"UPDATE users SET last_active = ? WHERE user_id = ?",
(current_time, user_id)
)
return dict(user)
else:
# 创建新用户
cursor.execute(
"INSERT INTO users (user_id, username, created_at, last_active) VALUES (?, ?, ?, ?)",
(user_id, username, current_time, current_time)
)
return {
'user_id': user_id,
'username': username,
'created_at': current_time,
'last_active': current_time
}
# ===== 游戏状态相关操作 =====
def get_game_state(self, chat_id: int, user_id: int, game_type: str) -> Optional[Dict]:
"""获取游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
Returns:
游戏状态字典如果不存在返回None
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(chat_id, user_id, game_type)
)
row = cursor.fetchone()
if row:
state = dict(row)
# 解析JSON数据
if state.get('state_data'):
state['state_data'] = json.loads(state['state_data'])
return state
return None
def save_game_state(self, chat_id: int, user_id: int, game_type: str, state_data: Dict):
"""保存游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
state_data: 状态数据字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
state_json = json.dumps(state_data, ensure_ascii=False)
cursor.execute("""
INSERT INTO game_states (chat_id, user_id, game_type, state_data, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(chat_id, user_id, game_type)
DO UPDATE SET state_data = ?, updated_at = ?
""", (chat_id, user_id, game_type, state_json, current_time, current_time,
state_json, current_time))
logger.debug(f"保存游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def delete_game_state(self, chat_id: int, user_id: int, game_type: str):
"""删除游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
"""
cursor = self.conn.cursor()
cursor.execute(
"DELETE FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(chat_id, user_id, game_type)
)
logger.debug(f"删除游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def cleanup_old_sessions(self, timeout: int = 1800):
"""清理过期的游戏会话
Args:
timeout: 超时时间
"""
cursor = self.conn.cursor()
cutoff_time = int(time.time()) - timeout
cursor.execute(
"DELETE FROM game_states WHERE updated_at < ?",
(cutoff_time,)
)
deleted = cursor.rowcount
if deleted > 0:
logger.info(f"清理了 {deleted} 个过期游戏会话")
# ===== 游戏统计相关操作 =====
def get_game_stats(self, user_id: int, game_type: str) -> Dict:
"""获取游戏统计
Args:
user_id: 用户ID
game_type: 游戏类型
Returns:
统计数据字典
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM game_stats WHERE user_id = ? AND game_type = ?",
(user_id, game_type)
)
row = cursor.fetchone()
if row:
return dict(row)
else:
# 返回默认值
return {
'user_id': user_id,
'game_type': game_type,
'wins': 0,
'losses': 0,
'draws': 0,
'total_plays': 0
}
def update_game_stats(self, user_id: int, game_type: str,
win: bool = False, loss: bool = False, draw: bool = False):
"""更新游戏统计
Args:
user_id: 用户ID
game_type: 游戏类型
win: 是否获胜
loss: 是否失败
draw: 是否平局
"""
cursor = self.conn.cursor()
# 使用UPSERT语法
cursor.execute("""
INSERT INTO game_stats (user_id, game_type, wins, losses, draws, total_plays)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(user_id, game_type)
DO UPDATE SET
wins = wins + ?,
losses = losses + ?,
draws = draws + ?,
total_plays = total_plays + 1
""", (user_id, game_type, int(win), int(loss), int(draw),
int(win), int(loss), int(draw)))
logger.debug(f"更新游戏统计: user_id={user_id}, game_type={game_type}")
2025-10-29 11:32:43 +08:00
# ===== 积分相关操作 =====
def get_user_points(self, user_id: int) -> Dict:
"""获取用户积分信息
Args:
user_id: 用户ID
Returns:
积分信息字典
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if row:
return dict(row)
else:
# 创建新用户积分记录
current_time = int(time.time())
cursor.execute(
"INSERT INTO user_points (user_id, total_points, available_points, created_at, updated_at) VALUES (?, 0, 0, ?, ?)",
(user_id, current_time, current_time)
)
return {
'user_id': user_id,
'total_points': 0,
'available_points': 0,
'created_at': current_time,
'updated_at': current_time
}
def add_points(self, user_id: int, points: int, source: str, description: str = None) -> bool:
"""添加积分
Args:
user_id: 用户ID
points: 积分数量
source: 积分来源
description: 描述
Returns:
是否成功
"""
if points <= 0:
2025-10-29 12:06:18 +08:00
logger.warning(f"积分数量无效: {points}")
2025-10-29 11:32:43 +08:00
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
2025-10-29 12:06:18 +08:00
# 确保用户存在
self.get_or_create_user(user_id)
2025-10-29 11:32:43 +08:00
# 更新用户积分
cursor.execute("""
INSERT INTO user_points (user_id, total_points, available_points, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id)
DO UPDATE SET
total_points = total_points + ?,
available_points = available_points + ?,
updated_at = ?
""", (user_id, points, points, current_time, current_time, points, points, current_time))
2025-10-29 12:06:18 +08:00
# 验证积分是否真的更新了
cursor.execute("SELECT available_points FROM user_points WHERE user_id = ?", (user_id,))
updated_points = cursor.fetchone()
if not updated_points:
logger.error(f"积分更新后查询失败: user_id={user_id}")
return False
2025-10-29 11:32:43 +08:00
# 记录积分变动
cursor.execute(
"INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)",
(user_id, points, source, description, current_time)
)
2025-10-29 12:06:18 +08:00
logger.info(f"用户 {user_id} 成功获得 {points} 积分,来源:{source},当前积分:{updated_points[0]}")
2025-10-29 11:32:43 +08:00
return True
except Exception as e:
2025-10-29 12:06:18 +08:00
logger.error(f"添加积分失败: user_id={user_id}, points={points}, error={e}", exc_info=True)
2025-10-29 11:32:43 +08:00
return False
def consume_points(self, user_id: int, points: int, source: str, description: str = None) -> bool:
"""消费积分
Args:
user_id: 用户ID
points: 积分数量
source: 消费来源
description: 描述
Returns:
是否成功
"""
if points <= 0:
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 检查积分是否足够
cursor.execute(
"SELECT available_points FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if not row or row[0] < points:
logger.warning(f"用户 {user_id} 积分不足,需要 {points},当前可用 {row[0] if row else 0}")
return False
# 消费积分
cursor.execute(
"UPDATE user_points SET available_points = available_points - ?, updated_at = ? WHERE user_id = ?",
(points, current_time, user_id)
)
# 记录积分变动
cursor.execute(
"INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)",
(user_id, -points, source, description, current_time)
)
logger.debug(f"用户 {user_id} 消费 {points} 积分,来源:{source}")
return True
except Exception as e:
logger.error(f"消费积分失败: {e}")
return False
def get_points_records(self, user_id: int, limit: int = 20) -> List[Dict]:
"""获取用户积分记录
Args:
user_id: 用户ID
limit: 限制数量
Returns:
积分记录列表
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM points_records WHERE user_id = ? ORDER BY created_at DESC LIMIT ?",
(user_id, limit)
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def check_daily_checkin(self, user_id: int, date: str) -> bool:
"""检查用户是否已签到
Args:
user_id: 用户ID
date: 日期字符串 (YYYY-MM-DD)
Returns:
是否已签到
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT 1 FROM daily_checkins WHERE user_id = ? AND checkin_date = ?",
(user_id, date)
)
return cursor.fetchone() is not None
def daily_checkin(self, user_id: int, points: int) -> bool:
"""每日签到
Args:
user_id: 用户ID
points: 签到积分
Returns:
是否成功
"""
from datetime import datetime
today = datetime.now().strftime('%Y-%m-%d')
if self.check_daily_checkin(user_id, today):
logger.warning(f"用户 {user_id} 今日已签到")
2025-10-29 11:32:43 +08:00
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 确保用户存在
self.get_or_create_user(user_id)
2025-10-29 12:06:18 +08:00
# 获取签到前积分
points_before = self.get_user_points(user_id)
logger.info(f"用户 {user_id} 签到前积分: {points_before['available_points']}")
2025-10-29 11:32:43 +08:00
# 记录签到
cursor.execute(
"INSERT INTO daily_checkins (user_id, checkin_date, points_earned, created_at) VALUES (?, ?, ?, ?)",
(user_id, today, points, current_time)
)
# 添加积分
result = self.add_points(user_id, points, "daily_checkin", f"每日签到奖励")
2025-10-29 12:06:18 +08:00
# 验证积分是否真的增加了
points_after = self.get_user_points(user_id)
logger.info(f"用户 {user_id} 签到后积分: {points_after['available_points']}")
if points_after['available_points'] > points_before['available_points']:
logger.info(f"用户 {user_id} 签到成功,积分增加: {points_after['available_points'] - points_before['available_points']}")
return True
else:
logger.error(f"用户 {user_id} 签到失败,积分未增加")
return False
2025-10-29 11:32:43 +08:00
except Exception as e:
logger.error(f"每日签到失败: {e}", exc_info=True)
2025-10-29 11:32:43 +08:00
return False
def get_points_leaderboard(self, limit: int = 10) -> List[Dict]:
"""获取积分排行榜
Args:
limit: 限制数量
Returns:
排行榜列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT u.user_id, u.username, up.total_points, up.available_points
FROM users u
LEFT JOIN user_points up ON u.user_id = up.user_id
ORDER BY COALESCE(up.total_points, 0) DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def add_alchemy_record(self, user_id: int, cost_points: int, reward_type: str,
reward_value: int, reward_description: str = None) -> bool:
"""添加炼金抽奖记录
Args:
user_id: 用户ID
cost_points: 消耗积分
reward_type: 奖励类型
reward_value: 奖励数值
reward_description: 奖励描述
Returns:
是否成功
"""
cursor = self.conn.cursor()
current_time = int(time.time())
try:
cursor.execute(
"INSERT INTO alchemy_records (user_id, cost_points, reward_type, reward_value, reward_description, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(user_id, cost_points, reward_type, reward_value, reward_description, current_time)
)
logger.debug(f"添加炼金记录: user_id={user_id}, cost={cost_points}, reward={reward_type}={reward_value}")
return True
except Exception as e:
logger.error(f"添加炼金记录失败: {e}")
return False
def get_alchemy_records(self, user_id: int, limit: int = 20) -> List[Dict]:
"""获取用户炼金记录
Args:
user_id: 用户ID
limit: 限制数量
Returns:
炼金记录列表
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM alchemy_records WHERE user_id = ? ORDER BY created_at DESC LIMIT ?",
(user_id, limit)
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_alchemy_stats(self, user_id: int) -> Dict:
"""获取用户炼金统计
Args:
user_id: 用户ID
Returns:
炼金统计信息
"""
cursor = self.conn.cursor()
# 总抽奖次数
cursor.execute(
"SELECT COUNT(*) FROM alchemy_records WHERE user_id = ?",
(user_id,)
)
total_draws = cursor.fetchone()[0]
# 总消耗积分
cursor.execute(
"SELECT SUM(cost_points) FROM alchemy_records WHERE user_id = ?",
(user_id,)
)
total_cost = cursor.fetchone()[0] or 0
# 总获得积分
cursor.execute(
"SELECT SUM(reward_value) FROM alchemy_records WHERE user_id = ? AND reward_type = 'points'",
(user_id,)
)
total_points_gained = cursor.fetchone()[0] or 0
return {
'total_draws': total_draws,
'total_cost': total_cost,
'total_points_gained': total_points_gained,
'net_points': total_points_gained - total_cost
}
# ===== 积分赠送相关操作 =====
def send_gift(self, sender_id: int, receiver_id: int, points: int, message: str = None) -> bool:
"""赠送积分
Args:
sender_id: 赠送者ID
receiver_id: 接收者ID
points: 赠送积分数量
message: 附赠消息
Returns:
是否成功
"""
if points <= 0:
return False
if sender_id == receiver_id:
return False # 不能赠送给自己
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 检查赠送者积分是否足够
cursor.execute(
"SELECT available_points FROM user_points WHERE user_id = ?",
(sender_id,)
)
row = cursor.fetchone()
if not row or row[0] < points:
logger.warning(f"用户 {sender_id} 积分不足,需要 {points},当前可用 {row[0] if row else 0}")
return False
# 确保接收者存在
self.get_or_create_user(receiver_id)
# 赠送者扣分
cursor.execute(
"UPDATE user_points SET available_points = available_points - ?, updated_at = ? WHERE user_id = ?",
(points, current_time, sender_id)
)
# 接收者加分
cursor.execute("""
INSERT INTO user_points (user_id, total_points, available_points, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id)
DO UPDATE SET
total_points = total_points + ?,
available_points = available_points + ?,
updated_at = ?
""", (receiver_id, points, points, current_time, current_time, points, points, current_time))
# 记录赠送
cursor.execute(
"INSERT INTO gift_records (sender_id, receiver_id, points, message, created_at) VALUES (?, ?, ?, ?, ?)",
(sender_id, receiver_id, points, message, current_time)
)
# 记录积分变动
cursor.execute(
"INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)",
(sender_id, -points, "gift_send", f"赠送积分给用户{receiver_id}", current_time)
)
cursor.execute(
"INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)",
(receiver_id, points, "gift_receive", f"收到用户{sender_id}的积分赠送", current_time)
)
logger.debug(f"用户 {sender_id} 赠送 {points} 积分给用户 {receiver_id}")
return True
except Exception as e:
logger.error(f"赠送积分失败: {e}")
return False
def get_gift_records_sent(self, user_id: int, limit: int = 20) -> List[Dict]:
"""获取用户发送的赠送记录
Args:
user_id: 用户ID
limit: 限制数量
Returns:
赠送记录列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT gr.*, u.username as receiver_name
FROM gift_records gr
LEFT JOIN users u ON gr.receiver_id = u.user_id
WHERE gr.sender_id = ?
ORDER BY gr.created_at DESC
LIMIT ?
""", (user_id, limit))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_gift_records_received(self, user_id: int, limit: int = 20) -> List[Dict]:
"""获取用户接收的赠送记录
Args:
user_id: 用户ID
limit: 限制数量
Returns:
接收记录列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT gr.*, u.username as sender_name
FROM gift_records gr
LEFT JOIN users u ON gr.sender_id = u.user_id
WHERE gr.receiver_id = ?
ORDER BY gr.created_at DESC
LIMIT ?
""", (user_id, limit))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_gift_stats(self, user_id: int) -> Dict:
"""获取用户赠送统计
Args:
user_id: 用户ID
Returns:
赠送统计信息
"""
cursor = self.conn.cursor()
# 总发送积分
cursor.execute(
"SELECT SUM(points) FROM gift_records WHERE sender_id = ?",
(user_id,)
)
total_sent = cursor.fetchone()[0] or 0
# 总接收积分
cursor.execute(
"SELECT SUM(points) FROM gift_records WHERE receiver_id = ?",
(user_id,)
)
total_received = cursor.fetchone()[0] or 0
# 发送次数
cursor.execute(
"SELECT COUNT(*) FROM gift_records WHERE sender_id = ?",
(user_id,)
)
sent_count = cursor.fetchone()[0]
# 接收次数
cursor.execute(
"SELECT COUNT(*) FROM gift_records WHERE receiver_id = ?",
(user_id,)
)
received_count = cursor.fetchone()[0]
return {
'total_sent': total_sent,
'total_received': total_received,
'sent_count': sent_count,
'received_count': received_count,
'net_gift': total_received - total_sent
}
2025-10-28 13:00:35 +08:00
def close(self):
"""关闭数据库连接"""
if self._conn:
self._conn.close()
self._conn = None
logger.info("数据库连接已关闭")
# 全局数据库实例
_db_instance: Optional[Database] = None
def get_db() -> Database:
"""获取全局数据库实例(单例模式)"""
global _db_instance
if _db_instance is None:
_db_instance = Database()
return _db_instance