Compare commits

...

18 Commits

Author SHA1 Message Date
89de330e2d 修复默认入口忽略command参数的错误 2025-11-13 17:11:20 +08:00
cd54036ab7 新增默认入口 2025-11-13 15:44:30 +08:00
9cb259f2c7 1.新增可选的插件网页2.修改插件路由 2025-11-12 22:58:07 +08:00
ca3cf114e3 添加静态样式文件 2025-11-10 20:35:22 +08:00
ac68bb27a3 修复at导致命令无法识别的问题 2025-11-10 10:06:26 +08:00
36324398c3 修复at导致命令无法识别的问题 2025-11-10 09:55:26 +08:00
7deef9092a 数据库模型兼容性增强 2025-11-09 21:38:50 +08:00
4d3d841dda 开放了更多接口内容 2025-11-08 15:34:25 +08:00
6c53a3a18f 现在允许在任意位置注册数据库模型, 并允许注册任意数量数据库模型 2025-11-08 15:05:32 +08:00
16ef75c3ce 被调用的任务的来源必须已经实例化并注册进架构中 2025-11-08 14:09:33 +08:00
b5fe23342d 新增定时调度任务 2025-11-08 01:24:50 +08:00
9899387697 将ProjectConfig加入Architecture 2025-11-07 23:47:33 +08:00
c49f55808e Update gitmodule 2025-11-07 22:13:57 +08:00
477fbf1876 一些Log格式错误被删除 2025-11-07 15:52:06 +08:00
a1b3f51b61 移除不必要的内容 2025-11-07 00:59:06 +08:00
14ce7e6e3f 现在同一个模块内可以声明多个插件类了 2025-11-06 20:38:43 +08:00
4ad222cbc7 修复插件注册中由于引用导致的重复 2025-11-06 16:16:11 +08:00
34da3f8459 去除插件调用地址的.py文件名后缀 2025-11-06 11:46:31 +08:00
14 changed files with 566 additions and 188 deletions

1
.gitmodules vendored
View File

@@ -1,3 +1,4 @@
[submodule "Convention"] [submodule "Convention"]
path = Convention path = Convention
url = http://www.liubai.site:3000/ninemine/Convention-Python.git url = http://www.liubai.site:3000/ninemine/Convention-Python.git
branch = main

View File

@@ -1,6 +1,10 @@
from ..Convention.Runtime.Config import * from ..Convention.Runtime.Config import *
from ..Convention.Runtime.Architecture import Architecture
from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor
from ..CoreModules.flags import set_internal_verbose, get_internal_debug
Architecture.Register(ProjectConfig, ProjectConfig(), lambda: None)
from ..CoreModules.flags import set_internal_verbose
from .web import app from .web import app
from argparse import ArgumentParser from argparse import ArgumentParser
from typing import * from typing import *
@@ -11,7 +15,7 @@ except ImportError as ex:
ImportingThrow(ex, "Internal", ["uvicorn"]) ImportingThrow(ex, "Internal", ["uvicorn"])
def main() -> int: def main() -> int:
config = ProjectConfig() config: ProjectConfig = Architecture.Get(ProjectConfig)
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument("--host", type=str, default=config.FindItem("host", "0.0.0.0")) parser.add_argument("--host", type=str, default=config.FindItem("host", "0.0.0.0"))

View File

