from pickle import NONE from ..Convention.Runtime.Config import * from ..Convention.Runtime.GlobalConfig import ProjectConfig from ..Convention.Runtime.Architecture import Architecture from ..Convention.Runtime.File import ToolFile from ..CoreModules.database import get_db from ..CoreModules.clock_scheduler import get_clock_scheduler from fastapi import APIRouter, FastAPI from typing import * from pydantic import * from abc import ABC import importlib import os config: ProjectConfig = Architecture.Get(ProjectConfig) class DatabaseModel(BaseModel): table_name: str = Field(default="main_table") column_defs: Dict[str, str] = Field(default={}) class PluginInterface(ABC): plugin_instances: Dict[str, "PluginInterface"] = {} async def callback(self, message: str|None|Literal[""], chat_id: int, user_id: int) -> str|None: ''' 继承后重写该方法接受消息并返回消息 返回空字符串代表不进行反馈 Args: message: 消息内容 chat_id: 会话ID user_id: 用户ID ''' config.Log("Warning", f"{ConsoleFrontColor.YELLOW}插件{self.__class__.__name__}未实现callback方法{ConsoleFrontColor.RESET}") return "" @final def register_table(self, db_model: DatabaseModel) -> None: db = get_db() cursor = db.conn.cursor() sql = f"CREATE TABLE IF NOT EXISTS {db_model.table_name} ({', '.join([f'{name} {field_def}' for name, field_def in db_model.column_defs.items()])})" config.Log("Info", f"{ConsoleFrontColor.LIGHTMAGENTA_EX}为表 {db_model.table_name} 创建: {sql}{ConsoleFrontColor.RESET}") try: cursor.execute(sql) except Exception as e: config.Log("Error", f"{ConsoleFrontColor.RED}为表 {db_model.table_name} 创建失败: {e}{ConsoleFrontColor.RESET}") try: cursor.execute(f"PRAGMA table_info({db_model.table_name})") existing_columns = {row["name"] for row in cursor.fetchall()} except Exception as e: config.Log("Error", f"{ConsoleFrontColor.RED}查询表 {db_model.table_name} 列信息失败: {e}{ConsoleFrontColor.RESET}") return constraint_keywords = ("PRIMARY KEY", "FOREIGN KEY", "UNIQUE", "CHECK") for column_name, column_def in db_model.column_defs.items(): column_name_upper = column_name.upper() column_def_upper = column_def.upper() if any(keyword in column_name_upper for keyword in constraint_keywords) or any( column_def_upper.startswith(keyword) for keyword in constraint_keywords ): continue if " " in column_name or "(" in column_name: continue if column_name in existing_columns: continue alter_sql = f"ALTER TABLE {db_model.table_name} ADD COLUMN {column_name} {column_def}" config.Log( "Info", f"{ConsoleFrontColor.LIGHTMAGENTA_EX}为表 {db_model.table_name} 添加缺失列: {alter_sql}{ConsoleFrontColor.RESET}", ) try: cursor.execute(alter_sql) except Exception as e: config.Log("Error", f"{ConsoleFrontColor.RED}为表 {db_model.table_name} 添加列 {column_name} 失败: {e}{ConsoleFrontColor.RESET}") continue try: db.conn.commit() except Exception as e: config.Log("Error", f"{ConsoleFrontColor.RED}提交表 {db_model.table_name} 列更新失败: {e}{ConsoleFrontColor.RESET}") @final def execute(self) -> Optional[APIRouter]: ''' 继承后是否返回路由决定是否启动该插件 若返回None, 则不启动该插件 ''' def setup() -> None: # 在数据库保证必要的表和列存在 db_model = self.register_db_model() if db_model is None: pass elif isinstance(db_model, DatabaseModel): self.register_table(db_model) else: for model in db_model: self.register_table(model) self.wake_up() Architecture.Register(self.__class__, self, setup, *self.dependencies()) router = APIRouter() router.post(f"/{self.__class__.__name__}/callback")(self.generate_router_callback()) if self.generate_router_illustrated_guide() is not None: router.get(f"/{self.__class__.__name__}")(self.generate_router_illustrated_guide()) return router def generate_router_callback(self) -> Callable|Coroutine: ''' 继承后重写该方法生成路由回调函数 ''' return self.callback def generate_router_illustrated_guide(self) -> Callable|Coroutine|None: ''' 继承后重写该方法生成渲染图鉴与攻略网页的函数 ''' return None def dependencies(self) -> List[Type]: ''' 继承后重写该方法注册依赖插件 若返回[], 则不需要依赖插件 ''' return [] def wake_up(self) -> None: ''' 依赖插件全部注册后被调用, 用于通知插件实例依赖项已完全注册 ''' pass @final def register_plugin(self, command: str) -> None: ''' 将插件注册, 使其可以被命令匹配 ''' if command in PluginInterface.plugin_instances: config.Log("Warning", f"{ConsoleFrontColor.YELLOW}插件{PluginInterface.plugin_instances[command].__class__.__name__}已注册命令{command}, 将被新插件{self.__class__.__name__}覆盖{ConsoleFrontColor.RESET}") else: config.Log("Info", f"插件{self.__class__.__name__}已注册命令{command}") PluginInterface.plugin_instances[command] = self def register_db_model(self) -> List[DatabaseModel]|DatabaseModel|None: ''' 继承后重写该方法注册数据库模型 ''' return None def register_clock( self, callback: Callable[..., Any], delay_ms: int, *, args: Optional[Sequence[Any]] = None, kwargs: Optional[Dict[str, Any]] = None, ) -> int: ''' 注册一次性延时任务 Args: callback: 时间到期后调用的函数/方法 delay_ms: 延迟毫秒数 args: 传入回调的位置参数 kwargs: 传入回调的关键字参数 ''' if not callable(callback): raise ValueError("callback must be callable") scheduler = get_clock_scheduler() plugin_module = callback.__module__ plugin_class: Optional[str] = None if hasattr(callback, "__self__") and callback.__self__ is self: plugin_module = self.__class__.__module__ plugin_class = self.__class__.__name__ callback_name = getattr(callback, "__name__", None) if not callback_name: raise ValueError("callback must have a __name__ attribute") task_id = scheduler.register_task( plugin_module, plugin_class, callback_name, delay_ms, args=args, kwargs=kwargs, ) return task_id def is_enable_plugin(self) -> bool: ''' 继承后重写该方法判断是否启用该插件 ''' return False def get_plugin_tag(self) -> str: return "plugin" def ImportPlugins(app: FastAPI, plugin_dir:str = "Plugins") -> None: ''' 导入插件 Args: app: FastAPI应用 plugin_dir: 插件目录 ''' plugin_tool_dir = ToolFile(plugin_dir)|None if plugin_tool_dir.Exists() == False: plugin_tool_dir.MustExistsPath() return if plugin_tool_dir.IsDir() == False: config.Log("Error", f"插件目录不是目录: {plugin_tool_dir.GetFullPath()}") return plugin_registered_class = set[type[PluginInterface]]() for dir_name, sub_dirs, files in plugin_tool_dir.DirWalk(): for file_name in files: module_file = ToolFile(dir_name)|file_name if file_name.endswith(".py") and not file_name.startswith("__"): try: module = importlib.import_module(f"{module_file.GetFullPath().replace(".py", '').replace('/', '.').replace('\\', '.')}") for class_name in dir(module): plugin_class = getattr(module, class_name) if not isinstance(plugin_class, type): continue if issubclass(plugin_class, PluginInterface) and plugin_class not in plugin_registered_class: plugin_registered_class.add(plugin_class) plugin = plugin_class() if plugin.is_enable_plugin() == False: continue router = plugin.execute() if router: app.include_router(router, prefix=f"/api", tags=[plugin.get_plugin_tag()]) except Exception as e: config.Log("Error", f"{ConsoleFrontColor.RED}加载插件 {module_file} 失败: {e}{ConsoleFrontColor.RESET}") __all__ = ["ImportPlugins", "PluginInterface", "DatabaseModel"]