Compare commits
6 Commits
3fa432a2bb
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 058975c37d | |||
| efd826f677 | |||
| ad17b905c4 | |||
| 4ba5bfdfee | |||
| 8121899d32 | |||
| a79ba69443 |
@@ -34,14 +34,14 @@ class SingletonModel[T](IModel):
|
|||||||
_InjectInstances:Dict[type,Any] = {}
|
_InjectInstances:Dict[type,Any] = {}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def GetInstance(t:Typen[T]) -> T:
|
def GetInstance(t:type) -> T:
|
||||||
return SingletonModel._InjectInstances[t]
|
return SingletonModel._InjectInstances[t]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def SetInstance(t:Typen[T], obj:T) -> None:
|
def SetInstance(t:type, obj:T) -> None:
|
||||||
SingletonModel._InjectInstances[t] = obj
|
SingletonModel._InjectInstances[t] = obj
|
||||||
|
|
||||||
def __init__(self, t:Typen[T]) -> None:
|
def __init__(self, t:type) -> None:
|
||||||
self.typen: type = t
|
self.typen: type = t
|
||||||
|
|
||||||
|
|
||||||
@@ -131,8 +131,9 @@ class Architecture:
|
|||||||
if slot in cls._RegisteringRuntime:
|
if slot in cls._RegisteringRuntime:
|
||||||
raise InvalidOperationError("Illegal duplicate registrations")
|
raise InvalidOperationError("Illegal duplicate registrations")
|
||||||
cls._RegisteringRuntime[slot] = Architecture.Registering(slot, target, DependenceModel(Architecture.TypeQuery(dependence) for dependence in dependences), action)
|
cls._RegisteringRuntime[slot] = Architecture.Registering(slot, target, DependenceModel(Architecture.TypeQuery(dependence) for dependence in dependences), action)
|
||||||
|
dependences = cls._RegisteringRuntime[slot].dependences
|
||||||
cls._InternalRegisteringComplete()
|
cls._InternalRegisteringComplete()
|
||||||
return cls._RegisteringRuntime[slot].dependences
|
return dependences
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def Contains(cls, type_:type) -> bool:
|
def Contains(cls, type_:type) -> bool:
|
||||||
|
|||||||
@@ -187,10 +187,10 @@ def InternalImportingThrow(
|
|||||||
):
|
):
|
||||||
with lock_guard():
|
with lock_guard():
|
||||||
requierds_str = ",".join([f"<{r}>" for r in requierds])
|
requierds_str = ",".join([f"<{r}>" for r in requierds])
|
||||||
print(f"Internal Convention package is not installed.\n{messageBase.format_map({
|
print("Internal Convention package is not installed.\n", messageBase.format_map({
|
||||||
"module": moduleName,
|
"module": moduleName,
|
||||||
"required": requierds_str
|
"required": requierds_str
|
||||||
})}\n[{ConsoleFrontColor.RESET}{format_traceback_info(back=2)}{ConsoleFrontColor.RED}]")
|
}), "\n[",ConsoleFrontColor.RESET,format_traceback_info(back=2),ConsoleFrontColor.RED,"]")
|
||||||
|
|
||||||
def ReleaseFailed2Requirements():
|
def ReleaseFailed2Requirements():
|
||||||
global ImportingFailedSet
|
global ImportingFailedSet
|
||||||
@@ -204,18 +204,17 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
InternalImportingThrow("Internal", ["pydantic"])
|
InternalImportingThrow("Internal", ["pydantic"])
|
||||||
|
|
||||||
type Typen[_T] = type
|
T = TypeVar("T")
|
||||||
|
Action = TypeVar("Action", bound=Callable[[], None])
|
||||||
type Action = Callable[[], None]
|
ClosuresCallable = Union[Callable[[Optional[None]], Any], type]
|
||||||
type ClosuresCallable[_T] = Union[Callable[[Optional[None]], _T], Typen[_T]]
|
|
||||||
|
|
||||||
def AssemblyTypen(obj:Any) -> str:
|
def AssemblyTypen(obj:Any) -> str:
|
||||||
if isinstance(obj, type):
|
if isinstance(obj, type):
|
||||||
return f"{obj.__module__}.{obj.__name__}, "\
|
return f"{obj.__module__}.{obj.__name__}, "\
|
||||||
f"{obj.Assembly() if hasattr(obj, "Assembly") else "Global"}"
|
f"{obj.Assembly() if hasattr(obj, 'Assembly') else 'Global'}"
|
||||||
else:
|
else:
|
||||||
return f"{obj.__class__.__module__}.{obj.__class__.__name__}, "\
|
return f"{obj.__class__.__module__}.{obj.__class__.__name__}, "\
|
||||||
f"{obj.GetAssembly() if hasattr(obj, "GetAssembly") else "Global"}"
|
f"{obj.GetAssembly() if hasattr(obj, 'GetAssembly') else 'Global'}"
|
||||||
def ReadAssemblyTypen(
|
def ReadAssemblyTypen(
|
||||||
assembly_typen: str,
|
assembly_typen: str,
|
||||||
*,
|
*,
|
||||||
@@ -232,68 +231,13 @@ def ReadAssemblyTypen(
|
|||||||
target_type = getattr(importlib.import_module(module_name), class_name)
|
target_type = getattr(importlib.import_module(module_name), class_name)
|
||||||
return target_type, assembly_name
|
return target_type, assembly_name
|
||||||
|
|
||||||
# using as c#: event
|
|
||||||
class ActionEvent[_Call:Callable]:
|
|
||||||
def __init__(self, actions:Sequence[_Call]):
|
|
||||||
super().__init__()
|
|
||||||
self._actions: List[Callable] = [action for action in actions]
|
|
||||||
self.call_indexs: List[int] = [i for i in range(len(actions))]
|
|
||||||
self.last_result: List[Any] = []
|
|
||||||
def CallFuncWithoutCallIndexControl(self, index:int, *args, **kwargs) -> Union[Any, Exception]:
|
|
||||||
try:
|
|
||||||
return self._actions[index](*args, **kwargs)
|
|
||||||
except Exception as ex:
|
|
||||||
return ex
|
|
||||||
def CallFunc(self, index:int, *args, **kwargs) -> Union[Any, Exception]:
|
|
||||||
return self.CallFuncWithoutCallIndexControl(self.call_indexs[index], *args, **kwargs)
|
|
||||||
def _InjectInvoke(self, *args, **kwargs):
|
|
||||||
result:List[Any] = []
|
|
||||||
for index in range(self.CallMaxCount):
|
|
||||||
result.append(self.CallFunc(index, *args, **kwargs))
|
|
||||||
return result
|
|
||||||
def Invoke(self, *args, **kwargs) -> Union[Self, bool]:
|
|
||||||
self.last_result = self._InjectInvoke(*args, **kwargs)
|
|
||||||
return self
|
|
||||||
def InitCallIndex(self):
|
|
||||||
self.call_indexs = [i for i in range(len(self._actions))]
|
|
||||||
def AddAction(self, action:_Call):
|
|
||||||
self._actions.append(action)
|
|
||||||
self.call_indexs.append(len(self._actions)-1)
|
|
||||||
return self
|
|
||||||
def AddActions(self, actions:Sequence[_Call]):
|
|
||||||
for action in actions:
|
|
||||||
self.AddAction(action)
|
|
||||||
return self
|
|
||||||
def _InternalRemoveAction(self, action:_Call):
|
|
||||||
if action in self._actions:
|
|
||||||
index = self._actions.index(action)
|
|
||||||
self._actions.remove(action)
|
|
||||||
self.call_indexs.remove(index)
|
|
||||||
for i in range(len(self.call_indexs)):
|
|
||||||
if self.call_indexs[i] > index:
|
|
||||||
self.call_indexs[i] -= 1
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
def RemoveAction(self, action:_Call):
|
|
||||||
while self._InternalRemoveAction(action):
|
|
||||||
pass
|
|
||||||
return self
|
|
||||||
def IsValid(self):
|
|
||||||
return not any(isinstance(x, Exception) for x in self.last_result)
|
|
||||||
def __bool__(self):
|
|
||||||
return self.IsValid()
|
|
||||||
@property
|
|
||||||
def CallMaxCount(self):
|
|
||||||
return len(self.call_indexs)
|
|
||||||
@property
|
|
||||||
def ActionCount(self):
|
|
||||||
return len(self._actions)
|
|
||||||
|
|
||||||
# region instance
|
# region instance
|
||||||
|
|
||||||
# threads
|
# threads
|
||||||
|
|
||||||
class atomic[_T]:
|
_T = TypeVar("_T")
|
||||||
|
|
||||||
|
class atomic(Generic[_T]):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
value: _T,
|
value: _T,
|
||||||
@@ -320,13 +264,13 @@ class atomic[_T]:
|
|||||||
return self.FetchAdd(value)
|
return self.FetchAdd(value)
|
||||||
def __sub__(self, value:_T):
|
def __sub__(self, value:_T):
|
||||||
return self.FetchSub(value)
|
return self.FetchSub(value)
|
||||||
def __iadd__(self, value:_T) -> Self:
|
def __iadd__(self, value:_T) -> 'atomic[_T]':
|
||||||
self.FetchAdd(value)
|
self.FetchAdd(value)
|
||||||
return self
|
return self
|
||||||
def __isub__(self, value:_T) -> Self:
|
def __isub__(self, value:_T) -> 'atomic[_T]':
|
||||||
self.FetchSub(value)
|
self.FetchSub(value)
|
||||||
return self
|
return self
|
||||||
def __enter__(self) -> Self:
|
def __enter__(self) -> 'atomic[_T]':
|
||||||
self._is_in_with = True
|
self._is_in_with = True
|
||||||
self.locker.acquire()
|
self.locker.acquire()
|
||||||
return self
|
return self
|
||||||
@@ -482,7 +426,7 @@ class PlatformIndicator:
|
|||||||
"""
|
"""
|
||||||
return "Assets/"
|
return "Assets/"
|
||||||
|
|
||||||
class DescriptiveIndicator[T]:
|
class DescriptiveIndicator(Generic[T]):
|
||||||
def __init__(self, description:str, value:T) -> None:
|
def __init__(self, description:str, value:T) -> None:
|
||||||
self.descripion : str = description
|
self.descripion : str = description
|
||||||
self.value : T = value
|
self.value : T = value
|
||||||
|
|||||||
@@ -366,7 +366,7 @@ class EasySave:
|
|||||||
@overload
|
@overload
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def Read[T](
|
def Read[T](
|
||||||
rtype: Typen[T],
|
rtype: type,
|
||||||
file: Optional[ToolFile|str] = None,
|
file: Optional[ToolFile|str] = None,
|
||||||
*,
|
*,
|
||||||
setting: Optional[EasySaveSetting] = None
|
setting: Optional[EasySaveSetting] = None
|
||||||
|
|||||||
@@ -60,7 +60,7 @@ class ToolFile(BaseModel):
|
|||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
filePath: Union[str, Self],
|
filePath: Union[str, 'ToolFile'],
|
||||||
):
|
):
|
||||||
filePath = os.path.expandvars(str(filePath))
|
filePath = os.path.expandvars(str(filePath))
|
||||||
if ":" in filePath:
|
if ":" in filePath:
|
||||||
@@ -152,7 +152,7 @@ class ToolFile(BaseModel):
|
|||||||
else:
|
else:
|
||||||
os.remove(self.OriginFullPath)
|
os.remove(self.OriginFullPath)
|
||||||
return self
|
return self
|
||||||
def Copy(self, targetPath:Optional[Union[Self, str]]=None):
|
def Copy(self, targetPath:Optional[Union['ToolFile', str]]=None):
|
||||||
if targetPath is None:
|
if targetPath is None:
|
||||||
return ToolFile(self.OriginFullPath)
|
return ToolFile(self.OriginFullPath)
|
||||||
if self.Exists() == False:
|
if self.Exists() == False:
|
||||||
@@ -162,7 +162,7 @@ class ToolFile(BaseModel):
|
|||||||
target_file = target_file|self.GetFilename()
|
target_file = target_file|self.GetFilename()
|
||||||
shutil.copy(self.OriginFullPath, str(target_file))
|
shutil.copy(self.OriginFullPath, str(target_file))
|
||||||
return target_file
|
return target_file
|
||||||
def Move(self, targetPath:Union[Self, str]):
|
def Move(self, targetPath:Union['ToolFile', str]):
|
||||||
if self.Exists() is False:
|
if self.Exists() is False:
|
||||||
raise FileNotFoundError("file not found")
|
raise FileNotFoundError("file not found")
|
||||||
target_file = ToolFile(str(targetPath))
|
target_file = ToolFile(str(targetPath))
|
||||||
@@ -171,7 +171,7 @@ class ToolFile(BaseModel):
|
|||||||
shutil.move(self.OriginFullPath, str(target_file))
|
shutil.move(self.OriginFullPath, str(target_file))
|
||||||
self.OriginFullPath = target_file.OriginFullPath
|
self.OriginFullPath = target_file.OriginFullPath
|
||||||
return self
|
return self
|
||||||
def Rename(self, newpath:Union[Self, str]):
|
def Rename(self, newpath:Union['ToolFile', str]):
|
||||||
if self.Exists() is False:
|
if self.Exists() is False:
|
||||||
raise FileNotFoundError("file not found")
|
raise FileNotFoundError("file not found")
|
||||||
newpath = str(newpath)
|
newpath = str(newpath)
|
||||||
@@ -327,7 +327,7 @@ class ToolFile(BaseModel):
|
|||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
with open(self.OriginFullPath, 'w', encoding='utf-8') as f:
|
with open(self.OriginFullPath, 'w', encoding='utf-8') as f:
|
||||||
json.dump(json_data, f, indent=4)
|
json.dump(json_data, f, indent=4, ensure_ascii=False)
|
||||||
return self
|
return self
|
||||||
def SaveAsCsv(self, csv_data:"pd.DataFrame"):
|
def SaveAsCsv(self, csv_data:"pd.DataFrame"):
|
||||||
'''
|
'''
|
||||||
@@ -504,12 +504,11 @@ class ToolFile(BaseModel):
|
|||||||
return result
|
return result
|
||||||
def DirWalk(
|
def DirWalk(
|
||||||
self,
|
self,
|
||||||
top,
|
|
||||||
topdown: bool = True,
|
topdown: bool = True,
|
||||||
onerror: Optional[Callable] = None,
|
onerror: Optional[Callable] = None,
|
||||||
followlinks: bool = False
|
followlinks: bool = False
|
||||||
) -> Iterator[tuple[dir_name_type, list[dir_name_type], list[file_name_type]]]:
|
) -> Iterator[tuple[dir_name_type, list[dir_name_type], list[file_name_type]]]:
|
||||||
return os.walk(self.OriginFullPath, top=top, topdown=topdown, onerror=onerror, followlinks=followlinks)
|
return os.walk(self.OriginFullPath, topdown=topdown, onerror=onerror, followlinks=followlinks)
|
||||||
|
|
||||||
|
|
||||||
def bool(self):
|
def bool(self):
|
||||||
@@ -522,7 +521,7 @@ class ToolFile(BaseModel):
|
|||||||
self.Create()
|
self.Create()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def MakeFileInside(self, data:Self, is_delete_source = False):
|
def MakeFileInside(self, data:'ToolFile', is_delete_source = False):
|
||||||
if self.IsDir() is False:
|
if self.IsDir() is False:
|
||||||
raise Exception("Cannot make file inside a file, because this object target is not a directory")
|
raise Exception("Cannot make file inside a file, because this object target is not a directory")
|
||||||
result:ToolFile = self|data.GetFilename()
|
result:ToolFile = self|data.GetFilename()
|
||||||
@@ -644,7 +643,7 @@ class ToolFile(BaseModel):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise EncryptionError(f"Encryption failed: {str(e)}")
|
raise EncryptionError(f"Encryption failed: {str(e)}")
|
||||||
|
|
||||||
def decrypt(self, key: str, algorithm: str = 'AES') -> Self:
|
def decrypt(self, key: str, algorithm: str = 'AES') -> 'ToolFile':
|
||||||
"""
|
"""
|
||||||
解密文件
|
解密文件
|
||||||
Args:
|
Args:
|
||||||
@@ -740,7 +739,7 @@ class ToolFile(BaseModel):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise HashError(f"Hash verification failed: {str(e)}")
|
raise HashError(f"Hash verification failed: {str(e)}")
|
||||||
|
|
||||||
def save_hash(self, algorithm: str = 'md5', output_path: Optional[str] = None) -> Self:
|
def save_hash(self, algorithm: str = 'md5', output_path: Optional[str] = None) -> 'ToolFile':
|
||||||
"""
|
"""
|
||||||
保存文件的哈希值到文件
|
保存文件的哈希值到文件
|
||||||
Args:
|
Args:
|
||||||
@@ -861,7 +860,7 @@ class ToolFile(BaseModel):
|
|||||||
max_backups: int = 5,
|
max_backups: int = 5,
|
||||||
backup_format: str = 'zip',
|
backup_format: str = 'zip',
|
||||||
include_metadata: bool = True
|
include_metadata: bool = True
|
||||||
) -> Self:
|
) -> 'ToolFile':
|
||||||
"""
|
"""
|
||||||
创建文件或目录的备份
|
创建文件或目录的备份
|
||||||
Args:
|
Args:
|
||||||
@@ -879,7 +878,7 @@ class ToolFile(BaseModel):
|
|||||||
# 生成备份目录
|
# 生成备份目录
|
||||||
if backup_dir is None:
|
if backup_dir is None:
|
||||||
backup_dir = os.path.join(self.GetDir(), '.backup')
|
backup_dir = os.path.join(self.GetDir(), '.backup')
|
||||||
backup_dir:Self = ToolFile(backup_dir)
|
backup_dir:'ToolFile' = ToolFile(backup_dir)
|
||||||
backup_dir.MustExistsPath()
|
backup_dir.MustExistsPath()
|
||||||
|
|
||||||
# 生成备份文件名
|
# 生成备份文件名
|
||||||
@@ -935,10 +934,10 @@ class ToolFile(BaseModel):
|
|||||||
|
|
||||||
def restore_backup(
|
def restore_backup(
|
||||||
self,
|
self,
|
||||||
backup_file: Union[str, Self],
|
backup_file: Union[str, 'ToolFile'],
|
||||||
restore_path: Optional[str] = None,
|
restore_path: Optional[str] = None,
|
||||||
verify_hash: bool = True
|
verify_hash: bool = True
|
||||||
) -> Self:
|
) -> 'ToolFile':
|
||||||
"""
|
"""
|
||||||
从备份恢复文件或目录
|
从备份恢复文件或目录
|
||||||
Args:
|
Args:
|
||||||
@@ -949,7 +948,7 @@ class ToolFile(BaseModel):
|
|||||||
恢复后的文件对象
|
恢复后的文件对象
|
||||||
"""
|
"""
|
||||||
if not isinstance(backup_file, ToolFile):
|
if not isinstance(backup_file, ToolFile):
|
||||||
backup_file:Self = ToolFile(backup_file)
|
backup_file:'ToolFile' = ToolFile(backup_file)
|
||||||
|
|
||||||
if not backup_file.Exists():
|
if not backup_file.Exists():
|
||||||
raise FileNotFoundError(f"Backup file not found: {backup_file.GetFullPath()}")
|
raise FileNotFoundError(f"Backup file not found: {backup_file.GetFullPath()}")
|
||||||
@@ -958,7 +957,7 @@ class ToolFile(BaseModel):
|
|||||||
# 确定恢复路径
|
# 确定恢复路径
|
||||||
if restore_path is None:
|
if restore_path is None:
|
||||||
restore_path = self.GetFullPath()
|
restore_path = self.GetFullPath()
|
||||||
restore_path:Self = ToolFile(restore_path)
|
restore_path:'ToolFile' = ToolFile(restore_path)
|
||||||
|
|
||||||
# 解压备份
|
# 解压备份
|
||||||
if backup_file.get_extension() == 'zip':
|
if backup_file.get_extension() == 'zip':
|
||||||
@@ -985,7 +984,7 @@ class ToolFile(BaseModel):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise BackupError(f"Restore failed: {str(e)}")
|
raise BackupError(f"Restore failed: {str(e)}")
|
||||||
|
|
||||||
def list_backups(self) -> List[Self]:
|
def list_backups(self) -> List['ToolFile']:
|
||||||
"""
|
"""
|
||||||
列出所有备份
|
列出所有备份
|
||||||
Returns:
|
Returns:
|
||||||
@@ -995,7 +994,7 @@ class ToolFile(BaseModel):
|
|||||||
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
|
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
backup_dir:Self = ToolFile(os.path.join(self.GetDir(), '.backup'))
|
backup_dir:'ToolFile' = ToolFile(os.path.join(self.GetDir(), '.backup'))
|
||||||
if not backup_dir.Exists():
|
if not backup_dir.Exists():
|
||||||
return []
|
return []
|
||||||
|
|
||||||
@@ -1037,7 +1036,7 @@ class ToolFile(BaseModel):
|
|||||||
execute: Optional[bool] = None,
|
execute: Optional[bool] = None,
|
||||||
hidden: Optional[bool] = None,
|
hidden: Optional[bool] = None,
|
||||||
recursive: bool = False
|
recursive: bool = False
|
||||||
) -> Self:
|
) -> 'ToolFile':
|
||||||
"""
|
"""
|
||||||
设置文件或目录的权限
|
设置文件或目录的权限
|
||||||
Args:
|
Args:
|
||||||
|
|||||||
@@ -228,7 +228,7 @@ class GlobalConfig:
|
|||||||
return self
|
return self
|
||||||
|
|
||||||
# 配置查找
|
# 配置查找
|
||||||
def FindItem[T](self, key: str, default: Optional[T] = None) -> Optional[T]:
|
def FindItem(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
|
||||||
"""查找配置项,支持默认值"""
|
"""查找配置项,支持默认值"""
|
||||||
if key in self._data_pair:
|
if key in self._data_pair:
|
||||||
return self._data_pair[key]
|
return self._data_pair[key]
|
||||||
|
|||||||
Reference in New Issue
Block a user