From 16ef75c3ce9132b189d5876531bfaba789e815e7 Mon Sep 17 00:00:00 2001 From: ninemine <1371605831@qq.com> Date: Sat, 8 Nov 2025 14:09:33 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A2=AB=E8=B0=83=E7=94=A8=E7=9A=84=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E6=9D=A5=E6=BA=90=E5=BF=85=E9=A1=BB=E5=B7=B2?= =?UTF-8?q?=E7=BB=8F=E5=AE=9E=E4=BE=8B=E5=8C=96=E5=B9=B6=E6=B3=A8=E5=86=8C?= =?UTF-8?q?=E8=BF=9B=E6=9E=B6=E6=9E=84=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CoreModules/clock_scheduler.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/CoreModules/clock_scheduler.py b/CoreModules/clock_scheduler.py index 59a5f17..2e5e6f7 100644 --- a/CoreModules/clock_scheduler.py +++ b/CoreModules/clock_scheduler.py @@ -18,24 +18,21 @@ from ..CoreModules.database import ( class ClockScheduler: - _instance: Optional["ClockScheduler"] = None - def __init__(self) -> None: self._db: Database = get_db() self._config: ProjectConfig = Architecture.Get(ProjectConfig) self._logger = self._config - self._tick_ms = max(int(self._config.FindItem("scheduler_tick_ms", 1000)), 10) - self._batch_size = max(int(self._config.FindItem("scheduler_max_batch", 50)), 1) + self._tick_ms = int(self._config.FindItem("scheduler_tick_ms", 1000*60)) + self._batch_size = int(self._config.FindItem("scheduler_max_batch", 1000)) self._loop_task: Optional[asyncio.Task] = None self._running = False self._lock = asyncio.Lock() - Architecture.Register(ClockScheduler, self, lambda: None) self._config.SaveProperties() @classmethod def instance(cls) -> "ClockScheduler": if not Architecture.Contains(ClockScheduler): - cls._instance = ClockScheduler() + Architecture.Register(ClockScheduler, ClockScheduler(), lambda: None) return Architecture.Get(ClockScheduler) async def start(self) -> None: @@ -114,7 +111,6 @@ class ClockScheduler: try: self._db.update_task_status(task_id, STATUS_RUNNING, attempts=attempts) await self._execute_task(task) - self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts) except Exception as exc: # pylint: disable=broad-except message = f"{type(exc).__name__}: {exc}" self._logger.Log( @@ -129,30 +125,35 @@ class ClockScheduler: ) async def _execute_task(self, task: Any) -> None: + task_id = int(task["id"]) + attempts = int(task["attempts"]) plugin_module = task["plugin_module"] plugin_class = task["plugin_class"] callback_name = task["callback_name"] payload_raw = task["payload"] args, kwargs = self._decode_payload(payload_raw) callback = self._resolve_callback(plugin_module, plugin_class, callback_name) + if not callback: + self._db.update_task_status(task_id, STATUS_FAILED, attempts=attempts+1) + return result = callback(*args, **kwargs) if inspect.isawaitable(result): await result + self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts) def _resolve_callback( self, plugin_module: str, plugin_class: Optional[str], callback_name: str, - ) -> Callable[..., Any]: + ) -> Optional[Callable[..., Any]]: + callback: Optional[Callable[..., Any]] = None module = importlib.import_module(plugin_module) if plugin_class: target_class = getattr(module, plugin_class) if Architecture.Contains(target_class): instance = Architecture.Get(target_class) - else: - instance = target_class() - callback = getattr(instance, callback_name) + callback = getattr(instance, callback_name) else: callback = getattr(module, callback_name) return callback