from ..Convention.Runtime.GlobalConfig import ProjectConfig from ..Convention.Runtime.Architecture import Architecture from ..CoreModules.database import get_db from fastapi import APIRouter, FastAPI from typing import * from pydantic import * from abc import ABC import importlib import os config = ProjectConfig() class DatabaseModel(BaseModel): table_name: str = Field(default="main_table") column_names: List[str] = Field(default=[]) column_defs: Dict[str, str] = Field(default={}) class PluginInterface(ABC): plugin_instances: Dict[str, "PluginInterface"] = {} def callback(self, message: str, chat_id: int, user_id: int) -> str: ''' 继承后重写该方法接受消息并返回消息 返回空字符串代表不进行反馈 Args: message: 消息内容 chat_id: 会话ID user_id: 用户ID Returns: str: 消息内容 ''' return "" def execute(self, path:str) -> Optional[APIRouter]: ''' 继承后是否返回路由决定是否启动该插件 若返回None, 则不启动该插件 ''' Architecture.Register(self.__class__, self, self.wake_up, *self.dependencies()) router = APIRouter() router.get(path)(self.generate_router_callback()) # 在数据库保证必要的表和列存在 db = get_db() db_model = self.register_db_model() if db_model: db.define_table(db_model.table_name) for field in db_model.column_names: db.define_column(db_model.table_name, field, db_model.column_defs[field]) return router def generate_router_callback(self) -> Callable|Coroutine: ''' 继承后重写该方法生成路由回调函数 ''' async def callback(*args: Any, **kwargs: Any) -> Any: pass return callback def dependencies(self) -> List[Type]: ''' 继承后重写该方法注册依赖插件 若返回[], 则不需要依赖插件 ''' return [] def wake_up(self) -> None: ''' 依赖插件全部注册后被调用, 用于通知插件实例依赖项已完全注册 ''' pass def register_plugin(self, command: str) -> None: ''' 将插件注册, 使其可以被命令匹配 ''' PluginInterface.plugin_instances[command] = self def register_db_model(self) -> DatabaseModel: ''' 继承后重写该方法注册数据库模型 ''' return DatabaseModel() def ImportPlugins(app: FastAPI, plugin_dir:str = "Plugins") -> None: ''' 导入插件 Args: app: FastAPI应用 plugin_dir: 插件目录 ''' for file in os.listdir(plugin_dir): if file.endswith(".py") and not file.startswith("__"): module_name = file[:-3] try: module = importlib.import_module(module_name) for class_name in dir(module): plugin_class = getattr(module, class_name) if issubclass(plugin_class, PluginInterface): plugin = plugin_class() router = plugin.execute(f"/{module_name}") if router: app.include_router(router, prefix=f"/api", tags=[module_name]) except Exception as e: config.Log("Error", f"加载插件{module_name}失败: {e}") __all__ = ["ImportPlugins", "PluginInterface", "DatabaseModel"]