Files
WPSBot/core/database.py
2025-10-29 11:32:43 +08:00

862 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SQLite数据库操作模块 - 使用标准库sqlite3"""
import sqlite3
import json
import time
import logging
from typing import Optional, Dict, Any, List
from pathlib import Path
from config import DATABASE_PATH
logger = logging.getLogger(__name__)
class Database:
"""数据库管理类"""
def __init__(self, db_path: str = DATABASE_PATH):
"""初始化数据库连接
Args:
db_path: 数据库文件路径
"""
self.db_path = db_path
self._conn: Optional[sqlite3.Connection] = None
self._ensure_db_exists()
self.init_tables()
def _ensure_db_exists(self):
"""确保数据库目录存在"""
db_dir = Path(self.db_path).parent
db_dir.mkdir(parents=True, exist_ok=True)
@property
def conn(self) -> sqlite3.Connection:
"""获取数据库连接(懒加载)"""
if self._conn is None:
self._conn = sqlite3.connect(
self.db_path,
check_same_thread=False, # 允许多线程访问
isolation_level=None # 自动提交
)
self._conn.row_factory = sqlite3.Row # 支持字典式访问
return self._conn
def init_tables(self):
"""初始化数据库表"""
cursor = self.conn.cursor()
# 用户表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
created_at INTEGER NOT NULL,
last_active INTEGER NOT NULL
)
""")
# 游戏状态表
cursor.execute("""
CREATE TABLE IF NOT EXISTS game_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
state_data TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
UNIQUE(chat_id, user_id, game_type)
)
""")
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_chat_user
ON game_states(chat_id, user_id)
""")
# 游戏统计表
cursor.execute("""
CREATE TABLE IF NOT EXISTS game_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
game_type TEXT NOT NULL,
wins INTEGER DEFAULT 0,
losses INTEGER DEFAULT 0,
draws INTEGER DEFAULT 0,
total_plays INTEGER DEFAULT 0,
UNIQUE(user_id, game_type)
)
""")
# 用户积分表
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_points (
user_id INTEGER PRIMARY KEY,
total_points INTEGER DEFAULT 0,
available_points INTEGER DEFAULT 0,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
# 积分记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS points_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
points INTEGER NOT NULL,
source TEXT NOT NULL,
description TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
# 每日签到记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS daily_checkins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
checkin_date TEXT NOT NULL,
points_earned INTEGER NOT NULL,
created_at INTEGER NOT NULL,
UNIQUE(user_id, checkin_date)
)
""")
# 炼金抽奖记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS alchemy_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
cost_points INTEGER NOT NULL,
reward_type TEXT NOT NULL,
reward_value INTEGER NOT NULL,
reward_description TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
""")
# 积分赠送记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS gift_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_id INTEGER NOT NULL,
receiver_id INTEGER NOT NULL,
points INTEGER NOT NULL,
message TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY (sender_id) REFERENCES users (user_id),
FOREIGN KEY (receiver_id) REFERENCES users (user_id)
)
""")
# 创建积分相关索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_points_records_user
ON points_records(user_id, created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_checkins_user_date
ON daily_checkins(user_id, checkin_date)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_alchemy_records_user
ON alchemy_records(user_id, created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_gift_records_sender
ON gift_records(sender_id, created_at)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_gift_records_receiver
ON gift_records(receiver_id, created_at)
""")
logger.info("数据库表初始化完成")
# ===== 用户相关操作 =====
def get_or_create_user(self, user_id: int, username: str = None) -> Dict:
"""获取或创建用户
Args:
user_id: 用户ID
username: 用户名
Returns:
用户信息字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
# 尝试获取用户
cursor.execute(
"SELECT * FROM users WHERE user_id = ?",
(user_id,)
)
user = cursor.fetchone()
if user:
# 更新最后活跃时间
cursor.execute(
"UPDATE users SET last_active = ? WHERE user_id = ?",
(current_time, user_id)
)
return dict(user)
else:
# 创建新用户
cursor.execute(
"INSERT INTO users (user_id, username, created_at, last_active) VALUES (?, ?, ?, ?)",
(user_id, username, current_time, current_time)
)
return {
'user_id': user_id,
'username': username,
'created_at': current_time,
'last_active': current_time
}
# ===== 游戏状态相关操作 =====
def get_game_state(self, chat_id: int, user_id: int, game_type: str) -> Optional[Dict]:
"""获取游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
Returns:
游戏状态字典如果不存在返回None
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(chat_id, user_id, game_type)
)
row = cursor.fetchone()
if row:
state = dict(row)
# 解析JSON数据
if state.get('state_data'):
state['state_data'] = json.loads(state['state_data'])
return state
return None
def save_game_state(self, chat_id: int, user_id: int, game_type: str, state_data: Dict):
"""保存游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
state_data: 状态数据字典
"""
cursor = self.conn.cursor()
current_time = int(time.time())
state_json = json.dumps(state_data, ensure_ascii=False)
cursor.execute("""
INSERT INTO game_states (chat_id, user_id, game_type, state_data, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(chat_id, user_id, game_type)
DO UPDATE SET state_data = ?, updated_at = ?
""", (chat_id, user_id, game_type, state_json, current_time, current_time,
state_json, current_time))
logger.debug(f"保存游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def delete_game_state(self, chat_id: int, user_id: int, game_type: str):
"""删除游戏状态
Args:
chat_id: 会话ID
user_id: 用户ID
game_type: 游戏类型
"""
cursor = self.conn.cursor()
cursor.execute(
"DELETE FROM game_states WHERE chat_id = ? AND user_id = ? AND game_type = ?",
(chat_id, user_id, game_type)
)
logger.debug(f"删除游戏状态: chat_id={chat_id}, user_id={user_id}, game_type={game_type}")
def cleanup_old_sessions(self, timeout: int = 1800):
"""清理过期的游戏会话
Args:
timeout: 超时时间(秒)
"""
cursor = self.conn.cursor()
cutoff_time = int(time.time()) - timeout
cursor.execute(
"DELETE FROM game_states WHERE updated_at < ?",
(cutoff_time,)
)
deleted = cursor.rowcount
if deleted > 0:
logger.info(f"清理了 {deleted} 个过期游戏会话")
# ===== 游戏统计相关操作 =====
def get_game_stats(self, user_id: int, game_type: str) -> Dict:
"""获取游戏统计
Args:
user_id: 用户ID
game_type: 游戏类型
Returns:
统计数据字典
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM game_stats WHERE user_id = ? AND game_type = ?",
(user_id, game_type)
)
row = cursor.fetchone()
if row:
return dict(row)
else:
# 返回默认值
return {
'user_id': user_id,
'game_type': game_type,
'wins': 0,
'losses': 0,
'draws': 0,
'total_plays': 0
}
def update_game_stats(self, user_id: int, game_type: str,
win: bool = False, loss: bool = False, draw: bool = False):
"""更新游戏统计
Args:
user_id: 用户ID
game_type: 游戏类型
win: 是否获胜
loss: 是否失败
draw: 是否平局
"""
cursor = self.conn.cursor()
# 使用UPSERT语法
cursor.execute("""
INSERT INTO game_stats (user_id, game_type, wins, losses, draws, total_plays)
VALUES (?, ?, ?, ?, ?, 1)
ON CONFLICT(user_id, game_type)
DO UPDATE SET
wins = wins + ?,
losses = losses + ?,
draws = draws + ?,
total_plays = total_plays + 1
""", (user_id, game_type, int(win), int(loss), int(draw),
int(win), int(loss), int(draw)))
logger.debug(f"更新游戏统计: user_id={user_id}, game_type={game_type}")
# ===== 积分相关操作 =====
def get_user_points(self, user_id: int) -> Dict:
"""获取用户积分信息
Args:
user_id: 用户ID
Returns:
积分信息字典
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if row:
return dict(row)
else:
# 创建新用户积分记录
current_time = int(time.time())
cursor.execute(
"INSERT INTO user_points (user_id, total_points, available_points, created_at, updated_at) VALUES (?, 0, 0, ?, ?)",
(user_id, current_time, current_time)
)
return {
'user_id': user_id,
'total_points': 0,
'available_points': 0,
'created_at': current_time,
'updated_at': current_time
}
def add_points(self, user_id: int, points: int, source: str, description: str = None) -> bool:
"""添加积分
Args:
user_id: 用户ID
points: 积分数量
source: 积分来源
description: 描述
Returns:
是否成功
"""
if points <= 0:
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 更新用户积分
cursor.execute("""
INSERT INTO user_points (user_id, total_points, available_points, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id)
DO UPDATE SET
total_points = total_points + ?,
available_points = available_points + ?,
updated_at = ?
""", (user_id, points, points, current_time, current_time, points, points, current_time))
# 记录积分变动
cursor.execute(
"INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)",
(user_id, points, source, description, current_time)
)
logger.debug(f"用户 {user_id} 获得 {points} 积分,来源:{source}")
return True
except Exception as e:
logger.error(f"添加积分失败: {e}")
return False
def consume_points(self, user_id: int, points: int, source: str, description: str = None) -> bool:
"""消费积分
Args:
user_id: 用户ID
points: 积分数量
source: 消费来源
description: 描述
Returns:
是否成功
"""
if points <= 0:
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 检查积分是否足够
cursor.execute(
"SELECT available_points FROM user_points WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if not row or row[0] < points:
logger.warning(f"用户 {user_id} 积分不足,需要 {points},当前可用 {row[0] if row else 0}")
return False
# 消费积分
cursor.execute(
"UPDATE user_points SET available_points = available_points - ?, updated_at = ? WHERE user_id = ?",
(points, current_time, user_id)
)
# 记录积分变动
cursor.execute(
"INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)",
(user_id, -points, source, description, current_time)
)
logger.debug(f"用户 {user_id} 消费 {points} 积分,来源:{source}")
return True
except Exception as e:
logger.error(f"消费积分失败: {e}")
return False
def get_points_records(self, user_id: int, limit: int = 20) -> List[Dict]:
"""获取用户积分记录
Args:
user_id: 用户ID
limit: 限制数量
Returns:
积分记录列表
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM points_records WHERE user_id = ? ORDER BY created_at DESC LIMIT ?",
(user_id, limit)
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def check_daily_checkin(self, user_id: int, date: str) -> bool:
"""检查用户是否已签到
Args:
user_id: 用户ID
date: 日期字符串 (YYYY-MM-DD)
Returns:
是否已签到
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT 1 FROM daily_checkins WHERE user_id = ? AND checkin_date = ?",
(user_id, date)
)
return cursor.fetchone() is not None
def daily_checkin(self, user_id: int, points: int) -> bool:
"""每日签到
Args:
user_id: 用户ID
points: 签到积分
Returns:
是否成功
"""
from datetime import datetime
today = datetime.now().strftime('%Y-%m-%d')
if self.check_daily_checkin(user_id, today):
return False
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 记录签到
cursor.execute(
"INSERT INTO daily_checkins (user_id, checkin_date, points_earned, created_at) VALUES (?, ?, ?, ?)",
(user_id, today, points, current_time)
)
# 添加积分
return self.add_points(user_id, points, "daily_checkin", f"每日签到奖励")
except Exception as e:
logger.error(f"每日签到失败: {e}")
return False
def get_points_leaderboard(self, limit: int = 10) -> List[Dict]:
"""获取积分排行榜
Args:
limit: 限制数量
Returns:
排行榜列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT u.user_id, u.username, up.total_points, up.available_points
FROM users u
LEFT JOIN user_points up ON u.user_id = up.user_id
ORDER BY COALESCE(up.total_points, 0) DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def add_alchemy_record(self, user_id: int, cost_points: int, reward_type: str,
reward_value: int, reward_description: str = None) -> bool:
"""添加炼金抽奖记录
Args:
user_id: 用户ID
cost_points: 消耗积分
reward_type: 奖励类型
reward_value: 奖励数值
reward_description: 奖励描述
Returns:
是否成功
"""
cursor = self.conn.cursor()
current_time = int(time.time())
try:
cursor.execute(
"INSERT INTO alchemy_records (user_id, cost_points, reward_type, reward_value, reward_description, created_at) VALUES (?, ?, ?, ?, ?, ?)",
(user_id, cost_points, reward_type, reward_value, reward_description, current_time)
)
logger.debug(f"添加炼金记录: user_id={user_id}, cost={cost_points}, reward={reward_type}={reward_value}")
return True
except Exception as e:
logger.error(f"添加炼金记录失败: {e}")
return False
def get_alchemy_records(self, user_id: int, limit: int = 20) -> List[Dict]:
"""获取用户炼金记录
Args:
user_id: 用户ID
limit: 限制数量
Returns:
炼金记录列表
"""
cursor = self.conn.cursor()
cursor.execute(
"SELECT * FROM alchemy_records WHERE user_id = ? ORDER BY created_at DESC LIMIT ?",
(user_id, limit)
)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_alchemy_stats(self, user_id: int) -> Dict:
"""获取用户炼金统计
Args:
user_id: 用户ID
Returns:
炼金统计信息
"""
cursor = self.conn.cursor()
# 总抽奖次数
cursor.execute(
"SELECT COUNT(*) FROM alchemy_records WHERE user_id = ?",
(user_id,)
)
total_draws = cursor.fetchone()[0]
# 总消耗积分
cursor.execute(
"SELECT SUM(cost_points) FROM alchemy_records WHERE user_id = ?",
(user_id,)
)
total_cost = cursor.fetchone()[0] or 0
# 总获得积分
cursor.execute(
"SELECT SUM(reward_value) FROM alchemy_records WHERE user_id = ? AND reward_type = 'points'",
(user_id,)
)
total_points_gained = cursor.fetchone()[0] or 0
return {
'total_draws': total_draws,
'total_cost': total_cost,
'total_points_gained': total_points_gained,
'net_points': total_points_gained - total_cost
}
# ===== 积分赠送相关操作 =====
def send_gift(self, sender_id: int, receiver_id: int, points: int, message: str = None) -> bool:
"""赠送积分
Args:
sender_id: 赠送者ID
receiver_id: 接收者ID
points: 赠送积分数量
message: 附赠消息
Returns:
是否成功
"""
if points <= 0:
return False
if sender_id == receiver_id:
return False # 不能赠送给自己
cursor = self.conn.cursor()
current_time = int(time.time())
try:
# 检查赠送者积分是否足够
cursor.execute(
"SELECT available_points FROM user_points WHERE user_id = ?",
(sender_id,)
)
row = cursor.fetchone()
if not row or row[0] < points:
logger.warning(f"用户 {sender_id} 积分不足,需要 {points},当前可用 {row[0] if row else 0}")
return False
# 确保接收者存在
self.get_or_create_user(receiver_id)
# 赠送者扣分
cursor.execute(
"UPDATE user_points SET available_points = available_points - ?, updated_at = ? WHERE user_id = ?",
(points, current_time, sender_id)
)
# 接收者加分
cursor.execute("""
INSERT INTO user_points (user_id, total_points, available_points, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(user_id)
DO UPDATE SET
total_points = total_points + ?,
available_points = available_points + ?,
updated_at = ?
""", (receiver_id, points, points, current_time, current_time, points, points, current_time))
# 记录赠送
cursor.execute(
"INSERT INTO gift_records (sender_id, receiver_id, points, message, created_at) VALUES (?, ?, ?, ?, ?)",
(sender_id, receiver_id, points, message, current_time)
)
# 记录积分变动
cursor.execute(
"INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)",
(sender_id, -points, "gift_send", f"赠送积分给用户{receiver_id}", current_time)
)
cursor.execute(
"INSERT INTO points_records (user_id, points, source, description, created_at) VALUES (?, ?, ?, ?, ?)",
(receiver_id, points, "gift_receive", f"收到用户{sender_id}的积分赠送", current_time)
)
logger.debug(f"用户 {sender_id} 赠送 {points} 积分给用户 {receiver_id}")
return True
except Exception as e:
logger.error(f"赠送积分失败: {e}")
return False
def get_gift_records_sent(self, user_id: int, limit: int = 20) -> List[Dict]:
"""获取用户发送的赠送记录
Args:
user_id: 用户ID
limit: 限制数量
Returns:
赠送记录列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT gr.*, u.username as receiver_name
FROM gift_records gr
LEFT JOIN users u ON gr.receiver_id = u.user_id
WHERE gr.sender_id = ?
ORDER BY gr.created_at DESC
LIMIT ?
""", (user_id, limit))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_gift_records_received(self, user_id: int, limit: int = 20) -> List[Dict]:
"""获取用户接收的赠送记录
Args:
user_id: 用户ID
limit: 限制数量
Returns:
接收记录列表
"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT gr.*, u.username as sender_name
FROM gift_records gr
LEFT JOIN users u ON gr.sender_id = u.user_id
WHERE gr.receiver_id = ?
ORDER BY gr.created_at DESC
LIMIT ?
""", (user_id, limit))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_gift_stats(self, user_id: int) -> Dict:
"""获取用户赠送统计
Args:
user_id: 用户ID
Returns:
赠送统计信息
"""
cursor = self.conn.cursor()
# 总发送积分
cursor.execute(
"SELECT SUM(points) FROM gift_records WHERE sender_id = ?",
(user_id,)
)
total_sent = cursor.fetchone()[0] or 0
# 总接收积分
cursor.execute(
"SELECT SUM(points) FROM gift_records WHERE receiver_id = ?",
(user_id,)
)
total_received = cursor.fetchone()[0] or 0
# 发送次数
cursor.execute(
"SELECT COUNT(*) FROM gift_records WHERE sender_id = ?",
(user_id,)
)
sent_count = cursor.fetchone()[0]
# 接收次数
cursor.execute(
"SELECT COUNT(*) FROM gift_records WHERE receiver_id = ?",
(user_id,)
)
received_count = cursor.fetchone()[0]
return {
'total_sent': total_sent,
'total_received': total_received,
'sent_count': sent_count,
'received_count': received_count,
'net_gift': total_received - total_sent
}
def close(self):
"""关闭数据库连接"""
if self._conn:
self._conn.close()
self._conn = None
logger.info("数据库连接已关闭")
# 全局数据库实例
_db_instance: Optional[Database] = None
def get_db() -> Database:
"""获取全局数据库实例(单例模式)"""
global _db_instance
if _db_instance is None:
_db_instance = Database()
return _db_instance