Compare commits

...

6 Commits

5 changed files with 39 additions and 95 deletions

View File

@@ -34,14 +34,14 @@ class SingletonModel[T](IModel):
_InjectInstances:Dict[type,Any] = {} _InjectInstances:Dict[type,Any] = {}
@staticmethod @staticmethod
def GetInstance(t:Typen[T]) -> T: def GetInstance(t:type) -> T:
return SingletonModel._InjectInstances[t] return SingletonModel._InjectInstances[t]
@staticmethod @staticmethod
def SetInstance(t:Typen[T], obj:T) -> None: def SetInstance(t:type, obj:T) -> None:
SingletonModel._InjectInstances[t] = obj SingletonModel._InjectInstances[t] = obj
def __init__(self, t:Typen[T]) -> None: def __init__(self, t:type) -> None:
self.typen: type = t self.typen: type = t
@@ -131,8 +131,9 @@ class Architecture:
if slot in cls._RegisteringRuntime: if slot in cls._RegisteringRuntime:
raise InvalidOperationError("Illegal duplicate registrations") raise InvalidOperationError("Illegal duplicate registrations")
cls._RegisteringRuntime[slot] = Architecture.Registering(slot, target, DependenceModel(Architecture.TypeQuery(dependence) for dependence in dependences), action) cls._RegisteringRuntime[slot] = Architecture.Registering(slot, target, DependenceModel(Architecture.TypeQuery(dependence) for dependence in dependences), action)
dependences = cls._RegisteringRuntime[slot].dependences
cls._InternalRegisteringComplete() cls._InternalRegisteringComplete()
return cls._RegisteringRuntime[slot].dependences return dependences
@classmethod @classmethod
def Contains(cls, type_:type) -> bool: def Contains(cls, type_:type) -> bool:

View File

