from pickle import NONE from ..Convention.Runtime.Config import * from ..Convention.Runtime.GlobalConfig import ProjectConfig from ..Convention.Runtime.Architecture import Architecture from ..Convention.Runtime.File import ToolFile from ..CoreModules.database import get_db from fastapi import APIRouter, FastAPI from typing import * from pydantic import * from abc import ABC import importlib import os config = ProjectConfig() class DatabaseModel(BaseModel): table_name: str = Field(default="main_table") column_names: List[str] = Field(default=[]) column_defs: Dict[str, str] = Field(default={}) class PluginInterface(ABC): plugin_instances: Dict[str, "PluginInterface"] = {} async def callback(self, message: str, chat_id: int, user_id: int) -> str|None: ''' 继承后重写该方法接受消息并返回消息 返回空字符串代表不进行反馈 Args: message: 消息内容 chat_id: 会话ID user_id: 用户ID ''' config.Log("Warning", f"插件{self.__class__.__name__}未实现callback方法") return "" def execute(self, path:str) -> Optional[APIRouter]: ''' 继承后是否返回路由决定是否启动该插件 若返回None, 则不启动该插件 ''' Architecture.Register(self.__class__, self, self.wake_up, *self.dependencies()) router = APIRouter() router.post(path)(self.generate_router_callback()) # 在数据库保证必要的表和列存在 db = get_db() db_model = self.register_db_model() if db_model: db.define_table(db_model.table_name) for field in db_model.column_names: db.define_column(db_model.table_name, field, db_model.column_defs[field]) return router def generate_router_callback(self) -> Callable|Coroutine: ''' 继承后重写该方法生成路由回调函数 ''' async def callback(message: str, chat_id: int, user_id: int) -> Any: return await self.callback(message, chat_id, user_id) return callback def dependencies(self) -> List[Type]: ''' 继承后重写该方法注册依赖插件 若返回[], 则不需要依赖插件 ''' return [] def wake_up(self) -> None: ''' 依赖插件全部注册后被调用, 用于通知插件实例依赖项已完全注册 ''' pass def register_plugin(self, command: str) -> None: ''' 将插件注册, 使其可以被命令匹配 ''' if command in PluginInterface.plugin_instances: config.Log("Warning", f"插件{PluginInterface.plugin_instances[command].__class__.__name__}已注册命令{command}, 将被新插件{self.__class__.__name__}覆盖") else: config.Log("Info", f"插件{self.__class__.__name__}已注册命令{command}") PluginInterface.plugin_instances[command] = self def register_db_model(self) -> DatabaseModel: ''' 继承后重写该方法注册数据库模型 ''' return DatabaseModel() def is_enable_plugin(self) -> bool: ''' 继承后重写该方法判断是否启用该插件 ''' return False def get_plugin_tag(self) -> str: return "plugin" def ImportPlugins(app: FastAPI, plugin_dir:str = "Plugins") -> None: ''' 导入插件 Args: app: FastAPI应用 plugin_dir: 插件目录 ''' plugin_tool_dir = ToolFile(plugin_dir)|None if plugin_tool_dir.Exists() == False: plugin_tool_dir.MustExistsPath() return if plugin_tool_dir.IsDir() == False: config.Log("Error", f"插件目录不是目录: {plugin_tool_dir.GetFullPath()}") return plugin_registered_class = set[type[PluginInterface]]() for dir_name, sub_dirs, files in plugin_tool_dir.DirWalk(): for file_name in files: module_file = ToolFile(dir_name)|file_name if file_name.endswith(".py") and not file_name.startswith("__"): try: module = importlib.import_module(f"{module_file.GetFullPath().replace(".py", '').replace('/', '.').replace('\\', '.')}") for class_name in dir(module): plugin_class = getattr(module, class_name) if not isinstance(plugin_class, type): continue if issubclass(plugin_class, PluginInterface) and plugin_class not in plugin_registered_class: plugin_registered_class.add(plugin_class) plugin = plugin_class() if plugin.is_enable_plugin() == False: continue router = plugin.execute(f"/{module_file.GetFullPath().replace(".py", '')}/{class_name}") if router: app.include_router(router, prefix=f"/api", tags=[plugin.get_plugin_tag()]) except Exception as e: config.Log("Error", f"{ConsoleFrontColor.RED}加载插件 {module_file} 失败: {e}{ConsoleFrontColor.RESET}") __all__ = ["ImportPlugins", "PluginInterface", "DatabaseModel"]