新增定时调度任务
This commit is contained in:
@@ -4,6 +4,7 @@ from fastapi.responses import JSONResponse
|
||||
from contextlib import asynccontextmanager
|
||||
from ..CoreModules.middleware import ConcurrencyLimitMiddleware
|
||||
from ..CoreModules.plugin_interface import ImportPlugins
|
||||
from ..CoreModules.clock_scheduler import get_clock_scheduler
|
||||
from ..CoreRouters import callback, health
|
||||
from ..Convention.Runtime.GlobalConfig import *
|
||||
from ..Convention.Runtime.Architecture import Architecture
|
||||
@@ -16,6 +17,8 @@ APP_CONFIG = {
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""应用生命周期管理"""
|
||||
scheduler = get_clock_scheduler()
|
||||
await scheduler.start()
|
||||
# 启动
|
||||
logger.Log("Info", "应用启动中...")
|
||||
|
||||
@@ -25,17 +28,17 @@ async def lifespan(app: FastAPI):
|
||||
# 启动后台清理
|
||||
logger.Log("Info", "启动后台清理...")
|
||||
|
||||
try:
|
||||
yield
|
||||
|
||||
finally:
|
||||
# 关闭
|
||||
try:
|
||||
logger.Log("Info", "关闭应用...")
|
||||
# await cleanup_task
|
||||
await scheduler.stop()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
logger.Log("Info", "关闭应用完成...")
|
||||
# db.close()
|
||||
|
||||
def generate_app(kwargs: dict) -> FastAPI:
|
||||
'''
|
||||
|
||||
182
CoreModules/clock_scheduler.py
Normal file
182
CoreModules/clock_scheduler.py
Normal file
@@ -0,0 +1,182 @@
|
||||
import asyncio
|
||||
import importlib
|
||||
import inspect
|
||||
import json
|
||||
import time
|
||||
from typing import Any, Callable, Dict, Optional, Sequence
|
||||
|
||||
from ..Convention.Runtime.Architecture import Architecture
|
||||
from ..Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
|
||||
from ..CoreModules.database import (
|
||||
STATUS_COMPLETED,
|
||||
STATUS_FAILED,
|
||||
STATUS_PENDING,
|
||||
STATUS_RUNNING,
|
||||
Database,
|
||||
get_db,
|
||||
)
|
||||
|
||||
|
||||
class ClockScheduler:
|
||||
_instance: Optional["ClockScheduler"] = None
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._db: Database = get_db()
|
||||
self._config: ProjectConfig = Architecture.Get(ProjectConfig)
|
||||
self._logger = self._config
|
||||
self._tick_ms = max(int(self._config.FindItem("scheduler_tick_ms", 1000)), 10)
|
||||
self._batch_size = max(int(self._config.FindItem("scheduler_max_batch", 50)), 1)
|
||||
self._loop_task: Optional[asyncio.Task] = None
|
||||
self._running = False
|
||||
self._lock = asyncio.Lock()
|
||||
Architecture.Register(ClockScheduler, self, lambda: None)
|
||||
self._config.SaveProperties()
|
||||
|
||||
@classmethod
|
||||
def instance(cls) -> "ClockScheduler":
|
||||
if not Architecture.Contains(ClockScheduler):
|
||||
cls._instance = ClockScheduler()
|
||||
return Architecture.Get(ClockScheduler)
|
||||
|
||||
async def start(self) -> None:
|
||||
async with self._lock:
|
||||
if self._running:
|
||||
return
|
||||
self._running = True
|
||||
self._db.reset_running_tasks()
|
||||
self._loop_task = asyncio.create_task(self._run_loop())
|
||||
self._logger.Log(
|
||||
"Info",
|
||||
f"{ConsoleFrontColor.GREEN}ClockScheduler started with tick {self._tick_ms} ms{ConsoleFrontColor.RESET}",
|
||||
)
|
||||
|
||||
async def stop(self) -> None:
|
||||
async with self._lock:
|
||||
if not self._running:
|
||||
return
|
||||
self._running = False
|
||||
if self._loop_task:
|
||||
self._loop_task.cancel()
|
||||
try:
|
||||
await self._loop_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._loop_task = None
|
||||
self._logger.Log(
|
||||
"Info",
|
||||
f"{ConsoleFrontColor.GREEN}ClockScheduler stopped{ConsoleFrontColor.RESET}",
|
||||
)
|
||||
|
||||
def register_task(
|
||||
self,
|
||||
plugin_module: str,
|
||||
plugin_class: Optional[str],
|
||||
callback_name: str,
|
||||
delay_ms: int,
|
||||
*,
|
||||
args: Optional[Sequence[Any]] = None,
|
||||
kwargs: Optional[Dict[str, Any]] = None,
|
||||
) -> int:
|
||||
execute_at = int(time.time() * 1000) + max(delay_ms, 0)
|
||||
payload = json.dumps({
|
||||
"args": list(args) if args else [],
|
||||
"kwargs": kwargs or {},
|
||||
})
|
||||
task_id = self._db.create_scheduled_task(
|
||||
plugin_module,
|
||||
plugin_class,
|
||||
callback_name,
|
||||
payload,
|
||||
execute_at,
|
||||
)
|
||||
self._logger.Log(
|
||||
"Info",
|
||||
f"{ConsoleFrontColor.LIGHTCYAN_EX}Scheduled task {task_id} for {plugin_module}.{callback_name} at {execute_at}{ConsoleFrontColor.RESET}",
|
||||
)
|
||||
return task_id
|
||||
|
||||
async def _run_loop(self) -> None:
|
||||
try:
|
||||
while self._running:
|
||||
await self._process_due_tasks()
|
||||
await asyncio.sleep(self._tick_ms / 1000)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
self._running = False
|
||||
|
||||
async def _process_due_tasks(self) -> None:
|
||||
now_ms = int(time.time() * 1000)
|
||||
tasks = self._db.get_due_tasks(now_ms, self._batch_size)
|
||||
for task in tasks:
|
||||
task_id = int(task["id"])
|
||||
attempts = int(task["attempts"])
|
||||
try:
|
||||
self._db.update_task_status(task_id, STATUS_RUNNING, attempts=attempts)
|
||||
await self._execute_task(task)
|
||||
self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
message = f"{type(exc).__name__}: {exc}"
|
||||
self._logger.Log(
|
||||
"Error",
|
||||
f"{ConsoleFrontColor.RED}Task {task_id} failed: {message}{ConsoleFrontColor.RESET}",
|
||||
)
|
||||
self._db.update_task_status(
|
||||
task_id,
|
||||
STATUS_FAILED,
|
||||
attempts=attempts + 1,
|
||||
last_error=message,
|
||||
)
|
||||
|
||||
async def _execute_task(self, task: Any) -> None:
|
||||
plugin_module = task["plugin_module"]
|
||||
plugin_class = task["plugin_class"]
|
||||
callback_name = task["callback_name"]
|
||||
payload_raw = task["payload"]
|
||||
args, kwargs = self._decode_payload(payload_raw)
|
||||
callback = self._resolve_callback(plugin_module, plugin_class, callback_name)
|
||||
result = callback(*args, **kwargs)
|
||||
if inspect.isawaitable(result):
|
||||
await result
|
||||
|
||||
def _resolve_callback(
|
||||
self,
|
||||
plugin_module: str,
|
||||
plugin_class: Optional[str],
|
||||
callback_name: str,
|
||||
) -> Callable[..., Any]:
|
||||
module = importlib.import_module(plugin_module)
|
||||
if plugin_class:
|
||||
target_class = getattr(module, plugin_class)
|
||||
if Architecture.Contains(target_class):
|
||||
instance = Architecture.Get(target_class)
|
||||
else:
|
||||
instance = target_class()
|
||||
callback = getattr(instance, callback_name)
|
||||
else:
|
||||
callback = getattr(module, callback_name)
|
||||
return callback
|
||||
|
||||
def _decode_payload(self, payload_raw: Optional[str]) -> tuple[list[Any], Dict[str, Any]]:
|
||||
if not payload_raw:
|
||||
return [], {}
|
||||
try:
|
||||
payload_obj = json.loads(payload_raw)
|
||||
args = payload_obj.get("args", [])
|
||||
kwargs = payload_obj.get("kwargs", {})
|
||||
if not isinstance(args, list) or not isinstance(kwargs, dict):
|
||||
raise ValueError("Invalid payload structure")
|
||||
return args, kwargs
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
self._logger.Log(
|
||||
"Warning",
|
||||
f"{ConsoleFrontColor.YELLOW}Failed to decode payload: {exc}{ConsoleFrontColor.RESET}",
|
||||
)
|
||||
return [], {}
|
||||
|
||||
|
||||
def get_clock_scheduler() -> ClockScheduler:
|
||||
return ClockScheduler.instance()
|
||||
|
||||
|
||||
__all__ = ["ClockScheduler", "get_clock_scheduler"]
|
||||
@@ -1,5 +1,7 @@
|
||||
"""SQLite数据库操作模块 - 使用标准库sqlite3"""
|
||||
import sqlite3
|
||||
import time
|
||||
from typing import Any, Optional
|
||||
from ..Convention.Runtime.Config import *
|
||||
from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor
|
||||
from ..Convention.Runtime.Architecture import Architecture
|
||||
@@ -9,6 +11,12 @@ logger: ProjectConfig = Architecture.Get(ProjectConfig)
|
||||
DATABASE_PATH = logger.GetFile(logger.FindItem("database_path", "db.db"), False).GetFullPath()
|
||||
logger.SaveProperties()
|
||||
|
||||
SCHEDULED_TASK_TABLE = "scheduled_tasks"
|
||||
STATUS_PENDING = "pending"
|
||||
STATUS_RUNNING = "running"
|
||||
STATUS_COMPLETED = "completed"
|
||||
STATUS_FAILED = "failed"
|
||||
|
||||
class Database:
|
||||
"""数据库管理类"""
|
||||
|
||||
@@ -123,7 +131,112 @@ class Database:
|
||||
|
||||
def init_tables(self):
|
||||
"""初始化数据库表"""
|
||||
pass
|
||||
self._ensure_scheduled_tasks_table()
|
||||
|
||||
def _ensure_scheduled_tasks_table(self) -> None:
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute(
|
||||
f"""
|
||||
CREATE TABLE IF NOT EXISTS {SCHEDULED_TASK_TABLE} (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
plugin_module TEXT NOT NULL,
|
||||
plugin_class TEXT,
|
||||
callback_name TEXT NOT NULL,
|
||||
payload TEXT,
|
||||
execute_at INTEGER NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT '{STATUS_PENDING}',
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
last_error TEXT,
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL
|
||||
)
|
||||
"""
|
||||
)
|
||||
cursor.execute(
|
||||
f"CREATE INDEX IF NOT EXISTS idx_{SCHEDULED_TASK_TABLE}_status_execute_at ON {SCHEDULED_TASK_TABLE}(status, execute_at)"
|
||||
)
|
||||
|
||||
def create_scheduled_task(
|
||||
self,
|
||||
plugin_module: str,
|
||||
plugin_class: Optional[str],
|
||||
callback_name: str,
|
||||
payload: Optional[str],
|
||||
execute_at_ms: int
|
||||
) -> int:
|
||||
now_ms = int(time.time() * 1000)
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute(
|
||||
f"""
|
||||
INSERT INTO {SCHEDULED_TASK_TABLE} (
|
||||
plugin_module, plugin_class, callback_name, payload,
|
||||
execute_at, status, attempts, created_at, updated_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
plugin_module,
|
||||
plugin_class,
|
||||
callback_name,
|
||||
payload,
|
||||
execute_at_ms,
|
||||
STATUS_PENDING,
|
||||
0,
|
||||
now_ms,
|
||||
now_ms,
|
||||
)
|
||||
)
|
||||
task_id = cursor.lastrowid
|
||||
return int(task_id)
|
||||
|
||||
def get_due_tasks(self, now_ms: int, limit: int) -> list[sqlite3.Row]:
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute(
|
||||
f"""
|
||||
SELECT * FROM {SCHEDULED_TASK_TABLE}
|
||||
WHERE status = ? AND execute_at <= ?
|
||||
ORDER BY execute_at ASC
|
||||
LIMIT ?
|
||||
""",
|
||||
(STATUS_PENDING, now_ms, limit)
|
||||
)
|
||||
return cursor.fetchall()
|
||||
|
||||
def update_task_status(
|
||||
self,
|
||||
task_id: int,
|
||||
status: str,
|
||||
*,
|
||||
attempts: Optional[int] = None,
|
||||
last_error: Optional[str] = None
|
||||
) -> None:
|
||||
now_ms = int(time.time() * 1000)
|
||||
sets = ["status = ?"]
|
||||
params: list[Any] = [status]
|
||||
if attempts is not None:
|
||||
sets.append("attempts = ?")
|
||||
params.append(attempts)
|
||||
if last_error is not None:
|
||||
sets.append("last_error = ?")
|
||||
params.append(last_error)
|
||||
params.append(task_id)
|
||||
set_clause = ", ".join(sets)
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute(
|
||||
f"UPDATE {SCHEDULED_TASK_TABLE} SET {set_clause} WHERE id = ?",
|
||||
params
|
||||
)
|
||||
|
||||
def reset_running_tasks(self) -> None:
|
||||
cursor = self.conn.cursor()
|
||||
now_ms = int(time.time() * 1000)
|
||||
cursor.execute(
|
||||
f"""
|
||||
UPDATE {SCHEDULED_TASK_TABLE}
|
||||
SET status = ?, updated_at = ?
|
||||
WHERE status = ?
|
||||
""",
|
||||
(STATUS_PENDING, now_ms, STATUS_RUNNING)
|
||||
)
|
||||
|
||||
def close(self):
|
||||
"""关闭数据库连接"""
|
||||
@@ -132,10 +245,19 @@ class Database:
|
||||
self._conn = None
|
||||
logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接已关闭{ConsoleFrontColor.RESET}")
|
||||
|
||||
|
||||
def get_db() -> Database:
|
||||
"""获取全局数据库实例(单例模式)"""
|
||||
if not Architecture.Contains(Database):
|
||||
return Database()
|
||||
return Architecture.Get(Database)
|
||||
|
||||
__all__ = ["get_db"]
|
||||
__all__ = [
|
||||
"get_db",
|
||||
"Database",
|
||||
"SCHEDULED_TASK_TABLE",
|
||||
"STATUS_PENDING",
|
||||
"STATUS_RUNNING",
|
||||
"STATUS_COMPLETED",
|
||||
"STATUS_FAILED",
|
||||
]
|
||||
|
||||
@@ -5,6 +5,7 @@ from ..Convention.Runtime.GlobalConfig import ProjectConfig
|
||||
from ..Convention.Runtime.Architecture import Architecture
|
||||
from ..Convention.Runtime.File import ToolFile
|
||||
from ..CoreModules.database import get_db
|
||||
from ..CoreModules.clock_scheduler import get_clock_scheduler
|
||||
from fastapi import APIRouter, FastAPI
|
||||
from typing import *
|
||||
from pydantic import *
|
||||
@@ -33,6 +34,7 @@ class PluginInterface(ABC):
|
||||
config.Log("Warning", f"{ConsoleFrontColor.YELLOW}插件{self.__class__.__name__}未实现callback方法{ConsoleFrontColor.RESET}")
|
||||
return ""
|
||||
|
||||
@final
|
||||
def execute(self, path:str) -> Optional[APIRouter]:
|
||||
'''
|
||||
继承后是否返回路由决定是否启动该插件
|
||||
@@ -76,6 +78,7 @@ class PluginInterface(ABC):
|
||||
'''
|
||||
pass
|
||||
|
||||
@final
|
||||
def register_plugin(self, command: str) -> None:
|
||||
'''
|
||||
将插件注册, 使其可以被命令匹配
|
||||
@@ -92,6 +95,43 @@ class PluginInterface(ABC):
|
||||
'''
|
||||
return None
|
||||
|
||||
def register_clock(
|
||||
self,
|
||||
callback: Callable[..., Any],
|
||||
delay_ms: int,
|
||||
*,
|
||||
args: Optional[Sequence[Any]] = None,
|
||||
kwargs: Optional[Dict[str, Any]] = None,
|
||||
) -> int:
|
||||
'''
|
||||
注册一次性延时任务
|
||||
Args:
|
||||
callback: 时间到期后调用的函数/方法
|
||||
delay_ms: 延迟毫秒数
|
||||
args: 传入回调的位置参数
|
||||
kwargs: 传入回调的关键字参数
|
||||
'''
|
||||
if not callable(callback):
|
||||
raise ValueError("callback must be callable")
|
||||
scheduler = get_clock_scheduler()
|
||||
plugin_module = callback.__module__
|
||||
plugin_class: Optional[str] = None
|
||||
if hasattr(callback, "__self__") and callback.__self__ is self:
|
||||
plugin_module = self.__class__.__module__
|
||||
plugin_class = self.__class__.__name__
|
||||
callback_name = getattr(callback, "__name__", None)
|
||||
if not callback_name:
|
||||
raise ValueError("callback must have a __name__ attribute")
|
||||
task_id = scheduler.register_task(
|
||||
plugin_module,
|
||||
plugin_class,
|
||||
callback_name,
|
||||
delay_ms,
|
||||
args=args,
|
||||
kwargs=kwargs,
|
||||
)
|
||||
return task_id
|
||||
|
||||
def is_enable_plugin(self) -> bool:
|
||||
'''
|
||||
继承后重写该方法判断是否启用该插件
|
||||
|
||||
Reference in New Issue
Block a user