@@ -4,11 +4,12 @@ from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from ..CoreModules.middleware import ConcurrencyLimitMiddleware from ..CoreModules.middleware import ConcurrencyLimitMiddleware
from ..CoreModules.plugin_interface import ImportPlugins from ..CoreModules.plugin_interface import ImportPlugins
from ..CoreModules.clock_scheduler import get_clock_scheduler
from ..CoreRouters import callback, health from ..CoreRouters import callback, health
from ..Convention.Runtime.GlobalConfig import * from ..Convention.Runtime.GlobalConfig import *
from ..Convention.Runtime.Architecture import Architecture from ..Convention.Runtime.Architecture import Architecture
config = ProjectConfig() logger: ProjectConfig = Architecture.Get(ProjectConfig)
APP_CONFIG = { APP_CONFIG = {
"docs_url": "/docs", "docs_url": "/docs",
} }
@@ -16,31 +17,34 @@ APP_CONFIG = {
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""应用生命周期管理""" """应用生命周期管理"""
scheduler = get_clock_scheduler()
await scheduler.start()
# 启动 # 启动
config.Log("Info", "应用启动中...") logger.Log("Info", "应用启动中...")
# 初始化数据 # 初始化数据
config.Log("Info", "初始化数据...") logger.Log("Info", "初始化数据...")
# 启动后台清理 # 启动后台清理
config.Log("Info", "启动后台清理...") logger.Log("Info", "启动后台清理...")
yield
# 关闭
try: try:
config.Log("Info", "关闭应用...") yield
# await cleanup_task
except asyncio.CancelledError:
pass
finally: finally:
config.Log("Info", "关闭应用完成...") # 关闭
# db.close() try:
logger.Log("Info", "关闭应用...")
await scheduler.stop()
except asyncio.CancelledError:
pass
finally:
logger.Log("Info", "关闭应用完成...")
def generate_app(kwargs: dict) -> FastAPI: def generate_app(kwargs: dict) -> FastAPI:
''' '''
生成FastAPI应用 生成FastAPI应用
''' '''
config: ProjectConfig = Architecture.Get(ProjectConfig)
kwargs.update(**APP_CONFIG) kwargs.update(**APP_CONFIG)
app = FastAPI(**kwargs, lifespan=lifespan) app = FastAPI(**kwargs, lifespan=lifespan)
@@ -59,11 +63,12 @@ def generate_app(kwargs: dict) -> FastAPI:
return app return app
app: FastAPI = generate_app(config.FindItem("app_config", {})) app: FastAPI = generate_app(logger.FindItem("app_config", {}))
@app.get("/") @app.get("/")
async def root(): async def root():
"""根路径""" """根路径"""
config: ProjectConfig = Architecture.Get(ProjectConfig)
app_name = config.FindItem("app_name", "Application") app_name = config.FindItem("app_name", "Application")
app_version = config.FindItem("app_version", "0.0.0") app_version = config.FindItem("app_version", "0.0.0")
config.SaveProperties() config.SaveProperties()
@@ -76,11 +81,13 @@ async def root():
@app.exception_handler(Exception) @app.exception_handler(Exception)
async def global_exception_handler(request, exc): async def global_exception_handler(request, exc):
"""全局异常处理""" """全局异常处理"""
config.Log("Error", f"未捕获的异常: {exc}", exc_info=True) logger.Log("Error", f"未捕获的异常: {exc}\n{format_traceback_info()}")
return JSONResponse( return JSONResponse(
status_code=500, status_code=500,
content={"error": "Internal Server Error", "detail": str(exc)} content={"error": "Internal Server Error", "detail": str(exc)}
) )
logger.SaveProperties()
# 除了从本模块导出的app使用API实例外, 还可以从Architecture.Get(FastAPI)获取 # 除了从本模块导出的app使用API实例外, 还可以从Architecture.Get(FastAPI)获取
__all__ = ["app"] __all__ = ["app"]

View File

@@ -0,0 +1,183 @@
import asyncio
import importlib
import inspect
import json
import time
from typing import Any, Callable, Dict, Optional, Sequence
from ..Convention.Runtime.Architecture import Architecture
from ..Convention.Runtime.GlobalConfig import ConsoleFrontColor, ProjectConfig
from ..CoreModules.database import (
STATUS_COMPLETED,
STATUS_FAILED,
STATUS_PENDING,
STATUS_RUNNING,
Database,
get_db,
)
class ClockScheduler:
def __init__(self) -> None:
self._db: Database = get_db()
self._config: ProjectConfig = Architecture.Get(ProjectConfig)
self._logger = self._config
self._tick_ms = int(self._config.FindItem("scheduler_tick_ms", 1000*60))
self._batch_size = int(self._config.FindItem("scheduler_max_batch", 1000))
self._loop_task: Optional[asyncio.Task] = None
self._running = False
self._lock = asyncio.Lock()
self._config.SaveProperties()
@classmethod
def instance(cls) -> "ClockScheduler":
if not Architecture.Contains(ClockScheduler):
Architecture.Register(ClockScheduler, ClockScheduler(), lambda: None)
return Architecture.Get(ClockScheduler)
async def start(self) -> None:
async with self._lock:
if self._running:
return
self._running = True
self._db.reset_running_tasks()
self._loop_task = asyncio.create_task(self._run_loop())
self._logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}ClockScheduler started with tick {self._tick_ms} ms{ConsoleFrontColor.RESET}",
)
async def stop(self) -> None:
async with self._lock:
if not self._running:
return
self._running = False
if self._loop_task:
self._loop_task.cancel()
try:
await self._loop_task
except asyncio.CancelledError:
pass
self._loop_task = None
self._logger.Log(
"Info",
f"{ConsoleFrontColor.GREEN}ClockScheduler stopped{ConsoleFrontColor.RESET}",
)
def register_task(
self,
plugin_module: str,
plugin_class: Optional[str],
callback_name: str,
delay_ms: int,
*,
args: Optional[Sequence[Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> int:
execute_at = int(time.time() * 1000) + max(delay_ms, 0)
payload = json.dumps({
"args": list(args) if args else [],
"kwargs": kwargs or {},
})
task_id = self._db.create_scheduled_task(
plugin_module,
plugin_class,
callback_name,
payload,
execute_at,
)
self._logger.Log(
"Info",
f"{ConsoleFrontColor.LIGHTCYAN_EX}Scheduled task {task_id} for {plugin_module}.{callback_name} at {execute_at}{ConsoleFrontColor.RESET}",
)
return task_id
async def _run_loop(self) -> None:
try:
while self._running:
await self._process_due_tasks()
await asyncio.sleep(self._tick_ms / 1000)
except asyncio.CancelledError:
pass
finally:
self._running = False
async def _process_due_tasks(self) -> None:
now_ms = int(time.time() * 1000)
tasks = self._db.get_due_tasks(now_ms, self._batch_size)
for task in tasks:
task_id = int(task["id"])
attempts = int(task["attempts"])
try:
self._db.update_task_status(task_id, STATUS_RUNNING, attempts=attempts)
await self._execute_task(task)
except Exception as exc: # pylint: disable=broad-except
message = f"{type(exc).__name__}: {exc}"
self._logger.Log(
"Error",
f"{ConsoleFrontColor.RED}Task {task_id} failed: {message}{ConsoleFrontColor.RESET}",
)
self._db.update_task_status(
task_id,
STATUS_FAILED,
attempts=attempts + 1,
last_error=message,
)
async def _execute_task(self, task: Any) -> None:
task_id = int(task["id"])
attempts = int(task["attempts"])
plugin_module = task["plugin_module"]
plugin_class = task["plugin_class"]
callback_name = task["callback_name"]
payload_raw = task["payload"]
args, kwargs = self._decode_payload(payload_raw)
callback = self._resolve_callback(plugin_module, plugin_class, callback_name)
if not callback:
self._db.update_task_status(task_id, STATUS_FAILED, attempts=attempts+1)
return
result = callback(*args, **kwargs)
if inspect.isawaitable(result):
await result
self._db.update_task_status(task_id, STATUS_COMPLETED, attempts=attempts)
def _resolve_callback(
self,
plugin_module: str,
plugin_class: Optional[str],
callback_name: str,
) -> Optional[Callable[..., Any]]:
callback: Optional[Callable[..., Any]] = None
module = importlib.import_module(plugin_module)
if plugin_class:
target_class = getattr(module, plugin_class)
if Architecture.Contains(target_class):
instance = Architecture.Get(target_class)
callback = getattr(instance, callback_name)
else:
callback = getattr(module, callback_name)
return callback
def _decode_payload(self, payload_raw: Optional[str]) -> tuple[list[Any], Dict[str, Any]]:
if not payload_raw:
return [], {}
try:
payload_obj = json.loads(payload_raw)
args = payload_obj.get("args", [])
kwargs = payload_obj.get("kwargs", {})
if not isinstance(args, list) or not isinstance(kwargs, dict):
raise ValueError("Invalid payload structure")
return args, kwargs
except Exception as exc: # pylint: disable=broad-except
self._logger.Log(
"Warning",
f"{ConsoleFrontColor.YELLOW}Failed to decode payload: {exc}{ConsoleFrontColor.RESET}",
)
return [], {}
def get_clock_scheduler() -> ClockScheduler:
return ClockScheduler.instance()
__all__ = ["ClockScheduler", "get_clock_scheduler"]

View File

@@ -1,14 +1,21 @@
"""SQLite数据库操作模块 - 使用标准库sqlite3""" """SQLite数据库操作模块 - 使用标准库sqlite3"""
import sqlite3 import sqlite3
import json
import time import time
from typing import * from typing import Any, Optional
from ..Convention.Runtime.Config import *
from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor
from ..Convention.Runtime.Architecture import Architecture from ..Convention.Runtime.Architecture import Architecture
from ..Convention.Runtime.File import ToolFile from ..Convention.Runtime.File import ToolFile
config = ProjectConfig() logger: ProjectConfig = Architecture.Get(ProjectConfig)
DATABASE_PATH = config.GetFile(config.FindItem("database_path", "db.db"), False).GetFullPath() DATABASE_PATH = logger.GetFile(logger.FindItem("database_path", "db.db"), False).GetFullPath()
logger.SaveProperties()
SCHEDULED_TASK_TABLE = "scheduled_tasks"
STATUS_PENDING = "pending"
STATUS_RUNNING = "running"
STATUS_COMPLETED = "completed"
STATUS_FAILED = "failed"
class Database: class Database:
"""数据库管理类""" """数据库管理类"""
@@ -49,9 +56,9 @@ class Database:
self._conn.execute("PRAGMA cache_size=1000") self._conn.execute("PRAGMA cache_size=1000")
self._conn.execute("PRAGMA temp_store=MEMORY") self._conn.execute("PRAGMA temp_store=MEMORY")
config.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接成功: {self.db_path}{ConsoleFrontColor.RESET}") logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接成功: {self.db_path}{ConsoleFrontColor.RESET}")
except Exception as e: except Exception as e:
config.Log("Error", f"{ConsoleFrontColor.RED}数据库连接失败: {e}{ConsoleFrontColor.RESET}", exc_info=True) logger.Log("Error", f"{ConsoleFrontColor.RED}数据库连接失败: {e}{ConsoleFrontColor.RESET}")
raise raise
return self._conn return self._conn
@@ -77,7 +84,7 @@ class Database:
if not self._table_exists(table_name): if not self._table_exists(table_name):
cursor = self.conn.cursor() cursor = self.conn.cursor()
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)") cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT)")
config.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 创建{ConsoleFrontColor.RESET}") logger.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 创建{ConsoleFrontColor.RESET}")
return self return self
def _column_exists(self, table_name: str, column_name: str) -> bool: def _column_exists(self, table_name: str, column_name: str) -> bool:
@@ -107,9 +114,9 @@ class Database:
try: try:
cursor = self.conn.cursor() cursor = self.conn.cursor()
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}") cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_def}")
config.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 添加列 {column_name}{ConsoleFrontColor.RESET}") logger.Log("Info", f"{ConsoleFrontColor.GREEN}为表 {table_name} 添加列 {column_name}{ConsoleFrontColor.RESET}")
except Exception as e: except Exception as e:
config.Log("Warning", f"{ConsoleFrontColor.YELLOW}添加列失败: {e}{ConsoleFrontColor.RESET}") logger.Log("Warning", f"{ConsoleFrontColor.YELLOW}添加列失败: {e}{ConsoleFrontColor.RESET}")
def define_column(self, table_name: str, column_name: str, column_def: str): def define_column(self, table_name: str, column_name: str, column_def: str):
"""定义列 """定义列
@@ -124,24 +131,120 @@ class Database:
def init_tables(self): def init_tables(self):
"""初始化数据库表""" """初始化数据库表"""
self._ensure_scheduled_tasks_table()
def _ensure_scheduled_tasks_table(self) -> None:
cursor = self.conn.cursor() cursor = self.conn.cursor()
cursor.execute(
# 用户表 f"""
cursor.execute(""" CREATE TABLE IF NOT EXISTS {SCHEDULED_TASK_TABLE} (
CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER PRIMARY KEY, plugin_module TEXT NOT NULL,
username TEXT, plugin_class TEXT,
callback_name TEXT NOT NULL,
payload TEXT,
execute_at INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT '{STATUS_PENDING}',
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at INTEGER NOT NULL, created_at INTEGER NOT NULL,
last_active INTEGER NOT NULL updated_at INTEGER NOT NULL
) )
""") """
)
cursor.execute(
f"CREATE INDEX IF NOT EXISTS idx_{SCHEDULED_TASK_TABLE}_status_execute_at ON {SCHEDULED_TASK_TABLE}(status, execute_at)"
)
def create_scheduled_task(
self,
plugin_module: str,
plugin_class: Optional[str],
callback_name: str,
payload: Optional[str],
execute_at_ms: int
) -> int:
now_ms = int(time.time() * 1000)
cursor = self.conn.cursor()
cursor.execute(
f"""
INSERT INTO {SCHEDULED_TASK_TABLE} (
plugin_module, plugin_class, callback_name, payload,
execute_at, status, attempts, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
plugin_module,
plugin_class,
callback_name,
payload,
execute_at_ms,
STATUS_PENDING,
0,
now_ms,
now_ms,
)
)
task_id = cursor.lastrowid
return int(task_id)
def get_due_tasks(self, now_ms: int, limit: int) -> list[sqlite3.Row]:
cursor = self.conn.cursor()
cursor.execute(
f"""
SELECT * FROM {SCHEDULED_TASK_TABLE}
WHERE status = ? AND execute_at <= ?
ORDER BY execute_at ASC
LIMIT ?
""",
(STATUS_PENDING, now_ms, limit)
)
return cursor.fetchall()
def update_task_status(
self,
task_id: int,
status: str,
*,
attempts: Optional[int] = None,
last_error: Optional[str] = None
) -> None:
now_ms = int(time.time() * 1000)
sets = ["status = ?"]
params: list[Any] = [status]
if attempts is not None:
sets.append("attempts = ?")
params.append(attempts)
if last_error is not None:
sets.append("last_error = ?")
params.append(last_error)
params.append(task_id)
set_clause = ", ".join(sets)
cursor = self.conn.cursor()
cursor.execute(
f"UPDATE {SCHEDULED_TASK_TABLE} SET {set_clause} WHERE id = ?",
params
)
def reset_running_tasks(self) -> None:
cursor = self.conn.cursor()
now_ms = int(time.time() * 1000)
cursor.execute(
f"""
UPDATE {SCHEDULED_TASK_TABLE}
SET status = ?, updated_at = ?
WHERE status = ?
""",
(STATUS_PENDING, now_ms, STATUS_RUNNING)
)
def close(self): def close(self):
"""关闭数据库连接""" """关闭数据库连接"""
if self._conn: if self._conn:
self._conn.close() self._conn.close()
self._conn = None self._conn = None
config.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接已关闭{ConsoleFrontColor.RESET}") logger.Log("Info", f"{ConsoleFrontColor.GREEN}数据库连接已关闭{ConsoleFrontColor.RESET}")
def get_db() -> Database: def get_db() -> Database:
"""获取全局数据库实例(单例模式)""" """获取全局数据库实例(单例模式)"""
@@ -149,4 +252,12 @@ def get_db() -> Database:
return Database() return Database()
return Architecture.Get(Database) return Architecture.Get(Database)
__all__ = ["get_db"] __all__ = [
"get_db",
"Database",
"SCHEDULED_TASK_TABLE",
"STATUS_PENDING",
"STATUS_RUNNING",
"STATUS_COMPLETED",
"STATUS_FAILED",
]

