264 lines
8.8 KiB
Python
264 lines
8.8 KiB
Python
"""SQLite数据库操作模块 - 使用标准库sqlite3"""
|
||
import sqlite3
|
||
import time
|
||
from typing import Any, Optional
|
||
from ..Convention.Runtime.Config import *
|
||
from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor
|
||
from ..Convention.Runtime.Architecture import Architecture
|
||
from ..Convention.Runtime.File import ToolFile
|
||
|
||
logger: ProjectConfig = Architecture.Get(ProjectConfig)
|
||
DATABASE_PATH = logger.GetFile(logger.FindItem("database_path", "db.db"), False).GetFullPath()
|
||
logger.SaveProperties()
|
||
|
||
SCHEDULED_TASK_TABLE = "scheduled_tasks"
|
||
STATUS_PENDING = "pending"
|
||
STATUS_RUNNING = "running"
|
||
STATUS_COMPLETED = "completed"
|
||
STATUS_FAILED = "failed"
|
||
|
||
class Database:
|
||
"""数据库管理类"""
|
||
|
||
def __init__(self, db_path: str = DATABASE_PATH):
|
||
"""初始化数据库连接
|
||
|
||
Args:
|
||
db_path: 数据库文件路径
|
||
"""
|
||
self.db_path = db_path
|
||
self._conn: Optional[sqlite3.Connection] = None
|
||
self._ensure_db_exists()
|
||
self.init_tables()
|
||
Architecture.Register(Database, self, lambda: None)
|
||
|
||
def _ensure_db_exists(self):
|
||
"""确保数据库目录存在"""
|
||
db_dir = ToolFile(self.db_path).BackToParentDir()
|
||
db_dir.MustExistsPath()
|
||
|
||
@property
|
||
def conn(self) -> sqlite3.Connection:
|
||
"""获取数据库连接(懒加载)"""
|
||
if self._conn is None:
|
||
try:
|
||
self._conn = sqlite3.connect(
|
||
self.db_path,
|
||
check_same_thread=False, # 允许多线程访问
|
||
isolation_level=None, # 自动提交
|
||
timeout=30.0 # 增加超时时间
|
||
)
|
||
self._conn.row_factory = sqlite3.Row # 支持字典式访问
|
||
|
||
# 启用WAL模式以提高并发性能
|
||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||
self._conn.execute("PRAGMA synchronous=NORMAL")
|
||
self._conn.execute("PRAGMA cache_size=1000")
|
||
self._conn.execute("PRAGMA temp_store=MEMORY")
|
||
|
||
logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接成功: {self.db_path}{ConsoleFrontColor.RESET}")
|
||
except Exception as e:
|
||
logger.Log("Error", f"{ConsoleFrontColor.RED}数据库连接失败: {e}{ConsoleFrontColor.RESET}")
|
||
raise
|
||
return self._conn
|
||
|
||
def _table_exists(self, table_name: str) -> bool:
|
||
"""检查表是否存在
|
||
|
||
Args:
|
||
table_name: 表名
|
||
|
||
Returns:
|
||
是否存在
|
||
"""
|
||
cursor = self.conn.cursor()
|
||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
|
||
return cursor.fetchone() is not None
|
||
|
||
def define_table(self, table_name: str):
|
||
"""定义表
|
||
|
||
Args:
|
||
table_name: 表名
|
||
"""
|
||
if not self._table_exists(table_name):
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)")
|
||
logger.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 创建{ConsoleFrontColor.RESET}")
|
||
return self
|
||
|
||
def _column_exists(self, table_name: str, column_name: str) -> bool:
|
||
"""检查表中列是否存在
|
||
|
||
Args:
|
||
table_name: 表名
|
||
column_name: 列名
|
||
|
||
Returns:
|
||
是否存在
|
||
"""
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(f"PRAGMA table_info({table_name})")
|
||
columns = [row[1] for row in cursor.fetchall()]
|
||
return column_name in columns
|
||
|
||
def _add_column_if_not_exists(self, table_name: str, column_name: str, column_def: str):
|
||
"""安全地添加列(如果不存在)
|
||
|
||
Args:
|
||
table_name: 表名
|
||
column_name: 列名
|
||
column_def: 列定义(如 "INTEGER" 或 "TEXT DEFAULT ''")
|
||
"""
|
||
if not self._column_exists(table_name, column_name):
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}")
|
||
logger.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 添加列 {column_name}{ConsoleFrontColor.RESET}")
|
||
except Exception as e:
|
||
logger.Log("Warning", f"{ConsoleFrontColor.YELLOW}添加列失败: {e}{ConsoleFrontColor.RESET}")
|
||
|
||
def define_column(self, table_name: str, column_name: str, column_def: str):
|
||
"""定义列
|
||
|
||
Args:
|
||
table_name: 表名
|
||
column_name: 列名
|
||
column_def: 列定义(如 "INTEGER" 或 "TEXT DEFAULT ''")
|
||
"""
|
||
self._add_column_if_not_exists(table_name, column_name, column_def)
|
||
return self
|
||
|
||
def init_tables(self):
|
||
"""初始化数据库表"""
|
||
self._ensure_scheduled_tasks_table()
|
||
|
||
def _ensure_scheduled_tasks_table(self) -> None:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
CREATE TABLE IF NOT EXISTS {SCHEDULED_TASK_TABLE} (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
plugin_module TEXT NOT NULL,
|
||
plugin_class TEXT,
|
||
callback_name TEXT NOT NULL,
|
||
payload TEXT,
|
||
execute_at INTEGER NOT NULL,
|
||
status TEXT NOT NULL DEFAULT '{STATUS_PENDING}',
|
||
attempts INTEGER NOT NULL DEFAULT 0,
|
||
last_error TEXT,
|
||
created_at INTEGER NOT NULL,
|
||
updated_at INTEGER NOT NULL
|
||
)
|
||
"""
|
||
)
|
||
cursor.execute(
|
||
f"CREATE INDEX IF NOT EXISTS idx_{SCHEDULED_TASK_TABLE}_status_execute_at ON {SCHEDULED_TASK_TABLE}(status, execute_at)"
|
||
)
|
||
|
||
def create_scheduled_task(
|
||
self,
|
||
plugin_module: str,
|
||
plugin_class: Optional[str],
|
||
callback_name: str,
|
||
payload: Optional[str],
|
||
execute_at_ms: int
|
||
) -> int:
|
||
now_ms = int(time.time() * 1000)
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
INSERT INTO {SCHEDULED_TASK_TABLE} (
|
||
plugin_module, plugin_class, callback_name, payload,
|
||
execute_at, status, attempts, created_at, updated_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
plugin_module,
|
||
plugin_class,
|
||
callback_name,
|
||
payload,
|
||
execute_at_ms,
|
||
STATUS_PENDING,
|
||
0,
|
||
now_ms,
|
||
now_ms,
|
||
)
|
||
)
|
||
task_id = cursor.lastrowid
|
||
return int(task_id)
|
||
|
||
def get_due_tasks(self, now_ms: int, limit: int) -> list[sqlite3.Row]:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(
|
||
f"""
|
||
SELECT * FROM {SCHEDULED_TASK_TABLE}
|
||
WHERE status = ? AND execute_at <= ?
|
||
ORDER BY execute_at ASC
|
||
LIMIT ?
|
||
""",
|
||
(STATUS_PENDING, now_ms, limit)
|
||
)
|
||
return cursor.fetchall()
|
||
|
||
def update_task_status(
|
||
self,
|
||
task_id: int,
|
||
status: str,
|
||
*,
|
||
attempts: Optional[int] = None,
|
||
last_error: Optional[str] = None
|
||
) -> None:
|
||
now_ms = int(time.time() * 1000)
|
||
sets = ["status = ?"]
|
||
params: list[Any] = [status]
|
||
if attempts is not None:
|
||
sets.append("attempts = ?")
|
||
params.append(attempts)
|
||
if last_error is not None:
|
||
sets.append("last_error = ?")
|
||
params.append(last_error)
|
||
params.append(task_id)
|
||
set_clause = ", ".join(sets)
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(
|
||
f"UPDATE {SCHEDULED_TASK_TABLE} SET {set_clause} WHERE id = ?",
|
||
params
|
||
)
|
||
|
||
def reset_running_tasks(self) -> None:
|
||
cursor = self.conn.cursor()
|
||
now_ms = int(time.time() * 1000)
|
||
cursor.execute(
|
||
f"""
|
||
UPDATE {SCHEDULED_TASK_TABLE}
|
||
SET status = ?, updated_at = ?
|
||
WHERE status = ?
|
||
""",
|
||
(STATUS_PENDING, now_ms, STATUS_RUNNING)
|
||
)
|
||
|
||
def close(self):
|
||
"""关闭数据库连接"""
|
||
if self._conn:
|
||
self._conn.close()
|
||
self._conn = None
|
||
logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接已关闭{ConsoleFrontColor.RESET}")
|
||
|
||
|
||
def get_db() -> Database:
|
||
"""获取全局数据库实例(单例模式)"""
|
||
if not Architecture.Contains(Database):
|
||
return Database()
|
||
return Architecture.Get(Database)
|
||
|
||
__all__ = [
|
||
"get_db",
|
||
"Database",
|
||
"SCHEDULED_TASK_TABLE",
|
||
"STATUS_PENDING",
|
||
"STATUS_RUNNING",
|
||
"STATUS_COMPLETED",
|
||
"STATUS_FAILED",
|
||
]
|