放弃打包流程, 退化目录

This commit is contained in:
2025-11-05 11:15:49 +08:00
parent e07d604d12
commit d102ed124c
17 changed files with 1 additions and 38 deletions

230
Runtime/Architecture.py Normal file
View File

@@ -0,0 +1,230 @@
from .Config import *
from .Reflection import *
from abc import ABC, abstractmethod
class ISignal(ABC):
pass
class IModel(ABC):
pass
class IDataModel(ABC, IModel):
@abstractmethod
def Save(self) -> str:
pass
@abstractmethod
def Load(self, data:str) -> None:
pass
class IConvertable[T](ABC):
@abstractmethod
def ConvertTo(self) -> T:
pass
class IConvertModel[T](IConvertable[T], IModel):
pass
class SingletonModel[T](IModel):
_InjectInstances:Dict[type,Any] = {}
@staticmethod
def GetInstance(t:Typen[T]) -> T:
return SingletonModel._InjectInstances[t]
@staticmethod
def SetInstance(t:Typen[T], obj:T) -> None:
SingletonModel._InjectInstances[t] = obj
def __init__(self, t:Typen[T]) -> None:
self.typen: type = t
class DependenceModel(IConvertModel[bool]):
def __init__(self, queries:Sequence[IConvertModel[bool]]) -> None:
self.queries:list[IConvertModel[bool]] = list(queries)
@override
def ConvertTo(self) -> bool:
for query in self.queries:
if query.ConvertTo() == False:
return False
return True
def __iter__(self):
return iter(self.queries)
SignalListener = Callable[[ISignal], None]
class Architecture:
@staticmethod
def FormatType(t:type) -> str:
return f"{t.__module__}::{t.__name__}"
@staticmethod
def LoadFromFormat(data:str, exception:Exception|None=None) -> type|None:
try:
module,name = data.split("::")
return StringWithModel2Type(name, module=module)
except Exception as ex:
if exception is not None:
exception = ex
return None
@classmethod
def InternalReset(cls) -> None:
# Register System
cls._RegisteredObjects.clear()
cls._RegisteringRuntime.clear()
# Signal Listener
cls._SignalListener.clear()
# Timeline/Chain
cls._TimelineQueues.clear()
cls._TimelineContentID = 0
#region Objects Registered
class TypeQuery(IConvertModel[bool]):
def __init__(self, queryType:type) -> None:
self._queryType = queryType
@override
def ConvertTo(self) -> bool:
return self._queryType in Architecture._RegisteredObjects
class Registering(IConvertModel[bool]):
def __init__(self, registerSlot:type, target:Any, dependences:DependenceModel, action:Action) -> None:
self.registerSlot = registerSlot
self.target = target
self.dependences = dependences
self.action = action
@override
def ConvertTo(self) -> bool:
return self.dependences.ConvertTo()
_RegisteringRuntime: Dict[type, Registering] = {}
_RegisteredObjects: Dict[type, Any] = {}
@classmethod
def _InternalRegisteringComplete(cls) -> None:
CompletedSet: Set[Architecture.Registering] = set()
for dependence in cls._RegisteringRuntime.keys():
if cls._RegisteringRuntime[dependence].dependences.ConvertTo():
CompletedSet.add(cls._RegisteringRuntime[dependence])
for complete in CompletedSet:
del cls._RegisteringRuntime[complete.registerSlot]
complete.action()
cls._RegisteredObjects[complete.registerSlot] = complete.target
if len(CompletedSet) > 0:
cls._InternalRegisteringComplete()
@classmethod
def Register(cls, slot:type, target:Any, action:Action, *dependences:type) -> DependenceModel:
if slot in cls._RegisteringRuntime:
raise InvalidOperationError("Illegal duplicate registrations")
cls._RegisteringRuntime[slot] = Architecture.Registering(slot, target, DependenceModel(Architecture.TypeQuery(dependence) for dependence in dependences), action)
cls._InternalRegisteringComplete()
return cls._RegisteringRuntime[slot].dependences
@classmethod
def Contains(cls, type_:type) -> bool:
return type_ in cls._RegisteredObjects
@classmethod
def Get(cls, type_:type) -> Any:
return cls._RegisteredObjects[type_]
@classmethod
def Unregister(cls, slot:type) -> bool:
if slot in cls._RegisteredObjects:
del cls._RegisteredObjects[slot]
return True
if slot in cls._RegisteringRuntime:
del cls._RegisteringRuntime[slot]
return True
return False
#endregion
#region Signal & Update
_SignalListener: Dict[type, List[SignalListener]] = {}
@classmethod
def AddListener(cls, slot:type, listener:SignalListener) -> None:
if slot not in cls._SignalListener:
cls._SignalListener[slot] = []
cls._SignalListener[slot].append(listener)
@classmethod
def SendMessage(cls, slot:type, signal:ISignal):
if slot in cls._SignalListener:
for listener in cls._SignalListener[slot]:
listener(signal)
#endregion
#region Timeline/Chain & Update
class TimelineQueueEntry:
def __init__(self):
self.predicate: Callable[[], bool] = lambda: False
self.actions: List[Action] = []
class Timeline:
def __init__(self):
self.predicate_mapper: Dict[Callable[[], bool], int] = {}
self.queue: List[Architecture.TimelineQueueEntry] = []
self.context: int = 0
_TimelineQueues: Dict[int, Timeline] = {}
_TimelineContentID: int = 0
@classmethod
def CreateTimeline(cls) -> int:
cls._TimelineQueues[cls._TimelineContentID] = cls.Timeline()
cls._TimelineContentID += 1
return cls._TimelineContentID - 1
@classmethod
def AddStep(cls, timeline_id:int, predicate:Callable[[], bool], *actions:Action):
timeline = cls._TimelineQueues[timeline_id]
if predicate in timeline.predicate_mapper:
time = timeline.predicate_mapper[predicate]
timeline.queue[time].actions.extend(actions)
else:
time = len(timeline.queue)
timeline.predicate_mapper[predicate] = time
entry = cls.TimelineQueueEntry()
entry.predicate = predicate
entry.actions = list(actions)
timeline.queue.append(entry)
@classmethod
def UpdateTimeline(cls):
stats = True
while stats:
stats = False
for timeline in cls._TimelineQueues.values():
if timeline.context < len(timeline.queue):
if timeline.queue[timeline.context].predicate():
stats = True
for action in timeline.queue[timeline.context].actions:
action()
timeline.context += 1
@classmethod
def ResetTimelineContext(cls, timeline_id:int):
cls._TimelineQueues[timeline_id].context = 0
#endregion

353
Runtime/Asynchrony.py Normal file
View File

