diff --git a/CoreModules/clock_scheduler.py b/CoreModules/clock_scheduler.py index 59a5f17..2e5e6f7 100644 --- a/CoreModules/clock_scheduler.py +++ b/CoreModules/clock_scheduler.py @@ -18,24 +18,21 @@ from ..CoreModules.database import ( class ClockScheduler: - _instance: Optional["ClockScheduler"] = None - def __init__(self) -> None: self._db: Database = get_db() self._config: ProjectConfig = Architecture.Get(ProjectConfig) self._logger = self._config - self._tick_ms = max(int(self._config.FindItem("scheduler_tick_ms", 1000)), 10) - self._batch_size = max(int(self._config.FindItem("scheduler_max_batch", 50)), 1) + self._tick_ms = int(self._config.FindItem("scheduler_tick_ms", 1000*60)) + self._batch_size = int(self._config.FindItem("scheduler_max_batch", 1000)) self._loop_task: Optional[asyncio.Task] = None self._running = False self._lock = asyncio.Lock() - Architecture.Register(ClockScheduler, self, lambda: None) self._config.SaveProperties() @classmethod def instance(cls) -> "ClockScheduler": if not Architecture.Contains(ClockScheduler): - cls._instance = ClockScheduler() + Architecture.Register(ClockScheduler, ClockScheduler(), lambda: None) return Architecture.Get(ClockScheduler) async def start(self) -> None: @@ -114,7 +111,6 @@ class ClockScheduler: try: self._db.update_task_status(task_id, STATUS_RUNNING, attempts=attempts) await self._execute_task(task) - self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts) except Exception as exc: # pylint: disable=broad-except message = f"{type(exc).__name__}: {exc}" self._logger.Log( @@ -129,30 +125,35 @@ class ClockScheduler: ) async def _execute_task(self, task: Any) -> None: + task_id = int(task["id"]) + attempts = int(task["attempts"]) plugin_module = task["plugin_module"] plugin_class = task["plugin_class"] callback_name = task["callback_name"] payload_raw = task["payload"] args, kwargs = self._decode_payload(payload_raw) callback = self._resolve_callback(plugin_module, plugin_class, callback_name) + if not callback: + self._db.update_task_status(task_id, STATUS_FAILED, attempts=attempts+1) + return result = callback(*args, **kwargs) if inspect.isawaitable(result): await result + self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts) def _resolve_callback( self, plugin_module: str, plugin_class: Optional[str], callback_name: str, - ) -> Callable[..., Any]: + ) -> Optional[Callable[..., Any]]: + callback: Optional[Callable[..., Any]] = None module = importlib.import_module(plugin_module) if plugin_class: target_class = getattr(module, plugin_class) if Architecture.Contains(target_class): instance = Architecture.Get(target_class) - else: - instance = target_class() - callback = getattr(instance, callback_name) + callback = getattr(instance, callback_name) else: callback = getattr(module, callback_name) return callback