"""SQLite数据库操作模块 - 使用标准库sqlite3""" import sqlite3 import time from typing import Any, Optional from ..Convention.Runtime.Config import * from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor from ..Convention.Runtime.Architecture import Architecture from ..Convention.Runtime.File import ToolFile logger: ProjectConfig = Architecture.Get(ProjectConfig) DATABASE_PATH = logger.GetFile(logger.FindItem("database_path", "db.db"), False).GetFullPath() logger.SaveProperties() SCHEDULED_TASK_TABLE = "scheduled_tasks" STATUS_PENDING = "pending" STATUS_RUNNING = "running" STATUS_COMPLETED = "completed" STATUS_FAILED = "failed" class Database: """数据库管理类""" def __init__(self, db_path: str = DATABASE_PATH): """初始化数据库连接 Args: db_path: 数据库文件路径 """ self.db_path = db_path self._conn: Optional[sqlite3.Connection] = None self._ensure_db_exists() self.init_tables() Architecture.Register(Database, self, lambda: None) def _ensure_db_exists(self): """确保数据库目录存在""" db_dir = ToolFile(self.db_path).BackToParentDir() db_dir.MustExistsPath() @property def conn(self) -> sqlite3.Connection: """获取数据库连接(懒加载)""" if self._conn is None: try: self._conn = sqlite3.connect( self.db_path, check_same_thread=False, # 允许多线程访问 isolation_level=None, # 自动提交 timeout=30.0 # 增加超时时间 ) self._conn.row_factory = sqlite3.Row # 支持字典式访问 # 启用WAL模式以提高并发性能 self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA synchronous=NORMAL") self._conn.execute("PRAGMA cache_size=1000") self._conn.execute("PRAGMA temp_store=MEMORY") logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接成功: {self.db_path}{ConsoleFrontColor.RESET}") except Exception as e: logger.Log("Error", f"{ConsoleFrontColor.RED}数据库连接失败: {e}{ConsoleFrontColor.RESET}") raise return self._conn def _table_exists(self, table_name: str) -> bool: """检查表是否存在 Args: table_name: 表名 Returns: 是否存在 """ cursor = self.conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,)) return cursor.fetchone() is not None def define_table(self, table_name: str): """定义表 Args: table_name: 表名 """ if not self._table_exists(table_name): cursor = self.conn.cursor() cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)") logger.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 创建{ConsoleFrontColor.RESET}") return self def _column_exists(self, table_name: str, column_name: str) -> bool: """检查表中列是否存在 Args: table_name: 表名 column_name: 列名 Returns: 是否存在 """ cursor = self.conn.cursor() cursor.execute(f"PRAGMA table_info({table_name})") columns = [row[1] for row in cursor.fetchall()] return column_name in columns def _add_column_if_not_exists(self, table_name: str, column_name: str, column_def: str): """安全地添加列(如果不存在) Args: table_name: 表名 column_name: 列名 column_def: 列定义(如 "INTEGER" 或 "TEXT DEFAULT ''") """ if not self._column_exists(table_name, column_name): try: cursor = self.conn.cursor() cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}") logger.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 添加列 {column_name}{ConsoleFrontColor.RESET}") except Exception as e: logger.Log("Warning", f"{ConsoleFrontColor.YELLOW}添加列失败: {e}{ConsoleFrontColor.RESET}") def define_column(self, table_name: str, column_name: str, column_def: str): """定义列 Args: table_name: 表名 column_name: 列名 column_def: 列定义(如 "INTEGER" 或 "TEXT DEFAULT ''") """ self._add_column_if_not_exists(table_name, column_name, column_def) return self def init_tables(self): """初始化数据库表""" self._ensure_scheduled_tasks_table() def _ensure_scheduled_tasks_table(self) -> None: cursor = self.conn.cursor() cursor.execute( f""" CREATE TABLE IF NOT EXISTS {SCHEDULED_TASK_TABLE} ( id INTEGER PRIMARY KEY AUTOINCREMENT, plugin_module TEXT NOT NULL, plugin_class TEXT, callback_name TEXT NOT NULL, payload TEXT, execute_at INTEGER NOT NULL, status TEXT NOT NULL DEFAULT '{STATUS_PENDING}', attempts INTEGER NOT NULL DEFAULT 0, last_error TEXT, created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL ) """ ) cursor.execute( f"CREATE INDEX IF NOT EXISTS idx_{SCHEDULED_TASK_TABLE}_status_execute_at ON {SCHEDULED_TASK_TABLE}(status, execute_at)" ) def create_scheduled_task( self, plugin_module: str, plugin_class: Optional[str], callback_name: str, payload: Optional[str], execute_at_ms: int ) -> int: now_ms = int(time.time() * 1000) cursor = self.conn.cursor() cursor.execute( f""" INSERT INTO {SCHEDULED_TASK_TABLE} ( plugin_module, plugin_class, callback_name, payload, execute_at, status, attempts, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( plugin_module, plugin_class, callback_name, payload, execute_at_ms, STATUS_PENDING, 0, now_ms, now_ms, ) ) task_id = cursor.lastrowid return int(task_id) def get_due_tasks(self, now_ms: int, limit: int) -> list[sqlite3.Row]: cursor = self.conn.cursor() cursor.execute( f""" SELECT * FROM {SCHEDULED_TASK_TABLE} WHERE status = ? AND execute_at <= ? ORDER BY execute_at ASC LIMIT ? """, (STATUS_PENDING, now_ms, limit) ) return cursor.fetchall() def update_task_status( self, task_id: int, status: str, *, attempts: Optional[int] = None, last_error: Optional[str] = None ) -> None: now_ms = int(time.time() * 1000) sets = ["status = ?"] params: list[Any] = [status] if attempts is not None: sets.append("attempts = ?") params.append(attempts) if last_error is not None: sets.append("last_error = ?") params.append(last_error) params.append(task_id) set_clause = ", ".join(sets) cursor = self.conn.cursor() cursor.execute( f"UPDATE {SCHEDULED_TASK_TABLE} SET {set_clause} WHERE id = ?", params ) def reset_running_tasks(self) -> None: cursor = self.conn.cursor() now_ms = int(time.time() * 1000) cursor.execute( f""" UPDATE {SCHEDULED_TASK_TABLE} SET status = ?, updated_at = ? WHERE status = ? """, (STATUS_PENDING, now_ms, STATUS_RUNNING) ) def close(self): """关闭数据库连接""" if self._conn: self._conn.close() self._conn = None logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接已关闭{ConsoleFrontColor.RESET}") def get_db() -> Database: """获取全局数据库实例(单例模式)""" if not Architecture.Contains(Database): return Database() return Architecture.Get(Database) __all__ = [ "get_db", "Database", "SCHEDULED_TASK_TABLE", "STATUS_PENDING", "STATUS_RUNNING", "STATUS_COMPLETED", "STATUS_FAILED", ]