View File

@@ -2,7 +2,7 @@ from ..Convention.Runtime.Architecture import *
from ..Convention.Runtime.GlobalConfig import ProjectConfig from ..Convention.Runtime.GlobalConfig import ProjectConfig
from pydantic import * from pydantic import *
config = ProjectConfig() logger: ProjectConfig = Architecture.Get(ProjectConfig)
class DebugFlags(BaseModel): class DebugFlags(BaseModel):
debug: bool = Field(default=False) debug: bool = Field(default=False)
@@ -10,7 +10,7 @@ class DebugFlags(BaseModel):
class VerboseFlags(BaseModel): class VerboseFlags(BaseModel):
verbose: bool = Field(default=False) verbose: bool = Field(default=False)
Architecture.Register(DebugFlags, DebugFlags(debug=config.FindItem("debug", False)), lambda: None) Architecture.Register(DebugFlags, DebugFlags(debug=logger.FindItem("debug", False)), lambda: None)
Architecture.Register(VerboseFlags, VerboseFlags(verbose=False), lambda: None) Architecture.Register(VerboseFlags, VerboseFlags(verbose=False), lambda: None)
def set_internal_debug(debug:bool) -> None: def set_internal_debug(debug:bool) -> None:
@@ -23,6 +23,6 @@ def set_internal_verbose(verbose:bool) -> None:
def get_internal_verbose() -> bool: def get_internal_verbose() -> bool:
return Architecture.Get(VerboseFlags).verbose return Architecture.Get(VerboseFlags).verbose
config.SaveProperties() logger.SaveProperties()
__all__ = ["set_internal_debug", "get_internal_debug", "set_internal_verbose", "get_internal_verbose"] __all__ = ["set_internal_debug", "get_internal_debug", "set_internal_verbose", "get_internal_verbose"]