@@ -0,0 +1,353 @@
from .Config import *
from .Reflection import *
from collections import defaultdict
import asyncio
import threading
from typing import Optional
from pydantic import BaseModel
from abc import ABC, abstractmethod
class AsyncContextDetector:
"""异步上下文检测工具类"""
@staticmethod
def is_in_async_context() -> bool:
"""检查是否在异步上下文中运行"""
try:
asyncio.current_task()
return True
except RuntimeError:
return False
@staticmethod
def get_current_loop() -> Optional[asyncio.AbstractEventLoop]:
"""获取当前事件循环如果没有则返回None"""
try:
return asyncio.get_running_loop()
except RuntimeError:
return None
@staticmethod
def ensure_async_context_safe(operation_name: str) -> None:
"""确保在异步上下文中执行是安全的"""
if AsyncContextDetector.is_in_async_context():
raise RuntimeError(
f"Cannot perform '{operation_name}' from within an async context. "
f"Use await or async methods instead."
)
class AsyncFieldAccessor:
"""异步字段访问器,封装字段访问逻辑"""
def __init__(
self,
async_fields: Dict[str, 'AsynchronyExpression'],
origin_fields: Dict[str, FieldInfo]
) -> None:
self._async_fields = async_fields
self._origin_fields = origin_fields
async def get_field_value_async(self, field_name: str):
"""异步获取字段值"""
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
return await self._async_fields[field_name].get_value()
def get_field_value_sync(self, field_name: str):
"""同步获取字段值(仅在非异步上下文中使用)"""
AsyncContextDetector.ensure_async_context_safe(f"sync access to field '{field_name}'")
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
async_expr = self._async_fields[field_name]
if not async_expr.is_initialize and async_expr.timeout > 0:
# 需要等待但在同步上下文中使用run_async
return run_async(async_expr.get_value())
elif not async_expr.is_initialize:
raise RuntimeError(f"Field '{field_name}' is not initialized and has no timeout")
else:
return run_async(async_expr.get_value())
def is_field_initialized(self, field_name: str) -> bool:
"""检查字段是否已初始化"""
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
return self._async_fields[field_name].is_initialize
def set_field_value(self, field_name: str, value: Any) -> None:
"""设置字段值"""
if field_name not in self._origin_fields:
raise AttributeError(f"No async field '{field_name}' found")
self._async_fields[field_name].set_value(value)
class AsynchronyUninitialized:
"""表示未初始化状态的单例类"""
__instance__ = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls.__instance__ is None:
with cls._lock:
if cls.__instance__ is None:
cls.__instance__ = super().__new__(cls)
return cls.__instance__
def __repr__(self):
return "uninitialized"
def __str__(self):
return "None"
class AsynchronyExpression:
def __init__(
self,
field: FieldInfo,
value: Any = AsynchronyUninitialized(),
*,
time_wait: float = 0.1,
timeout: float = 0,
callback: Optional[Action] = None,
):
'''
参数:
field: 字段
value: 初始化, 默认为AsynchronyUninitialized, 即无初始化
time_wait: 等待时间, 默认为0.1秒
timeout: 超时时间, 默认为0秒
callback: 回调函数, 默认为None, 当状态为无初始化时get_value会调用callback
'''
self.field = field
self._value = value
self.callback = callback
self.is_initialize = not isinstance(value, AsynchronyUninitialized)
self.time_wait = time_wait
self.timeout = timeout
def get_value_sync(self):
if self.is_initialize:
return self._value
elif self.callback is not None:
self.callback()
if self.is_initialize:
return self._value
else:
raise RuntimeError(f"Field {self.field.FieldName} is not initialized")
async def get_value(self):
"""异步获取字段值,改进的超时机制"""
if self.is_initialize:
return self._value
elif self.callback is not None:
self.callback()
if self.timeout > 0:
try:
# 使用 asyncio.wait_for 提供更精确的超时控制
async def wait_for_initialization():
while not self.is_initialize:
await asyncio.sleep(self.time_wait)
return self._value
return await asyncio.wait_for(wait_for_initialization(), timeout=self.timeout)
except asyncio.TimeoutError:
raise TimeoutError(f"Timeout waiting for uninitialized field {self.field.FieldName}")
else:
# 无超时,一直等待
while not self.is_initialize:
await asyncio.sleep(self.time_wait)
return self._value
def set_value(self, value: Any) -> None:
"""设置字段值"""
if isinstance(value, AsynchronyUninitialized):
self.set_uninitialized()
elif self.field.Verify(type(value)):
self._value = value
self.is_initialize = True
else:
raise ValueError(f"Value {value} is not valid for field {self.field.FieldName}")
def SetUninitialized(self) -> None:
"""设置为未初始化状态(保持兼容性的旧方法名)"""
self.set_uninitialized()
def set_uninitialized(self) -> None:
"""设置为未初始化状态"""
if self.is_initialize:
del self._value
self._value = AsynchronyUninitialized()
self.is_initialize = False
class Asynchronous(ABC):
__Asynchronous_Origin_Fields__: Dict[Type, Dict[str, FieldInfo]] = defaultdict(dict)
_fields_lock = threading.Lock()
def _GetAsynchronousOriginFields(self) -> Dict[str, FieldInfo]:
return Asynchronous.__Asynchronous_Origin_Fields__[type(self)]
def __init__(self, **kwargs: Dict[str, dict]):
super().__init__()
self.__Asynchronous_Fields__: Dict[str, AsynchronyExpression] = {}
# 使用线程锁保护类变量访问
with Asynchronous._fields_lock:
origin_fields = self._GetAsynchronousOriginFields()
for field_info in TypeManager.GetInstance().CreateOrGetRefTypeFromType(type(self)).GetAllFields():
if field_info.FieldName == "__Asynchronous_Origin_Fields__":
continue
origin_fields[field_info.FieldName] = field_info
self.__Asynchronous_Fields__[field_info.FieldName] = AsynchronyExpression(
field_info, **kwargs.get(field_info.FieldName, {})
)
# 创建字段访问器以提升性能
self._field_accessor = AsyncFieldAccessor(self.__Asynchronous_Fields__, origin_fields)
def __getattribute__(self, name: str) -> Any:
# 快速路径:非异步字段直接返回
if name in ("__Asynchronous_Fields__", "_GetAsynchronousOriginFields", "_field_accessor"):
return super().__getattribute__(name)
# 一次性获取所需属性,避免重复调用
try:
field_accessor:AsyncFieldAccessor = super().__getattribute__("_field_accessor")
origin_fields:Dict[str, FieldInfo] = super().__getattribute__("_GetAsynchronousOriginFields")()
except AttributeError:
# 对象可能尚未完全初始化
return super().__getattribute__(name)
if name in origin_fields:
# 这是一个异步字段
if AsyncContextDetector.is_in_async_context():
# 在异步上下文中,提供友好的错误提示
async_fields:Dict[str, AsynchronyExpression] = super().__getattribute__("__Asynchronous_Fields__")
async_expr = async_fields[name]
if not async_expr.is_initialize:
timeout_info = f" with {async_expr.timeout}s timeout" if async_expr.timeout > 0 else ""
raise RuntimeError(
f"Field '{name}' is not initialized{timeout_info}. "
)
else:
# 字段已初始化,直接返回值
return async_expr.get_value_sync()
else:
# 在同步上下文中,使用字段访问器
try:
return field_accessor.get_field_value_sync(name)
except RuntimeError as e:
if "Cannot perform" in str(e):
# 重新包装错误信息,提供更友好的提示
raise RuntimeError(
f"Cannot access async field '{name}' from sync context when it requires initialization. "
f"Use async context or ensure field is pre-initialized."
) from e
else:
raise
return super().__getattribute__(name)
def __setattr__(self, name: str, value: Any) -> None:
if name in ("__Asynchronous_Fields__", "_GetAsynchronousOriginFields", "_field_accessor"):
super().__setattr__(name, value)
elif hasattr(self, '_field_accessor'):
# 对象已初始化,使用字段访问器
try:
field_accessor = super().__getattribute__("_field_accessor")
field_accessor.set_field_value(name, value)
return
except AttributeError:
# 不是异步字段
pass
super().__setattr__(name, value)
def __delattr__(self, name: str) -> None:
if name in ("__Asynchronous_Fields__", "_GetAsynchronousOriginFields", "_field_accessor"):
super().__delattr__(name)
elif hasattr(self, '_field_accessor'):
# 对象已初始化,使用字段访问器
try:
field_accessor = super().__getattribute__("_field_accessor")
origin_fields = super().__getattribute__("_GetAsynchronousOriginFields")()
if name in origin_fields:
async_fields = super().__getattribute__("__Asynchronous_Fields__")
async_fields[name].set_uninitialized()
return
except AttributeError:
# 不是异步字段
pass
super().__delattr__(name)
def is_field_initialized(self, field_name: str) -> bool:
"""检查字段是否已初始化"""
return self._field_accessor.is_field_initialized(field_name)
def run_until_complete(coro: Coroutine) -> Any:
"""Gets an existing event loop to run the coroutine.
If there is no existing event loop, creates a new one.
"""
try:
# Check if there's an existing event loop
loop = asyncio.get_event_loop()
# If we're here, there's an existing loop but it's not running
return loop.run_until_complete(coro)
except RuntimeError:
# If we can't get the event loop, we're likely in a different thread, or its already running
try:
return asyncio.run(coro)
except RuntimeError:
raise RuntimeError(
"Detected nested async. Please use nest_asyncio.apply() to allow nested event loops."
"Or, use async entry methods like `aquery()`, `aretriever`, `achat`, etc."
)
def run_async_coroutine(coro: Coroutine) -> Any:
try:
# Check if there's an existing event loop
loop = asyncio.get_event_loop()
# If we're here, there's an existing loop but it's not running
return loop.create_task(coro)
except RuntimeError:
# If we can't get the event loop, we're likely in a different thread, or its already running
try:
return asyncio.run(coro)
except RuntimeError:
raise RuntimeError(
"Detected nested async. Please use nest_asyncio.apply() to allow nested event loops."
"Or, use async entry methods like `aquery()`, `aretriever`, `achat`, etc."
)
def run_async(coro: Coroutine):
"""安全地运行异步协程,避免事件循环死锁"""
# 使用统一的异步上下文检测
AsyncContextDetector.ensure_async_context_safe("run_async")
# 尝试获取当前事件循环
current_loop = AsyncContextDetector.get_current_loop()
if current_loop is not None and not current_loop.is_running():
# 有事件循环但未运行,直接使用
return current_loop.run_until_complete(coro)
elif current_loop is None:
# 没有事件循环,创建新的
try:
return asyncio.run(coro)
except RuntimeError as e:
raise RuntimeError(
"Failed to run async coroutine. "
"Please ensure proper async environment or use nest_asyncio.apply() for nested loops."
) from e
else:
# 事件循环正在运行这种情况应该被AsyncContextDetector捕获
raise RuntimeError(
"Unexpected state: running event loop detected but context check passed. "
"This should not happen."
)

511
Runtime/Config.py Normal file
View File