@@ -187,10 +187,10 @@ def InternalImportingThrow(
): ):
with lock_guard(): with lock_guard():
requierds_str = ",".join([f"<{r}>" for r in requierds]) requierds_str = ",".join([f"<{r}>" for r in requierds])
print(f"Internal Convention package is not installed.\n{messageBase.format_map({ print("Internal Convention package is not installed.\n", messageBase.format_map({
"module": moduleName, "module": moduleName,
"required": requierds_str "required": requierds_str
})}\n[{ConsoleFrontColor.RESET}{format_traceback_info(back=2)}{ConsoleFrontColor.RED}]") }), "\n[",ConsoleFrontColor.RESET,format_traceback_info(back=2),ConsoleFrontColor.RED,"]")
def ReleaseFailed2Requirements(): def ReleaseFailed2Requirements():
global ImportingFailedSet global ImportingFailedSet
@@ -204,18 +204,17 @@ try:
except ImportError: except ImportError:
InternalImportingThrow("Internal", ["pydantic"]) InternalImportingThrow("Internal", ["pydantic"])
type Typen[_T] = type T = TypeVar("T")
Action = TypeVar("Action", bound=Callable[[], None])
type Action = Callable[[], None] ClosuresCallable = Union[Callable[[Optional[None]], Any], type]
type ClosuresCallable[_T] = Union[Callable[[Optional[None]], _T], Typen[_T]]
def AssemblyTypen(obj:Any) -> str: def AssemblyTypen(obj:Any) -> str:
if isinstance(obj, type): if isinstance(obj, type):
return f"{obj.__module__}.{obj.__name__}, "\ return f"{obj.__module__}.{obj.__name__}, "\
f"{obj.Assembly() if hasattr(obj, "Assembly") else "Global"}" f"{obj.Assembly() if hasattr(obj, 'Assembly') else 'Global'}"
else: else:
return f"{obj.__class__.__module__}.{obj.__class__.__name__}, "\ return f"{obj.__class__.__module__}.{obj.__class__.__name__}, "\
f"{obj.GetAssembly() if hasattr(obj, "GetAssembly") else "Global"}" f"{obj.GetAssembly() if hasattr(obj, 'GetAssembly') else 'Global'}"
def ReadAssemblyTypen( def ReadAssemblyTypen(
assembly_typen: str, assembly_typen: str,
*, *,
@@ -232,68 +231,13 @@ def ReadAssemblyTypen(
target_type = getattr(importlib.import_module(module_name), class_name) target_type = getattr(importlib.import_module(module_name), class_name)
return target_type, assembly_name return target_type, assembly_name
# using as c#: event
class ActionEvent[_Call:Callable]:
def __init__(self, actions:Sequence[_Call]):
super().__init__()
self._actions: List[Callable] = [action for action in actions]
self.call_indexs: List[int] = [i for i in range(len(actions))]
self.last_result: List[Any] = []
def CallFuncWithoutCallIndexControl(self, index:int, *args, **kwargs) -> Union[Any, Exception]:
try:
return self._actions[index](*args, **kwargs)
except Exception as ex:
return ex
def CallFunc(self, index:int, *args, **kwargs) -> Union[Any, Exception]:
return self.CallFuncWithoutCallIndexControl(self.call_indexs[index], *args, **kwargs)
def _InjectInvoke(self, *args, **kwargs):
result:List[Any] = []
for index in range(self.CallMaxCount):
result.append(self.CallFunc(index, *args, **kwargs))
return result
def Invoke(self, *args, **kwargs) -> Union[Self, bool]:
self.last_result = self._InjectInvoke(*args, **kwargs)
return self
def InitCallIndex(self):
self.call_indexs = [i for i in range(len(self._actions))]
def AddAction(self, action:_Call):
self._actions.append(action)
self.call_indexs.append(len(self._actions)-1)
return self
def AddActions(self, actions:Sequence[_Call]):
for action in actions:
self.AddAction(action)
return self
def _InternalRemoveAction(self, action:_Call):
if action in self._actions:
index = self._actions.index(action)
self._actions.remove(action)
self.call_indexs.remove(index)
for i in range(len(self.call_indexs)):
if self.call_indexs[i] > index:
self.call_indexs[i] -= 1
return True
return False
def RemoveAction(self, action:_Call):
while self._InternalRemoveAction(action):
pass
return self
def IsValid(self):
return not any(isinstance(x, Exception) for x in self.last_result)
def __bool__(self):
return self.IsValid()
@property
def CallMaxCount(self):
return len(self.call_indexs)
@property
def ActionCount(self):
return len(self._actions)
# region instance # region instance
# threads # threads
class atomic[_T]: _T = TypeVar("_T")
class atomic(Generic[_T]):
def __init__( def __init__(
self, self,
value: _T, value: _T,
@@ -320,13 +264,13 @@ class atomic[_T]:
return self.FetchAdd(value) return self.FetchAdd(value)
def __sub__(self, value:_T): def __sub__(self, value:_T):
return self.FetchSub(value) return self.FetchSub(value)
def __iadd__(self, value:_T) -> Self: def __iadd__(self, value:_T) -> 'atomic[_T]':
self.FetchAdd(value) self.FetchAdd(value)
return self return self
def __isub__(self, value:_T) -> Self: def __isub__(self, value:_T) -> 'atomic[_T]':
self.FetchSub(value) self.FetchSub(value)
return self return self
def __enter__(self) -> Self: def __enter__(self) -> 'atomic[_T]':
self._is_in_with = True self._is_in_with = True
self.locker.acquire() self.locker.acquire()
return self return self
@@ -482,7 +426,7 @@ class PlatformIndicator:
""" """
return "Assets/" return "Assets/"
class DescriptiveIndicator[T]: class DescriptiveIndicator(Generic[T]):
def __init__(self, description:str, value:T) -> None: def __init__(self, description:str, value:T) -> None:
self.descripion : str = description self.descripion : str = description
self.value : T = value self.value : T = value

View File

@@ -366,7 +366,7 @@ class EasySave:
@overload @overload
@staticmethod @staticmethod
def Read[T]( def Read[T](
rtype: Typen[T], rtype: type,
file: Optional[ToolFile|str] = None, file: Optional[ToolFile|str] = None,
*, *,
setting: Optional[EasySaveSetting] = None setting: Optional[EasySaveSetting] = None

View File

@@ -60,7 +60,7 @@ class ToolFile(BaseModel):
def __init__( def __init__(
self, self,
filePath: Union[str, Self], filePath: Union[str, 'ToolFile'],
): ):
filePath = os.path.expandvars(str(filePath)) filePath = os.path.expandvars(str(filePath))
if ":" in filePath: if ":" in filePath:
@@ -152,7 +152,7 @@ class ToolFile(BaseModel):
else: else:
os.remove(self.OriginFullPath) os.remove(self.OriginFullPath)
return self return self
def Copy(self, targetPath:Optional[Union[Self, str]]=None): def Copy(self, targetPath:Optional[Union['ToolFile', str]]=None):
if targetPath is None: if targetPath is None:
return ToolFile(self.OriginFullPath) return ToolFile(self.OriginFullPath)
if self.Exists() == False: if self.Exists() == False:
@@ -162,7 +162,7 @@ class ToolFile(BaseModel):
target_file = target_file|self.GetFilename() target_file = target_file|self.GetFilename()
shutil.copy(self.OriginFullPath, str(target_file)) shutil.copy(self.OriginFullPath, str(target_file))
return target_file return target_file
def Move(self, targetPath:Union[Self, str]): def Move(self, targetPath:Union['ToolFile', str]):
if self.Exists() is False: if self.Exists() is False:
raise FileNotFoundError("file not found") raise FileNotFoundError("file not found")
target_file = ToolFile(str(targetPath)) target_file = ToolFile(str(targetPath))
@@ -171,7 +171,7 @@ class ToolFile(BaseModel):
shutil.move(self.OriginFullPath, str(target_file)) shutil.move(self.OriginFullPath, str(target_file))
self.OriginFullPath = target_file.OriginFullPath self.OriginFullPath = target_file.OriginFullPath
return self return self
def Rename(self, newpath:Union[Self, str]): def Rename(self, newpath:Union['ToolFile', str]):
if self.Exists() is False: if self.Exists() is False:
raise FileNotFoundError("file not found") raise FileNotFoundError("file not found")
newpath = str(newpath) newpath = str(newpath)
@@ -327,7 +327,7 @@ class ToolFile(BaseModel):
except: except:
pass pass
with open(self.OriginFullPath, 'w', encoding='utf-8') as f: with open(self.OriginFullPath, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=4) json.dump(json_data, f, indent=4, ensure_ascii=False)
return self return self
def SaveAsCsv(self, csv_data:"pd.DataFrame"): def SaveAsCsv(self, csv_data:"pd.DataFrame"):
''' '''
@@ -504,12 +504,11 @@ class ToolFile(BaseModel):
return result return result
def DirWalk( def DirWalk(
self, self,
top,
topdown: bool = True, topdown: bool = True,
onerror: Optional[Callable] = None, onerror: Optional[Callable] = None,
followlinks: bool = False followlinks: bool = False
) -> Iterator[tuple[dir_name_type, list[dir_name_type], list[file_name_type]]]: ) -> Iterator[tuple[dir_name_type, list[dir_name_type], list[file_name_type]]]:
return os.walk(self.OriginFullPath, top=top, topdown=topdown, onerror=onerror, followlinks=followlinks) return os.walk(self.OriginFullPath, topdown=topdown, onerror=onerror, followlinks=followlinks)
def bool(self): def bool(self):
@@ -522,7 +521,7 @@ class ToolFile(BaseModel):
self.Create() self.Create()
return self return self
def MakeFileInside(self, data:Self, is_delete_source = False): def MakeFileInside(self, data:'ToolFile', is_delete_source = False):
if self.IsDir() is False: if self.IsDir() is False:
raise Exception("Cannot make file inside a file, because this object target is not a directory") raise Exception("Cannot make file inside a file, because this object target is not a directory")
result:ToolFile = self|data.GetFilename() result:ToolFile = self|data.GetFilename()
@@ -644,7 +643,7 @@ class ToolFile(BaseModel):
except Exception as e: except Exception as e:
raise EncryptionError(f"Encryption failed: {str(e)}") raise EncryptionError(f"Encryption failed: {str(e)}")
def decrypt(self, key: str, algorithm: str = 'AES') -> Self: def decrypt(self, key: str, algorithm: str = 'AES') -> 'ToolFile':
""" """
解密文件 解密文件
Args: Args:
@@ -740,7 +739,7 @@ class ToolFile(BaseModel):
except Exception as e: except Exception as e:
raise HashError(f"Hash verification failed: {str(e)}") raise HashError(f"Hash verification failed: {str(e)}")
def save_hash(self, algorithm: str = 'md5', output_path: Optional[str] = None) -> Self: def save_hash(self, algorithm: str = 'md5', output_path: Optional[str] = None) -> 'ToolFile':
""" """
保存文件的哈希值到文件 保存文件的哈希值到文件
Args: Args:
@@ -861,7 +860,7 @@ class ToolFile(BaseModel):
max_backups: int = 5, max_backups: int = 5,
backup_format: str = 'zip', backup_format: str = 'zip',
include_metadata: bool = True include_metadata: bool = True
) -> Self: ) -> 'ToolFile':
""" """
创建文件或目录的备份 创建文件或目录的备份
Args: Args:
@@ -879,7 +878,7 @@ class ToolFile(BaseModel):
# 生成备份目录 # 生成备份目录
if backup_dir is None: if backup_dir is None:
backup_dir = os.path.join(self.GetDir(), '.backup') backup_dir = os.path.join(self.GetDir(), '.backup')
backup_dir:Self = ToolFile(backup_dir) backup_dir:'ToolFile' = ToolFile(backup_dir)
backup_dir.MustExistsPath() backup_dir.MustExistsPath()
# 生成备份文件名 # 生成备份文件名
@@ -935,10 +934,10 @@ class ToolFile(BaseModel):
def restore_backup( def restore_backup(
self, self,
backup_file: Union[str, Self], backup_file: Union[str, 'ToolFile'],
restore_path: Optional[str] = None, restore_path: Optional[str] = None,
verify_hash: bool = True verify_hash: bool = True
) -> Self: ) -> 'ToolFile':
""" """
从备份恢复文件或目录 从备份恢复文件或目录
Args: Args:
@@ -949,7 +948,7 @@ class ToolFile(BaseModel):
恢复后的文件对象 恢复后的文件对象
""" """
if not isinstance(backup_file, ToolFile): if not isinstance(backup_file, ToolFile):
backup_file:Self = ToolFile(backup_file) backup_file:'ToolFile' = ToolFile(backup_file)
if not backup_file.Exists(): if not backup_file.Exists():
raise FileNotFoundError(f"Backup file not found: {backup_file.GetFullPath()}") raise FileNotFoundError(f"Backup file not found: {backup_file.GetFullPath()}")
@@ -958,7 +957,7 @@ class ToolFile(BaseModel):
# 确定恢复路径 # 确定恢复路径
if restore_path is None: if restore_path is None:
restore_path = self.GetFullPath() restore_path = self.GetFullPath()
restore_path:Self = ToolFile(restore_path) restore_path:'ToolFile' = ToolFile(restore_path)
# 解压备份 # 解压备份
if backup_file.get_extension() == 'zip': if backup_file.get_extension() == 'zip':
@@ -985,7 +984,7 @@ class ToolFile(BaseModel):
except Exception as e: except Exception as e:
raise BackupError(f"Restore failed: {str(e)}") raise BackupError(f"Restore failed: {str(e)}")
def list_backups(self) -> List[Self]: def list_backups(self) -> List['ToolFile']:
""" """
列出所有备份 列出所有备份
Returns: Returns:
@@ -995,7 +994,7 @@ class ToolFile(BaseModel):
raise FileNotFoundError(f"File not found: {self.GetFullPath()}") raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
try: try:
backup_dir:Self = ToolFile(os.path.join(self.GetDir(), '.backup')) backup_dir:'ToolFile' = ToolFile(os.path.join(self.GetDir(), '.backup'))
if not backup_dir.Exists(): if not backup_dir.Exists():
return [] return []
@@ -1037,7 +1036,7 @@ class ToolFile(BaseModel):
execute: Optional[bool] = None, execute: Optional[bool] = None,
hidden: Optional[bool] = None, hidden: Optional[bool] = None,
recursive: bool = False recursive: bool = False
) -> Self: ) -> 'ToolFile':
""" """
设置文件或目录的权限 设置文件或目录的权限
Args: Args:

View File

@@ -228,7 +228,7 @@ class GlobalConfig:
return self return self
# 配置查找 # 配置查找
def FindItem[T](self, key: str, default: Optional[T] = None) -> Optional[T]: def FindItem(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
"""查找配置项,支持默认值""" """查找配置项,支持默认值"""
if key in self._data_pair: if key in self._data_pair:
return self._data_pair[key] return self._data_pair[key]