View File

@@ -3,10 +3,12 @@ import asyncio
from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request from starlette.requests import Request
from starlette.responses import Response from starlette.responses import Response
from ..Convention.Runtime.GlobalConfig import ProjectConfig from ..Convention.Runtime.GlobalConfig import *
from ..Convention.Runtime.Architecture import Architecture
config = ProjectConfig() logger: ProjectConfig = Architecture.Get(ProjectConfig)
MAX_CONCURRENT_REQUESTS = config.FindItem("max_concurrent_requests", 100) MAX_CONCURRENT_REQUESTS = logger.FindItem("max_concurrent_requests", 100)
logger.SaveProperties()
class ConcurrencyLimitMiddleware(BaseHTTPMiddleware): class ConcurrencyLimitMiddleware(BaseHTTPMiddleware):
"""并发限制中间件 - 防止内存爆炸""" """并发限制中间件 - 防止内存爆炸"""
@@ -15,7 +17,7 @@ class ConcurrencyLimitMiddleware(BaseHTTPMiddleware):
super().__init__(app) super().__init__(app)
self.semaphore = asyncio.Semaphore(max_concurrent) self.semaphore = asyncio.Semaphore(max_concurrent)
self.max_concurrent = max_concurrent self.max_concurrent = max_concurrent
config.Log("Info", f"并发限制中间件已启用,最大并发数:{max_concurrent}") logger.Log("Info", f"并发限制中间件已启用,最大并发数:{max_concurrent}")
async def dispatch(self, request: Request, call_next) -> Response: async def dispatch(self, request: Request, call_next) -> Response:
"""处理请求""" """处理请求"""
@@ -24,7 +26,7 @@ class ConcurrencyLimitMiddleware(BaseHTTPMiddleware):
response = await call_next(request) response = await call_next(request)
return response return response
except Exception as e: except Exception as e:
config.Log("Error", f"请求处理错误: {e}", exc_info=True) logger.Log("Error", f"{ConsoleFrontColor.RED}请求处理错误: {e}{ConsoleFrontColor.RESET}")
return Response( return Response(
content='{"error": "Internal Server Error"}', content='{"error": "Internal Server Error"}',
status_code=500, status_code=500,

View File

@@ -1,95 +1,14 @@
"""数据模型定义""" """数据模型定义"""
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from typing import Optional, Dict, Any, List from typing import *
class CallbackRequest(BaseModel): class CallbackRequest(BaseModel):
"""WPS Callback请求模型""" """Callback请求模型"""
chatid: int = Field(..., description="会话ID") chatid: int = Field(default=0, description="会话ID")
creator: int = Field(..., description="发送者ID") creator: int = Field(default=0, description="发送者ID")
content: str = Field(..., description="消息内容") content: str = Field(default="", description="消息内容")
reply: Optional[Dict[str, Any]] = Field(None, description="回复内容") reply: Dict[str, Any] = Field(default={}, description="回复内容")
robot_key: str = Field(..., description="机器人key") robot_key: str = Field(default="", description="机器人key")
url: str = Field(..., description="callback地址") url: str = Field(default="", description="callback地址")
ctime: int = Field(..., description="发送时间") ctime: int = Field(default=0, description="发送时间")
class TextMessage(BaseModel):
"""文本消息"""
msgtype: str = "text"
text: Dict[str, str]
@classmethod
def create(cls, content: str):
"""创建文本消息"""
return cls(text={"content": content})
class MarkdownMessage(BaseModel):
"""Markdown消息"""
msgtype: str = "markdown"
markdown: Dict[str, str]
@classmethod
def create(cls, text: str):
"""创建Markdown消息"""
return cls(markdown={"text": text})
class LinkMessage(BaseModel):
"""链接消息"""
msgtype: str = "link"
link: Dict[str, str]
@classmethod
def create(cls, title: str, text: str, message_url: str = "", btn_title: str = "查看详情"):
"""创建链接消息"""
return cls(link={
"title": title,
"text": text,
"messageUrl": message_url,
"btnTitle": btn_title
})
class GameState(BaseModel):
"""游戏状态基类"""
game_type: str
created_at: int
updated_at: int
class GuessGameState(GameState):
"""猜数字游戏状态"""
game_type: str = "guess"
target: int = Field(..., description="目标数字")
attempts: int = Field(0, description="尝试次数")
guesses: list[int] = Field(default_factory=list, description="历史猜测")
max_attempts: int = Field(10, description="最大尝试次数")
class QuizGameState(GameState):
"""问答游戏状态"""
game_type: str = "quiz"
question_id: int = Field(..., description="问题ID")
question: str = Field(..., description="问题内容")
attempts: int = Field(0, description="尝试次数")
max_attempts: int = Field(3, description="最大尝试次数")
class PrivateMessageRequest(BaseModel):
"""私聊消息请求模型"""
user_id: int = Field(..., description="目标用户ID")
content: str = Field(..., description="消息内容")
msg_type: str = Field(default="text", description="消息类型: text 或 markdown")
class CheckBatchRequest(BaseModel):
"""批量检查请求模型"""
user_ids: List[int] = Field(..., description="用户ID列表")
class CheckBatchResponse(BaseModel):
"""批量检查响应模型"""
results: Dict[int, bool] = Field(..., description="用户ID到是否有URL的映射")

View File

@@ -5,6 +5,7 @@ from ..Convention.Runtime.GlobalConfig import ProjectConfig
from ..Convention.Runtime.Architecture import Architecture from ..Convention.Runtime.Architecture import Architecture
from ..Convention.Runtime.File import ToolFile from ..Convention.Runtime.File import ToolFile
from ..CoreModules.database import get_db from ..CoreModules.database import get_db
from ..CoreModules.clock_scheduler import get_clock_scheduler
from fastapi import APIRouter, FastAPI from fastapi import APIRouter, FastAPI
from typing import * from typing import *
from pydantic import * from pydantic import *
@@ -12,17 +13,16 @@ from abc import ABC
import importlib import importlib
import os import os
config = ProjectConfig() config: ProjectConfig = Architecture.Get(ProjectConfig)
class DatabaseModel(BaseModel): class DatabaseModel(BaseModel):
table_name: str = Field(default="main_table") table_name: str = Field(default="main_table")
column_names: List[str] = Field(default=[])
column_defs: Dict[str, str] = Field(default={}) column_defs: Dict[str, str] = Field(default={})
class PluginInterface(ABC): class PluginInterface(ABC):
plugin_instances: Dict[str, "PluginInterface"] = {} plugin_instances: Dict[str, "PluginInterface"] = {}
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None: async def callback(self, message: str|None|Literal[""], chat_id: int, user_id: int) -> str|None:
''' '''
继承后重写该方法接受消息并返回消息 继承后重写该方法接受消息并返回消息
返回空字符串代表不进行反馈 返回空字符串代表不进行反馈
@@ -31,34 +31,91 @@ class PluginInterface(ABC):
chat_id: 会话ID chat_id: 会话ID
user_id: 用户ID user_id: 用户ID
''' '''
config.Log("Warning", f"插件{self.__class__.__name__}未实现callback方法") config.Log("Warning", f"{ConsoleFrontColor.YELLOW}插件{self.__class__.__name__}未实现callback方法{ConsoleFrontColor.RESET}")
return "" return ""
def execute(self, path:str) -> Optional[APIRouter]: @final
def register_table(self, db_model: DatabaseModel) -> None:
db = get_db()
cursor = db.conn.cursor()
sql = f"CREATE TABLE IF NOT EXISTS {db_model.table_name} ({', '.join([f'{name} {field_def}' for name, field_def in db_model.column_defs.items()])})"
config.Log("Info", f"{ConsoleFrontColor.LIGHTMAGENTA_EX}为表 {db_model.table_name} 创建: {sql}{ConsoleFrontColor.RESET}")
try:
cursor.execute(sql)
except Exception as e:
config.Log("Error", f"{ConsoleFrontColor.RED}为表 {db_model.table_name} 创建失败: {e}{ConsoleFrontColor.RESET}")
try:
cursor.execute(f"PRAGMA table_info({db_model.table_name})")
existing_columns = {row["name"] for row in cursor.fetchall()}
except Exception as e:
config.Log("Error", f"{ConsoleFrontColor.RED}查询表 {db_model.table_name} 列信息失败: {e}{ConsoleFrontColor.RESET}")
return
constraint_keywords = ("PRIMARY KEY", "FOREIGN KEY", "UNIQUE", "CHECK")
for column_name, column_def in db_model.column_defs.items():
column_name_upper = column_name.upper()
column_def_upper = column_def.upper()
if any(keyword in column_name_upper for keyword in constraint_keywords) or any(
column_def_upper.startswith(keyword) for keyword in constraint_keywords
):
continue
if " " in column_name or "(" in column_name:
continue
if column_name in existing_columns:
continue
alter_sql = f"ALTER TABLE {db_model.table_name} ADD COLUMN {column_name} {column_def}"
config.Log(
"Info",
f"{ConsoleFrontColor.LIGHTMAGENTA_EX}为表 {db_model.table_name} 添加缺失列: {alter_sql}{ConsoleFrontColor.RESET}",
)
try:
cursor.execute(alter_sql)
except Exception as e:
config.Log("Error", f"{ConsoleFrontColor.RED}为表 {db_model.table_name} 添加列 {column_name} 失败: {e}{ConsoleFrontColor.RESET}")
continue
try:
db.conn.commit()
except Exception as e:
config.Log("Error", f"{ConsoleFrontColor.RED}提交表 {db_model.table_name} 列更新失败: {e}{ConsoleFrontColor.RESET}")
@final
def execute(self) -> Optional[APIRouter]:
''' '''
继承后是否返回路由决定是否启动该插件 继承后是否返回路由决定是否启动该插件
若返回None, 则不启动该插件 若返回None, 则不启动该插件
''' '''
Architecture.Register(self.__class__, self, self.wake_up, *self.dependencies())
router = APIRouter()
router.post(path)(self.generate_router_callback())
# 在数据库保证必要的表和列存在
db = get_db()
db_model = self.register_db_model()
if db_model:
db.define_table(db_model.table_name)
for field in db_model.column_names:
db.define_column(db_model.table_name, field, db_model.column_defs[field])
def setup() -> None:
# 在数据库保证必要的表和列存在
db_model = self.register_db_model()
if db_model is None:
pass
elif isinstance(db_model, DatabaseModel):
self.register_table(db_model)
else:
for model in db_model:
self.register_table(model)
self.wake_up()
Architecture.Register(self.__class__, self, setup, *self.dependencies())
router = APIRouter()
router.post(f"/{self.__class__.__name__}/callback")(self.generate_router_callback())
if self.generate_router_illustrated_guide() is not None:
router.get(f"/{self.__class__.__name__}")(self.generate_router_illustrated_guide())
return router return router
def generate_router_callback(self) -> Callable|Coroutine: def generate_router_callback(self) -> Callable|Coroutine:
''' '''
继承后重写该方法生成路由回调函数 继承后重写该方法生成路由回调函数
''' '''
async def callback(message: str, chat_id: int, user_id: int) -> Any: return self.callback
return await self.callback(message, chat_id, user_id)
return callback def generate_router_illustrated_guide(self) -> Callable|Coroutine|None:
'''
继承后重写该方法生成渲染图鉴与攻略网页的函数
'''
return None
def dependencies(self) -> List[Type]: def dependencies(self) -> List[Type]:
''' '''
@@ -73,17 +130,59 @@ class PluginInterface(ABC):
''' '''
pass pass
@final
def register_plugin(self, command: str) -> None: def register_plugin(self, command: str) -> None:
''' '''
将插件注册, 使其可以被命令匹配 将插件注册, 使其可以被命令匹配
''' '''
PluginInterface.plugin_instances[command] = self if command in PluginInterface.plugin_instances:
config.Log("Warning", f"{ConsoleFrontColor.YELLOW}插件{PluginInterface.plugin_instances[command].__class__.__name__}已注册命令{command}, 将被新插件{self.__class__.__name__}覆盖{ConsoleFrontColor.RESET}")
else:
config.Log("Info", f"插件{self.__class__.__name__}已注册命令{command}")
PluginInterface.plugin_instances[command] = self
def register_db_model(self) -> DatabaseModel: def register_db_model(self) -> List[DatabaseModel]|DatabaseModel|None:
''' '''
继承后重写该方法注册数据库模型 继承后重写该方法注册数据库模型
''' '''
return DatabaseModel() return None
def register_clock(
self,
callback: Callable[..., Any],
delay_ms: int,
*,
args: Optional[Sequence[Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
) -> int:
'''
注册一次性延时任务
Args:
callback: 时间到期后调用的函数/方法
delay_ms: 延迟毫秒数
args: 传入回调的位置参数
kwargs: 传入回调的关键字参数
'''
if not callable(callback):
raise ValueError("callback must be callable")
scheduler = get_clock_scheduler()
plugin_module = callback.__module__
plugin_class: Optional[str] = None
if hasattr(callback, "__self__") and callback.__self__ is self:
plugin_module = self.__class__.__module__
plugin_class = self.__class__.__name__
callback_name = getattr(callback, "__name__", None)
if not callback_name:
raise ValueError("callback must have a __name__ attribute")
task_id = scheduler.register_task(
plugin_module,
plugin_class,
callback_name,
delay_ms,
args=args,
kwargs=kwargs,
)
return task_id
def is_enable_plugin(self) -> bool: def is_enable_plugin(self) -> bool:
''' '''
@@ -109,6 +208,7 @@ def ImportPlugins(app: FastAPI, plugin_dir:str = "Plugins") -> None:
if plugin_tool_dir.IsDir() == False: if plugin_tool_dir.IsDir() == False:
config.Log("Error", f"插件目录不是目录: {plugin_tool_dir.GetFullPath()}") config.Log("Error", f"插件目录不是目录: {plugin_tool_dir.GetFullPath()}")
return return
plugin_registered_class = set[type[PluginInterface]]()
for dir_name, sub_dirs, files in plugin_tool_dir.DirWalk(): for dir_name, sub_dirs, files in plugin_tool_dir.DirWalk():
for file_name in files: for file_name in files:
module_file = ToolFile(dir_name)|file_name module_file = ToolFile(dir_name)|file_name
@@ -119,11 +219,12 @@ def ImportPlugins(app: FastAPI, plugin_dir:str = "Plugins") -> None:
plugin_class = getattr(module, class_name) plugin_class = getattr(module, class_name)
if not isinstance(plugin_class, type): if not isinstance(plugin_class, type):
continue continue
if issubclass(plugin_class, PluginInterface): if issubclass(plugin_class, PluginInterface) and plugin_class not in plugin_registered_class:
plugin_registered_class.add(plugin_class)
plugin = plugin_class() plugin = plugin_class()
if plugin.is_enable_plugin() == False: if plugin.is_enable_plugin() == False:
continue continue
router = plugin.execute(f"/{module_file}") router = plugin.execute()
if router: if router:
app.include_router(router, prefix=f"/api", tags=[plugin.get_plugin_tag()]) app.include_router(router, prefix=f"/api", tags=[plugin.get_plugin_tag()])
except Exception as e: except Exception as e:

View File

@@ -1,47 +1,49 @@
"""Callback路由处理""" """Callback路由处理"""
from ..Convention.Runtime.GlobalConfig import ProjectConfig from ..Convention.Runtime.GlobalConfig import *
from ..Convention.Runtime.Architecture import Architecture
from fastapi import APIRouter, Request from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
import re
from ..CoreModules.models import CallbackRequest from ..CoreModules.models import CallbackRequest
from ..CoreModules.plugin_interface import PluginInterface from ..CoreModules.plugin_interface import PluginInterface
config = ProjectConfig()
ALWAYS_RETURN_OK = config.FindItem("always_return_ok", True) # 返回ok可以避免重试 logger: ProjectConfig = Architecture.Get(ProjectConfig)
ALWAYS_RETURN_OK = logger.FindItem("always_return_ok", True) # 返回ok可以避免重试
logger.SaveProperties()
router = APIRouter() router = APIRouter()
@router.get("/callback") @router.get("/callback")
async def callback_verify(): async def callback_verify():
"""Callback可用性校验""" """Callback可用性校验"""
config.Log("Info", "收到Callback验证请求") logger.Log("Info", "收到Callback验证请求")
return JSONResponse({"result": "ok"}) return JSONResponse({"result": "ok"})
# 机器人名称模式(用于从@消息中提取,兼容半角/全角空格)
AT_PATTERN = re.compile(r'@[^\s]+[\s\u3000]+(.+)', re.DOTALL)
@router.post("/callback") def parse_message_after_at(message: str) -> str:
async def callback_receive(request: Request): # 去除首尾空格
"""Callback消息""" message = message.strip()
# 尝试提取@后的内容
at_match = AT_PATTERN.search(message)
if at_match:
return at_match.group(1).strip()
return message
@router.post("/callback/construct")
async def callback_receive_construct(callback_data: CallbackRequest):
"""以构造好的Callback消息进行处理, 已知方式"""
try: try:
# 解析请求数据
data = await request.json()
config.Log("Info", f"完整callback数据: {data}")
# 验证请求
try:
callback_data = CallbackRequest(**data)
except Exception as e:
config.Log("Error", f"请求数据验证失败: {e}")
if ALWAYS_RETURN_OK:
return JSONResponse({"result": "ok"})
else:
return JSONResponse({"result": "error", "message": str(e)})
# 解析指令 # 解析指令
content = callback_data.content content = parse_message_after_at(callback_data.content)
command = content.split(" ")[0] command = content.split(" ")[0]
message = content[len(command):].strip() message = content[len(command):].strip()
config.Log("Info", f"识别指令: command={command}") logger.Log("Info", f"识别指令: command={command}")
# 处理指令 # 处理指令
result = await handle_command(command, message, callback_data.chatid, callback_data.creator) result = await handle_command(command, message, callback_data.chatid, callback_data.creator)
@@ -51,7 +53,45 @@ async def callback_receive(request: Request):
return JSONResponse({"result": "ok"}) return JSONResponse({"result": "ok"})
except Exception as e: except Exception as e:
config.Log("Error", f"处理Callback异常: {e}") logger.Log("Error", f"处理Callback异常: {e}")
if ALWAYS_RETURN_OK:
return JSONResponse({"result": "ok"})
else:
return JSONResponse({"result": "error", "message": str(e)})
@router.post("/callback")
async def callback_receive(request: Request):
"""接受未知的Callback消息并进行处理, 默认方式"""
try:
# 解析请求数据
data = await request.json()
logger.Log("Info", f"完整callback数据: {data}")
# 验证请求
try:
callback_data = CallbackRequest(**data)
except Exception as e:
logger.Log("Error", f"请求数据验证失败: {e}")
if ALWAYS_RETURN_OK:
return JSONResponse({"result": "ok"})
else:
return JSONResponse({"result": "error", "message": str(e)})
# 解析指令
content = parse_message_after_at(callback_data.content)
command = content.split(" ")[0]
message = content[len(command):].strip()
logger.Log("Info", f"识别指令: command={command}")
# 处理指令
result = await handle_command(command, message, callback_data.chatid, callback_data.creator)
if result:
return JSONResponse({"result": "ok", "message": result})
else:
return JSONResponse({"result": "ok"})
except Exception as e:
logger.Log("Error", f"处理Callback异常: {e}")
if ALWAYS_RETURN_OK: if ALWAYS_RETURN_OK:
return JSONResponse({"result": "ok"}) return JSONResponse({"result": "ok"})
else: else:
@@ -74,11 +114,14 @@ async def handle_command(command: str, message: str,
try: try:
plugin = PluginInterface.plugin_instances.get(command, None) plugin = PluginInterface.plugin_instances.get(command, None)
if plugin: if plugin:
config.Log("Info", f"已找到插件注册指令: {command}, class: {plugin.__class__.__name__}") logger.Log("Info", f"已找到插件注册指令: {command}, class: {plugin.__class__.__name__}")
return await plugin.callback(message, chat_id, user_id) return await plugin.callback(message, chat_id, user_id)
elif "default" in PluginInterface.plugin_instances:
logger.Log("Info", f"未找到插件注册指令: {command}, 使用默认插件: {PluginInterface.plugin_instances["default"].__class__.__name__}")
return await PluginInterface.plugin_instances["default"].callback(command+" "+message, chat_id, user_id)
else: else:
return f"❌ 未识别指令: {command}" return f"❌ 未识别指令: {command}"
except Exception as e: except Exception as e:
config.Log("Error", f"处理指令异常: {e}", exc_info=True) logger.Log("Error", f"{ConsoleFrontColor.RED}处理指令异常: {e}{ConsoleFrontColor.RESET}")
return f"❌ 处理指令时出错: {str(e)}" return f"❌ 处理指令时出错: {str(e)}"

View File

@@ -1,5 +1,6 @@
"""健康检查路由""" """健康检查路由"""
from ..Convention.Runtime.Config import * from ..Convention.Runtime.Config import *
from ..Convention.Runtime.Architecture import Architecture
try: try:
import psutil import psutil
except ImportError: except ImportError:
@@ -10,17 +11,16 @@ from fastapi.responses import JSONResponse
from ..CoreModules.database import get_db from ..CoreModules.database import get_db
from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor from ..Convention.Runtime.GlobalConfig import ProjectConfig, ConsoleFrontColor
config = ProjectConfig() logger: ProjectConfig = Architecture.Get(ProjectConfig)
router = APIRouter() router = APIRouter()
@router.get("/health") @router.get("/health")
async def health_check(): async def health_check():
"""健康检查""" """健康检查"""
return JSONResponse({ return JSONResponse({
"status": "healthy", "status": "healthy",
"service": config.FindItem("app_name", "Application") "service": logger.FindItem("app_name", "Application")
}) })
@@ -53,9 +53,10 @@ async def system_stats():
} }
}) })
except Exception as e: except Exception as e:
config.Log("Error", f"{ConsoleFrontColor.RED}获取系统统计失败: {e}{ConsoleFrontColor.RESET}", exc_info=True) logger.Log("Error", f"{ConsoleFrontColor.RED}获取系统统计失败: {e}{ConsoleFrontColor.RESET}")
return JSONResponse( return JSONResponse(
status_code=500, status_code=500,
content={"error": str(e)} content={"error": str(e)}
) )
logger.SaveProperties()

3
swagger-ui-bundle.js Normal file

File diff suppressed because one or more lines are too long

3
swagger-ui.css Normal file

File diff suppressed because one or more lines are too long