@@ -0,0 +1,511 @@
from types import TracebackType
from typing import *
from abc import *
import sys
import threading
import traceback
import datetime
# region ansi colorful
# Copyright Jonathan Hartley 2013. BSD 3-Clause license
'''
This module generates ANSI character codes to printing colors to terminals.
See: http://en.wikipedia.org/wiki/ANSI_escape_code
'''
CSI = '\033['
OSC = '\033]'
BEL = '\a'
def code_to_chars(code):
return CSI + str(code) + 'm'
def set_title(title):
return OSC + '2;' + title + BEL
def clear_screen(mode=2):
return CSI + str(mode) + 'J'
def clear_line(mode=2):
return CSI + str(mode) + 'K'
class AnsiCodes(object):
def __init__(self):
# the subclasses declare class attributes which are numbers.
# Upon instantiation we define instance attributes, which are the same
# as the class attributes but wrapped with the ANSI escape sequence
for name in dir(self):
if not name.startswith('_'):
value = getattr(self, name)
setattr(self, name, code_to_chars(value))
class ConsoleCursor(object):
def UP(self, n=1):
return CSI + str(n) + 'A'
def DOWN(self, n=1):
return CSI + str(n) + 'B'
def FORWARD(self, n=1):
return CSI + str(n) + 'C'
def BACK(self, n=1):
return CSI + str(n) + 'D'
def POS(self, x=1, y=1):
return CSI + str(y) + ';' + str(x) + 'H'
class ConsoleFrontColorClass(AnsiCodes):
BLACK = 30
RED = 31
GREEN = 32
YELLOW = 33
BLUE = 34
MAGENTA = 35
CYAN = 36
WHITE = 37
RESET = 39
# These are fairly well supported, but not part of the standard.
LIGHTBLACK_EX = 90
LIGHTRED_EX = 91
LIGHTGREEN_EX = 92
LIGHTYELLOW_EX = 93
LIGHTBLUE_EX = 94
LIGHTMAGENTA_EX = 95
LIGHTCYAN_EX = 96
LIGHTWHITE_EX = 97
ConsoleFrontColor = ConsoleFrontColorClass()
class ConsoleBackgroundColorClass(AnsiCodes):
BLACK = 40
RED = 41
GREEN = 42
YELLOW = 43
BLUE = 44
MAGENTA = 45
CYAN = 46
WHITE = 47
RESET = 49
# These are fairly well supported, but not part of the standard.
LIGHTBLACK_EX = 100
LIGHTRED_EX = 101
LIGHTGREEN_EX = 102
LIGHTYELLOW_EX = 103
LIGHTBLUE_EX = 104
LIGHTMAGENTA_EX = 105
LIGHTCYAN_EX = 106
LIGHTWHITE_EX = 107
ConsoleBackgroundColor = ConsoleBackgroundColorClass()
class ConsoleStyleClass(AnsiCodes):
BRIGHT = 1
DIM = 2
NORMAL = 22
RESET_ALL = 0
ConsoleStyle = ConsoleStyleClass()
def PrintColorful(color:str, *args, is_reset:bool=True, **kwargs):
with lock_guard():
if is_reset:
print(color,*args,ConsoleStyle.RESET_ALL, **kwargs)
else:
print(color,*args, **kwargs)
def PrintAsError(message:str):
PrintColorful(ConsoleFrontColor.RED, message)
def PrintAsWarning(message:str):
PrintColorful(ConsoleFrontColor.YELLOW, message)
def PrintAsInfo(message:str):
PrintColorful(ConsoleFrontColor.GREEN, message)
def PrintAsDebug(message:str):
PrintColorful(ConsoleFrontColor.BLUE, message)
def PrintAsSuccess(message:str):
PrintColorful(ConsoleFrontColor.GREEN, message)
def PrintAsLight(message:str):
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, message)
# endregion
class NotImplementedError(Exception):
def __init__(self, message:Optional[str]=None) -> None:
if message is not None:
super().__init__(message)
else:
super().__init__()
class InvalidOperationError(Exception):
def __init__(self, message:Optional[str]=None) -> None:
if message is not None:
super().__init__(message)
else:
super().__init__()
def format_traceback_info(char:str='\n', back:int=1):
return char.join(traceback.format_stack()[:-back])
INTERNAL_DEBUG = False
def SetInternalDebug(mode:bool):
global INTERNAL_DEBUG
INTERNAL_DEBUG = mode
def GetInternalDebug() -> bool:
global INTERNAL_DEBUG
return INTERNAL_DEBUG
ImportingFailedSet:Set[str] = set()
def ImportingThrow(
ex: ImportError,
moduleName: str,
requierds: Sequence[str],
*,
messageBase: str = ConsoleFrontColor.RED+"{module} Module requires {required} package."+ConsoleFrontColor.RESET,
installBase: str = ConsoleFrontColor.GREEN+"\tpip install {name}"+ConsoleFrontColor.RESET
):
with lock_guard():
requierds_str = ",".join([f"<{r}>" for r in requierds])
print(messageBase.format_map(dict(module=moduleName, required=requierds_str)))
print('Install it via command:')
for i in requierds:
global ImportingFailedSet
ImportingFailedSet.add(i)
install = installBase.format_map({"name":i})
print(install)
if ex:
print(ConsoleFrontColor.RED, f"Import Error On {moduleName} Module: {ex}, \b{ex.path}\n"\
f"[{ConsoleFrontColor.RESET}{format_traceback_info(back=2)}{ConsoleFrontColor.RED}]")
def InternalImportingThrow(
moduleName: str,
requierds: Sequence[str],
*,
messageBase: str = ConsoleFrontColor.RED+"{module} Module requires internal Convention package: {required}."+ConsoleFrontColor.RESET,
):
with lock_guard():
requierds_str = ",".join([f"<{r}>" for r in requierds])
print(f"Internal Convention package is not installed.\n{messageBase.format_map({
"module": moduleName,
"required": requierds_str
})}\n[{ConsoleFrontColor.RESET}{format_traceback_info(back=2)}{ConsoleFrontColor.RED}]")
def ReleaseFailed2Requirements():
global ImportingFailedSet
if len(ImportingFailedSet) == 0:
return
with open("requirements.txt", 'w') as f:
f.write("\n".join(ImportingFailedSet))
try:
from pydantic import *
except ImportError:
InternalImportingThrow("Internal", ["pydantic"])
type Typen[_T] = type
type Action = Callable[[], None]
type ClosuresCallable[_T] = Union[Callable[[Optional[None]], _T], Typen[_T]]
def AssemblyTypen(obj:Any) -> str:
if isinstance(obj, type):
return f"{obj.__module__}.{obj.__name__}, "\
f"{obj.Assembly() if hasattr(obj, "Assembly") else "Global"}"
else:
return f"{obj.__class__.__module__}.{obj.__class__.__name__}, "\
f"{obj.GetAssembly() if hasattr(obj, "GetAssembly") else "Global"}"
def ReadAssemblyTypen(
assembly_typen: str,
*,
premodule: Optional[str|Callable[[str], str]] = None
) -> Tuple[type, str]:
typen, assembly_name = assembly_typen.split(",")
module_name, _, class_name = typen.rpartition(".")
if premodule is not None:
if isinstance(premodule, str):
module_name = premodule
else:
module_name = premodule(module_name)
import importlib
target_type = getattr(importlib.import_module(module_name), class_name)
return target_type, assembly_name
# using as c#: event
class ActionEvent[_Call:Callable]:
def __init__(self, actions:Sequence[_Call]):
super().__init__()
self._actions: List[Callable] = [action for action in actions]
self.call_indexs: List[int] = [i for i in range(len(actions))]
self.last_result: List[Any] = []
def CallFuncWithoutCallIndexControl(self, index:int, *args, **kwargs) -> Union[Any, Exception]:
try:
return self._actions[index](*args, **kwargs)
except Exception as ex:
return ex
def CallFunc(self, index:int, *args, **kwargs) -> Union[Any, Exception]:
return self.CallFuncWithoutCallIndexControl(self.call_indexs[index], *args, **kwargs)
def _InjectInvoke(self, *args, **kwargs):
result:List[Any] = []
for index in range(self.CallMaxCount):
result.append(self.CallFunc(index, *args, **kwargs))
return result
def Invoke(self, *args, **kwargs) -> Union[Self, bool]:
self.last_result = self._InjectInvoke(*args, **kwargs)
return self
def InitCallIndex(self):
self.call_indexs = [i for i in range(len(self._actions))]
def AddAction(self, action:_Call):
self._actions.append(action)
self.call_indexs.append(len(self._actions)-1)
return self
def AddActions(self, actions:Sequence[_Call]):
for action in actions:
self.AddAction(action)
return self
def _InternalRemoveAction(self, action:_Call):
if action in self._actions:
index = self._actions.index(action)
self._actions.remove(action)
self.call_indexs.remove(index)
for i in range(len(self.call_indexs)):
if self.call_indexs[i] > index:
self.call_indexs[i] -= 1
return True
return False
def RemoveAction(self, action:_Call):
while self._InternalRemoveAction(action):
pass
return self
def IsValid(self):
return not any(isinstance(x, Exception) for x in self.last_result)
def __bool__(self):
return self.IsValid()
@property
def CallMaxCount(self):
return len(self.call_indexs)
@property
def ActionCount(self):
return len(self._actions)
# region instance
# threads
class atomic[_T]:
def __init__(
self,
value: _T,
locker: Optional[threading.Lock] = None,
) -> None:
self._value: _T = value
self._is_in_with: bool = False
self.locker: threading.Lock = locker if locker is not None else threading.Lock()
def FetchAdd(self, value:_T):
with lock_guard(self.locker):
self._value += value
return self._value
def FetchSub(self, value:_T):
with lock_guard(self.locker):
self._value -= value
return self._value
def Load(self) -> _T:
with lock_guard(self.locker):
return self._value
def Store(self, value: _T):
with lock_guard(self.locker):
self._value = value
def __add__(self, value:_T):
return self.FetchAdd(value)
def __sub__(self, value:_T):
return self.FetchSub(value)
def __iadd__(self, value:_T) -> Self:
self.FetchAdd(value)
return self
def __isub__(self, value:_T) -> Self:
self.FetchSub(value)
return self
def __enter__(self) -> Self:
self._is_in_with = True
self.locker.acquire()
return self
def __exit__(
self,
exc_type: Optional[type],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType]
) -> bool:
self._is_in_with = False
self.locker.release()
if exc_type is None:
return True
else:
return False
@property
def Value(self) -> _T:
if self._is_in_with:
return self._value
raise NotImplementedError("This method can only be called within a with statement")
@Value.setter
def Value(self, value:_T) -> _T:
if self._is_in_with:
self._value = value
raise NotImplementedError("This method can only be called within a with statement")
def __str__(self) -> str:
return str(self.Load())
def __repr__(self) -> str:
return repr(self.Load())
InternalGlobalLocker = threading.Lock()
InternalGlobalLockerCount = atomic[int](0)
class lock_guard:
def __init__(
self,
lock: Optional[Union[threading.RLock, threading.Lock]] = None
):
if lock is None:
lock = InternalGlobalLocker
self._locker = lock
self._locker.acquire()
def __del__(self):
self._locker.release()
def __enter__(self):
return
def __exit__(self,*args,**kwargs):
return True
class global_lock_guard(lock_guard):
def __init__(self):
super().__init__(None)
class thread_instance(threading.Thread):
def __init__(
self,
call: Action,
*,
is_del_join: bool = True,
**kwargs
):
kwargs.update({"target": call})
super().__init__(**kwargs)
self.is_del_join = is_del_join
self.start()
def __del__(self):
if self.is_del_join:
self.join()
# region end
def Nowf() -> str:
'''
printf now time to YYYY-MM-DD_HH-MM-SS format,
return: str
'''
return datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
true: Literal[True] = True
false: Literal[False] = False
class PlatformIndicator:
IsRelease : bool = False
IsPlatformWindows : bool = sys.platform == "Windows"
IsPlatformLinux : bool = sys.platform == "Linux"
IsPlatformOsx : bool = sys.platform == "OSX"
IsPlatformX64 : bool = True
CompanyName : str = "DefaultCompany"
ProductName : str = "DefaultProject"
PrettyFace : str = r"""
⣇⣿⠘⣿⣿⣿⡿⡿⣟⣟⢟⢟⢝⠵⡝⣿⡿⢂⣼⣿⣷⣌⠩⡫⡻⣝⠹⢿⣿⣷
⡆⣿⣆⠱⣝⡵⣝⢅⠙⣿⢕⢕⢕⢕⢝⣥⢒⠅⣿⣿⣿⡿⣳⣌⠪⡪⣡⢑⢝⣇
⡆⣿⣿⣦⠹⣳⣳⣕⢅⠈⢗⢕⢕⢕⢕⢕⢈⢆⠟⠋⠉⠁⠉⠉⠁⠈⣸⢐⢕⢽
⡗⢰⣶⣶⣦⣝⢝⢕⢕⠅⡆⢕⢕⢕⢕⢕⣴⠏⣠⡶⠛⡉⡉⡛⢶⣦⡀⠐⣕⢕
⡝⡄⢻⢟⣿⣿⣷⣕⣕⣅⣿⣔⣕⣵⣵⣿⣿⢠⣿⢠⣮⡈⣌⠨⠅⠹⣷⡀⢱⢕
⡝⡵⠟⠈⠀⠀⠀⠀⠉⢿⣿⣿⣿⣿⣿⣿⣿⣼⣿⢈⡋⠴⢿⡟⣡⡇⣿⡇⢀⢕
⡝⠁⣠⣾⠟⡉⡉⡉⠻⣦⣻⣿⣿⣿⣿⣿⣿⣿⣿⣧⠸⣿⣦⣥⣿⡇⡿⣰⢗⢄
⠁⢰⣿⡏⣴⣌⠈⣌⠡⠈⢻⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣬⣉⣉⣁⣄⢖⢕⢕⢕
⡀⢻⣿⡇⢙⠁⠴⢿⡟⣡⡆⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣷⣵⣵⣿
⡻⣄⣻⣿⣌⠘⢿⣷⣥⣿⠇⣿⣿⣿⣿⣿⣿⠛⠻⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿
⣷⢄⠻⣿⣟⠿⠦⠍⠉⣡⣾⣿⣿⣿⣿⣿⣿⢸⣿⣦⠙⣿⣿⣿⣿⣿⣿⣿⣿⠟
⡕⡑⣑⣈⣻⢗⢟⢞⢝⣻⣿⣿⣿⣿⣿⣿⣿⠸⣿⠿⠃⣿⣿⣿⣿⣿⣿⡿⠁⣠
⡝⡵⡈⢟⢕⢕⢕⢕⣵⣿⣿⣿⣿⣿⣿⣿⣿⣿⣶⣶⣿⣿⣿⣿⣿⠿⠋⣀⣈⠙
⡝⡵⡕⡀⠑⠳⠿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⣿⠿⠛⢉⡠⡲⡫⡪⡪⡣
""".strip()
@staticmethod
def GetFileSeparator(is_not_this_platform:bool = False) -> str:
if PlatformIndicator.IsPlatformWindows and not is_not_this_platform:
return "\\"
return "/"
@staticmethod
def GetApplicationPath() -> str:
"""获取应用程序所在目录"""
import os
return os.path.dirname(os.path.abspath(__file__))
@staticmethod
def GetCurrentWorkingDirectory() -> str:
"""获取当前工作目录"""
import os
return os.getcwd()
# 使用类方法获取路径
@classmethod
def ApplicationPath(cls) -> str:
"""应用程序路径属性"""
return cls.GetApplicationPath()
@classmethod
def StreamingAssetsPath(cls) -> str:
"""流媒体资源路径属性"""
return cls.ApplicationPath() + "/StreamingAssets/"
@staticmethod
def PersistentDataPath() -> str:
"""
获取持久化数据路径,根据平台返回不同的路径
"""
import os
if PlatformIndicator.IsPlatformWindows:
return os.path.expandvars(f"%userprofile%\\AppData\\LocalLow\\{PlatformIndicator.CompanyName}\\{PlatformIndicator.ProductName}\\")
elif PlatformIndicator.IsPlatformLinux:
return os.path.expandvars("$HOME/.config/")
return ""
@staticmethod
def DataPath() -> str:
"""
获取数据路径
"""
return "Assets/"
class DescriptiveIndicator[T]:
def __init__(self, description:str, value:T) -> None:
self.descripion : str = description
self.value : T = value
class Switch:
def __init__(self, value, isThougth = False) -> None:
self.value = value
self.isThougth = False
self.caseStats = False
self.result = None
def Case(self, caseValue, callback:Callable[[], Any]) -> 'Switch':
if self.caseStats and self.isThougth:
self.result = callback()
elif caseValue == self.value:
self.caseStats = True
self.result = callback()
return self
def Default(self, callback:Callable[[], Any]) -> Any:
if self.caseStats and self.isThougth:
self.result = callback()
elif self.caseStats == False:
self.caseStats = True
self.result = callback()
return self.result

