2025-11-08 01:24:50 +08:00
|
|
|
import asyncio
|
|
|
|
|
import importlib
|
|
|
|
|
import inspect
|
|
|
|
|
import json
|
|
|
|
|
import time
|
|
|
|
|
from typing import Any, Callable, Dict, Optional, Sequence
|
|
|
|
|
|
|
|
|
|
from ..Convention.Runtime.Architecture import Architecture
|
|
|
|
|
from ..Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
|
|
|
|
|
from ..CoreModules.database import (
|
|
|
|
|
STATUS_COMPLETED,
|
|
|
|
|
STATUS_FAILED,
|
|
|
|
|
STATUS_PENDING,
|
|
|
|
|
STATUS_RUNNING,
|
|
|
|
|
Database,
|
|
|
|
|
get_db,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ClockScheduler:
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
|
self._db: Database = get_db()
|
|
|
|
|
self._config: ProjectConfig = Architecture.Get(ProjectConfig)
|
|
|
|
|
self._logger = self._config
|
2025-11-08 14:09:33 +08:00
|
|
|
self._tick_ms = int(self._config.FindItem("scheduler_tick_ms", 1000*60))
|
|
|
|
|
self._batch_size = int(self._config.FindItem("scheduler_max_batch", 1000))
|
2025-11-08 01:24:50 +08:00
|
|
|
self._loop_task: Optional[asyncio.Task] = None
|
|
|
|
|
self._running = False
|
|
|
|
|
self._lock = asyncio.Lock()
|
|
|
|
|
self._config.SaveProperties()
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def instance(cls) -> "ClockScheduler":
|
|
|
|
|
if not Architecture.Contains(ClockScheduler):
|
2025-11-08 14:09:33 +08:00
|
|
|
Architecture.Register(ClockScheduler, ClockScheduler(), lambda: None)
|
2025-11-08 01:24:50 +08:00
|
|
|
return Architecture.Get(ClockScheduler)
|
|
|
|
|
|
|
|
|
|
async def start(self) -> None:
|
|
|
|
|
async with self._lock:
|
|
|
|
|
if self._running:
|
|
|
|
|
return
|
|
|
|
|
self._running = True
|
|
|
|
|
self._db.reset_running_tasks()
|
|
|
|
|
self._loop_task = asyncio.create_task(self._run_loop())
|
|
|
|
|
self._logger.Log(
|
|
|
|
|
"Info",
|
|
|
|
|
f"{ConsoleFrontColor.GREEN}ClockScheduler started with tick {self._tick_ms} ms{ConsoleFrontColor.RESET}",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def stop(self) -> None:
|
|
|
|
|
async with self._lock:
|
|
|
|
|
if not self._running:
|
|
|
|
|
return
|
|
|
|
|
self._running = False
|
|
|
|
|
if self._loop_task:
|
|
|
|
|
self._loop_task.cancel()
|
|
|
|
|
try:
|
|
|
|
|
await self._loop_task
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
pass
|
|
|
|
|
self._loop_task = None
|
|
|
|
|
self._logger.Log(
|
|
|
|
|
"Info",
|
|
|
|
|
f"{ConsoleFrontColor.GREEN}ClockScheduler stopped{ConsoleFrontColor.RESET}",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def register_task(
|
|
|
|
|
self,
|
|
|
|
|
plugin_module: str,
|
|
|
|
|
plugin_class: Optional[str],
|
|
|
|
|
callback_name: str,
|
|
|
|
|
delay_ms: int,
|
|
|
|
|
*,
|
|
|
|
|
args: Optional[Sequence[Any]] = None,
|
|
|
|
|
kwargs: Optional[Dict[str, Any]] = None,
|
|
|
|
|
) -> int:
|
|
|
|
|
execute_at = int(time.time() * 1000) + max(delay_ms, 0)
|
|
|
|
|
payload = json.dumps({
|
|
|
|
|
"args": list(args) if args else [],
|
|
|
|
|
"kwargs": kwargs or {},
|
|
|
|
|
})
|
|
|
|
|
task_id = self._db.create_scheduled_task(
|
|
|
|
|
plugin_module,
|
|
|
|
|
plugin_class,
|
|
|
|
|
callback_name,
|
|
|
|
|
payload,
|
|
|
|
|
execute_at,
|
|
|
|
|
)
|
|
|
|
|
self._logger.Log(
|
|
|
|
|
"Info",
|
|
|
|
|
f"{ConsoleFrontColor.LIGHTCYAN_EX}Scheduled task {task_id} for {plugin_module}.{callback_name} at {execute_at}{ConsoleFrontColor.RESET}",
|
|
|
|
|
)
|
|
|
|
|
return task_id
|
|
|
|
|
|
|
|
|
|
async def _run_loop(self) -> None:
|
|
|
|
|
try:
|
|
|
|
|
while self._running:
|
|
|
|
|
await self._process_due_tasks()
|
|
|
|
|
await asyncio.sleep(self._tick_ms / 1000)
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
pass
|
|
|
|
|
finally:
|
|
|
|
|
self._running = False
|
|
|
|
|
|
|
|
|
|
async def _process_due_tasks(self) -> None:
|
|
|
|
|
now_ms = int(time.time() * 1000)
|
|
|
|
|
tasks = self._db.get_due_tasks(now_ms, self._batch_size)
|
|
|
|
|
for task in tasks:
|
|
|
|
|
task_id = int(task["id"])
|
|
|
|
|
attempts = int(task["attempts"])
|
|
|
|
|
try:
|
|
|
|
|
self._db.update_task_status(task_id, STATUS_RUNNING, attempts=attempts)
|
|
|
|
|
await self._execute_task(task)
|
|
|
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
|
|
|
message = f"{type(exc).__name__}: {exc}"
|
|
|
|
|
self._logger.Log(
|
|
|
|
|
"Error",
|
|
|
|
|
f"{ConsoleFrontColor.RED}Task {task_id} failed: {message}{ConsoleFrontColor.RESET}",
|
|
|
|
|
)
|
|
|
|
|
self._db.update_task_status(
|
|
|
|
|
task_id,
|
|
|
|
|
STATUS_FAILED,
|
|
|
|
|
attempts=attempts + 1,
|
|
|
|
|
last_error=message,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def _execute_task(self, task: Any) -> None:
|
2025-11-08 14:09:33 +08:00
|
|
|
task_id = int(task["id"])
|
|
|
|
|
attempts = int(task["attempts"])
|
2025-11-08 01:24:50 +08:00
|
|
|
plugin_module = task["plugin_module"]
|
|
|
|
|
plugin_class = task["plugin_class"]
|
|
|
|
|
callback_name = task["callback_name"]
|
|
|
|
|
payload_raw = task["payload"]
|
|
|
|
|
args, kwargs = self._decode_payload(payload_raw)
|
|
|
|
|
callback = self._resolve_callback(plugin_module, plugin_class, callback_name)
|
2025-11-08 14:09:33 +08:00
|
|
|
if not callback:
|
|
|
|
|
self._db.update_task_status(task_id, STATUS_FAILED, attempts=attempts+1)
|
|
|
|
|
return
|
2025-11-08 01:24:50 +08:00
|
|
|
result = callback(*args, **kwargs)
|
|
|
|
|
if inspect.isawaitable(result):
|
|
|
|
|
await result
|
2025-11-08 14:09:33 +08:00
|
|
|
self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts)
|
2025-11-08 01:24:50 +08:00
|
|
|
|
|
|
|
|
def _resolve_callback(
|
|
|
|
|
self,
|
|
|
|
|
plugin_module: str,
|
|
|
|
|
plugin_class: Optional[str],
|
|
|
|
|
callback_name: str,
|
2025-11-08 14:09:33 +08:00
|
|
|
) -> Optional[Callable[..., Any]]:
|
|
|
|
|
callback: Optional[Callable[..., Any]] = None
|
2025-11-08 01:24:50 +08:00
|
|
|
module = importlib.import_module(plugin_module)
|
|
|
|
|
if plugin_class:
|
|
|
|
|
target_class = getattr(module, plugin_class)
|
|
|
|
|
if Architecture.Contains(target_class):
|
|
|
|
|
instance = Architecture.Get(target_class)
|
2025-11-08 14:09:33 +08:00
|
|
|
callback = getattr(instance, callback_name)
|
2025-11-08 01:24:50 +08:00
|
|
|
else:
|
|
|
|
|
callback = getattr(module, callback_name)
|
|
|
|
|
return callback
|
|
|
|
|
|
|
|
|
|
def _decode_payload(self, payload_raw: Optional[str]) -> tuple[list[Any], Dict[str, Any]]:
|
|
|
|
|
if not payload_raw:
|
|
|
|
|
return [], {}
|
|
|
|
|
try:
|
|
|
|
|
payload_obj = json.loads(payload_raw)
|
|
|
|
|
args = payload_obj.get("args", [])
|
|
|
|
|
kwargs = payload_obj.get("kwargs", {})
|
|
|
|
|
if not isinstance(args, list) or not isinstance(kwargs, dict):
|
|
|
|
|
raise ValueError("Invalid payload structure")
|
|
|
|
|
return args, kwargs
|
|
|
|
|
except Exception as exc: # pylint: disable=broad-except
|
|
|
|
|
self._logger.Log(
|
|
|
|
|
"Warning",
|
|
|
|
|
f"{ConsoleFrontColor.YELLOW}Failed to decode payload: {exc}{ConsoleFrontColor.RESET}",
|
|
|
|
|
)
|
|
|
|
|
return [], {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_clock_scheduler() -> ClockScheduler:
|
|
|
|
|
return ClockScheduler.instance()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = ["ClockScheduler", "get_clock_scheduler"]
|