Files
PWF/CoreModules/database.py
2025-11-08 01:24:50 +08:00

264 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SQLite数据库操作模块 - 使用标准库sqlite3"""
import sqlite3
import time
from typing import Any, Optional
from ..Convention.Runtime.Config import *
from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor
from ..Convention.Runtime.Architecture import Architecture
from ..Convention.Runtime.File import ToolFile
logger: ProjectConfig = Architecture.Get(ProjectConfig)
DATABASE_PATH = logger.GetFile(logger.FindItem("database_path", "db.db"), False).GetFullPath()
logger.SaveProperties()
SCHEDULED_TASK_TABLE = "scheduled_tasks"
STATUS_PENDING = "pending"
STATUS_RUNNING = "running"
STATUS_COMPLETED = "completed"
STATUS_FAILED = "failed"
class Database:
"""数据库管理类"""
def __init__(self, db_path: str = DATABASE_PATH):
"""初始化数据库连接
Args:
db_path: 数据库文件路径
"""
self.db_path = db_path
self._conn: Optional[sqlite3.Connection] = None
self._ensure_db_exists()
self.init_tables()
Architecture.Register(Database, self, lambda: None)
def _ensure_db_exists(self):
"""确保数据库目录存在"""
db_dir = ToolFile(self.db_path).BackToParentDir()
db_dir.MustExistsPath()
@property
def conn(self) -> sqlite3.Connection:
"""获取数据库连接(懒加载)"""
if self._conn is None:
try:
self._conn = sqlite3.connect(
self.db_path,
check_same_thread=False, # 允许多线程访问
isolation_level=None, # 自动提交
timeout=30.0 # 增加超时时间
)
self._conn.row_factory = sqlite3.Row # 支持字典式访问
# 启用WAL模式以提高并发性能
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA synchronous=NORMAL")
self._conn.execute("PRAGMA cache_size=1000")
self._conn.execute("PRAGMA temp_store=MEMORY")
logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接成功: {self.db_path}{ConsoleFrontColor.RESET}")
except Exception as e:
logger.Log("Error", f"{ConsoleFrontColor.RED}数据库连接失败: {e}{ConsoleFrontColor.RESET}")
raise
return self._conn
def _table_exists(self, table_name: str) -> bool:
"""检查表是否存在
Args:
table_name: 表名
Returns:
是否存在
"""
cursor = self.conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
return cursor.fetchone() is not None
def define_table(self, table_name: str):
"""定义表
Args:
table_name: 表名
"""
if not self._table_exists(table_name):
cursor = self.conn.cursor()
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)")
logger.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 创建{ConsoleFrontColor.RESET}")
return self
def _column_exists(self, table_name: str, column_name: str) -> bool:
"""检查表中列是否存在
Args:
table_name: 表名
column_name: 列名
Returns:
是否存在
"""
cursor = self.conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [row[1] for row in cursor.fetchall()]
return column_name in columns
def _add_column_if_not_exists(self, table_name: str, column_name: str, column_def: str):
"""安全地添加列(如果不存在)
Args:
table_name: 表名
column_name: 列名
column_def: 列定义(如 "INTEGER""TEXT DEFAULT ''"
"""
if not self._column_exists(table_name, column_name):
try:
cursor = self.conn.cursor()
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}")
logger.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 添加列 {column_name}{ConsoleFrontColor.RESET}")
except Exception as e:
logger.Log("Warning", f"{ConsoleFrontColor.YELLOW}添加列失败: {e}{ConsoleFrontColor.RESET}")
def define_column(self, table_name: str, column_name: str, column_def: str):
"""定义列
Args:
table_name: 表名
column_name: 列名
column_def: 列定义(如 "INTEGER""TEXT DEFAULT ''"
"""
self._add_column_if_not_exists(table_name, column_name, column_def)
return self
def init_tables(self):
"""初始化数据库表"""
self._ensure_scheduled_tasks_table()
def _ensure_scheduled_tasks_table(self) -> None:
cursor = self.conn.cursor()
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {SCHEDULED_TASK_TABLE} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plugin_module TEXT NOT NULL,
plugin_class TEXT,
callback_name TEXT NOT NULL,
payload TEXT,
execute_at INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT '{STATUS_PENDING}',
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)
"""
)
cursor.execute(
f"CREATE INDEX IF NOT EXISTS idx_{SCHEDULED_TASK_TABLE}_status_execute_at ON {SCHEDULED_TASK_TABLE}(status, execute_at)"
)
def create_scheduled_task(
self,
plugin_module: str,
plugin_class: Optional[str],
callback_name: str,
payload: Optional[str],
execute_at_ms: int
) -> int:
now_ms = int(time.time() * 1000)
cursor = self.conn.cursor()
cursor.execute(
f"""
INSERT INTO {SCHEDULED_TASK_TABLE} (
plugin_module, plugin_class, callback_name, payload,
execute_at, status, attempts, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
plugin_module,
plugin_class,
callback_name,
payload,
execute_at_ms,
STATUS_PENDING,
0,
now_ms,
now_ms,
)
)
task_id = cursor.lastrowid
return int(task_id)
def get_due_tasks(self, now_ms: int, limit: int) -> list[sqlite3.Row]:
cursor = self.conn.cursor()
cursor.execute(
f"""
SELECT * FROM {SCHEDULED_TASK_TABLE}
WHERE status = ? AND execute_at <= ?
ORDER BY execute_at ASC
LIMIT ?
""",
(STATUS_PENDING, now_ms, limit)
)
return cursor.fetchall()
def update_task_status(
self,
task_id: int,
status: str,
*,
attempts: Optional[int] = None,
last_error: Optional[str] = None
) -> None:
now_ms = int(time.time() * 1000)
sets = ["status = ?"]
params: list[Any] = [status]
if attempts is not None:
sets.append("attempts = ?")
params.append(attempts)
if last_error is not None:
sets.append("last_error = ?")
params.append(last_error)
params.append(task_id)
set_clause = ", ".join(sets)
cursor = self.conn.cursor()
cursor.execute(
f"UPDATE {SCHEDULED_TASK_TABLE} SET {set_clause} WHERE id = ?",
params
)
def reset_running_tasks(self) -> None:
cursor = self.conn.cursor()
now_ms = int(time.time() * 1000)
cursor.execute(
f"""
UPDATE {SCHEDULED_TASK_TABLE}
SET status = ?, updated_at = ?
WHERE status = ?
""",
(STATUS_PENDING, now_ms, STATUS_RUNNING)
)
def close(self):
"""关闭数据库连接"""
if self._conn:
self._conn.close()
self._conn = None
logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接已关闭{ConsoleFrontColor.RESET}")
def get_db() -> Database:
"""获取全局数据库实例(单例模式)"""
if not Architecture.Contains(Database):
return Database()
return Architecture.Get(Database)
__all__ = [
"get_db",
"Database",
"SCHEDULED_TASK_TABLE",
"STATUS_PENDING",
"STATUS_RUNNING",
"STATUS_COMPLETED",
"STATUS_FAILED",
]