396
Runtime/EasySave.py Normal file
View File

@@ -0,0 +1,396 @@
from .Reflection import *
from .File import ToolFile
from .String import LimitStringLength
_Internal_EasySave_Debug:bool = False
def GetInternalEasySaveDebug() -> bool:
return _Internal_EasySave_Debug and GetInternalDebug()
def SetInternalEasySaveDebug(debug:bool) -> None:
global _Internal_EasySave_Debug
_Internal_EasySave_Debug = debug
class EasySaveSetting(BaseModel):
key: str = Field(description="目标键", default="easy")
# 从目标文件进行序列化/反序列化
file: str = Field(description="目标文件")
# 序列化/反序列化的格式方法
formatMode: Literal["json", "binary"] = Field(description="保存模式", default="json")
# TODO: refChain: bool = Field(description="是否以保留引用的方式保存", default=True)
# 文件形式与参数
# TODO: encoding: str = Field(description="编码", default="utf-8")
isBackup: bool = Field(description="是否备份", default=True)
backupSuffix: str = Field(description="备份后缀", default=".backup")
# 序列化/反序列化时, 如果设置了忽略字段的谓词, 则被谓词选中的字段将不会工作
# 如果设置了选择字段的谓词, 则被选中的字段才会工作
ignorePr: Optional[Callable[[FieldInfo], bool]] = Field(description="忽略字段的谓词", default=None)
selectPr: Optional[Callable[[FieldInfo], bool]] = Field(description="选择字段的谓词", default=None)
class ESWriter(BaseModel):
setting: EasySaveSetting = Field(description="设置")
def _GetFields(self, rtype:RefType) -> List[FieldInfo]:
'''
获取字段
'''
fields: List[FieldInfo] = []
if self.setting.ignorePr is not None and self.setting.selectPr is not None:
fields = [ field for field in rtype.GetAllFields() if self.setting.selectPr(field) and not self.setting.ignorePr(field) ]
elif self.setting.selectPr is None and self.setting.ignorePr is None:
fields = rtype.GetFields()
elif self.setting.ignorePr is not None:
fields = [ field for field in rtype.GetAllFields() if not self.setting.ignorePr(field) ]
else:
fields = [ field for field in rtype.GetAllFields() if self.setting.selectPr(field) ]
return fields
def _DoJsonSerialize(self, result_file:ToolFile, rtype:RefType, rinstance:Any) -> Any:
'''
序列化: json格式
'''
def dfs(rtype:RefType, rinstance:Any) -> Dict[str, Any]|Any:
if rinstance is None:
return rinstance
if rtype.IsUnion:
rtype = TypeManager.GetInstance().CreateOrGetRefType(rinstance)
if rtype.IsValueType:
return rinstance
elif rtype.IsCollection:
try:
if rtype.IsList:
return [ dfs(TypeManager.GetInstance().CreateOrGetRefType(iter_), iter_) for iter_ in rinstance ]
elif rtype.IsSet:
return { dfs(TypeManager.GetInstance().CreateOrGetRefType(iter_), iter_) for iter_ in rinstance }
elif rtype.IsTuple:
return tuple(dfs(TypeManager.GetInstance().CreateOrGetRefType(iter_), iter_) for iter_ in rinstance)
elif rtype.IsDictionary:
return {
dfs(TypeManager.GetInstance().CreateOrGetRefType(key), key):
dfs(TypeManager.GetInstance().CreateOrGetRefType(iter_), iter_)
for key, iter_ in rinstance.items()
}
except Exception as e:
raise ReflectionException(f"{ConsoleFrontColor.RED}容器<{rtype.RealType}>"\
f"在序列化时遇到错误:{ConsoleFrontColor.RESET}\n{e}") from e
raise NotImplementedError(f"{ConsoleFrontColor.RED}不支持的容器: {rinstance}"\
f"<{rtype.Print2Str(verbose=GetInternalEasySaveDebug())}>{ConsoleFrontColor.RESET}")
elif hasattr(rtype.RealType, "__easy_serialize__"):
custom_data, is_need_type = rtype.RealType.__easy_serialize__(rinstance)
if is_need_type:
return {
"__type": AssemblyTypen(rtype.RealType),
**custom_data
}
else:
return custom_data
else:
fields: List[FieldInfo] = self._GetFields(rtype)
layer: Dict[str, Any] = {
"__type": AssemblyTypen(rtype.RealType)
}
for field in fields:
try:
layer[field.FieldName] = dfs(
TypeManager.GetInstance().CreateOrGetRefType(field.FieldType),
field.GetValue(rinstance)
)
except Exception as e:
raise ReflectionException(f"{ConsoleFrontColor.RED}字段{field.FieldName}"\
f"<{field.FieldType}>在序列化时遇到错误:{ConsoleFrontColor.RESET}\n{e}") from e
return layer
layers: Dict[str, Any] = {}
if result_file.Exists():
filedata = result_file.LoadAsJson()
if isinstance(filedata, dict):
layers = filedata
layers[self.setting.key] = {
"__type": AssemblyTypen(rtype.RealType),
"value": dfs(rtype, rinstance)
}
result_file.SaveAsJson(layers)
def _DoBinarySerialize(self, result_file:ToolFile, rinstance:Any) -> Any:
'''
序列化: 二进制格式
'''
result_file.SaveAsBinary(rinstance)
def Serialize(self, result_file:ToolFile, rtype:RefType, rinstance:Any) -> Any:
'''
序列化
'''
if self.setting.formatMode == "json":
self._DoJsonSerialize(result_file, rtype, rinstance)
elif self.setting.formatMode == "binary":
self._DoBinarySerialize(result_file, rinstance)
else:
raise NotImplementedError(f"不支持的格式: {self.setting.formatMode}")
def Write[T](self, rinstance:T) -> ToolFile:
'''
写入数据
'''
result_file: ToolFile = ToolFile(self.setting.file)
backup_file: ToolFile = None
if result_file.GetDir() is not None and not ToolFile(result_file.GetDir()).Exists():
raise FileNotFoundError(f"文件路径不存在: {result_file.GetDir()}")
if result_file.Exists() and self.setting.isBackup:
if result_file.GetDir() is not None:
backup_file = ToolFile(result_file.GetDir()) | (result_file.GetFilename(True) + self.setting.backupSuffix)
else:
backup_file = ToolFile(result_file.GetFilename(True) + self.setting.backupSuffix)
result_file.Copy(backup_file)
try:
self.Serialize(result_file, TypeManager.GetInstance().CreateOrGetRefType(rinstance), rinstance)
except Exception:
if backup_file is not None:
result_file.Remove()
backup_file.Copy(result_file)
backup_file.Remove()
raise
finally:
if backup_file is not None:
backup_file.Remove()
return result_file
class ESReader(BaseModel):
setting: EasySaveSetting = Field(description="设置")
def _GetFields(self, rtype:RefType) -> List[FieldInfo]:
'''
获取字段
'''
fields: List[FieldInfo] = []
if self.setting.ignorePr is not None and self.setting.selectPr is not None:
fields = [ field for field in rtype.GetAllFields() if self.setting.selectPr(field) and not self.setting.ignorePr(field) ]
elif self.setting.selectPr is None and self.setting.ignorePr is None:
fields = rtype.GetFields()
elif self.setting.ignorePr is not None:
fields = [ field for field in rtype.GetAllFields() if not self.setting.ignorePr(field) ]
else:
fields = [ field for field in rtype.GetAllFields() if self.setting.selectPr(field) ]
return fields
def GetRtypeFromTypen(self, type_label:str) -> RefType:
'''
从类型标签中获取类型
'''
#module_name, _, class_name = type_label.split(",")[0].strip().rpartition('.')
#if GetInternalEasySaveDebug():
# PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
# f"{ConsoleFrontColor.YELLOW}, module_name: {ConsoleFrontColor.RESET}{module_name}"\
# f"{ConsoleFrontColor.YELLOW}, class_name: {ConsoleFrontColor.RESET}{class_name}")
#typen_to = try_to_type(class_name, module_name=module_name) or to_type(class_name)
#return TypeManager.GetInstance().CreateOrGetRefType(typen_to)
typen, assembly_name = ReadAssemblyTypen(type_label)
if GetInternalEasySaveDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
f"{ConsoleFrontColor.YELLOW}, typen: {ConsoleFrontColor.RESET}{typen}"\
f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}")
return TypeManager.GetInstance().CreateOrGetRefType(typen)
def _DoJsonDeserialize(self, read_file:ToolFile, rtype:Optional[RefType] = None) -> Any:
'''
反序列化: json格式
Args:
read_file (ToolFile): 要读取的文件对象
rtype (Optional[RTypen[Any]], optional): 目标类型. 如果为None, 则从文件中读取类型信息. Defaults to None.
Returns:
Any: 反序列化后的对象
Raises:
NotImplementedError: 当遇到不支持的集合类型时抛出
ValueError: 当rinstance不为None时抛出
'''
# 从文件中加载JSON数据
layers: Dict[str, Any] = read_file.LoadAsJson()
if self.setting.key not in layers:
raise ValueError(f"{ConsoleFrontColor.RED}文件中不包含目标键: {ConsoleFrontColor.RESET}{self.setting.key}")
# 如果未指定类型, 则从JSON数据中获取类型信息
if rtype is None:
rtype: RefType = self.GetRtypeFromTypen(layers["__type"])
layers: Dict[str, Any] = layers[self.setting.key]["value"]
result_instance: Any = None
def dfs(rtype:Optional[RefType], layer:Dict[str, Any]|Any) -> Any:
'''
深度优先遍历反序列化
Args:
rtype (Optional[RefType]): 当前处理的类型
layer (Dict[str, Any]|Any): 当前处理的JSON数据层
rinstance (Any): 当前处理的对象实例
Returns:
Any: 反序列化后的对象
'''
# 如果类型为None且当前层包含类型信息, 则获取类型
if isinstance(layer, dict) and "__type" in layer:
rtype = self.GetRtypeFromTypen(layer["__type"])
if rtype is None:
raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}")
if GetInternalEasySaveDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\
f"{ConsoleFrontColor.YELLOW}, rtype: {ConsoleFrontColor.RESET}{rtype.ToString()}")
# 处理值类型
if (rtype.IsValueType or
rtype.Verify(Any) or
(layer is None and rtype.Verify(type(None)))
):
return layer
# 处理集合类型
elif rtype.IsCollection:
try:
if rtype.IsList:
element_type = rtype.GenericArgs[0] if len(rtype.GenericArgs) > 0 else Any
return [ dfs(TypeManager.GetInstance().CreateOrGetRefType(element_type), iter_) for iter_ in layer ]
elif rtype.IsSet:
element_type = rtype.GenericArgs[0] if len(rtype.GenericArgs) > 0 else Any
return { dfs(TypeManager.GetInstance().CreateOrGetRefType(element_type), iter_) for iter_ in layer }
elif rtype.IsTuple:
element_types: List[type] = rtype.GenericArgs
result: tuple = tuple(None for _ in layer)
if element_types is None or len(element_types) == 0:
element_types = [Any] * len(layer)
for index, iter_ in enumerate(layer):
result[index] = dfs(TypeManager.GetInstance().CreateOrGetRefType(element_types[index]), iter_)
return result
elif rtype.IsDictionary:
element_key, element_value = (rtype.GenericArgs[0], rtype.GenericArgs[1]) if len(rtype.GenericArgs) > 1 else (Any, Any)
return {
dfs(TypeManager.GetInstance().CreateOrGetRefType(element_key), keyname):
dfs(TypeManager.GetInstance().CreateOrGetRefType(element_value), iter_)
for keyname, iter_ in layer.items()
}
except Exception as e:
raise ReflectionException(f"容器<{LimitStringLength(str(layer), 100)}>在反序列化时遇到错误:\n{e}") from e
raise NotImplementedError(f"{ConsoleFrontColor.RED}不支持的容器: {LimitStringLength(str(layer), 100)}"\
f"<{rtype.Print2Str(verbose=GetInternalEasySaveDebug())}>{ConsoleFrontColor.RESET}")
# 处理对象类型
elif isinstance(rtype.RealType, type) and hasattr(rtype.RealType, "__easy_deserialize__"):
return rtype.RealType.__easy_deserialize__(layer)
else:
rinstance = rtype.CreateInstance()
if GetInternalEasySaveDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\
f"{rtype.Print2Str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}")
fields:List[FieldInfo] = self._GetFields(rtype)
for field in fields:
if field.FieldName not in layer:
continue
field_rtype:RefType = None
try:
if field.FieldType == list and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(ListIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}List<"\
f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == set and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(SetIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Set<"\
f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == tuple and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(TupleIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Tuple<"\
f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == dict and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(
DictIndictaor(field.ValueType.GenericArgs[0], field.ValueType.GenericArgs[1])
)
if GetInternalEasySaveDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Dict<"\
f"{field_rtype.GenericArgs[0]}, {field_rtype.GenericArgs[1]}>")
else:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(field.FieldType)
if GetInternalEasySaveDebug():
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}{field_rtype.RealType}"\
f"<{field_rtype.GenericArgs}>")
field.SetValue(rinstance, dfs(field_rtype, layer[field.FieldName]))
except Exception as e:
raise ReflectionException(f"Json字段{field.FieldName}={LimitStringLength(str(layer[field.FieldName]), 100)}: \n{e}") from e
return rinstance
# 从根节点开始反序列化
result_instance = dfs(rtype, layers)
return result_instance
def _DoBinaryDeserialize(self, read_file:ToolFile, rtype:RefType) -> Any:
'''
反序列化: 二进制格式
'''
return read_file.LoadAsBinary()
def Deserialize(self, read_file:ToolFile, rtype:Optional[RefType]=None) -> Any:
'''
反序列化
'''
if self.setting.formatMode == "json":
return self._DoJsonDeserialize(read_file, rtype)
elif self.setting.formatMode == "binary":
return self._DoBinaryDeserialize(read_file, rtype)
else:
raise NotImplementedError(f"不支持的格式: {self.setting.formatMode}")
def Read[T](self, rtype:Optional[RTypen[T]]=None) -> T:
'''
读取数据
'''
read_file: ToolFile = ToolFile(self.setting.file)
if not read_file.Exists():
raise FileNotFoundError(f"文件不存在: {read_file}")
if read_file.IsDir():
raise IsADirectoryError(f"文件是目录: {read_file}")
return self.Deserialize(read_file, rtype)
class EasySave:
@staticmethod
def Write[T](rinstance:T, file:Optional[ToolFile|str]=None, *, setting:Optional[EasySaveSetting]=None) -> ToolFile:
'''
写入数据
'''
return ESWriter(setting=(setting if setting is not None else EasySaveSetting(file=str(file)))).Write(rinstance)
@overload
@staticmethod
def Read[T](
rtype: Typen[T],
file: Optional[ToolFile|str] = None,
*,
setting: Optional[EasySaveSetting] = None
) -> T:
...
@overload
@staticmethod
def Read[T](
rtype: RTypen[T],
file: Optional[ToolFile|str] = None,
*,
setting: Optional[EasySaveSetting] = None
) -> T:
...
@staticmethod
def Read[T](
rtype: RTypen[T]|type,
file: Optional[ToolFile|str] = None,
*,
setting: Optional[EasySaveSetting] = None
) -> T:
'''
读取数据
'''
if isinstance(rtype, type):
rtype = TypeManager.GetInstance().CreateOrGetRefType(rtype)
return ESReader(setting=(setting if setting is not None else EasySaveSetting(file=str(file)))).Read(rtype)

