import asyncio import importlib import inspect import json import time from typing import Any, Callable, Dict, Optional, Sequence from ..Convention.Runtime.Architecture import Architecture from ..Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig from ..CoreModules.database import ( STATUS_COMPLETED, STATUS_FAILED, STATUS_PENDING, STATUS_RUNNING, Database, get_db, ) class ClockScheduler: def __init__(self) -> None: self._db: Database = get_db() self._config: ProjectConfig = Architecture.Get(ProjectConfig) self._logger = self._config self._tick_ms = int(self._config.FindItem("scheduler_tick_ms", 1000*60)) self._batch_size = int(self._config.FindItem("scheduler_max_batch", 1000)) self._loop_task: Optional[asyncio.Task] = None self._running = False self._lock = asyncio.Lock() self._config.SaveProperties() @classmethod def instance(cls) -> "ClockScheduler": if not Architecture.Contains(ClockScheduler): Architecture.Register(ClockScheduler, ClockScheduler(), lambda: None) return Architecture.Get(ClockScheduler) async def start(self) -> None: async with self._lock: if self._running: return self._running = True self._db.reset_running_tasks() self._loop_task = asyncio.create_task(self._run_loop()) self._logger.Log( "Info", f"{ConsoleFrontColor.GREEN}ClockScheduler started with tick {self._tick_ms} ms{ConsoleFrontColor.RESET}", ) async def stop(self) -> None: async with self._lock: if not self._running: return self._running = False if self._loop_task: self._loop_task.cancel() try: await self._loop_task except asyncio.CancelledError: pass self._loop_task = None self._logger.Log( "Info", f"{ConsoleFrontColor.GREEN}ClockScheduler stopped{ConsoleFrontColor.RESET}", ) def register_task( self, plugin_module: str, plugin_class: Optional[str], callback_name: str, delay_ms: int, *, args: Optional[Sequence[Any]] = None, kwargs: Optional[Dict[str, Any]] = None, ) -> int: execute_at = int(time.time() * 1000) + max(delay_ms, 0) payload = json.dumps({ "args": list(args) if args else [], "kwargs": kwargs or {}, }) task_id = self._db.create_scheduled_task( plugin_module, plugin_class, callback_name, payload, execute_at, ) self._logger.Log( "Info", f"{ConsoleFrontColor.LIGHTCYAN_EX}Scheduled task {task_id} for {plugin_module}.{callback_name} at {execute_at}{ConsoleFrontColor.RESET}", ) return task_id async def _run_loop(self) -> None: try: while self._running: await self._process_due_tasks() await asyncio.sleep(self._tick_ms / 1000) except asyncio.CancelledError: pass finally: self._running = False async def _process_due_tasks(self) -> None: now_ms = int(time.time() * 1000) tasks = self._db.get_due_tasks(now_ms, self._batch_size) for task in tasks: task_id = int(task["id"]) attempts = int(task["attempts"]) try: self._db.update_task_status(task_id, STATUS_RUNNING, attempts=attempts) await self._execute_task(task) except Exception as exc: # pylint: disable=broad-except message = f"{type(exc).__name__}: {exc}" self._logger.Log( "Error", f"{ConsoleFrontColor.RED}Task {task_id} failed: {message}{ConsoleFrontColor.RESET}", ) self._db.update_task_status( task_id, STATUS_FAILED, attempts=attempts + 1, last_error=message, ) async def _execute_task(self, task: Any) -> None: task_id = int(task["id"]) attempts = int(task["attempts"]) plugin_module = task["plugin_module"] plugin_class = task["plugin_class"] callback_name = task["callback_name"] payload_raw = task["payload"] args, kwargs = self._decode_payload(payload_raw) callback = self._resolve_callback(plugin_module, plugin_class, callback_name) if not callback: self._db.update_task_status(task_id, STATUS_FAILED, attempts=attempts+1) return result = callback(*args, **kwargs) if inspect.isawaitable(result): await result self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts) def _resolve_callback( self, plugin_module: str, plugin_class: Optional[str], callback_name: str, ) -> Optional[Callable[..., Any]]: callback: Optional[Callable[..., Any]] = None module = importlib.import_module(plugin_module) if plugin_class: target_class = getattr(module, plugin_class) if Architecture.Contains(target_class): instance = Architecture.Get(target_class) callback = getattr(instance, callback_name) else: callback = getattr(module, callback_name) return callback def _decode_payload(self, payload_raw: Optional[str]) -> tuple[list[Any], Dict[str, Any]]: if not payload_raw: return [], {} try: payload_obj = json.loads(payload_raw) args = payload_obj.get("args", []) kwargs = payload_obj.get("kwargs", {}) if not isinstance(args, list) or not isinstance(kwargs, dict): raise ValueError("Invalid payload structure") return args, kwargs except Exception as exc: # pylint: disable=broad-except self._logger.Log( "Warning", f"{ConsoleFrontColor.YELLOW}Failed to decode payload: {exc}{ConsoleFrontColor.RESET}", ) return [], {} def get_clock_scheduler() -> ClockScheduler: return ClockScheduler.instance() __all__ = ["ClockScheduler", "get_clock_scheduler"]