From b5fe23342df074068a87446c9094036e2e36ba92 Mon Sep 17 00:00:00 2001 From: ninemine <1371605831@qq.com> Date: Sat, 8 Nov 2025 01:24:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=AE=9A=E6=97=B6=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Application/web.py | 21 ++-- CoreModules/clock_scheduler.py | 182 ++++++++++++++++++++++++++++++++ CoreModules/database.py | 126 +++++++++++++++++++++- CoreModules/plugin_interface.py | 40 +++++++ 4 files changed, 358 insertions(+), 11 deletions(-) create mode 100644 CoreModules/clock_scheduler.py diff --git a/Application/web.py b/Application/web.py index 7522ac1..11d0c44 100644 --- a/Application/web.py +++ b/Application/web.py @@ -4,6 +4,7 @@ from fastapi.responses import JSONResponse from contextlib import asynccontextmanager from ..CoreModules.middleware import ConcurrencyLimitMiddleware from ..CoreModules.plugin_interface import ImportPlugins +from ..CoreModules.clock_scheduler import get_clock_scheduler from ..CoreRouters import callback, health from ..Convention.Runtime.GlobalConfig import * from ..Convention.Runtime.Architecture import Architecture @@ -16,6 +17,8 @@ APP_CONFIG = { @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" + scheduler = get_clock_scheduler() + await scheduler.start() # 启动 logger.Log("Info", "应用启动中...") @@ -25,17 +28,17 @@ async def lifespan(app: FastAPI): # 启动后台清理 logger.Log("Info", "启动后台清理...") - yield - - # 关闭 try: - logger.Log("Info", "关闭应用...") - # await cleanup_task - except asyncio.CancelledError: - pass + yield finally: - logger.Log("Info", "关闭应用完成...") - # db.close() + # 关闭 + try: + logger.Log("Info", "关闭应用...") + await scheduler.stop() + except asyncio.CancelledError: + pass + finally: + logger.Log("Info", "关闭应用完成...") def generate_app(kwargs: dict) -> FastAPI: ''' diff --git a/CoreModules/clock_scheduler.py b/CoreModules/clock_scheduler.py new file mode 100644 index 0000000..59a5f17 --- /dev/null +++ b/CoreModules/clock_scheduler.py @@ -0,0 +1,182 @@ +import asyncio +import importlib +import inspect +import json +import time +from typing import Any, Callable, Dict, Optional, Sequence + +from ..Convention.Runtime.Architecture import Architecture +from ..Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig +from ..CoreModules.database import ( + STATUS_COMPLETED, + STATUS_FAILED, + STATUS_PENDING, + STATUS_RUNNING, + Database, + get_db, +) + + +class ClockScheduler: + _instance: Optional["ClockScheduler"] = None + + def __init__(self) -> None: + self._db: Database = get_db() + self._config: ProjectConfig = Architecture.Get(ProjectConfig) + self._logger = self._config + self._tick_ms = max(int(self._config.FindItem("scheduler_tick_ms", 1000)), 10) + self._batch_size = max(int(self._config.FindItem("scheduler_max_batch", 50)), 1) + self._loop_task: Optional[asyncio.Task] = None + self._running = False + self._lock = asyncio.Lock() + Architecture.Register(ClockScheduler, self, lambda: None) + self._config.SaveProperties() + + @classmethod + def instance(cls) -> "ClockScheduler": + if not Architecture.Contains(ClockScheduler): + cls._instance = ClockScheduler() + return Architecture.Get(ClockScheduler) + + async def start(self) -> None: + async with self._lock: + if self._running: + return + self._running = True + self._db.reset_running_tasks() + self._loop_task = asyncio.create_task(self._run_loop()) + self._logger.Log( + "Info", + f"{ConsoleFrontColor.GREEN}ClockScheduler started with tick {self._tick_ms} ms{ConsoleFrontColor.RESET}", + ) + + async def stop(self) -> None: + async with self._lock: + if not self._running: + return + self._running = False + if self._loop_task: + self._loop_task.cancel() + try: + await self._loop_task + except asyncio.CancelledError: + pass + self._loop_task = None + self._logger.Log( + "Info", + f"{ConsoleFrontColor.GREEN}ClockScheduler stopped{ConsoleFrontColor.RESET}", + ) + + def register_task( + self, + plugin_module: str, + plugin_class: Optional[str], + callback_name: str, + delay_ms: int, + *, + args: Optional[Sequence[Any]] = None, + kwargs: Optional[Dict[str, Any]] = None, + ) -> int: + execute_at = int(time.time() * 1000) + max(delay_ms, 0) + payload = json.dumps({ + "args": list(args) if args else [], + "kwargs": kwargs or {}, + }) + task_id = self._db.create_scheduled_task( + plugin_module, + plugin_class, + callback_name, + payload, + execute_at, + ) + self._logger.Log( + "Info", + f"{ConsoleFrontColor.LIGHTCYAN_EX}Scheduled task {task_id} for {plugin_module}.{callback_name} at {execute_at}{ConsoleFrontColor.RESET}", + ) + return task_id + + async def _run_loop(self) -> None: + try: + while self._running: + await self._process_due_tasks() + await asyncio.sleep(self._tick_ms / 1000) + except asyncio.CancelledError: + pass + finally: + self._running = False + + async def _process_due_tasks(self) -> None: + now_ms = int(time.time() * 1000) + tasks = self._db.get_due_tasks(now_ms, self._batch_size) + for task in tasks: + task_id = int(task["id"]) + attempts = int(task["attempts"]) + try: + self._db.update_task_status(task_id, STATUS_RUNNING, attempts=attempts) + await self._execute_task(task) + self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts) + except Exception as exc: # pylint: disable=broad-except + message = f"{type(exc).__name__}: {exc}" + self._logger.Log( + "Error", + f"{ConsoleFrontColor.RED}Task {task_id} failed: {message}{ConsoleFrontColor.RESET}", + ) + self._db.update_task_status( + task_id, + STATUS_FAILED, + attempts=attempts + 1, + last_error=message, + ) + + async def _execute_task(self, task: Any) -> None: + plugin_module = task["plugin_module"] + plugin_class = task["plugin_class"] + callback_name = task["callback_name"] + payload_raw = task["payload"] + args, kwargs = self._decode_payload(payload_raw) + callback = self._resolve_callback(plugin_module, plugin_class, callback_name) + result = callback(*args, **kwargs) + if inspect.isawaitable(result): + await result + + def _resolve_callback( + self, + plugin_module: str, + plugin_class: Optional[str], + callback_name: str, + ) -> Callable[..., Any]: + module = importlib.import_module(plugin_module) + if plugin_class: + target_class = getattr(module, plugin_class) + if Architecture.Contains(target_class): + instance = Architecture.Get(target_class) + else: + instance = target_class() + callback = getattr(instance, callback_name) + else: + callback = getattr(module, callback_name) + return callback + + def _decode_payload(self, payload_raw: Optional[str]) -> tuple[list[Any], Dict[str, Any]]: + if not payload_raw: + return [], {} + try: + payload_obj = json.loads(payload_raw) + args = payload_obj.get("args", []) + kwargs = payload_obj.get("kwargs", {}) + if not isinstance(args, list) or not isinstance(kwargs, dict): + raise ValueError("Invalid payload structure") + return args, kwargs + except Exception as exc: # pylint: disable=broad-except + self._logger.Log( + "Warning", + f"{ConsoleFrontColor.YELLOW}Failed to decode payload: {exc}{ConsoleFrontColor.RESET}", + ) + return [], {} + + +def get_clock_scheduler() -> ClockScheduler: + return ClockScheduler.instance() + + +__all__ = ["ClockScheduler", "get_clock_scheduler"] diff --git a/CoreModules/database.py b/CoreModules/database.py index 4d1bda9..82e001c 100644 --- a/CoreModules/database.py +++ b/CoreModules/database.py @@ -1,5 +1,7 @@ """SQLite数据库操作模块 - 使用标准库sqlite3""" import sqlite3 +import time +from typing import Any, Optional from ..Convention.Runtime.Config import * from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor from ..Convention.Runtime.Architecture import Architecture @@ -9,6 +11,12 @@ logger: ProjectConfig = Architecture.Get(ProjectConfig) DATABASE_PATH = logger.GetFile(logger.FindItem("database_path", "db.db"), False).GetFullPath() logger.SaveProperties() +SCHEDULED_TASK_TABLE = "scheduled_tasks" +STATUS_PENDING = "pending" +STATUS_RUNNING = "running" +STATUS_COMPLETED = "completed" +STATUS_FAILED = "failed" + class Database: """数据库管理类""" @@ -123,7 +131,112 @@ class Database: def init_tables(self): """初始化数据库表""" - pass + self._ensure_scheduled_tasks_table() + + def _ensure_scheduled_tasks_table(self) -> None: + cursor = self.conn.cursor() + cursor.execute( + f""" + CREATE TABLE IF NOT EXISTS {SCHEDULED_TASK_TABLE} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + plugin_module TEXT NOT NULL, + plugin_class TEXT, + callback_name TEXT NOT NULL, + payload TEXT, + execute_at INTEGER NOT NULL, + status TEXT NOT NULL DEFAULT '{STATUS_PENDING}', + attempts INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ) + """ + ) + cursor.execute( + f"CREATE INDEX IF NOT EXISTS idx_{SCHEDULED_TASK_TABLE}_status_execute_at ON {SCHEDULED_TASK_TABLE}(status, execute_at)" + ) + + def create_scheduled_task( + self, + plugin_module: str, + plugin_class: Optional[str], + callback_name: str, + payload: Optional[str], + execute_at_ms: int + ) -> int: + now_ms = int(time.time() * 1000) + cursor = self.conn.cursor() + cursor.execute( + f""" + INSERT INTO {SCHEDULED_TASK_TABLE} ( + plugin_module, plugin_class, callback_name, payload, + execute_at, status, attempts, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + plugin_module, + plugin_class, + callback_name, + payload, + execute_at_ms, + STATUS_PENDING, + 0, + now_ms, + now_ms, + ) + ) + task_id = cursor.lastrowid + return int(task_id) + + def get_due_tasks(self, now_ms: int, limit: int) -> list[sqlite3.Row]: + cursor = self.conn.cursor() + cursor.execute( + f""" + SELECT * FROM {SCHEDULED_TASK_TABLE} + WHERE status = ? AND execute_at <= ? + ORDER BY execute_at ASC + LIMIT ? + """, + (STATUS_PENDING, now_ms, limit) + ) + return cursor.fetchall() + + def update_task_status( + self, + task_id: int, + status: str, + *, + attempts: Optional[int] = None, + last_error: Optional[str] = None + ) -> None: + now_ms = int(time.time() * 1000) + sets = ["status = ?"] + params: list[Any] = [status] + if attempts is not None: + sets.append("attempts = ?") + params.append(attempts) + if last_error is not None: + sets.append("last_error = ?") + params.append(last_error) + params.append(task_id) + set_clause = ", ".join(sets) + cursor = self.conn.cursor() + cursor.execute( + f"UPDATE {SCHEDULED_TASK_TABLE} SET {set_clause} WHERE id = ?", + params + ) + + def reset_running_tasks(self) -> None: + cursor = self.conn.cursor() + now_ms = int(time.time() * 1000) + cursor.execute( + f""" + UPDATE {SCHEDULED_TASK_TABLE} + SET status = ?, updated_at = ? + WHERE status = ? + """, + (STATUS_PENDING, now_ms, STATUS_RUNNING) + ) def close(self): """关闭数据库连接""" @@ -132,10 +245,19 @@ class Database: self._conn = None logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接已关闭{ConsoleFrontColor.RESET}") + def get_db() -> Database: """获取全局数据库实例(单例模式)""" if not Architecture.Contains(Database): return Database() return Architecture.Get(Database) -__all__ = ["get_db"] +__all__ = [ + "get_db", + "Database", + "SCHEDULED_TASK_TABLE", + "STATUS_PENDING", + "STATUS_RUNNING", + "STATUS_COMPLETED", + "STATUS_FAILED", +] diff --git a/CoreModules/plugin_interface.py b/CoreModules/plugin_interface.py index f85dd3e..07492e7 100644 --- a/CoreModules/plugin_interface.py +++ b/CoreModules/plugin_interface.py @@ -5,6 +5,7 @@ from ..Convention.Runtime.GlobalConfig import ProjectConfig from ..Convention.Runtime.Architecture import Architecture from ..Convention.Runtime.File import ToolFile from ..CoreModules.database import get_db +from ..CoreModules.clock_scheduler import get_clock_scheduler from fastapi import APIRouter, FastAPI from typing import * from pydantic import * @@ -33,6 +34,7 @@ class PluginInterface(ABC): config.Log("Warning", f"{ConsoleFrontColor.YELLOW}插件{self.__class__.__name__}未实现callback方法{ConsoleFrontColor.RESET}") return "" + @final def execute(self, path:str) -> Optional[APIRouter]: ''' 继承后是否返回路由决定是否启动该插件 @@ -76,6 +78,7 @@ class PluginInterface(ABC): ''' pass + @final def register_plugin(self, command: str) -> None: ''' 将插件注册, 使其可以被命令匹配 @@ -92,6 +95,43 @@ class PluginInterface(ABC): ''' return None + def register_clock( + self, + callback: Callable[..., Any], + delay_ms: int, + *, + args: Optional[Sequence[Any]] = None, + kwargs: Optional[Dict[str, Any]] = None, + ) -> int: + ''' + 注册一次性延时任务 + Args: + callback: 时间到期后调用的函数/方法 + delay_ms: 延迟毫秒数 + args: 传入回调的位置参数 + kwargs: 传入回调的关键字参数 + ''' + if not callable(callback): + raise ValueError("callback must be callable") + scheduler = get_clock_scheduler() + plugin_module = callback.__module__ + plugin_class: Optional[str] = None + if hasattr(callback, "__self__") and callback.__self__ is self: + plugin_module = self.__class__.__module__ + plugin_class = self.__class__.__name__ + callback_name = getattr(callback, "__name__", None) + if not callback_name: + raise ValueError("callback must have a __name__ attribute") + task_id = scheduler.register_task( + plugin_module, + plugin_class, + callback_name, + delay_ms, + args=args, + kwargs=kwargs, + ) + return task_id + def is_enable_plugin(self) -> bool: ''' 继承后重写该方法判断是否启用该插件