1154
Runtime/File.py Normal file

File diff suppressed because it is too large Load Diff

264
Runtime/GlobalConfig.py Normal file
View File

@@ -0,0 +1,264 @@
from .Config import *
from .File import ToolFile
from .String import FillString
from typing import *
import json
import os
# 静态配置
ConstConfigFile = "config.json"
def InitExtensionEnv():
"""初始化扩展环境"""
global ConstConfigFile
ConstConfigFile = "config.json"
ProjectConfig.InitExtensionEnv()
def GenerateEmptyConfigJson(file: ToolFile):
"""生成空配置JSON"""
file.SaveAsJson({"properties": {}})
return file
class GlobalConfig:
"""全局配置管理类"""
def __init__(
self,
data_dir: Optional[Union[str, ToolFile]] = None,
is_try_create_data_dir: bool = False,
is_load: bool = True
):
"""
构造与初始化
Args:
data_dir: 数据目录路径或ToolFile对象
is_try_create_data_dir: 是否尝试创建数据目录
is_load: 是否自动加载现有配置
"""
# 设置数据目录,确保目录存在
if data_dir is None:
data_dir = ToolFile(os.path.abspath('./'))
self.data_dir: ToolFile = data_dir if isinstance(data_dir, ToolFile) else ToolFile(str(data_dir))
if not self.data_dir.IsDir():
self.data_dir.BackToParentDir()
if not self.data_dir.Exists():
if is_try_create_data_dir:
self.data_dir.MustExistsPath()
else:
raise FileNotFoundError(f"Can't find data dir: {self.data_dir.GetDir()}")
# 检查配置文件,不存在则生成空配置
self._data_pair: Dict[str, Any] = {}
self._data_find: Dict[str, Any] = {}
self._const_config_file = ConstConfigFile
config_file = self.ConfigFile
if not config_file.Exists():
GenerateEmptyConfigJson(config_file)
elif is_load:
self.LoadProperties()
def __del__(self):
pass
# 文件管理
def GetConfigFile(self) -> ToolFile:
"""获取配置文件对象"""
return self.data_dir | self._const_config_file
@property
def ConfigFile(self) -> ToolFile:
"""获取配置文件对象(属性形式)"""
return self.GetConfigFile()
def GetFile(self, path: str, is_must_exist: bool = False) -> ToolFile:
"""获取数据目录下的文件"""
result = self.data_dir | path
if is_must_exist and not result.Exists():
result.MustExistsPath()
return result
def CreateFile(self, path: str) -> bool:
"""创建文件"""
result = self.data_dir | path
if result.Exists():
return False
if not result.GetParentDir().Exists():
return False
result.Create()
return True
def EraseFile(self, path: str) -> bool:
"""清空文件内容"""
result = self.data_dir | path
if result.Exists():
try:
result.Remove()
result.Create()
return True
except:
pass
return False
def RemoveFile(self, path: str) -> bool:
"""删除文件"""
result = self.data_dir | path
if result.Exists():
try:
result.Remove()
return True
except:
pass
return False
# 配置数据操作 - 支持迭代器
def __setitem__(self, key: str, value: Any) -> Any:
"""索引器设置配置项"""
self._data_pair[key] = value
return value
def __getitem__(self, key: str) -> Any:
"""索引器获取配置项"""
return self._data_pair[key]
def __contains__(self, key: str) -> bool:
"""检查键是否存在"""
return key in self._data_pair
def __delitem__(self, key: str):
"""删除配置项"""
del self._data_pair[key]
def __iter__(self):
"""迭代器支持"""
return iter(self._data_pair.items())
def __len__(self) -> int:
"""获取配置项数量"""
return len(self._data_pair)
def Contains(self, key: str) -> bool:
"""检查键是否存在"""
return key in self._data_pair
def Remove(self, key: str) -> bool:
"""删除配置项"""
if key in self._data_pair:
del self._data_pair[key]
return True
return False
def DataSize(self) -> int:
"""获取配置项数量"""
return len(self._data_pair)
# 持久化
def SaveProperties(self) -> 'GlobalConfig':
"""保存配置到文件"""
config = self.ConfigFile
config.SaveAsJson({
"properties": self._data_pair,
"find": self._data_find
})
return self
def LoadProperties(self) -> 'GlobalConfig':
"""从文件加载配置"""
config = self.ConfigFile
if not config.Exists():
self._data_pair = {}
else:
data = config.LoadAsJson()
if "properties" in data:
self._data_pair = data["properties"]
else:
raise ValueError("Can't find properties in config file")
return self
# 日志系统
def GetLogFile(self) -> ToolFile:
"""获取日志文件对象"""
return self.GetFile(self.ConfigFile.GetFilename(True) + "_log.txt", True)
@property
def LogFile(self) -> ToolFile:
"""获取日志文件对象(属性形式)"""
return self.GetLogFile()
def DefaultLogger(self, message: str):
"""默认日志输出器"""
print(message)
def Log(self, message_type: str, message: Union[str, Any], logger: Optional[Callable[[str], None]] = None) -> 'GlobalConfig':
"""记录日志"""
str_message_type = str(message_type)
# 使用String中的工具函数自动调整消息类型的对齐宽度
aligned_message_type = FillString(str_message_type, max_length=len("Property not found"), side="center")
what = f"[{Nowf()}] {aligned_message_type} : {str(message)}"
if logger is None:
logger = self.DefaultLogger
logger(what)
# 写入日志文件
log = self.GetLogFile()
# 读取现有内容并追加新内容
try:
existing_content = log.LoadAsText()
except:
existing_content = ""
log.SaveAsText(existing_content + what + '\n')
return self
def LogPropertyNotFound(self, message: str, logger: Optional[Callable[[str], None]] = None, default: Any = None) -> 'GlobalConfig':
"""记录属性未找到的日志"""
if default is not None:
message = f"{message} (default = {default})"
self.Log("Property not found", message, logger)
return self
def LogMessageOfPleaseCompleteConfiguration(self) -> 'GlobalConfig':
"""记录配置提示信息"""
self.Log("Error", "Please complete configuration")
return self
# 配置查找
def FindItem[T](self, key: str, default: Optional[T] = None) -> Optional[T]:
"""查找配置项,支持默认值"""
if key in self._data_pair:
return self._data_pair[key]
else:
self.LogPropertyNotFound(key, default=default)
self._data_find[key] = default
return default
class ProjectConfig(GlobalConfig):
"""项目级配置管理类继承自GlobalConfig"""
# 静态配置
ProjectConfigFileFocus = "Assets/"
@staticmethod
def InitExtensionEnv():
"""初始化项目扩展环境"""
ProjectConfig.ProjectConfigFileFocus = "Assets/"
@staticmethod
def SetProjectConfigFileFocus(path: str):
"""设置项目配置焦点目录"""
ProjectConfig.ProjectConfigFileFocus = path
@staticmethod
def GetProjectConfigFileFocus() -> str:
"""获取项目配置焦点目录"""
return ProjectConfig.ProjectConfigFileFocus
def __init__(self, is_load: bool = True):
"""使用默认项目目录构造"""
super().__init__(ProjectConfig.GetProjectConfigFileFocus(), is_try_create_data_dir=True, is_load=is_load)

