Files
PWF/CoreModules/plugin_interface.py
2025-11-06 10:55:39 +08:00

132 lines
4.7 KiB
Python

from pickle import NONE
from ..Convention.Runtime.Config import *
from ..Convention.Runtime.GlobalConfig import ProjectConfig
from ..Convention.Runtime.Architecture import Architecture
from ..Convention.Runtime.File import ToolFile
from ..CoreModules.database import get_db
from fastapi import APIRouter, FastAPI
from typing import *
from pydantic import *
from abc import ABC
import importlib
import os
config = ProjectConfig()
class DatabaseModel(BaseModel):
table_name: str = Field(default="main_table")
column_names: List[str] = Field(default=[])
column_defs: Dict[str, str] = Field(default={})
class PluginInterface(ABC):
plugin_instances: Dict[str, "PluginInterface"] = {}
async def callback(self, message: str, chat_id: int, user_id: int) -> str|None:
'''
继承后重写该方法接受消息并返回消息
返回空字符串代表不进行反馈
Args:
message: 消息内容
chat_id: 会话ID
user_id: 用户ID
'''
config.Log("Warning", f"插件{self.__class__.__name__}未实现callback方法")
return ""
def execute(self, path:str) -> Optional[APIRouter]:
'''
继承后是否返回路由决定是否启动该插件
若返回None, 则不启动该插件
'''
Architecture.Register(self.__class__, self, self.wake_up, *self.dependencies())
router = APIRouter()
router.post(path)(self.generate_router_callback())
# 在数据库保证必要的表和列存在
db = get_db()
db_model = self.register_db_model()
if db_model:
db.define_table(db_model.table_name)
for field in db_model.column_names:
db.define_column(db_model.table_name, field, db_model.column_defs[field])
return router
def generate_router_callback(self) -> Callable|Coroutine:
'''
继承后重写该方法生成路由回调函数
'''
async def callback(message: str, chat_id: int, user_id: int) -> Any:
return await self.callback(message, chat_id, user_id)
return callback
def dependencies(self) -> List[Type]:
'''
继承后重写该方法注册依赖插件
若返回[], 则不需要依赖插件
'''
return []
def wake_up(self) -> None:
'''
依赖插件全部注册后被调用, 用于通知插件实例依赖项已完全注册
'''
pass
def register_plugin(self, command: str) -> None:
'''
将插件注册, 使其可以被命令匹配
'''
PluginInterface.plugin_instances[command] = self
def register_db_model(self) -> DatabaseModel:
'''
继承后重写该方法注册数据库模型
'''
return DatabaseModel()
def is_enable_plugin(self) -> bool:
'''
继承后重写该方法判断是否启用该插件
'''
return False
def get_plugin_tag(self) -> str:
return "plugin"
def ImportPlugins(app: FastAPI, plugin_dir:str = "Plugins") -> None:
'''
导入插件
Args:
app: FastAPI应用
plugin_dir: 插件目录
'''
plugin_tool_dir = ToolFile(plugin_dir)|None
if plugin_tool_dir.Exists() == False:
plugin_tool_dir.MustExistsPath()
return
if plugin_tool_dir.IsDir() == False:
config.Log("Error", f"插件目录不是目录: {plugin_tool_dir.GetFullPath()}")
return
for dir_name, sub_dirs, files in plugin_tool_dir.DirWalk():
for file_name in files:
module_file = ToolFile(dir_name)|file_name
if file_name.endswith(".py") and not file_name.startswith("__"):
try:
module = importlib.import_module(f"{module_file.GetFullPath().replace(".py", '').replace('/', '.').replace('\\', '.')}")
for class_name in dir(module):
plugin_class = getattr(module, class_name)
if not isinstance(plugin_class, type):
continue
if issubclass(plugin_class, PluginInterface):
plugin = plugin_class()
if plugin.is_enable_plugin() == False:
continue
router = plugin.execute(f"/{module_file}")
if router:
app.include_router(router, prefix=f"/api", tags=[plugin.get_plugin_tag()])
except Exception as e:
config.Log("Error", f"{ConsoleFrontColor.RED}加载插件 {module_file} 失败: {e}{ConsoleFrontColor.RESET}")
__all__ = ["ImportPlugins", "PluginInterface", "DatabaseModel"]