Compare commits
1 Commits
b5fe23342d
...
16ef75c3ce
| Author | SHA1 | Date | |
|---|---|---|---|
| 16ef75c3ce |
@@ -18,24 +18,21 @@ from ..CoreModules.database import (
|
|||||||
|
|
||||||
|
|
||||||
class ClockScheduler:
|
class ClockScheduler:
|
||||||
_instance: Optional["ClockScheduler"] = None
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self._db: Database = get_db()
|
self._db: Database = get_db()
|
||||||
self._config: ProjectConfig = Architecture.Get(ProjectConfig)
|
self._config: ProjectConfig = Architecture.Get(ProjectConfig)
|
||||||
self._logger = self._config
|
self._logger = self._config
|
||||||
self._tick_ms = max(int(self._config.FindItem("scheduler_tick_ms", 1000)), 10)
|
self._tick_ms = int(self._config.FindItem("scheduler_tick_ms", 1000*60))
|
||||||
self._batch_size = max(int(self._config.FindItem("scheduler_max_batch", 50)), 1)
|
self._batch_size = int(self._config.FindItem("scheduler_max_batch", 1000))
|
||||||
self._loop_task: Optional[asyncio.Task] = None
|
self._loop_task: Optional[asyncio.Task] = None
|
||||||
self._running = False
|
self._running = False
|
||||||
self._lock = asyncio.Lock()
|
self._lock = asyncio.Lock()
|
||||||
Architecture.Register(ClockScheduler, self, lambda: None)
|
|
||||||
self._config.SaveProperties()
|
self._config.SaveProperties()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def instance(cls) -> "ClockScheduler":
|
def instance(cls) -> "ClockScheduler":
|
||||||
if not Architecture.Contains(ClockScheduler):
|
if not Architecture.Contains(ClockScheduler):
|
||||||
cls._instance = ClockScheduler()
|
Architecture.Register(ClockScheduler, ClockScheduler(), lambda: None)
|
||||||
return Architecture.Get(ClockScheduler)
|
return Architecture.Get(ClockScheduler)
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
@@ -114,7 +111,6 @@ class ClockScheduler:
|
|||||||
try:
|
try:
|
||||||
self._db.update_task_status(task_id, STATUS_RUNNING, attempts=attempts)
|
self._db.update_task_status(task_id, STATUS_RUNNING, attempts=attempts)
|
||||||
await self._execute_task(task)
|
await self._execute_task(task)
|
||||||
self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts)
|
|
||||||
except Exception as exc: # pylint: disable=broad-except
|
except Exception as exc: # pylint: disable=broad-except
|
||||||
message = f"{type(exc).__name__}: {exc}"
|
message = f"{type(exc).__name__}: {exc}"
|
||||||
self._logger.Log(
|
self._logger.Log(
|
||||||
@@ -129,30 +125,35 @@ class ClockScheduler:
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def _execute_task(self, task: Any) -> None:
|
async def _execute_task(self, task: Any) -> None:
|
||||||
|
task_id = int(task["id"])
|
||||||
|
attempts = int(task["attempts"])
|
||||||
plugin_module = task["plugin_module"]
|
plugin_module = task["plugin_module"]
|
||||||
plugin_class = task["plugin_class"]
|
plugin_class = task["plugin_class"]
|
||||||
callback_name = task["callback_name"]
|
callback_name = task["callback_name"]
|
||||||
payload_raw = task["payload"]
|
payload_raw = task["payload"]
|
||||||
args, kwargs = self._decode_payload(payload_raw)
|
args, kwargs = self._decode_payload(payload_raw)
|
||||||
callback = self._resolve_callback(plugin_module, plugin_class, callback_name)
|
callback = self._resolve_callback(plugin_module, plugin_class, callback_name)
|
||||||
|
if not callback:
|
||||||
|
self._db.update_task_status(task_id, STATUS_FAILED, attempts=attempts+1)
|
||||||
|
return
|
||||||
result = callback(*args, **kwargs)
|
result = callback(*args, **kwargs)
|
||||||
if inspect.isawaitable(result):
|
if inspect.isawaitable(result):
|
||||||
await result
|
await result
|
||||||
|
self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts)
|
||||||
|
|
||||||
def _resolve_callback(
|
def _resolve_callback(
|
||||||
self,
|
self,
|
||||||
plugin_module: str,
|
plugin_module: str,
|
||||||
plugin_class: Optional[str],
|
plugin_class: Optional[str],
|
||||||
callback_name: str,
|
callback_name: str,
|
||||||
) -> Callable[..., Any]:
|
) -> Optional[Callable[..., Any]]:
|
||||||
|
callback: Optional[Callable[..., Any]] = None
|
||||||
module = importlib.import_module(plugin_module)
|
module = importlib.import_module(plugin_module)
|
||||||
if plugin_class:
|
if plugin_class:
|
||||||
target_class = getattr(module, plugin_class)
|
target_class = getattr(module, plugin_class)
|
||||||
if Architecture.Contains(target_class):
|
if Architecture.Contains(target_class):
|
||||||
instance = Architecture.Get(target_class)
|
instance = Architecture.Get(target_class)
|
||||||
else:
|
callback = getattr(instance, callback_name)
|
||||||
instance = target_class()
|
|
||||||
callback = getattr(instance, callback_name)
|
|
||||||
else:
|
else:
|
||||||
callback = getattr(module, callback_name)
|
callback = getattr(module, callback_name)
|
||||||
return callback
|
return callback
|
||||||
|
|||||||
Reference in New Issue
Block a user