743
Runtime/Interaction.py Normal file
View File

@@ -0,0 +1,743 @@
from .Config import *
from .File import ToolFile
from .Web import ToolURL
import json
import urllib.parse
import os
from typing import *
try:
from pydantic import BaseModel, PrivateAttr, Field
except ImportError as e:
ImportingThrow(e, "Interaction", ["pydantic"])
try:
import aiofiles
except ImportError as e:
ImportingThrow(e, "Interaction", ["aiofiles"])
class InteractionError(Exception):
"""交互操作异常基类"""
pass
class PathValidationError(InteractionError):
"""路径验证异常"""
pass
class LoadError(InteractionError):
"""加载异常"""
pass
class SaveError(InteractionError):
"""保存异常"""
pass
class Interaction(BaseModel):
"""统一的文件交互类,自适应处理本地文件和网络文件"""
originPath: str
_is_url: bool = PrivateAttr(False)
_is_local: bool = PrivateAttr(False)
_tool_file: Optional[ToolFile] = PrivateAttr(None)
_tool_url: Optional[ToolURL] = PrivateAttr(None)
def __init__(self, path):
"""
从路径字符串创建对象自动识别本地文件或网络URL
Args:
path: 路径字符串或是可以转换为路径字符串的对象
"""
super().__init__(originPath=str(path))
# 自动识别路径类型
self._detect_path_type()
def _detect_path_type(self):
"""自动检测路径类型"""
path = self.originPath.strip()
# 检查是否为HTTP/HTTPS URL
if path.startswith(('http://', 'https://', 'file://')):
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(path)
return
# 检查是否为localhost URL
if path.startswith('localhost'):
# 转换为完整的HTTP URL
if not path.startswith('localhost:'):
# 默认端口80
full_url = f"http://{path}"
else:
full_url = f"http://{path}"
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(full_url)
self.originPath = full_url
return
# 检查是否为绝对路径或相对路径
if (os.path.isabs(path) or
path.startswith('./') or
path.startswith('../') or
':' in path[:3]): # Windows盘符
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
return
# 默认作为相对路径处理
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
def __str__(self) -> str:
"""隐式字符串转换"""
return self.originPath
def __bool__(self) -> bool:
"""隐式布尔转换,检查路径是否有效"""
return self.IsValid
@property
def IsValid(self) -> bool:
"""检查路径是否有效"""
if self._is_url:
return self._tool_url.IsValid if self._tool_url else False
else:
return self._tool_file.Exists() if self._tool_file else False
@property
def IsURL(self) -> bool:
"""是否为网络URL"""
return self._is_url
@property
def IsLocal(self) -> bool:
"""是否为本地文件"""
return self._is_local
@property
def IsFile(self) -> bool:
"""是否为文件对于URL检查是否存在文件名"""
if self._is_url:
return bool(self._tool_url.GetFilename()) if self._tool_url else False
else:
return self._tool_file.IsFile() if self._tool_file else False
@property
def IsDir(self) -> bool:
"""是否为目录(仅对本地路径有效)"""
if self._is_local:
return self._tool_file.IsDir() if self._tool_file else False
return False
def GetFilename(self) -> str:
"""获取文件名"""
if self._is_url:
return self._tool_url.GetFilename() if self._tool_url else ""
else:
return self._tool_file.GetFilename() if self._tool_file else ""
def GetExtension(self) -> str:
"""获取文件扩展名"""
if self._is_url:
return self._tool_url.GetExtension() if self._tool_url else ""
else:
return self._tool_file.GetExtension() if self._tool_file else ""
def ExtensionIs(self, *extensions: str) -> bool:
"""检查扩展名是否匹配"""
if self._is_url:
return self._tool_url.ExtensionIs(*extensions) if self._tool_url else False
else:
current_ext = self.GetExtension()
return current_ext.lower() in [ext.lower().lstrip('.') for ext in extensions]
# 文件类型判断属性
@property
def IsText(self) -> bool:
"""是否为文本文件"""
return self.ExtensionIs('txt', 'html', 'htm', 'css', 'js', 'xml', 'csv', 'md', 'py', 'java', 'cpp', 'c', 'h')
@property
def IsJson(self) -> bool:
"""是否为JSON文件"""
return self.ExtensionIs('json')
@property
def IsImage(self) -> bool:
"""是否为图像文件"""
return self.ExtensionIs('jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp')
@property
def IsDocument(self) -> bool:
"""是否为文档文件"""
return self.ExtensionIs('pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx')
def Open(self, path: str) -> 'Interaction':
"""在当前对象上打开新路径"""
new_obj = Interaction(path)
self.originPath = new_obj.originPath
self._is_url = new_obj._is_url
self._is_local = new_obj._is_local
self._tool_file = new_obj._tool_file
self._tool_url = new_obj._tool_url
return self
# 同步加载方法
def LoadAsText(self) -> str:
"""
同步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsText()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsText()
def LoadAsBinary(self) -> bytes:
"""
同步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsBinary()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsBinary()
def LoadAsJson(self, model_type: Optional[type] = None) -> Any:
"""
同步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsJson(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
json_data = self._tool_file.LoadAsJson()
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
# 异步加载方法
async def LoadAsTextAsync(self) -> str:
"""
异步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsTextAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'r', encoding='utf-8') as f:
return await f.read()
async def LoadAsBinaryAsync(self) -> bytes:
"""
异步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsBinaryAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'rb') as f:
return await f.read()
async def LoadAsJsonAsync(self, model_type: Optional[type] = None) -> Any:
"""
异步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsJsonAsync(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地JSON文件
text_content = await self.LoadAsTextAsync()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise LoadError(f"Failed to parse JSON from {self.originPath}: {str(e)}")
# 同步保存方法
def SaveAsText(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL先下载然后保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsText(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsText(content)
return self
def SaveAsBinary(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsBinary(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsBinary(content)
return self
def SaveAsJson(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.json"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsJson(data)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsJson(data)
return self
# 异步保存方法
async def SaveAsTextAsync(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
return self
async def SaveAsBinaryAsync(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'wb') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'wb') as f:
await f.write(content)
return self
async def SaveAsJsonAsync(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
# 序列化JSON数据
try:
from pydantic import BaseModel
if isinstance(data, BaseModel):
json_data = data.model_dump()
json_data["__type"] = f"{data.__class__.__name__}, pydantic.BaseModel"
else:
json_data = data
json_content = json.dumps(json_data, indent=4, ensure_ascii=False)
except Exception as e:
raise SaveError(f"Failed to serialize JSON data: {str(e)}")
# 保存JSON内容
return await self.SaveAsTextAsync(json_content, local_path)
# HTTP请求方法仅对URL有效
def Get(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
同步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Get(callback)
def Post(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
同步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Post(callback, form_data)
async def GetAsync(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
异步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.GetAsync(callback)
async def PostAsync(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
异步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.PostAsync(callback, form_data)
# 便利方法
def Save(self, local_path: Optional[str] = None) -> 'Interaction':
"""
自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL先下载内容再保存
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
self._tool_url.Save(local_path)
return self
async def SaveAsync(self, local_path: Optional[str] = None) -> 'Interaction':
"""
异步自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL异步下载内容
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
try:
if self.IsText:
content = await self.LoadAsTextAsync()
await self.SaveAsTextAsync(content, local_path)
elif self.IsJson:
content = await self.LoadAsJsonAsync()
await self.SaveAsJsonAsync(content, local_path)
else:
content = await self.LoadAsBinaryAsync()
await self.SaveAsBinaryAsync(content, local_path)
except Exception as e:
raise SaveError(f"Failed to save {self.originPath}: {str(e)}")
return self
def Downloadable(self) -> bool:
"""检查是否可下载"""
return self._is_url and self._tool_url.IsValid if self._tool_url else False
def Download(self, local_path: Optional[str] = None) -> ToolFile:
"""
下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("Download method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.Download(local_path)
async def DownloadAsync(self, local_path: Optional[str] = None) -> ToolFile:
"""
异步下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("DownloadAsync method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.DownloadAsync(local_path)
def Copy(self, target_path) -> ToolFile:
"""
复制文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
新的Interaction对象
"""
if not self._is_local:
raise InteractionError("Copy method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Copy(str(target_path))
def Move(self, target_path) -> ToolFile:
"""
移动文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
更新后的Interaction对象
"""
if not self._is_local:
raise InteractionError("Move method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Move(str(target_path))
def Remove(self) -> 'Interaction':
"""
删除文件(仅对本地文件有效)
Returns:
Interaction对象本身
"""
if not self._is_local:
raise InteractionError("Remove method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.Remove()
return self
def Exists(self) -> bool:
"""
检查文件是否存在
Returns:
是否存在
"""
return self.IsValid
def GetSize(self) -> int:
"""
获取文件大小(仅对本地文件有效)
Returns:
文件大小(字节)
"""
if not self._is_local:
raise InteractionError("GetSize method is only available for local files")
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.GetSize()
def GetDir(self) -> str:
"""
获取目录路径
Returns:
目录路径
"""
if self._is_local:
return self._tool_file.GetDir() if self._tool_file else ""
else:
# 对于URL返回基础URL
if self._tool_url:
parsed = urllib.parse.urlparse(self._tool_url.url)
return f"{parsed.scheme}://{parsed.netloc}"
return ""
def GetParentDir(self) -> 'Interaction':
"""
获取父目录的Interaction对象
Returns:
父目录的Interaction对象
"""
if self._is_local:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
parent_dir = self._tool_file.GetParentDir()
return Interaction(parent_dir.GetFullPath())
else:
# 对于URL返回基础URL
base_url = self.GetDir()
return Interaction(base_url)
def ToString(self) -> str:
"""获取完整路径"""
return self.originPath
def GetFullPath(self) -> str:
"""获取完整路径"""
return self.originPath

1529
Runtime/Reflection.py Normal file

File diff suppressed because it is too large Load Diff

313
Runtime/String.py Normal file
View File

@@ -0,0 +1,313 @@
from .Config import *
def LimitStringLength(data, max_length:int=50) -> str:
s:str = data if isinstance(data, str) else str(data)
if len(s) <= max_length:
return s
else:
inside_str = "\n...\n...\n"
# 计算头尾部分的长度
head_length = max_length // 2
tail_length = max_length - head_length - len(inside_str) # 3 是省略号的长度
# 截取头尾部分并连接
return s[:head_length] + inside_str + s[-tail_length:]
def FillString(data:Any,
max_length: int = 50,
fill_char: str = " ",
side: Literal["left", "right", "center"] = "right"
) -> str:
s:str = data if isinstance(data, str) else str(data)
char = fill_char[0]
if len(s) >= max_length:
return s
else:
if side == "left":
return s + char * (max_length - len(s))
elif side == "right":
return char * (max_length - len(s)) + s
elif side == "center":
left = (max_length - len(s)) // 2
right = max_length - len(s) - left
return char * left + s + char * right
else:
raise ValueError(f"Unsupported side: {side}")
def Bytes2Strings(lines:List[bytes], encoding='utf-8') -> List[str]:
return [line.decode(encoding) for line in lines]
def Bytes2String(lines:List[bytes], encoding='utf-8') -> str:
return "".join(Bytes2Strings(lines, encoding))
def word_segmentation(
sentence,
cut_all: bool = False,
HMM: bool = True,
use_paddle: bool = False
) -> Sequence[Optional[Union[Any, str]]]:
try:
import jieba
return jieba.dt.cut(str(sentence), cut_all=cut_all, HMM=HMM, use_paddle=use_paddle)
except ImportError:
raise ValueError("jieba is not install")
def GetEditorDistanceAndOperations(
s1:str,
s2:str,
) -> Tuple[int, List[Tuple[Literal["add","delete"], int, int, str]]]:
"""
计算两个字符串的编辑距离和操作序列
操作格式: (操作类型, 开始位置, 结束位置, 内容)
位置基于源字符串s1
"""
m, n = len(s1), len(s2)
# 使用简单的LCS算法来找到最长公共子序列
# 然后基于LCS生成操作序列
lcs = [[0] * (n + 1) for _ in range(m + 1)]
# 构建LCS表
for i in range(1, m + 1):
for j in range(1, n + 1):
if s1[i - 1] == s2[j - 1]:
lcs[i][j] = lcs[i - 1][j - 1] + 1
else:
lcs[i][j] = max(lcs[i - 1][j], lcs[i][j - 1])
# 基于LCS生成操作序列
operations = []
i, j = m, n
while i > 0 or j > 0:
if i > 0 and j > 0 and s1[i - 1] == s2[j - 1]:
# 字符匹配,不需要操作
i -= 1
j -= 1
elif j > 0 and (i == 0 or lcs[i][j - 1] >= lcs[i - 1][j]):
# 需要插入s2[j-1]
# 找到插入位置在s1中的位置
insert_pos = i
operations.insert(0, ("add", insert_pos, insert_pos, s2[j - 1]))
j -= 1
else:
# 需要删除s1[i-1]
operations.insert(0, ("delete", i - 1, i, s1[i - 1]))
i -= 1
# 合并连续的操作
merged_operations = []
for op in operations:
if merged_operations and merged_operations[-1][0] == op[0]:
last_op = merged_operations[-1]
if op[0] == "add" and last_op[2] == op[1]:
# 合并连续的添加操作
merged_operations[-1] = (op[0], last_op[1], op[2], last_op[3] + op[3])
elif op[0] == "delete" and last_op[2] == op[1]:
# 合并连续的删除操作
merged_operations[-1] = (op[0], last_op[1], op[2], last_op[3] + op[3])
else:
merged_operations.append(op)
else:
merged_operations.append(op)
# 计算编辑距离
edit_distance = m + n - 2 * lcs[m][n]
return edit_distance, merged_operations
def _build_line_lcs(lines1: List[str], lines2: List[str]) -> List[List[int]]:
"""
构建行级LCS动态规划表
"""
m, n = len(lines1), len(lines2)
lcs = [[0] * (n + 1) for _ in range(m + 1)]
# 使用哈希加速行比较
hash1 = [hash(line) for line in lines1]
hash2 = [hash(line) for line in lines2]
for i in range(1, m + 1):
for j in range(1, n + 1):
if hash1[i-1] == hash2[j-1] and lines1[i-1] == lines2[j-1]:
lcs[i][j] = lcs[i-1][j-1] + 1
else:
lcs[i][j] = max(lcs[i-1][j], lcs[i][j-1])
return lcs
def _extract_line_operations(lines1: List[str], lines2: List[str], lcs: List[List[int]]) -> List[Tuple[str, int, int, List[str]]]:
"""
从LCS表提取行级操作序列
返回: (操作类型, 起始行号, 结束行号, 行内容列表)
"""
operations = []
m, n = len(lines1), len(lines2)
i, j = m, n
while i > 0 or j > 0:
if i > 0 and j > 0 and lines1[i-1] == lines2[j-1]:
i -= 1
j -= 1
elif j > 0 and (i == 0 or lcs[i][j-1] >= lcs[i-1][j]):
operations.insert(0, ("add", i, i, [lines2[j-1]]))
j -= 1
else:
operations.insert(0, ("delete", i-1, i, [lines1[i-1]]))
i -= 1
# 合并连续的同类行操作
merged = []
for op_type, start, end, lines in operations:
if merged and merged[-1][0] == op_type and merged[-1][2] == start:
merged[-1] = (op_type, merged[-1][1], end, merged[-1][3] + lines)
else:
merged.append((op_type, start, end, lines))
return merged
def _char_diff_in_region(s1: str, s2: str) -> List[Tuple[str, int, int, str]]:
"""
对小范围区域进行字符级LCS比较
返回相对于输入字符串的位置
"""
m, n = len(s1), len(s2)
# 快速路径
if m == 0 and n == 0:
return []
if m == 0:
return [("add", 0, 0, s2)]
if n == 0:
return [("delete", 0, m, s1)]
if s1 == s2:
return []
# 字符级LCS
lcs = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(1, m + 1):
for j in range(1, n + 1):
if s1[i-1] == s2[j-1]:
lcs[i][j] = lcs[i-1][j-1] + 1
else:
lcs[i][j] = max(lcs[i-1][j], lcs[i][j-1])
# 回溯生成操作
operations = []
i, j = m, n
while i > 0 or j > 0:
if i > 0 and j > 0 and s1[i-1] == s2[j-1]:
i -= 1
j -= 1
elif j > 0 and (i == 0 or lcs[i][j-1] >= lcs[i-1][j]):
operations.insert(0, ("add", i, i, s2[j-1]))
j -= 1
else:
operations.insert(0, ("delete", i-1, i, s1[i-1]))
i -= 1
# 合并连续操作
merged = []
for op_type, start, end, content in operations:
if merged and merged[-1][0] == op_type:
last_op = merged[-1]
if op_type == "add" and last_op[2] == start:
merged[-1] = (op_type, last_op[1], end, last_op[3] + content)
elif op_type == "delete" and last_op[2] == start:
merged[-1] = (op_type, last_op[1], end, last_op[3] + content)
else:
merged.append((op_type, start, end, content))
else:
merged.append((op_type, start, end, content))
return merged
def GetDiffOperations(
s1:str,
s2:str,
) -> List[Tuple[Literal["add","delete"], int, int, str]]:
"""
计算两个字符串的差异操作序列(混合行级+字符级算法)
操作格式: (操作类型, 开始位置, 结束位置, 内容)
位置基于源字符串s1的字符偏移
"""
# 快速路径
if s1 == s2:
return []
if not s1:
return [("add", 0, 0, s2)]
if not s2:
return [("delete", 0, len(s1), s1)]
# 阶段1: 分行并建立位置映射
lines1 = s1.split('\n')
lines2 = s2.split('\n')
# 构建行号到字符位置的映射
line_offsets_s1 = [0]
for line in lines1[:-1]:
line_offsets_s1.append(line_offsets_s1[-1] + len(line) + 1) # +1 for '\n'
line_offsets_s2 = [0]
for line in lines2[:-1]:
line_offsets_s2.append(line_offsets_s2[-1] + len(line) + 1)
# 阶段2: 行级LCS分析
lcs = _build_line_lcs(lines1, lines2)
line_operations = _extract_line_operations(lines1, lines2, lcs)
# 阶段3: 转换为字符级操作
final_operations = []
for op_type, start_line, end_line, op_lines in line_operations:
if op_type == "add":
# 添加操作: 在s1的start_line位置插入
char_pos = line_offsets_s1[start_line] if start_line < len(line_offsets_s1) else len(s1)
content = '\n'.join(op_lines)
# 对于添加的行块,可以选择字符级细化或直接使用
# 这里先直接使用行级结果
final_operations.append(("add", char_pos, char_pos, content))
elif op_type == "delete":
# 删除操作: 删除s1的[start_line, end_line)行
char_start = line_offsets_s1[start_line]
if end_line < len(lines1):
char_end = line_offsets_s1[end_line]
else:
char_end = len(s1)
content = '\n'.join(op_lines)
final_operations.append(("delete", char_start, char_end, content))
# 阶段4: 对于连续的删除+添加,尝试字符级精细比较
optimized_operations = []
i = 0
while i < len(final_operations):
if (i + 1 < len(final_operations) and
final_operations[i][0] == "delete" and
final_operations[i+1][0] == "add" and
final_operations[i][2] == final_operations[i+1][1]):
# 这是一个修改操作,进行字符级细化
del_op = final_operations[i]
add_op = final_operations[i+1]
old_text = del_op[3]
new_text = add_op[3]
base_pos = del_op[1]
# 字符级比较
char_ops = _char_diff_in_region(old_text, new_text)
# 调整位置到全局坐标
for op_type, rel_start, rel_end, content in char_ops:
optimized_operations.append((op_type, base_pos + rel_start, base_pos + rel_end, content))
i += 2
else:
optimized_operations.append(final_operations[i])
i += 1
return optimized_operations

587
Runtime/Web.py Normal file
View File

@@ -0,0 +1,587 @@
from .Config import *
from .File import ToolFile
import json
import urllib.parse
import urllib.request
import urllib.error
import asyncio
import os
import re
from typing import *
from pydantic import BaseModel
try:
import aiohttp
import aiofiles
except ImportError as e:
ImportingThrow(e, "Web", ["aiohttp", "aiofiles"])
class WebError(Exception):
"""网络操作异常基类"""
pass
class URLValidationError(WebError):
"""URL验证异常"""
pass
class HTTPRequestError(WebError):
"""HTTP请求异常"""
pass
class DownloadError(WebError):
"""下载异常"""
pass
class ToolURL(BaseModel):
"""网络URL工具类提供HTTP客户端和URL操作功能"""
url: str
def __init__(self, url: Union[str, 'ToolURL']):
"""
从URL字符串创建对象
Args:
url: URL字符串或ToolURL对象
"""
if isinstance(url, ToolURL):
url = url.url
super().__init__(url=str(url))
def __str__(self) -> str:
"""隐式字符串转换"""
return self.url
def __bool__(self) -> bool:
"""隐式布尔转换等同于IsValid"""
return self.IsValid
def ToString(self) -> str:
"""获取完整URL"""
return self.url
def GetFullURL(self) -> str:
"""获取完整URL"""
return self.url
@property
def FullURL(self) -> str:
"""获取完整URL属性"""
return self.url
@property
def IsValid(self) -> bool:
"""检查URL是否有效"""
return self.ValidateURL()
def ValidateURL(self) -> bool:
"""
验证URL格式
Returns:
是否为有效的HTTP/HTTPS URL
"""
try:
parsed = urllib.parse.urlparse(self.url)
return parsed.scheme in ('http', 'https') and parsed.netloc != ''
except Exception:
return False
def GetFilename(self) -> str:
"""
获取URL中的文件名
Returns:
URL路径中的文件名
"""
try:
parsed = urllib.parse.urlparse(self.url)
path = parsed.path
if path:
return os.path.basename(path)
return ""
except Exception:
return ""
def GetExtension(self) -> str:
"""
获取文件扩展名
Returns:
文件扩展名(不包含点)
"""
filename = self.GetFilename()
if '.' in filename:
return filename.split('.')[-1].lower()
return ""
def ExtensionIs(self, *extensions: str) -> bool:
"""
检查扩展名是否匹配
Args:
*extensions: 要检查的扩展名列表
Returns:
是否匹配任一扩展名
"""
current_ext = self.GetExtension()
return current_ext in [ext.lower().lstrip('.') for ext in extensions]
def Open(self, url: str) -> 'ToolURL':
"""
在当前对象上打开新URL
Args:
url: 新的URL字符串
Returns:
更新后的ToolURL对象
"""
self.url = str(url)
return self
# 文件类型判断属性
@property
def IsText(self) -> bool:
"""是否为文本文件txt, html, htm, css, js, xml, csv"""
return self.ExtensionIs('txt', 'html', 'htm', 'css', 'js', 'xml', 'csv', 'md', 'py', 'java', 'cpp', 'c', 'h')
@property
def IsJson(self) -> bool:
"""是否为JSON文件"""
return self.ExtensionIs('json')
@property
def IsImage(self) -> bool:
"""是否为图像文件jpg, jpeg, png, gif, bmp, svg"""
return self.ExtensionIs('jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp')
@property
def IsDocument(self) -> bool:
"""是否为文档文件pdf, doc, docx, xls, xlsx, ppt, pptx"""
return self.ExtensionIs('pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx')
# HTTP请求方法
def Get(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
同步GET请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
with urllib.request.urlopen(self.url) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
def Post(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
同步POST请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
data = None
if form_data:
data = urllib.parse.urlencode(form_data).encode('utf-8')
req = urllib.request.Request(self.url, data=data, method='POST')
if form_data:
req.add_header('Content-Type', 'application/x-www-form-urlencoded')
with urllib.request.urlopen(req) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
# 异步HTTP请求方法
async def GetAsync(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
异步GET请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
async def PostAsync(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
异步POST请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
async with aiohttp.ClientSession() as session:
async with session.post(self.url, data=form_data) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
def PostJson(self, callback: Callable[[Optional[Any]], None],
json_data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None) -> bool:
"""
同步JSON POST请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
json_data: JSON数据字典
headers: 自定义请求头字典
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
data = None
if json_data:
data = json.dumps(json_data).encode('utf-8')
req = urllib.request.Request(self.url, data=data, method='POST')
# 设置默认请求头
req.add_header('Content-Type', 'application/json')
# 添加自定义请求头
if headers:
for key, value in headers.items():
req.add_header(key, value)
with urllib.request.urlopen(req) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
async def PostJsonAsync(self, callback: Callable[[Optional[Any]], None],
json_data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None) -> bool:
"""
异步JSON POST请求
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
json_data: JSON数据字典
headers: 自定义请求头字典
Returns:
是否请求成功
"""
if not self.IsValid:
callback(None)
return False
try:
# 准备请求头
request_headers = {'Content-Type': 'application/json'}
if headers:
request_headers.update(headers)
async with aiohttp.ClientSession() as session:
async with session.post(self.url, json=json_data, headers=request_headers) as response:
callback(response)
return True
except Exception as e:
callback(None)
return False
# 内容加载方法
def LoadAsText(self) -> str:
"""
同步加载为文本
Returns:
文本内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
with urllib.request.urlopen(self.url) as response:
content = response.read()
# 尝试检测编码
encoding = response.headers.get_content_charset() or 'utf-8'
return content.decode(encoding)
except Exception as e:
raise HTTPRequestError(f"Failed to load text from {self.url}: {str(e)}")
async def LoadAsTextAsync(self) -> str:
"""
异步加载为文本
Returns:
文本内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
return await response.text()
except Exception as e:
raise HTTPRequestError(f"Failed to load text from {self.url}: {str(e)}")
def LoadAsBinary(self) -> bytes:
"""
同步加载为字节数组
Returns:
二进制内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
with urllib.request.urlopen(self.url) as response:
return response.read()
except Exception as e:
raise HTTPRequestError(f"Failed to load binary from {self.url}: {str(e)}")
async def LoadAsBinaryAsync(self) -> bytes:
"""
异步加载为字节数组
Returns:
二进制内容
"""
if not self.IsValid:
raise URLValidationError(f"Invalid URL: {self.url}")
try:
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
return await response.read()
except Exception as e:
raise HTTPRequestError(f"Failed to load binary from {self.url}: {str(e)}")
def LoadAsJson(self, model_type: Optional[type] = None) -> Any:
"""
同步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
text_content = self.LoadAsText()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise HTTPRequestError(f"Failed to parse JSON from {self.url}: {str(e)}")
async def LoadAsJsonAsync(self, model_type: Optional[type] = None) -> Any:
"""
异步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
text_content = await self.LoadAsTextAsync()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise HTTPRequestError(f"Failed to parse JSON from {self.url}: {str(e)}")
# 文件保存和下载功能
def Save(self, local_path: Optional[str] = None) -> ToolFile:
"""
自动选择格式保存到本地
Args:
local_path: 本地保存路径如果为None则自动生成
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
if self.IsText:
return self.SaveAsText(local_path)
elif self.IsJson:
return self.SaveAsJson(local_path)
else:
return self.SaveAsBinary(local_path)
def SaveAsText(self, local_path: Optional[str] = None) -> ToolFile:
"""
保存为文本文件
Args:
local_path: 本地保存路径
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
text_content = self.LoadAsText()
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsText(text_content)
return file_obj
def SaveAsJson(self, local_path: Optional[str] = None) -> ToolFile:
"""
保存为JSON文件
Args:
local_path: 本地保存路径
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded.json"
json_data = self.LoadAsJson()
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsJson(json_data)
return file_obj
def SaveAsBinary(self, local_path: Optional[str] = None) -> ToolFile:
"""
保存为二进制文件
Args:
local_path: 本地保存路径
Returns:
保存的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
binary_content = self.LoadAsBinary()
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsBinary(binary_content)
return file_obj
def Download(self, local_path: Optional[str] = None) -> ToolFile:
"""
同步下载文件
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
return self.Save(local_path)
async def DownloadAsync(self, local_path: Optional[str] = None) -> ToolFile:
"""
异步下载文件
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
try:
if self.IsText:
content = await self.LoadAsTextAsync()
file_obj.SaveAsText(content)
elif self.IsJson:
content = await self.LoadAsJsonAsync()
file_obj.SaveAsJson(content)
else:
content = await self.LoadAsBinaryAsync()
file_obj.SaveAsBinary(content)
return file_obj
except Exception as e:
raise DownloadError(f"Failed to download {self.url}: {str(e)}")
# 静态HTTP客户端实例避免连接池耗尽
_http_session: Optional[aiohttp.ClientSession] = None
async def get_http_session() -> aiohttp.ClientSession:
"""获取全局HTTP会话实例"""
global _http_session
if _http_session is None or _http_session.closed:
_http_session = aiohttp.ClientSession()
return _http_session
async def close_http_session():
"""关闭全局HTTP会话"""
global _http_session
if _http_session and not _http_session.closed:
await _http_session.close()
_http_session = None

0
Runtime/__init__.py Normal file
View File