233 lines
9.3 KiB
Python
233 lines
9.3 KiB
Python
from pickle import NONE
|
|
|
|
from ..Convention.Runtime.Config import *
|
|
from ..Convention.Runtime.GlobalConfig import ProjectConfig
|
|
from ..Convention.Runtime.Architecture import Architecture
|
|
from ..Convention.Runtime.File import ToolFile
|
|
from ..CoreModules.database import get_db
|
|
from ..CoreModules.clock_scheduler import get_clock_scheduler
|
|
from fastapi import APIRouter, FastAPI
|
|
from typing import *
|
|
from pydantic import *
|
|
from abc import ABC
|
|
import importlib
|
|
import os
|
|
|
|
config: ProjectConfig = Architecture.Get(ProjectConfig)
|
|
|
|
class DatabaseModel(BaseModel):
|
|
table_name: str = Field(default="main_table")
|
|
column_defs: Dict[str, str] = Field(default={})
|
|
|
|
class PluginInterface(ABC):
|
|
plugin_instances: Dict[str, "PluginInterface"] = {}
|
|
|
|
async def callback(self, message: str|None|Literal[""], chat_id: int, user_id: int) -> str|None:
|
|
'''
|
|
继承后重写该方法接受消息并返回消息
|
|
返回空字符串代表不进行反馈
|
|
Args:
|
|
message: 消息内容
|
|
chat_id: 会话ID
|
|
user_id: 用户ID
|
|
'''
|
|
config.Log("Warning", f"{ConsoleFrontColor.YELLOW}插件{self.__class__.__name__}未实现callback方法{ConsoleFrontColor.RESET}")
|
|
return ""
|
|
|
|
@final
|
|
def register_table(self, db_model: DatabaseModel) -> None:
|
|
db = get_db()
|
|
cursor = db.conn.cursor()
|
|
sql = f"CREATE TABLE IF NOT EXISTS {db_model.table_name} ({', '.join([f'{name} {field_def}' for name, field_def in db_model.column_defs.items()])})"
|
|
config.Log("Info", f"{ConsoleFrontColor.LIGHTMAGENTA_EX}为表 {db_model.table_name} 创建: {sql}{ConsoleFrontColor.RESET}")
|
|
try:
|
|
cursor.execute(sql)
|
|
except Exception as e:
|
|
config.Log("Error", f"{ConsoleFrontColor.RED}为表 {db_model.table_name} 创建失败: {e}{ConsoleFrontColor.RESET}")
|
|
|
|
try:
|
|
cursor.execute(f"PRAGMA table_info({db_model.table_name})")
|
|
existing_columns = {row["name"] for row in cursor.fetchall()}
|
|
except Exception as e:
|
|
config.Log("Error", f"{ConsoleFrontColor.RED}查询表 {db_model.table_name} 列信息失败: {e}{ConsoleFrontColor.RESET}")
|
|
return
|
|
|
|
constraint_keywords = ("PRIMARY KEY", "FOREIGN KEY", "UNIQUE", "CHECK")
|
|
for column_name, column_def in db_model.column_defs.items():
|
|
column_name_upper = column_name.upper()
|
|
column_def_upper = column_def.upper()
|
|
if any(keyword in column_name_upper for keyword in constraint_keywords) or any(
|
|
column_def_upper.startswith(keyword) for keyword in constraint_keywords
|
|
):
|
|
continue
|
|
if " " in column_name or "(" in column_name:
|
|
continue
|
|
if column_name in existing_columns:
|
|
continue
|
|
alter_sql = f"ALTER TABLE {db_model.table_name} ADD COLUMN {column_name} {column_def}"
|
|
config.Log(
|
|
"Info",
|
|
f"{ConsoleFrontColor.LIGHTMAGENTA_EX}为表 {db_model.table_name} 添加缺失列: {alter_sql}{ConsoleFrontColor.RESET}",
|
|
)
|
|
try:
|
|
cursor.execute(alter_sql)
|
|
except Exception as e:
|
|
config.Log("Error", f"{ConsoleFrontColor.RED}为表 {db_model.table_name} 添加列 {column_name} 失败: {e}{ConsoleFrontColor.RESET}")
|
|
continue
|
|
try:
|
|
db.conn.commit()
|
|
except Exception as e:
|
|
config.Log("Error", f"{ConsoleFrontColor.RED}提交表 {db_model.table_name} 列更新失败: {e}{ConsoleFrontColor.RESET}")
|
|
|
|
@final
|
|
def execute(self) -> Optional[APIRouter]:
|
|
'''
|
|
继承后是否返回路由决定是否启动该插件
|
|
若返回None, 则不启动该插件
|
|
'''
|
|
|
|
def setup() -> None:
|
|
# 在数据库保证必要的表和列存在
|
|
db_model = self.register_db_model()
|
|
if db_model is None:
|
|
pass
|
|
elif isinstance(db_model, DatabaseModel):
|
|
self.register_table(db_model)
|
|
else:
|
|
for model in db_model:
|
|
self.register_table(model)
|
|
self.wake_up()
|
|
|
|
Architecture.Register(self.__class__, self, setup, *self.dependencies())
|
|
router = APIRouter()
|
|
router.post(f"/{self.__class__.__name__}/callback")(self.generate_router_callback())
|
|
if self.generate_router_illustrated_guide() is not None:
|
|
router.get(f"/{self.__class__.__name__}")(self.generate_router_illustrated_guide())
|
|
return router
|
|
|
|
def generate_router_callback(self) -> Callable|Coroutine:
|
|
'''
|
|
继承后重写该方法生成路由回调函数
|
|
'''
|
|
return self.callback
|
|
|
|
def generate_router_illustrated_guide(self) -> Callable|Coroutine|None:
|
|
'''
|
|
继承后重写该方法生成渲染图鉴与攻略网页的函数
|
|
'''
|
|
return None
|
|
|
|
def dependencies(self) -> List[Type]:
|
|
'''
|
|
继承后重写该方法注册依赖插件
|
|
若返回[], 则不需要依赖插件
|
|
'''
|
|
return []
|
|
|
|
def wake_up(self) -> None:
|
|
'''
|
|
依赖插件全部注册后被调用, 用于通知插件实例依赖项已完全注册
|
|
'''
|
|
pass
|
|
|
|
@final
|
|
def register_plugin(self, command: str) -> None:
|
|
'''
|
|
将插件注册, 使其可以被命令匹配
|
|
'''
|
|
if command in PluginInterface.plugin_instances:
|
|
config.Log("Warning", f"{ConsoleFrontColor.YELLOW}插件{PluginInterface.plugin_instances[command].__class__.__name__}已注册命令{command}, 将被新插件{self.__class__.__name__}覆盖{ConsoleFrontColor.RESET}")
|
|
else:
|
|
config.Log("Info", f"插件{self.__class__.__name__}已注册命令{command}")
|
|
PluginInterface.plugin_instances[command] = self
|
|
|
|
def register_db_model(self) -> List[DatabaseModel]|DatabaseModel|None:
|
|
'''
|
|
继承后重写该方法注册数据库模型
|
|
'''
|
|
return None
|
|
|
|
def register_clock(
|
|
self,
|
|
callback: Callable[..., Any],
|
|
delay_ms: int,
|
|
*,
|
|
args: Optional[Sequence[Any]] = None,
|
|
kwargs: Optional[Dict[str, Any]] = None,
|
|
) -> int:
|
|
'''
|
|
注册一次性延时任务
|
|
Args:
|
|
callback: 时间到期后调用的函数/方法
|
|
delay_ms: 延迟毫秒数
|
|
args: 传入回调的位置参数
|
|
kwargs: 传入回调的关键字参数
|
|
'''
|
|
if not callable(callback):
|
|
raise ValueError("callback must be callable")
|
|
scheduler = get_clock_scheduler()
|
|
plugin_module = callback.__module__
|
|
plugin_class: Optional[str] = None
|
|
if hasattr(callback, "__self__") and callback.__self__ is self:
|
|
plugin_module = self.__class__.__module__
|
|
plugin_class = self.__class__.__name__
|
|
callback_name = getattr(callback, "__name__", None)
|
|
if not callback_name:
|
|
raise ValueError("callback must have a __name__ attribute")
|
|
task_id = scheduler.register_task(
|
|
plugin_module,
|
|
plugin_class,
|
|
callback_name,
|
|
delay_ms,
|
|
args=args,
|
|
kwargs=kwargs,
|
|
)
|
|
return task_id
|
|
|
|
def is_enable_plugin(self) -> bool:
|
|
'''
|
|
继承后重写该方法判断是否启用该插件
|
|
'''
|
|
return False
|
|
|
|
def get_plugin_tag(self) -> str:
|
|
return "plugin"
|
|
|
|
def ImportPlugins(app: FastAPI, plugin_dir:str = "Plugins") -> None:
|
|
'''
|
|
导入插件
|
|
|
|
Args:
|
|
app: FastAPI应用
|
|
plugin_dir: 插件目录
|
|
'''
|
|
plugin_tool_dir = ToolFile(plugin_dir)|None
|
|
if plugin_tool_dir.Exists() == False:
|
|
plugin_tool_dir.MustExistsPath()
|
|
return
|
|
if plugin_tool_dir.IsDir() == False:
|
|
config.Log("Error", f"插件目录不是目录: {plugin_tool_dir.GetFullPath()}")
|
|
return
|
|
plugin_registered_class = set[type[PluginInterface]]()
|
|
for dir_name, sub_dirs, files in plugin_tool_dir.DirWalk():
|
|
for file_name in files:
|
|
module_file = ToolFile(dir_name)|file_name
|
|
if file_name.endswith(".py") and not file_name.startswith("__"):
|
|
try:
|
|
module = importlib.import_module(f"{module_file.GetFullPath().replace(".py", '').replace('/', '.').replace('\\', '.')}")
|
|
for class_name in dir(module):
|
|
plugin_class = getattr(module, class_name)
|
|
if not isinstance(plugin_class, type):
|
|
continue
|
|
if issubclass(plugin_class, PluginInterface) and plugin_class not in plugin_registered_class:
|
|
plugin_registered_class.add(plugin_class)
|
|
plugin = plugin_class()
|
|
if plugin.is_enable_plugin() == False:
|
|
continue
|
|
router = plugin.execute()
|
|
if router:
|
|
app.include_router(router, prefix=f"/api", tags=[plugin.get_plugin_tag()])
|
|
except Exception as e:
|
|
config.Log("Error", f"{ConsoleFrontColor.RED}加载插件 {module_file} 失败: {e}{ConsoleFrontColor.RESET}")
|
|
|
|
__all__ = ["ImportPlugins", "PluginInterface", "DatabaseModel"] |