Files
PWF/CoreModules/database.py

264 lines
8.8 KiB
Python
Raw Permalink Normal View History

2025-11-05 16:21:05 +08:00
"""SQLite数据库操作模块 - 使用标准库sqlite3"""
import sqlite3
2025-11-08 01:24:50 +08:00
import time
from typing import Any, Optional
2025-11-07 15:52:06 +08:00
from ..Convention.Runtime.Config import *
2025-11-05 23:23:42 +08:00
from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor
from ..Convention.Runtime.Architecture import Architecture
from ..Convention.Runtime.File import ToolFile
2025-11-05 16:21:05 +08:00
2025-11-07 23:47:33 +08:00
logger: ProjectConfig = Architecture.Get(ProjectConfig)
DATABASE_PATH = logger.GetFile(logger.FindItem("database_path", "db.db"), False).GetFullPath()
logger.SaveProperties()
2025-11-05 16:21:05 +08:00
2025-11-08 01:24:50 +08:00
SCHEDULED_TASK_TABLE = "scheduled_tasks"
STATUS_PENDING = "pending"
STATUS_RUNNING = "running"
STATUS_COMPLETED = "completed"
STATUS_FAILED = "failed"
2025-11-05 16:21:05 +08:00
class Database:
"""数据库管理类"""
def __init__(self, db_path: str = DATABASE_PATH):
"""初始化数据库连接
Args:
db_path: 数据库文件路径
"""
self.db_path = db_path
self._conn: Optional[sqlite3.Connection] = None
self._ensure_db_exists()
self.init_tables()
Architecture.Register(Database, self, lambda: None)
def _ensure_db_exists(self):
"""确保数据库目录存在"""
db_dir = ToolFile(self.db_path).BackToParentDir()
db_dir.MustExistsPath()
@property
def conn(self) -> sqlite3.Connection:
"""获取数据库连接(懒加载)"""
if self._conn is None:
try:
self._conn = sqlite3.connect(
self.db_path,
check_same_thread=False, # 允许多线程访问
isolation_level=None, # 自动提交
timeout=30.0 # 增加超时时间
)
self._conn.row_factory = sqlite3.Row # 支持字典式访问
# 启用WAL模式以提高并发性能
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._conn.execute("PRAGMA cache_size=1000")
self._conn.execute("PRAGMA temp_store=MEMORY")
2025-11-07 23:47:33 +08:00
logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接成功: {self.db_path}{ConsoleFrontColor.RESET}")
2025-11-05 16:21:05 +08:00
except Exception as e:
2025-11-07 23:47:33 +08:00
logger.Log("Error", f"{ConsoleFrontColor.RED}数据库连接失败: {e}{ConsoleFrontColor.RESET}")
2025-11-05 16:21:05 +08:00
raise
return self._conn
def _table_exists(self, table_name: str) -> bool:
"""检查表是否存在
Args:
table_name: 表名
Returns:
是否存在
"""
cursor = self.conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
return cursor.fetchone() is not None
def define_table(self, table_name: str):
"""定义表
Args:
table_name: 表名
"""
if not self._table_exists(table_name):
cursor = self.conn.cursor()
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)")
2025-11-07 23:47:33 +08:00
logger.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 创建{ConsoleFrontColor.RESET}")
2025-11-05 16:21:05 +08:00
return self
def _column_exists(self, table_name: str, column_name: str) -> bool:
"""检查表中列是否存在
Args:
table_name: 表名
column_name: 列名
Returns:
是否存在
"""
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
def _add_column_if_not_exists(self, table_name: str, column_name: str, column_def: str):
"""安全地添加列(如果不存在)
Args:
table_name: 表名
column_name: 列名
column_def: 列定义 "INTEGER" "TEXT DEFAULT ''"
"""
if not self._column_exists(table_name, column_name):
try:
cursor = self.conn.cursor()
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}")
2025-11-07 23:47:33 +08:00
logger.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 添加列 {column_name}{ConsoleFrontColor.RESET}")
2025-11-05 16:21:05 +08:00
except Exception as e:
2025-11-07 23:47:33 +08:00
logger.Log("Warning", f"{ConsoleFrontColor.YELLOW}添加列失败: {e}{ConsoleFrontColor.RESET}")
2025-11-05 16:21:05 +08:00
def define_column(self, table_name: str, column_name: str, column_def: str):
"""定义列
Args:
table_name: 表名
column_name: 列名
column_def: 列定义 "INTEGER" "TEXT DEFAULT ''"
"""
self._add_column_if_not_exists(table_name, column_name, column_def)
return self
def init_tables(self):
"""初始化数据库表"""
2025-11-08 01:24:50 +08:00
self._ensure_scheduled_tasks_table()
def _ensure_scheduled_tasks_table(self) -> None:
cursor = self.conn.cursor()
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {SCHEDULED_TASK_TABLE} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plugin_module TEXT NOT NULL,
plugin_class TEXT,
callback_name TEXT NOT NULL,
payload TEXT,
execute_at INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT '{STATUS_PENDING}',
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)
"""
)
cursor.execute(
f"CREATE INDEX IF NOT EXISTS idx_{SCHEDULED_TASK_TABLE}_status_execute_at ON {SCHEDULED_TASK_TABLE}(status, execute_at)"
)
def create_scheduled_task(
self,
plugin_module: str,
plugin_class: Optional[str],
callback_name: str,
payload: Optional[str],
execute_at_ms: int
) -> int:
now_ms = int(time.time() * 1000)
cursor = self.conn.cursor()
cursor.execute(
f"""
INSERT INTO {SCHEDULED_TASK_TABLE} (
plugin_module, plugin_class, callback_name, payload,
execute_at, status, attempts, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
plugin_module,
plugin_class,
callback_name,
payload,
execute_at_ms,
STATUS_PENDING,
0,
now_ms,
now_ms,
)
)
task_id = cursor.lastrowid
return int(task_id)
def get_due_tasks(self, now_ms: int, limit: int) -> list[sqlite3.Row]:
cursor = self.conn.cursor()
cursor.execute(
f"""
SELECT * FROM {SCHEDULED_TASK_TABLE}
WHERE status = ? AND execute_at <= ?
ORDER BY execute_at ASC
LIMIT ?
""",
(STATUS_PENDING, now_ms, limit)
)
return cursor.fetchall()
def update_task_status(
self,
task_id: int,
status: str,
*,
attempts: Optional[int] = None,
last_error: Optional[str] = None
) -> None:
now_ms = int(time.time() * 1000)
sets = ["status = ?"]
params: list[Any] = [status]
if attempts is not None:
sets.append("attempts = ?")
params.append(attempts)
if last_error is not None:
sets.append("last_error = ?")
params.append(last_error)
params.append(task_id)
set_clause = ", ".join(sets)
cursor = self.conn.cursor()
cursor.execute(
f"UPDATE {SCHEDULED_TASK_TABLE} SET {set_clause} WHERE id = ?",
params
)
def reset_running_tasks(self) -> None:
cursor = self.conn.cursor()
now_ms = int(time.time() * 1000)
cursor.execute(
f"""
UPDATE {SCHEDULED_TASK_TABLE}
SET status = ?, updated_at = ?
WHERE status = ?
""",
(STATUS_PENDING, now_ms, STATUS_RUNNING)
)
2025-11-05 16:21:05 +08:00
def close(self):
"""关闭数据库连接"""
if self._conn:
self._conn.close()
self._conn = None
2025-11-07 23:47:33 +08:00
logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接已关闭{ConsoleFrontColor.RESET}")
2025-11-05 16:21:05 +08:00
2025-11-08 01:24:50 +08:00
2025-11-05 16:21:05 +08:00
def get_db() -> Database:
"""获取全局数据库实例(单例模式)"""
if not Architecture.Contains(Database):
return Database()
return Architecture.Get(Database)
2025-11-08 01:24:50 +08:00
__all__ = [
"get_db",
"Database",
"SCHEDULED_TASK_TABLE",
"STATUS_PENDING",
"STATUS_RUNNING",
"STATUS_COMPLETED",
"STATUS_FAILED",
]