被调用的任务的来源必须已经实例化并注册进架构中
This commit is contained in:
@@ -18,24 +18,21 @@ from ..CoreModules.database import (
|
||||
|
||||
|
||||
class ClockScheduler:
|
||||
_instance: Optional["ClockScheduler"] = None
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._db: Database = get_db()
|
||||
self._config: ProjectConfig = Architecture.Get(ProjectConfig)
|
||||
self._logger = self._config
|
||||
self._tick_ms = max(int(self._config.FindItem("scheduler_tick_ms", 1000)), 10)
|
||||
self._batch_size = max(int(self._config.FindItem("scheduler_max_batch", 50)), 1)
|
||||
self._tick_ms = int(self._config.FindItem("scheduler_tick_ms", 1000*60))
|
||||
self._batch_size = int(self._config.FindItem("scheduler_max_batch", 1000))
|
||||
self._loop_task: Optional[asyncio.Task] = None
|
||||
self._running = False
|
||||
self._lock = asyncio.Lock()
|
||||
Architecture.Register(ClockScheduler, self, lambda: None)
|
||||
self._config.SaveProperties()
|
||||
|
||||
@classmethod
|
||||
def instance(cls) -> "ClockScheduler":
|
||||
if not Architecture.Contains(ClockScheduler):
|
||||
cls._instance = ClockScheduler()
|
||||
Architecture.Register(ClockScheduler, ClockScheduler(), lambda: None)
|
||||
return Architecture.Get(ClockScheduler)
|
||||
|
||||
async def start(self) -> None:
|
||||
@@ -114,7 +111,6 @@ class ClockScheduler:
|
||||
try:
|
||||
self._db.update_task_status(task_id, STATUS_RUNNING, attempts=attempts)
|
||||
await self._execute_task(task)
|
||||
self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
message = f"{type(exc).__name__}: {exc}"
|
||||
self._logger.Log(
|
||||
@@ -129,30 +125,35 @@ class ClockScheduler:
|
||||
)
|
||||
|
||||
async def _execute_task(self, task: Any) -> None:
|
||||
task_id = int(task["id"])
|
||||
attempts = int(task["attempts"])
|
||||
plugin_module = task["plugin_module"]
|
||||
plugin_class = task["plugin_class"]
|
||||
callback_name = task["callback_name"]
|
||||
payload_raw = task["payload"]
|
||||
args, kwargs = self._decode_payload(payload_raw)
|
||||
callback = self._resolve_callback(plugin_module, plugin_class, callback_name)
|
||||
if not callback:
|
||||
self._db.update_task_status(task_id, STATUS_FAILED, attempts=attempts+1)
|
||||
return
|
||||
result = callback(*args, **kwargs)
|
||||
if inspect.isawaitable(result):
|
||||
await result
|
||||
self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts)
|
||||
|
||||
def _resolve_callback(
|
||||
self,
|
||||
plugin_module: str,
|
||||
plugin_class: Optional[str],
|
||||
callback_name: str,
|
||||
) -> Callable[..., Any]:
|
||||
) -> Optional[Callable[..., Any]]:
|
||||
callback: Optional[Callable[..., Any]] = None
|
||||
module = importlib.import_module(plugin_module)
|
||||
if plugin_class:
|
||||
target_class = getattr(module, plugin_class)
|
||||
if Architecture.Contains(target_class):
|
||||
instance = Architecture.Get(target_class)
|
||||
else:
|
||||
instance = target_class()
|
||||
callback = getattr(instance, callback_name)
|
||||
callback = getattr(instance, callback_name)
|
||||
else:
|
||||
callback = getattr(module, callback_name)
|
||||
return callback
|
||||
|
||||
Reference in New Issue
Block a user