Files
PWF/CoreModules/database.py
2025-11-05 16:21:05 +08:00

153 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SQLite数据库操作模块 - 使用标准库sqlite3"""
import sqlite3
import json
import time
from typing import *
from Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor
from Convention.Runtime.Architecture import Architecture
from Convention.Runtime.File import ToolFile
config = ProjectConfig()
DATABASE_PATH = config.GetFile(config.FindItem("database_path", "db.db"), False).GetFullPath()
class Database:
"""数据库管理类"""
def __init__(self, db_path: str = DATABASE_PATH):
"""初始化数据库连接
Args:
db_path: 数据库文件路径
"""
self.db_path = db_path
self._conn: Optional[sqlite3.Connection] = None
self._ensure_db_exists()
self.init_tables()
Architecture.Register(Database, self, lambda: None)
def _ensure_db_exists(self):
"""确保数据库目录存在"""
db_dir = ToolFile(self.db_path).BackToParentDir()
db_dir.MustExistsPath()
@property
def conn(self) -> sqlite3.Connection:
"""获取数据库连接(懒加载)"""
if self._conn is None:
try:
self._conn = sqlite3.connect(
self.db_path,
check_same_thread=False, # 允许多线程访问
isolation_level=None, # 自动提交
timeout=30.0 # 增加超时时间
)
self._conn.row_factory = sqlite3.Row # 支持字典式访问
# 启用WAL模式以提高并发性能
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._conn.execute("PRAGMA cache_size=1000")
self._conn.execute("PRAGMA temp_store=MEMORY")
config.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接成功: {self.db_path}{ConsoleFrontColor.RESET}")
except Exception as e:
config.Log("Error", f"{ConsoleFrontColor.RED}数据库连接失败: {e}{ConsoleFrontColor.RESET}", exc_info=True)
raise
return self._conn
def _table_exists(self, table_name: str) -> bool:
"""检查表是否存在
Args:
table_name: 表名
Returns:
是否存在
"""
cursor = self.conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
return cursor.fetchone() is not None
def define_table(self, table_name: str):
"""定义表
Args:
table_name: 表名
"""
if not self._table_exists(table_name):
cursor = self.conn.cursor()
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)")
config.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 创建{ConsoleFrontColor.RESET}")
return self
def _column_exists(self, table_name: str, column_name: str) -> bool:
"""检查表中列是否存在
Args:
table_name: 表名
column_name: 列名
Returns:
是否存在
"""
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
def _add_column_if_not_exists(self, table_name: str, column_name: str, column_def: str):
"""安全地添加列(如果不存在)
Args:
table_name: 表名
column_name: 列名
column_def: 列定义(如 "INTEGER""TEXT DEFAULT ''"
"""
if not self._column_exists(table_name, column_name):
try:
cursor = self.conn.cursor()
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}")
config.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 添加列 {column_name}{ConsoleFrontColor.RESET}")
except Exception as e:
config.Log("Warning", f"{ConsoleFrontColor.YELLOW}添加列失败: {e}{ConsoleFrontColor.RESET}")
def define_column(self, table_name: str, column_name: str, column_def: str):
"""定义列
Args:
table_name: 表名
column_name: 列名
column_def: 列定义(如 "INTEGER""TEXT DEFAULT ''"
"""
self._add_column_if_not_exists(table_name, column_name, column_def)
return self
def init_tables(self):
"""初始化数据库表"""
cursor = self.conn.cursor()
# 用户表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
created_at INTEGER NOT NULL,
last_active INTEGER NOT NULL
)
""")
def close(self):
"""关闭数据库连接"""
if self._conn:
self._conn.close()
self._conn = None
config.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接已关闭{ConsoleFrontColor.RESET}")
def get_db() -> Database:
"""获取全局数据库实例(单例模式)"""
if not Architecture.Contains(Database):
return Database()
return Architecture.Get(Database)
__all__ = ["get_db"]