Compare commits

...

14 Commits

Author SHA1 Message Date
4d6b0e1c28 添加图像模块 2025-10-22 16:17:54 +08:00
f98fd3b4c4 移除Web.md 2025-10-22 09:30:30 +08:00
ef266f4a17 PrintColorful增强 2025-10-21 09:40:22 +08:00
48a8318fe7 Merge branch 'main' of http://www.liubai.site:3000/ninemine/Convention-Python 2025-10-21 09:34:34 +08:00
11e1aa0f86 Colorful独立 2025-10-21 09:34:26 +08:00
007db5a06b Merge branch 'main' of http://www.liubai.site:3000/ninemine/Convention-Python 2025-10-14 19:29:03 +08:00
c11469a108 ToolFile+AbsPath 2025-10-14 19:26:05 +08:00
81209e85ee 增强ToolFile对斜杠的判断与相关行为 2025-10-14 17:09:04 +08:00
97c57f65df Update ToolFile 2025-10-13 17:01:21 +08:00
5b38b6239e Update ToolFile 2025-10-13 16:25:19 +08:00
f146d241eb 1.File新增逐行读取迭代2.Rename PrintColorful 2025-10-13 15:39:29 +08:00
7ff00a8ab9 放弃EP Visual 2025-09-30 14:49:39 +08:00
b02eafcb35 EP Interaction 2025-09-30 10:40:58 +08:00
ecaab13948 EP Interaction 2025-09-29 16:22:10 +08:00
13 changed files with 1956 additions and 3329 deletions

862
Convention/Image/OpenCV.py Normal file
View File

@@ -0,0 +1,862 @@
from ..Runtime.Config import *
try:
import cv2 as cv2
import cv2.data as cv2data
from cv2.typing import *
except ImportError as e:
ImportingThrow(e, "OpenCV", ["opencv-python", "opencv-python-headless"])
try:
import numpy as np
except ImportError as e:
ImportingThrow(e, "OpenCV", ["numpy"])
try:
from PIL import ImageFile as ImageFile
from PIL import Image as Image
except ImportError as e:
ImportingThrow(e, "OpenCV", ["pillow"])
from ..Runtime.File import ToolFile
_Unwrapper2Str = lambda x: str(x)
_Wrapper2File = lambda x: ToolFile(x)
VideoWriter = cv2.VideoWriter
def mp4_with_MPEG4_fourcc() -> int:
return VideoWriter.fourcc(*"mp4v")
def avi_with_Xvid_fourcc() -> int:
return VideoWriter.fourcc(*"XVID")
def avi_with_DivX_fourcc() -> int:
return VideoWriter.fourcc(*"DIVX")
def avi_with_MJPG_fourcc() -> int:
return VideoWriter.fourcc(*"MJPG")
def mp4_or_avi_with_H264_fourcc() -> int:
return VideoWriter.fourcc(*"X264")
def avi_with_H265_fourcc() -> int:
return VideoWriter.fourcc(*"H264")
def wmv_with_WMV1_fourcc() -> int:
return VideoWriter.fourcc(*"WMV1")
def wmv_with_WMV2_fourcc() -> int:
return VideoWriter.fourcc(*"WMV2")
def oggTheora_with_THEO_fourcc() -> int:
return VideoWriter.fourcc(*"THEO")
def flv_with_FLV1_fourcc() -> int:
return VideoWriter.fourcc(*"FLV1")
class VideoWriterInstance(VideoWriter):
def __init__(
self,
file_name: Union[ToolFile, str],
fourcc: int,
fps: float,
frame_size: tuple[int, int],
is_color: bool = True
):
super().__init__(_Unwrapper2Str(file_name), fourcc, fps, frame_size, is_color)
def __del__(self):
self.release()
def wait_key(delay:int):
return cv2.waitKey(delay)
def until_esc():
return wait_key(0)
def is_current_key(key:str, *, wait_delay:int = 1):
return wait_key(wait_delay) & 0xFF == ord(key[0])
class BasicViewable:
def __init__(self, filename_or_index:Union[str, ToolFile, int]):
self._capture: cv2.VideoCapture = None
self.stats: bool = True
self.Retarget(filename_or_index)
def __del__(self):
self.Release()
def __bool__(self):
return self.stats
def IsOpened(self):
return self._capture.isOpened()
def Release(self):
if self._capture is not None:
self._capture.release()
def Retarget(self, filename_or_index:Union[str, ToolFile, int]):
self.Release()
if isinstance(filename_or_index, int):
self._capture = cv2.VideoCapture(filename_or_index)
else:
self._capture = cv2.VideoCapture(_Unwrapper2Str(filename_or_index))
return self
def NextFrame(self) -> MatLike:
self.stats, frame =self._capture.read()
if self.stats:
return frame
else:
return None
def GetCaptrueInfo(self, id:int):
return self._capture.get(id)
def GetPropPosMsec(self):
return self.GetCaptrueInfo(0)
def GetPropPosFrames(self):
return self.GetCaptrueInfo(1)
def GetPropAviRatio(self):
return self.GetCaptrueInfo(2)
def GetPropFrameWidth(self):
return self.GetCaptrueInfo(3)
def GetPropFrameHeight(self):
return self.GetCaptrueInfo(4)
def GetPropFPS(self):
return self.GetCaptrueInfo(5)
def GetPropFourcc(self):
return self.GetCaptrueInfo(6)
def GetPropFrameCount(self):
return self.GetCaptrueInfo(7)
def GetPropFormat(self):
return self.GetCaptrueInfo(8)
def GetPropMode(self):
return self.GetCaptrueInfo(9)
def GetPropBrightness(self):
return self.GetCaptrueInfo(10)
def GetPropContrast(self):
return self.GetCaptrueInfo(11)
def GetPropSaturation(self):
return self.GetCaptrueInfo(12)
def GetPropHue(self):
return self.GetCaptrueInfo(13)
def GetPropGain(self):
return self.GetCaptrueInfo(14)
def GetPropExposure(self):
return self.GetCaptrueInfo(15)
def GetPropConvertRGB(self):
return self.GetCaptrueInfo(16)
def SetupCapture(self, id:int, value):
self._capture.set(id, value)
return self
def SetPropPosMsec(self, value:int):
return self.SetupCapture(0, value)
def SetPropPosFrames(self, value:int):
return self.SetupCapture(1, value)
def SetPropAviRatio(self, value:float):
return self.SetupCapture(2, value)
def SetPropFrameWidth(self, value:int):
return self.SetupCapture(3, value)
def SetPropFrameHeight(self, value:int):
return self.SetupCapture(4, value)
def SetPropFPS(self, value:int):
return self.SetupCapture(5, value)
def SetPropFourcc(self, value):
return self.SetupCapture(6, value)
def SetPropFrameCount(self, value):
return self.SetupCapture(7, value)
def SetPropFormat(self, value):
return self.SetupCapture(8, value)
def SetPropMode(self, value):
return self.SetupCapture(9, value)
def SetPropBrightness(self, value):
return self.SetupCapture(10, value)
def SetPropContrast(self, value):
return self.SetupCapture(11, value)
def SetPropSaturation(self, value):
return self.SetupCapture(12, value)
def SetPropHue(self, value):
return self.SetupCapture(13, value)
def SetPropGain(self, value):
return self.SetupCapture(14, value)
def SetPropExposure(self, value):
return self.SetupCapture(15, value)
def SetPropConvertRGB(self, value:int):
return self.SetupCapture(16, value)
def SetPropRectification(self, value:int):
return self.SetupCapture(17, value)
@property
def FrameSize(self) -> Tuple[float, float]:
return self.GetPropFrameWidth(), self.GetPropFrameHeight()
class BasicCamera(BasicViewable):
def __init__(self, index:int = 0):
self.writer: VideoWriter = None
super().__init__(int(index))
@override
def Release(self):
super().Release()
if self.writer is not None:
self.writer.release()
def CurrentFrame(self):
return self.NextFrame()
def recording(
self,
stop_pr: Callable[[], bool],
writer: VideoWriter,
):
self.writer = writer
while self.IsOpened():
if stop_pr():
break
frame = self.CurrentFrame()
cv2.imshow("__recording__", frame)
writer.write(frame)
cv2.destroyWindow("__recording__")
return self
class ImageObject:
def __init__(
self,
image: Optional[Union[
str,
Self,
BasicCamera,
ToolFile,
MatLike,
np.ndarray,
ImageFile.ImageFile,
Image.Image
]],
flags: int = -1):
self.__image: MatLike = None
self.__camera: BasicCamera = None
self.current: MatLike = None
if isinstance(image, BasicCamera):
self.lock_from_camera(image)
else:
self.load_image(image, flags)
@property
def camera(self) -> BasicCamera:
if self.__camera is None or self.__camera.IsOpened() is False:
return None
else:
return self.__camera
@property
def image(self) -> MatLike:
if self.current is not None:
return self.current
elif self.camera is None:
return self.__image
else:
return self.__camera.CurrentFrame()
@image.setter
def image(self, image: Optional[Union[
str,
Self,
ToolFile,
MatLike,
np.ndarray,
ImageFile.ImageFile,
Image.Image
]]):
self.load_image(image)
def load_from_nparray(
self,
array_: np.ndarray,
code: int = cv2.COLOR_RGB2BGR,
*args, **kwargs
):
self.__image = cv2.cvtColor(array_, code, *args, **kwargs)
return self
def load_from_PIL_image(
self,
image: Image.Image,
code: int = cv2.COLOR_RGB2BGR,
*args, **kwargs
):
self.load_from_nparray(np.array(image), code, *args, **kwargs)
return self
def load_from_PIL_ImageFile(
self,
image: ImageFile.ImageFile,
rect: Optional[Tuple[float, float, float, float]] = None
):
return self.load_from_PIL_image(image.crop(rect))
def load_from_cv2_image(self, image: MatLike):
self.__image = image
return self
def lock_from_camera(self, camera: BasicCamera):
self.__camera = camera
return self
@property
def dimension(self) -> int:
return self.image.ndim
@property
def shape(self) -> Tuple[int, int, int]:
'''height, width, depth'''
return self.image.shape
@property
def height(self) -> int:
return self.shape[0]
@property
def width(self) -> int:
return self.shape[1]
def is_enable(self):
return self.image is not None
def is_invalid(self):
return self.is_enable() is False
def __bool__(self):
return self.is_enable()
def __MatLike__(self):
return self.image
def load_image(
self,
image: Optional[Union[
str,
ToolFile,
Self,
MatLike,
np.ndarray,
ImageFile.ImageFile,
Image.Image
]],
flags: int = -1
):
"""加载图片"""
if image is None:
self.__image = None
return self
elif isinstance(image, type(self)):
self.__image = image.image
elif isinstance(image, MatLike):
self.__image = image
elif isinstance(image, np.ndarray):
self.load_from_nparray(image, flags)
elif isinstance(image, ImageFile.ImageFile):
self.load_from_PIL_ImageFile(image, flags)
elif isinstance(image, Image.Image):
self.load_from_PIL_image(image, flags)
else:
self.__image = cv2.imread(_Unwrapper2Str(image), flags)
return self
def save_image(self, save_path:Union[str, ToolFile], is_path_must_exist = False):
"""保存图片"""
if is_path_must_exist:
_Wrapper2File(save_path).try_create_parent_path()
if self.is_enable():
cv2.imwrite(_Unwrapper2Str(save_path), self.image)
return self
def show_image(
self,
window_name: str = "Image",
delay: Union[int,str] = 0,
image_show_func: Callable[[Self], None] = None,
*args, **kwargs
):
"""显示图片"""
if self.is_invalid():
return self
if self.camera is not None:
while (wait_key(1) & 0xFF != ord(str(delay)[0])) and self.camera is not None:
# dont delete this line, self.image is camera flame now, see<self.current = None>
self.current = self.image
if image_show_func is not None:
image_show_func(self)
if self.current is not None:
cv2.imshow(window_name, self.current)
# dont delete this line, see property<image>
self.current = None
else:
cv2.imshow(window_name, self.image)
cv2.waitKey(delay = int(delay), *args, **kwargs)
if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) > 0:
cv2.destroyWindow(window_name)
return self
# 分离通道
def split(self):
"""分离通道"""
return cv2.split(self.image)
def split_to_image_object(self):
"""分离通道"""
return [ImageObject(channel) for channel in self.split()]
@property
def channels(self):
return self.split()
@property
def blue_channel(self):
return self.channels[0]
@property
def green_channel(self):
return self.channels[1]
@property
def red_channel(self):
return self.channels[2]
@property
def alpha_channel(self):
return self.channels[3]
def get_blue_image(self):
return ImageObject(self.blue_channel)
def get_green_image(self):
return ImageObject(self.green_channel)
def get_red_image(self):
return ImageObject(self.red_channel)
def get_alpha_image(self):
return ImageObject(self.alpha_channel)
# 混合通道
def merge_channels_from_list(self, channels:List[MatLike]):
"""合并通道"""
self.image = cv2.merge(channels)
return self
def merge_channels(self, blue:MatLike, green:MatLike, red:MatLike):
"""合并通道"""
return self.merge_channels_from_list([blue, green, red])
def merge_channel_list(self, bgr:List[MatLike]):
"""合并通道"""
return self.merge_channels_from_list(bgr)
# Transform
def get_resize_image(self, width:int, height:int):
if self.is_enable():
return cv2.resize(self.image, (width, height))
return None
def get_rotate_image(self, angle:float):
if self.is_invalid():
return None
(h, w) = self.image.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
return cv2.warpAffine(self.image, M, (w, h))
def resize_image(self, width:int, height:int):
"""调整图片大小"""
new_image = self.get_resize_image(width, height)
if new_image is not None:
self.image = new_image
return self
def rotate_image(self, angle:float):
"""旋转图片"""
new_image = self.get_rotate_image(angle)
if new_image is not None:
self.image = new_image
return self
# 图片翻折
def flip(self, flip_code:int):
"""翻转图片"""
if self.is_enable():
self.image = cv2.flip(self.image, flip_code)
return self
def horizon_flip(self):
"""水平翻转图片"""
return self.flip(1)
def vertical_flip(self):
"""垂直翻转图片"""
return self.flip(0)
def both_flip(self):
"""双向翻转图片"""
return self.flip(-1)
# 色彩空间猜测
def guess_color_space(self) -> Optional[str]:
"""猜测色彩空间"""
if self.is_invalid():
return None
image = self.image
# 计算每个通道的像素值分布
hist_b = cv2.calcHist([image], [0], None, [256], [0, 256])
hist_g = cv2.calcHist([image], [1], None, [256], [0, 256])
hist_r = cv2.calcHist([image], [2], None, [256], [0, 256])
# 计算每个通道的像素值总和
sum_b = np.sum(hist_b)
sum_g = np.sum(hist_g)
sum_r = np.sum(hist_r)
# 根据像素值总和判断色彩空间
if sum_b > sum_g and sum_b > sum_r:
#print("The image might be in BGR color space.")
return "BGR"
elif sum_g > sum_b and sum_g > sum_r:
#print("The image might be in GRAY color space.")
return "GRAY"
else:
#print("The image might be in RGB color space.")
return "RGB"
# 颜色转化
def get_convert(self, color_convert:int):
"""颜色转化"""
if self.is_invalid():
return None
return cv2.cvtColor(self.image, color_convert)
def convert_to(self, color_convert:int):
"""颜色转化"""
if self.is_invalid():
return None
self.image = self.get_convert(color_convert)
def is_grayscale(self):
return self.dimension == 2
def get_grayscale(self):
if self.is_invalid():
return None
return cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)
def convert_to_grayscale(self):
"""将图片转换为灰度图"""
self.image = self.get_grayscale()
return self
def get_convert_flag(
self,
targetColorTypeName:Literal[
"BGR", "RGB", "GRAY", "YCrCb"
]
) -> Optional[int]:
"""获取颜色转化标志"""
flag = self.guess_color_space()
if flag is None:
return None
if targetColorTypeName == "BGR":
if flag == "RGB":
return cv2.COLOR_RGB2BGR
elif flag == "GRAY":
return cv2.COLOR_GRAY2BGR
elif flag == "YCrCb":
return cv2.COLOR_YCrCb2BGR
elif targetColorTypeName == "RGB":
if flag == "BGR":
return cv2.COLOR_BGR2RGB
elif flag == "GRAY":
return cv2.COLOR_GRAY2RGB
elif flag == "YCrCb":
return cv2.COLOR_YCrCb2RGB
elif targetColorTypeName == "GRAY":
if flag == "RGB":
return cv2.COLOR_RGB2GRAY
elif flag == "RGB":
return cv2.COLOR_BGR2GRAY
return None
# 原址裁切
def sub_image(self, x:int, y:int ,width:int ,height:int):
"""裁剪图片"""
if self.is_invalid():
return self
self.image = self.image[y:y+height, x:x+width]
return self
# 直方图
def equalizeHist(self, is_cover = False) -> MatLike:
"""直方图均衡化"""
if self.is_invalid():
return self
result:MatLike = cv2.equalizeHist(self.image)
if is_cover:
self.image = result
return result
def calcHist(
self,
channel: Union[List[int], int],
mask: Optional[MatLike] = None,
hist_size: Sequence[int] = [256],
ranges: Sequence[float] = [0, 256]
) -> MatLike:
"""计算直方图"""
if self.is_invalid():
return None
return cv2.calcHist(
[self.image],
channel if isinstance(channel, list) else [channel],
mask,
hist_size,
ranges)
# 子集操作
def sub_image_with_rect(self, rect:Tuple[float, float, float, float]):
"""裁剪图片"""
if self.is_invalid():
return self
self.image = self.image[rect[1]:rect[1]+rect[3], rect[0]:rect[0]+rect[2]]
return self
def sub_image_with_box(self, box:Tuple[float, float, float, float]):
"""裁剪图片"""
if self.is_invalid():
return self
self.image = self.image[box[1]:box[3], box[0]:box[2]]
return self
def sub_cover_with_rect(self, image:Union[Self, MatLike], rect:Tuple[float, float, float, float]):
"""覆盖图片"""
if self.is_invalid():
raise ValueError("Real Image is none")
if isinstance(image, MatLike):
image = ImageObject(image)
self.image[rect[1]:rect[1]+rect[3], rect[0]:rect[0]+rect[2]] = image.image
return self
def sub_cover_with_box(self, image:Union[Self, MatLike], box:Tuple[float, float, float, float]):
"""覆盖图片"""
if self.is_invalid():
raise ValueError("Real Image is none")
if isinstance(image, MatLike):
image = ImageObject(image)
self.image[box[1]:box[3], box[0]:box[2]] = image.image
return self
def operator_cv(self, func:Callable[[MatLike], Any], *args, **kwargs):
func(self.image, *args, **kwargs)
return self
def stack(self, *args:Self, **kwargs) -> Self:
images = [ image for image in args]
images.append(self)
return ImageObject(np.stack([np.uint8(image.image) for image in images], *args, **kwargs))
def vstack(self, *args:Self) -> Self:
images = [ image for image in args]
images.append(self)
return ImageObject(np.vstack([np.uint8(image.image) for image in images]))
def hstack(self, *args:Self) -> Self:
images = [ image for image in args]
images.append(self)
return ImageObject(np.hstack([np.uint8(image.image) for image in images]))
def merge_with_blending(self, other:Self, weights:Tuple[float, float]):
return ImageObject(cv2.addWeighted(self.image, weights[0], other.image, weights[1], 0))
def add(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.add(self.image, image_or_value)
else:
self.image = cv2.add(self.image, image_or_value.image)
return self
def __add__(self, image_or_value:Union[Self, int]):
return ImageObject(self.image.copy()).add(image_or_value)
def subtract(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.subtract(self.image, image_or_value)
else:
self.image = cv2.subtract(self.image, image_or_value.image)
return self
def __sub__(self, image_or_value:Union[Self, int]):
return ImageObject(self.image.copy()).subtract(image_or_value)
def multiply(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.multiply(self.image, image_or_value)
else:
self.image = cv2.multiply(self.image, image_or_value.image)
return self
def __mul__(self, image_or_value:Union[Self, int]):
return ImageObject(self.image.copy()).multiply(image_or_value)
def divide(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.divide(self.image, image_or_value)
else:
self.image = cv2.divide(self.image, image_or_value.image)
return self
def __truediv__(self, image_or_value:Union[Self, int]):
return ImageObject(self.image.copy()).divide(image_or_value)
def bitwise_and(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.bitwise_and(self.image, image_or_value)
else:
self.image = cv2.bitwise_and(self.image, image_or_value.image)
return self
def bitwise_or(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.bitwise_or(self.image, image_or_value)
else:
self.image = cv2.bitwise_or(self.image, image_or_value.image)
return self
def bitwise_xor(self, image_or_value:Union[Self]):
if isinstance(image_or_value, int):
self.image = cv2.bitwise_xor(self.image, image_or_value)
else:
self.image = cv2.bitwise_xor(self.image, image_or_value.image)
return self
def bitwise_not(self):
self.image = cv2.bitwise_not(self.image)
return self
def __neg__(self):
return ImageObject(self.image.copy()).bitwise_not()
class NoiseImageObject(ImageObject):
def __init__(
self,
height: int,
weight: int,
*,
mean: float = 0,
sigma: float = 25,
dtype = np.uint8
):
super().__init__(NoiseImageObject.get_new_noise(
None, height, weight, mean=mean, sigma=sigma, dtype=dtype
))
@classmethod
def get_new_noise(
raw_image: Optional[MatLike],
height: int,
weight: int,
*,
mean: float = 0,
sigma: float = 25,
dtype = np.uint8
) -> MatLike:
noise = raw_image
if noise is None:
noise = np.zeros((height, weight), dtype=dtype)
cv2.randn(noise, mean, sigma)
return cv2.cvtColor(noise, cv2.COLOR_GRAY2BGR)
def Unwrapper(image:Optional[Union[
str,
ImageObject,
ToolFile,
MatLike,
np.ndarray,
ImageFile.ImageFile,
Image.Image
]]) -> MatLike:
return image.image if isinstance(image, ImageObject) else ImageObject(image).image
def Wrapper(image:Optional[Union[
str,
ImageObject,
ToolFile,
MatLike,
np.ndarray,
ImageFile.ImageFile,
Image.Image
]]) -> ImageObject:
return ImageObject(image)
class light_cv_window:
def __init__(self, name:str):
self.__my_window_name = name
cv2.namedWindow(self.__my_window_name)
def __del__(self):
self.destroy()
def show_image(self, image:Union[ImageObject, MatLike]):
if self.__my_window_name is None:
self.__my_window_name = "window"
if isinstance(image, ImageObject):
image = image.image
cv2.imshow(self.__my_window_name, image)
return self
def destroy(self):
if self.__my_window_name is not None and cv2.getWindowProperty(self.__my_window_name, cv2.WND_PROP_VISIBLE) > 0:
cv2.destroyWindow(self.__my_window_name)
return self
@property
def window_rect(self):
return cv2.getWindowImageRect(self.__my_window_name)
@window_rect.setter
def window_rect(self, rect:Tuple[float, float, float, float]):
self.set_window_rect(rect[0], rect[1], rect[2], rect[3])
def set_window_size(self, weight:int, height:int):
cv2.resizeWindow(self.__my_window_name, weight, height)
return self
def get_window_size(self) -> Tuple[float, float]:
rect = self.window_rect
return rect[2], rect[3]
def get_window_property(self, prop_id:int):
return cv2.getWindowProperty(self.__my_window_name, prop_id)
def set_window_property(self, prop_id:int, prop_value:int):
cv2.setWindowProperty(self.__my_window_name, prop_id, prop_value)
return self
def get_prop_frame_width(self):
return self.window_rect[2]
def get_prop_frame_height(self):
return self.window_rect[3]
def is_full_window(self):
return cv2.getWindowProperty(self.__my_window_name, cv2.WINDOW_FULLSCREEN) > 0
def set_full_window(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_FULLSCREEN, 1)
return self
def set_normal_window(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_FULLSCREEN, 0)
return self
def is_using_openGL(self):
return cv2.getWindowProperty(self.__my_window_name, cv2.WINDOW_OPENGL) > 0
def set_using_openGL(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_OPENGL, 1)
return self
def set_not_using_openGL(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_OPENGL, 0)
return self
def is_autosize(self):
return cv2.getWindowProperty(self.__my_window_name, cv2.WINDOW_AUTOSIZE) > 0
def set_autosize(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_AUTOSIZE, 1)
return self
def set_not_autosize(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_AUTOSIZE, 0)
return self
def set_window_rect(self, x:int, y:int, weight:int, height:int):
cv2.moveWindow(self.__my_window_name, x, y)
return self.set_window_size(weight, height)
def set_window_pos(self, x:int, y:int):
cv2.moveWindow(self.__my_window_name, x, y)
return self
def wait_key(self, wait_time:int=0):
return cv2.waitKey(wait_time)
def get_haarcascade_frontalface(name_or_default:Optional[str]=None):
if name_or_default is None:
name_or_default = "haarcascade_frontalface_default"
return cv2.CascadeClassifier(cv2data.haarcascades+'haarcascade_frontalface_default.xml')
def detect_human_face(
image: ImageObject,
detecter: cv2.CascadeClassifier,
scaleFactor: float = 1.1,
minNeighbors: int = 4,
*args, **kwargs):
'''return is Rect[]'''
return detecter.detectMultiScale(image.image, scaleFactor, minNeighbors, *args, **kwargs)
class internal_detect_faces_oop(Callable[[ImageObject], None]):
def __init__(self):
self.face_cascade = get_haarcascade_frontalface()
def __call__(self, image:ImageObject):
gray = image.convert_to_grayscale()
faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
for (x,y,w,h) in faces:
image.operator_cv(cv2.rectangle,(x,y),(x+w,y+h),(255,0,0),2)
def easy_detect_faces(camera:BasicCamera):
ImageObject(camera).show_image("window", 'q', internal_detect_faces_oop())
# 示例使用
if __name__ == "__main__":
img_obj = ImageObject("path/to/your/image.jpg")
img_obj.show_image()
img_obj.resize_image(800, 600)
img_obj.rotate_image(45)
img_obj.convert_to_grayscale()
img_obj.save_image("path/to/save/image.jpg")
# Override tool_file to tool_file_ex
class tool_file_cvex(ToolFile):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@override
def load_as_image(self) -> ImageObject:
self.data = ImageObject(self.get_path())
return self.data
@override
def save(self, path = None):
image:ImageObject = self.data
image.save_image(path if path is not None else self.get_path())
return self

View File

@@ -5,35 +5,132 @@ import sys
import threading
import traceback
import datetime
try:
from colorama import Fore as ConsoleFrontColor, Back as ConsoleBackgroundColor, Style as ConsoleStyle
except:
print("colorama is not installed, using default colors")
class ConsoleFrontColor:
RED = ""
GREEN = ""
YELLOW = ""
BLUE = ""
MAGENTA = ""
CYAN = ""
WHITE = ""
RESET = ""
class ConsoleBackgroundColor:
RED = ""
GREEN = ""
YELLOW = ""
BLUE = ""
MAGENTA = ""
CYAN = ""
WHITE = ""
RESET = ""
class ConsoleStyle:
RESET = ""
BOLD = ""
DIM = ""
UNDERLINE = ""
REVERSE = ""
HIDDEN = ""
# region ansi colorful
# Copyright Jonathan Hartley 2013. BSD 3-Clause license
'''
This module generates ANSI character codes to printing colors to terminals.
See: http://en.wikipedia.org/wiki/ANSI_escape_code
'''
CSI = '\033['
OSC = '\033]'
BEL = '\a'
def code_to_chars(code):
return CSI + str(code) + 'm'
def set_title(title):
return OSC + '2;' + title + BEL
def clear_screen(mode=2):
return CSI + str(mode) + 'J'
def clear_line(mode=2):
return CSI + str(mode) + 'K'
class AnsiCodes(object):
def __init__(self):
# the subclasses declare class attributes which are numbers.
# Upon instantiation we define instance attributes, which are the same
# as the class attributes but wrapped with the ANSI escape sequence
for name in dir(self):
if not name.startswith('_'):
value = getattr(self, name)
setattr(self, name, code_to_chars(value))
class ConsoleCursor(object):
def UP(self, n=1):
return CSI + str(n) + 'A'
def DOWN(self, n=1):
return CSI + str(n) + 'B'
def FORWARD(self, n=1):
return CSI + str(n) + 'C'
def BACK(self, n=1):
return CSI + str(n) + 'D'
def POS(self, x=1, y=1):
return CSI + str(y) + ';' + str(x) + 'H'
class ConsoleFrontColorClass(AnsiCodes):
BLACK = 30
RED = 31
GREEN = 32
YELLOW = 33
BLUE = 34
MAGENTA = 35
CYAN = 36
WHITE = 37
RESET = 39
# These are fairly well supported, but not part of the standard.
LIGHTBLACK_EX = 90
LIGHTRED_EX = 91
LIGHTGREEN_EX = 92
LIGHTYELLOW_EX = 93
LIGHTBLUE_EX = 94
LIGHTMAGENTA_EX = 95
LIGHTCYAN_EX = 96
LIGHTWHITE_EX = 97
ConsoleFrontColor = ConsoleFrontColorClass()
class ConsoleBackgroundColorClass(AnsiCodes):
BLACK = 40
RED = 41
GREEN = 42
YELLOW = 43
BLUE = 44
MAGENTA = 45
CYAN = 46
WHITE = 47
RESET = 49
# These are fairly well supported, but not part of the standard.
LIGHTBLACK_EX = 100
LIGHTRED_EX = 101
LIGHTGREEN_EX = 102
LIGHTYELLOW_EX = 103
LIGHTBLUE_EX = 104
LIGHTMAGENTA_EX = 105
LIGHTCYAN_EX = 106
LIGHTWHITE_EX = 107
ConsoleBackgroundColor = ConsoleBackgroundColorClass()
class ConsoleStyleClass(AnsiCodes):
BRIGHT = 1
DIM = 2
NORMAL = 22
RESET_ALL = 0
ConsoleStyle = ConsoleStyleClass()
def PrintColorful(color:str, *args, is_reset:bool=True, **kwargs):
with lock_guard():
if is_reset:
print(color,*args,ConsoleStyle.RESET_ALL, **kwargs)
else:
print(color,*args, **kwargs)
def PrintAsError(message:str):
PrintColorful(ConsoleFrontColor.RED, message)
def PrintAsWarning(message:str):
PrintColorful(ConsoleFrontColor.YELLOW, message)
def PrintAsInfo(message:str):
PrintColorful(ConsoleFrontColor.GREEN, message)
def PrintAsDebug(message:str):
PrintColorful(ConsoleFrontColor.BLUE, message)
def PrintAsSuccess(message:str):
PrintColorful(ConsoleFrontColor.GREEN, message)
def PrintAsLight(message:str):
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, message)
# endregion
class NotImplementedError(Exception):
def __init__(self, message:Optional[str]=None) -> None:
@@ -60,13 +157,6 @@ def GetInternalDebug() -> bool:
global INTERNAL_DEBUG
return INTERNAL_DEBUG
def print_colorful(color:str, *args, is_reset:bool=True, **kwargs):
with lock_guard():
if is_reset:
print(color,*args,ConsoleStyle.RESET_ALL, **kwargs)
else:
print(color,*args, **kwargs)
ImportingFailedSet:Set[str] = set()
def ImportingThrow(
ex: ImportError,
@@ -328,6 +418,12 @@ class PlatformIndicator:
CompanyName : str = "DefaultCompany"
ProductName : str = "DefaultProject"
@staticmethod
def GetFileSeparator(is_not_this_platform:bool = False) -> str:
if PlatformIndicator.IsPlatformWindows and not is_not_this_platform:
return "\\"
return "/"
@staticmethod
def GetApplicationPath() -> str:
"""获取应用程序所在目录"""

View File

@@ -180,14 +180,14 @@ class ESReader(BaseModel):
'''
#module_name, _, class_name = type_label.split(",")[0].strip().rpartition('.')
#if GetInternalEasySaveDebug():
# print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
# PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
# f"{ConsoleFrontColor.YELLOW}, module_name: {ConsoleFrontColor.RESET}{module_name}"\
# f"{ConsoleFrontColor.YELLOW}, class_name: {ConsoleFrontColor.RESET}{class_name}")
#typen_to = try_to_type(class_name, module_name=module_name) or to_type(class_name)
#return TypeManager.GetInstance().CreateOrGetRefType(typen_to)
typen, assembly_name = ReadAssemblyTypen(type_label)
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
f"{ConsoleFrontColor.YELLOW}, typen: {ConsoleFrontColor.RESET}{typen}"\
f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}")
return TypeManager.GetInstance().CreateOrGetRefType(typen)
@@ -235,7 +235,7 @@ class ESReader(BaseModel):
if rtype is None:
raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}")
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\
f"{ConsoleFrontColor.YELLOW}, rtype: {ConsoleFrontColor.RESET}{rtype.ToString()}")
# 处理值类型
@@ -278,7 +278,7 @@ class ESReader(BaseModel):
else:
rinstance = rtype.CreateInstance()
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\
f"{rtype.Print2Str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}")
fields:List[FieldInfo] = self._GetFields(rtype)
for field in fields:
@@ -289,19 +289,19 @@ class ESReader(BaseModel):
if field.FieldType == list and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(ListIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}List<"\
f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == set and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(SetIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Set<"\
f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == tuple and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(TupleIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Tuple<"\
f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == dict and field.ValueType.IsGeneric:
@@ -309,13 +309,13 @@ class ESReader(BaseModel):
DictIndictaor(field.ValueType.GenericArgs[0], field.ValueType.GenericArgs[1])
)
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Dict<"\
f"{field_rtype.GenericArgs[0]}, {field_rtype.GenericArgs[1]}>")
else:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(field.FieldType)
if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}{field_rtype.RealType}"\
f"<{field_rtype.GenericArgs}>")
field.SetValue(rinstance, dfs(field_rtype, layer[field.FieldName]))

View File

@@ -1,10 +1,8 @@
import os.path
from .Config import *
import json
import shutil
import pandas as pd
import os
import sys
import pickle
import zipfile
import tarfile
import base64
@@ -14,21 +12,6 @@ import datetime
import stat
from typing import *
from pathlib import Path
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
from .String import Bytes2String
def GetExtensionName(file:str):
return os.path.splitext(file)[1][1:]
@@ -67,8 +50,10 @@ class PermissionError(FileOperationError):
"""权限操作异常"""
pass
from pydantic import BaseModel, GetCoreSchemaHandler, Field
from pydantic_core import core_schema
try:
from pydantic import BaseModel
except ImportError as e:
ImportingThrow(e, "File", ["pydantic"])
class ToolFile(BaseModel):
OriginFullPath:str
@@ -78,7 +63,7 @@ class ToolFile(BaseModel):
filePath: Union[str, Self],
):
filePath = os.path.expandvars(str(filePath))
if filePath[1:].startswith(":/") or filePath[1:].startswith(":\\"):
if ":" in filePath:
filePath = os.path.abspath(filePath)
super().__init__(OriginFullPath=filePath)
def __del__(self):
@@ -92,15 +77,21 @@ class ToolFile(BaseModel):
def __or__(self, other):
if other is None:
return ToolFile(self.GetFullPath() if self.IsDir() else f"{self.GetFullPath()}\\")
return ToolFile(self.GetFullPath() if self.IsDir() else f"{self.GetFullPath()}{PlatformIndicator.GetFileSeparator()}")
else:
# 不使用os.path.join因为os.path.join存在如下机制
# 当参数路径中存在绝对路径风格时,会忽略前面的参数,例如:
# os.path.join("E:/dev", "/analyze/") = "E:/analyze/"
# 而我们需要的是 "E:/dev/analyze"
first = self.GetFullPath().replace('/','\\').strip('\\')
second = str(other).replace('/','\\')
return ToolFile(f"{first}\\{second}")
separator = PlatformIndicator.GetFileSeparator()
separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
first = self.GetFullPath().replace(separator_not_this_platform,separator).strip(separator)
second = str(other).replace(separator_not_this_platform,separator)
if first == "./":
return ToolFile(f"{second}")
elif first == "../":
first = ToolFile(f"{os.path.abspath(first)}").BackToParentDir()
return ToolFile(f"{first}{separator}{second}")
def __idiv__(self, other):
temp = self.__or__(other)
self.OriginFullPath = temp.GetFullPath()
@@ -122,16 +113,19 @@ class ToolFile(BaseModel):
# 获取比较对象的路径
other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other)
self_path = self.OriginFullPath
separator = PlatformIndicator.GetFileSeparator()
separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
# 如果两个文件都存在,则直接比较路径
if self.Exists() == True and other.Exists() == True:
return self_path.strip('\\/') == other_path.strip('\\/')
return self_path.strip(separator_not_this_platform) == other_path.strip(separator_not_this_platform)
# 如果一个文件存在另一个不被判定为存在则一定不同
elif self.Exists() != other.Exists():
return False
# 如果两个文件都不存在,则直接比较文件名在视正反斜杠相同的情况下比较路径字符串
else:
return self_path.replace('/','\\') == other_path.replace('/','\\')
return self_path.replace(separator_not_this_platform,separator) == other_path.replace(separator_not_this_platform,separator)
def ToPath(self):
return Path(self.OriginFullPath)
@@ -181,7 +175,7 @@ class ToolFile(BaseModel):
if self.Exists() is False:
raise FileNotFoundError("file not found")
newpath = str(newpath)
if '\\' in newpath or '/' in newpath:
if PlatformIndicator.GetFileSeparator() in newpath or PlatformIndicator.GetFileSeparator(True) in newpath:
newpath = GetBaseFilename(newpath)
new_current_path = os.path.join(self.GetDir(), newpath)
os.rename(self.OriginFullPath, new_current_path)
@@ -192,16 +186,32 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'r', encoding=encoding) as f:
json_data = json.load(f, **kwargs)
return json_data
def LoadAsCsv(self) -> pd.DataFrame:
def LoadAsCsv(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f)
def LoadAsXml(self) -> pd.DataFrame:
def LoadAsXml(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_xml(f)
def LoadAsDataframe(self) -> pd.DataFrame:
def LoadAsDataframe(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f)
def LoadAsExcel(self) -> pd.DataFrame:
def LoadAsExcel(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f:
return pd.read_excel(f)
def LoadAsBinary(self) -> bytes:
@@ -211,18 +221,103 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'r') as f:
return f.read()
def LoadAsWav(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_wav(self.OriginFullPath)
def LoadAsAudio(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_file(self.OriginFullPath)
def LoadAsImage(self) -> ImageFile.ImageFile:
def LoadAsImage(self):
try:
from PIL import Image
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
return Image.open(self.OriginFullPath)
def LoadAsDocx(self) -> DocumentObject:
def LoadAsDocx(self) -> "docx.document.Document":
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
return Document(self.OriginFullPath)
def LoadAsUnknown(self, suffix:str) -> Any:
return self.LoadAsText()
def LoadAsModel(self, model:type[BaseModel]) -> BaseModel:
def LoadAsModel(self, model:type["BaseModel"]) -> "BaseModel":
return model.model_validate(self.LoadAsJson())
def ReadLines(self, **kwargs):
with open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = f.readline()
if not line or line == '':
break
yield line
async def ReadLinesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = await f.readline()
if not line or line == '':
break
yield line
def ReadBytes(self, **kwargs):
with open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = f.read(1024)
if not data or data == '':
break
yield data
async def ReadBytesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = await f.read(1024)
if not data or data == '':
break
yield data
def WriteBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'wb', **kwargs) as f:
f.write(data)
async def WriteBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'wb', **kwargs) as f:
await f.write(data)
def WriteLines(self, data:List[str], **kwargs):
with open(self.OriginFullPath, 'w', **kwargs) as f:
f.writelines(data)
async def WriteLinesAsync(self, data:List[str], **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'w', **kwargs) as f:
await f.writelines(data)
def AppendText(self, data:str, **kwargs):
with open(self.OriginFullPath, 'a', **kwargs) as f:
f.write(data)
async def AppendTextAsync(self, data:str, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'a', **kwargs) as f:
await f.write(data)
def AppendBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'ab', **kwargs) as f:
f.write(data)
async def AppendBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'ab', **kwargs) as f:
await f.write(data)
def SaveAsJson(self, json_data):
try:
from pydantic import BaseModel
@@ -234,16 +329,40 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=4)
return self
def SaveAsCsv(self, csv_data:pd.DataFrame):
def SaveAsCsv(self, csv_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
csv_data.to_csv(self.OriginFullPath)
return self
def SaveAsXml(self, xml_data:pd.DataFrame):
def SaveAsXml(self, xml_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
xml_data.to_xml(self.OriginFullPath)
return self
def SaveAsDataframe(self, dataframe_data:pd.DataFrame):
def SaveAsDataframe(self, dataframe_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
dataframe_data.to_csv(self.OriginFullPath)
return self
def SaveAsExcel(self, excel_data:pd.DataFrame):
def SaveAsExcel(self, excel_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
excel_data.to_excel(self.OriginFullPath, index=False)
return self
def SaveAsBinary(self, binary_data:bytes):
@@ -254,13 +373,32 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'w') as f:
f.writelines(text_data)
return self
def SaveAsAudio(self, audio_data:AudioSegment):
def SaveAsAudio(self, audio_data:"AudioSegment"):
'''
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
'''
audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath))
return self
def SaveAsImage(self, image_data:ImageFile.ImageFile):
def SaveAsImage(self, image_data:"ImageFile.ImageFile"):
'''
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
'''
image_data.save(self.OriginFullPath)
return self
def SaveAsDocx(self, docx_data:DocumentObject):
def SaveAsDocx(self, docx_data:"DocumentObject"):
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
docx_data.save(self.OriginFullPath)
return self
def SaveAsUnknown(self, unknown_data:Any):
@@ -276,6 +414,8 @@ class ToolFile(BaseModel):
return os.path.getsize(self.OriginFullPath)
def GetExtension(self):
return GetExtensionName(self.OriginFullPath)
def GetAbsPath(self) -> str:
return os.path.abspath(self.OriginFullPath)
def GetFullPath(self) -> str:
return self.OriginFullPath
def GetFilename(self, is_without_extension = False):
@@ -285,7 +425,7 @@ class ToolFile(BaseModel):
'''
if is_without_extension and '.' in self.OriginFullPath:
return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)]
elif self.OriginFullPath[-1] == '/' or self.OriginFullPath[-1] == '\\':
elif self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator(True):
return GetBaseFilename(self.OriginFullPath[:-1])
else:
return GetBaseFilename(self.OriginFullPath)
@@ -297,7 +437,7 @@ class ToolFile(BaseModel):
return os.path.dirname(self.OriginFullPath)
def IsDir(self):
if self.OriginFullPath[-1] == '\\' or self.GetFullPath()[-1] == '/':
if self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.GetFullPath()[-1] == PlatformIndicator.GetFileSeparator(True):
return True
else:
return os.path.isdir(self.OriginFullPath)
@@ -646,8 +786,11 @@ class ToolFile(BaseModel):
ignore_directories: 是否忽略目录事件
case_sensitive: 是否区分大小写
"""
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
except ImportError as e:
ImportingThrow(e, "File", ["watchdog"])
if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
@@ -1008,32 +1151,4 @@ class ToolFile(BaseModel):
是否隐藏
"""
return self.get_permissions()['hidden']
def split_elements(
file: Union[ToolFile, str],
*,
ratios: List[float] = [1,1],
pr: Optional[Callable[[ToolFile], bool]] = None,
shuffler: Optional[Callable[[List[ToolFile]], None]] = None,
output_dirs: Optional[List[ToolFile]] = None,
output_must_exist: bool = True,
output_callback: Optional[Callable[[ToolFile], None]] = None
) -> List[List[ToolFile]]:
result: List[List[ToolFile]] = tool_split_elements(WrapperFile(file).dir_tool_file_iter(),
ratios=ratios,
pr=pr,
shuffler=shuffler)
if output_dirs is None:
return result
for i in range(min(len(output_dirs), len(result))):
output_dir: ToolFile = output_dirs[i]
if output_dir.IsDir() is False:
raise Exception("Outputs must be directory")
if output_must_exist:
output_dir.must_exists_as_new()
for file in result[i]:
current = output_dirs[i].MakeFileInside(file)
if output_callback:
output_callback(current)
return result

View File

@@ -0,0 +1,743 @@
from .Config import *
from .File import ToolFile
from .Web import ToolURL
import json
import urllib.parse
import os
from typing import *
try:
from pydantic import BaseModel, PrivateAttr, Field
except ImportError as e:
ImportingThrow(e, "Interaction", ["pydantic"])
try:
import aiofiles
except ImportError as e:
ImportingThrow(e, "Interaction", ["aiofiles"])
class InteractionError(Exception):
"""交互操作异常基类"""
pass
class PathValidationError(InteractionError):
"""路径验证异常"""
pass
class LoadError(InteractionError):
"""加载异常"""
pass
class SaveError(InteractionError):
"""保存异常"""
pass
class Interaction(BaseModel):
"""统一的文件交互类,自适应处理本地文件和网络文件"""
originPath: str
_is_url: bool = PrivateAttr(False)
_is_local: bool = PrivateAttr(False)
_tool_file: Optional[ToolFile] = PrivateAttr(None)
_tool_url: Optional[ToolURL] = PrivateAttr(None)
def __init__(self, path):
"""
从路径字符串创建对象自动识别本地文件或网络URL
Args:
path: 路径字符串或是可以转换为路径字符串的对象
"""
super().__init__(originPath=str(path))
# 自动识别路径类型
self._detect_path_type()
def _detect_path_type(self):
"""自动检测路径类型"""
path = self.originPath.strip()
# 检查是否为HTTP/HTTPS URL
if path.startswith(('http://', 'https://', 'file://')):
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(path)
return
# 检查是否为localhost URL
if path.startswith('localhost'):
# 转换为完整的HTTP URL
if not path.startswith('localhost:'):
# 默认端口80
full_url = f"http://{path}"
else:
full_url = f"http://{path}"
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(full_url)
self.originPath = full_url
return
# 检查是否为绝对路径或相对路径
if (os.path.isabs(path) or
path.startswith('./') or
path.startswith('../') or
':' in path[:3]): # Windows盘符
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
return
# 默认作为相对路径处理
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
def __str__(self) -> str:
"""隐式字符串转换"""
return self.originPath
def __bool__(self) -> bool:
"""隐式布尔转换,检查路径是否有效"""
return self.IsValid
@property
def IsValid(self) -> bool:
"""检查路径是否有效"""
if self._is_url:
return self._tool_url.IsValid if self._tool_url else False
else:
return self._tool_file.Exists() if self._tool_file else False
@property
def IsURL(self) -> bool:
"""是否为网络URL"""
return self._is_url
@property
def IsLocal(self) -> bool:
"""是否为本地文件"""
return self._is_local
@property
def IsFile(self) -> bool:
"""是否为文件对于URL检查是否存在文件名"""
if self._is_url:
return bool(self._tool_url.GetFilename()) if self._tool_url else False
else:
return self._tool_file.IsFile() if self._tool_file else False
@property
def IsDir(self) -> bool:
"""是否为目录(仅对本地路径有效)"""
if self._is_local:
return self._tool_file.IsDir() if self._tool_file else False
return False
def GetFilename(self) -> str:
"""获取文件名"""
if self._is_url:
return self._tool_url.GetFilename() if self._tool_url else ""
else:
return self._tool_file.GetFilename() if self._tool_file else ""
def GetExtension(self) -> str:
"""获取文件扩展名"""
if self._is_url:
return self._tool_url.GetExtension() if self._tool_url else ""
else:
return self._tool_file.GetExtension() if self._tool_file else ""
def ExtensionIs(self, *extensions: str) -> bool:
"""检查扩展名是否匹配"""
if self._is_url:
return self._tool_url.ExtensionIs(*extensions) if self._tool_url else False
else:
current_ext = self.GetExtension()
return current_ext.lower() in [ext.lower().lstrip('.') for ext in extensions]
# 文件类型判断属性
@property
def IsText(self) -> bool:
"""是否为文本文件"""
return self.ExtensionIs('txt', 'html', 'htm', 'css', 'js', 'xml', 'csv', 'md', 'py', 'java', 'cpp', 'c', 'h')
@property
def IsJson(self) -> bool:
"""是否为JSON文件"""
return self.ExtensionIs('json')
@property
def IsImage(self) -> bool:
"""是否为图像文件"""
return self.ExtensionIs('jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp')
@property
def IsDocument(self) -> bool:
"""是否为文档文件"""
return self.ExtensionIs('pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx')
def Open(self, path: str) -> 'Interaction':
"""在当前对象上打开新路径"""
new_obj = Interaction(path)
self.originPath = new_obj.originPath
self._is_url = new_obj._is_url
self._is_local = new_obj._is_local
self._tool_file = new_obj._tool_file
self._tool_url = new_obj._tool_url
return self
# 同步加载方法
def LoadAsText(self) -> str:
"""
同步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsText()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsText()
def LoadAsBinary(self) -> bytes:
"""
同步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsBinary()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsBinary()
def LoadAsJson(self, model_type: Optional[type] = None) -> Any:
"""
同步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsJson(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
json_data = self._tool_file.LoadAsJson()
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
# 异步加载方法
async def LoadAsTextAsync(self) -> str:
"""
异步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsTextAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'r', encoding='utf-8') as f:
return await f.read()
async def LoadAsBinaryAsync(self) -> bytes:
"""
异步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsBinaryAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'rb') as f:
return await f.read()
async def LoadAsJsonAsync(self, model_type: Optional[type] = None) -> Any:
"""
异步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsJsonAsync(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地JSON文件
text_content = await self.LoadAsTextAsync()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise LoadError(f"Failed to parse JSON from {self.originPath}: {str(e)}")
# 同步保存方法
def SaveAsText(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL先下载然后保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsText(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsText(content)
return self
def SaveAsBinary(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsBinary(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsBinary(content)
return self
def SaveAsJson(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.json"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsJson(data)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsJson(data)
return self
# 异步保存方法
async def SaveAsTextAsync(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
return self
async def SaveAsBinaryAsync(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'wb') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'wb') as f:
await f.write(content)
return self
async def SaveAsJsonAsync(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
# 序列化JSON数据
try:
from pydantic import BaseModel
if isinstance(data, BaseModel):
json_data = data.model_dump()
json_data["__type"] = f"{data.__class__.__name__}, pydantic.BaseModel"
else:
json_data = data
json_content = json.dumps(json_data, indent=4, ensure_ascii=False)
except Exception as e:
raise SaveError(f"Failed to serialize JSON data: {str(e)}")
# 保存JSON内容
return await self.SaveAsTextAsync(json_content, local_path)
# HTTP请求方法仅对URL有效
def Get(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
同步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Get(callback)
def Post(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
同步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Post(callback, form_data)
async def GetAsync(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
异步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.GetAsync(callback)
async def PostAsync(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
异步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.PostAsync(callback, form_data)
# 便利方法
def Save(self, local_path: Optional[str] = None) -> 'Interaction':
"""
自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL先下载内容再保存
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
self._tool_url.Save(local_path)
return self
async def SaveAsync(self, local_path: Optional[str] = None) -> 'Interaction':
"""
异步自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL异步下载内容
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
try:
if self.IsText:
content = await self.LoadAsTextAsync()
await self.SaveAsTextAsync(content, local_path)
elif self.IsJson:
content = await self.LoadAsJsonAsync()
await self.SaveAsJsonAsync(content, local_path)
else:
content = await self.LoadAsBinaryAsync()
await self.SaveAsBinaryAsync(content, local_path)
except Exception as e:
raise SaveError(f"Failed to save {self.originPath}: {str(e)}")
return self
def Downloadable(self) -> bool:
"""检查是否可下载"""
return self._is_url and self._tool_url.IsValid if self._tool_url else False
def Download(self, local_path: Optional[str] = None) -> ToolFile:
"""
下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("Download method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.Download(local_path)
async def DownloadAsync(self, local_path: Optional[str] = None) -> ToolFile:
"""
异步下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("DownloadAsync method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.DownloadAsync(local_path)
def Copy(self, target_path) -> ToolFile:
"""
复制文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
新的Interaction对象
"""
if not self._is_local:
raise InteractionError("Copy method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Copy(str(target_path))
def Move(self, target_path) -> ToolFile:
"""
移动文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
更新后的Interaction对象
"""
if not self._is_local:
raise InteractionError("Move method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Move(str(target_path))
def Remove(self) -> 'Interaction':
"""
删除文件(仅对本地文件有效)
Returns:
Interaction对象本身
"""
if not self._is_local:
raise InteractionError("Remove method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.Remove()
return self
def Exists(self) -> bool:
"""
检查文件是否存在
Returns:
是否存在
"""
return self.IsValid
def GetSize(self) -> int:
"""
获取文件大小(仅对本地文件有效)
Returns:
文件大小(字节)
"""
if not self._is_local:
raise InteractionError("GetSize method is only available for local files")
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.GetSize()
def GetDir(self) -> str:
"""
获取目录路径
Returns:
目录路径
"""
if self._is_local:
return self._tool_file.GetDir() if self._tool_file else ""
else:
# 对于URL返回基础URL
if self._tool_url:
parsed = urllib.parse.urlparse(self._tool_url.url)
return f"{parsed.scheme}://{parsed.netloc}"
return ""
def GetParentDir(self) -> 'Interaction':
"""
获取父目录的Interaction对象
Returns:
父目录的Interaction对象
"""
if self._is_local:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
parent_dir = self._tool_file.GetParentDir()
return Interaction(parent_dir.GetFullPath())
else:
# 对于URL返回基础URL
base_url = self.GetDir()
return Interaction(base_url)
def ToString(self) -> str:
"""获取完整路径"""
return self.originPath
def GetFullPath(self) -> str:
"""获取完整路径"""
return self.originPath

View File

@@ -241,7 +241,7 @@ def ToType(
type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None)
type_final = type_components[-1]
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\
PrintColorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\
f"typen: {typen}, type_components: {type_components}")
if type_module is not None:
return sys.modules[type_module].__dict__[type_final]
@@ -304,7 +304,7 @@ def DecayType(
return type_hint
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}")
PrintColorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}")
result: type|List[type] = None
@@ -333,7 +333,7 @@ def DecayType(
raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>")
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Result: {result}")
PrintColorful(ConsoleFrontColor.YELLOW, f"Result: {result}")
return result
def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool:
@@ -456,7 +456,7 @@ class ValueInfo(BaseInfo):
super().__init__(**kwargs)
self._RealType = metaType
if GetInternalReflectionDebug() and len(generic_args) > 0:
print_colorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\
PrintColorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\
f"metaType={metaType}, generic_args={generic_args}")
self._GenericArgs = generic_args
if not isinstance(metaType, type):
@@ -546,7 +546,7 @@ class ValueInfo(BaseInfo):
**kwargs
) -> 'ValueInfo':
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\
PrintColorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\
f"metaType={metaType}, SelfType={SelfType}")
if isinstance(metaType, type):
if metaType is list:
@@ -601,7 +601,7 @@ class FieldInfo(MemberInfo):
selfType: type|Any|None = None
):
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\
PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\
f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ")
super().__init__(
name = name,
@@ -611,7 +611,7 @@ class FieldInfo(MemberInfo):
)
self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType)
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\
PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\
f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}")
@property
@@ -746,7 +746,7 @@ class MethodInfo(MemberInfo):
is_class_method: bool,
):
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\
PrintColorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\
f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})")
MemberInfo.__init__(self, name, ctype, is_static, is_public)
self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType)
@@ -1143,12 +1143,12 @@ class RefType(ValueInfo):
def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]:
if currentType.IsPrimitive:
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return f"{currentType.RealType}"
elif currentType.RealType in type_set:
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return {
"type": f"{currentType.RealType}",
@@ -1156,13 +1156,13 @@ class RefType(ValueInfo):
}
else:
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
type_set.add(currentType.RealType)
value = {}
fields = currentType.GetFields()
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}")
PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}")
for field in fields:
value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType))
return {
@@ -1429,7 +1429,7 @@ class TypeManager(BaseModel):
if data is None:
raise ReflectionException("data is None")
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}")
PrintColorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}")
# 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型
if isinstance(data, str) and data in self._string_to_type_cache:
@@ -1454,7 +1454,7 @@ class TypeManager(BaseModel):
# 添加到弱引用缓存
self._weak_refs[type_id] = weakref.ref(ref_type)
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Get "\
PrintColorful(ConsoleFrontColor.YELLOW, f"Get "\
f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\
f"{ConsoleFrontColor.RESET}{ref_type.ToString()}")
return ref_type
@@ -1507,7 +1507,7 @@ class TypeManager(BaseModel):
try:
ref_type = RefType(metaType)
if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Create "\
PrintColorful(ConsoleFrontColor.RED, f"Create "\
f"{ConsoleFrontColor.RESET}{metaType} "\
f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}")
self._RefTypes[metaType] = ref_type

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,264 +0,0 @@
# Visual 模块
Visual模块提供了数据可视化和图像处理相关的功能包括数据图表、图像处理、词云等。
## 目录结构
- `Core.py`: 核心数据可视化功能
- `OpenCV.py`: OpenCV图像处理功能
- `WordCloud.py`: 词云生成功能
- `Manim.py`: 数学动画功能
## 功能特性
### 1. 数据可视化 (Core.py)
#### 1.1 基础图表
- 折线图
- 柱状图
- 散点图
- 直方图
- 饼图
- 箱线图
- 热力图
- 分类数据图
- 联合图
#### 1.2 数据处理
- 缺失值处理
- 重复值处理
- 数据标准化
- 数据归一化
### 2. 图像处理 (OpenCV.py)
#### 2.1 图像操作
- 图像加载
- 支持多种格式jpg, png, bmp等
- 支持从文件路径或URL加载
- 支持从内存缓冲区加载
- 图像保存
- 支持多种格式输出
- 支持质量参数设置
- 支持压缩选项
- 图像显示
- 支持窗口标题设置
- 支持窗口大小调整
- 支持键盘事件处理
- 图像转换
- RGB转灰度
- RGB转HSV
- RGB转LAB
- 支持自定义转换矩阵
- 图像缩放
- 支持多种插值方法
- 支持保持宽高比
- 支持指定目标尺寸
- 图像旋转
- 支持任意角度旋转
- 支持旋转中心点设置
- 支持旋转后尺寸调整
- 图像翻转
- 水平翻转
- 垂直翻转
- 对角线翻转
- 图像合并
- 支持多图像拼接
- 支持透明度混合
- 支持蒙版处理
#### 2.2 ImageObject类详解
ImageObject类提供了完整的图像处理功能
```python
from Convention.Visual import OpenCV
# 创建图像对象
image = OpenCV.ImageObject("input.jpg")
# 基本属性
width = image.width # 图像宽度
height = image.height # 图像高度
channels = image.channels # 通道数
dtype = image.dtype # 数据类型
# 图像处理
image.resize_image(800, 600) # 调整大小
image.convert_to_grayscale() # 转换为灰度图
image.filter_gaussian((5, 5), 1.5, 1.5) # 高斯滤波
image.rotate_image(45) # 旋转45度
image.flip_image(horizontal=True) # 水平翻转
# 图像增强
image.adjust_brightness(1.2) # 调整亮度
image.adjust_contrast(1.5) # 调整对比度
image.adjust_saturation(0.8) # 调整饱和度
image.equalize_histogram() # 直方图均衡化
# 边缘检测
image.detect_edges(threshold1=100, threshold2=200) # Canny边缘检测
image.detect_contours() # 轮廓检测
# 特征提取
keypoints = image.detect_keypoints() # 关键点检测
descriptors = image.compute_descriptors() # 描述子计算
# 图像保存
image.save_image("output.jpg", quality=95) # 保存图像
image.save_image("output.png", compression=9) # 保存PNG
# 图像显示
image.show_image("预览") # 显示图像
image.wait_key(0) # 等待按键
# 图像信息
print(image.get_info()) # 获取图像信息
print(image.get_histogram()) # 获取直方图
```
#### 2.3 图像增强
- 边缘检测
- 滤波处理
- 阈值处理
- 形态学操作
- 轮廓检测
- 特征匹配
#### 2.4 视频处理
- 视频读取
- 视频写入
- 摄像头控制
- 帧处理
### 3. 词云生成 (WordCloud.py)
#### 3.1 词云功能
- 词云创建
- 标题设置
- 渲染输出
- 样式定制
### 4. 数学动画 (Manim.py)
#### 4.1 动画功能
- 数学公式动画
- 几何图形动画
- 图表动画
- 场景管理
## 使用示例
### 1. 数据可视化示例
```python
from Convention.Visual import Core
# 创建数据可视化生成器
generator = Core.data_visual_generator("data.csv")
# 绘制折线图
generator.plot_line("x", "y", title="折线图示例")
# 绘制柱状图
generator.plot_bar("category", "value", title="柱状图示例")
# 绘制散点图
generator.plot_scatter("x", "y", title="散点图示例")
# 绘制饼图
generator.plot_pie("category", title="饼图示例")
```
### 2. 图像处理示例
```python
from Convention.Visual import OpenCV
# 创建图像对象
image = OpenCV.ImageObject("input.jpg")
# 图像处理
image.resize_image(800, 600)
image.convert_to_grayscale()
image.filter_gaussian((5, 5), 1.5, 1.5)
# 保存图像
image.save_image("output.jpg")
```
### 3. 词云生成示例
```python
from Convention.Visual import WordCloud
# 创建词云
wordcloud = WordCloud.make_word_cloud("词云", [
("Python", 100),
("Java", 80),
("C++", 70),
("JavaScript", 90),
])
# 设置标题
WordCloud.set_title(wordcloud, "编程语言词云")
# 渲染输出
WordCloud.render_to(wordcloud, "wordcloud.html")
```
### 4. 视频处理示例
```python
from Convention.Visual import OpenCV
# 创建视频捕获对象
camera = OpenCV.light_cv_camera(0)
# 创建视频写入对象
writer = OpenCV.VideoWriterInstance(
"output.avi",
OpenCV.avi_with_Xvid_fourcc(),
30.0,
(640, 480)
)
# 录制视频
def stop_condition():
return OpenCV.is_current_key('q')
camera.recording(stop_condition, writer)
```
## 依赖项
- matplotlib: 数据可视化
- seaborn: 高级数据可视化
- opencv-python: 图像处理
- pyecharts: 词云生成
- manim: 数学动画
## 注意事项
1. 使用图像处理时注意内存占用
2. 视频处理时注意帧率设置
3. 词云生成时注意数据量
4. 动画制作时注意性能优化
## 性能优化
1. 使用图像处理时注意批量处理
2. 视频处理时使用合适的编码格式
3. 词云生成时控制词数
4. 动画制作时优化渲染设置
## 贡献指南
欢迎提交Issue和Pull Request来改进功能或添加新特性。

View File

@@ -1,66 +0,0 @@
from ..Internal import *
from pyecharts.charts import WordCloud
from pyecharts import options as opts
from pyecharts import types
#from ..File.Core import tool_file, UnWrapper as UnWrapper2Str
def make_word_cloud(
series_name: str,
data_pair: Sequence[Tuple[str, int]],
**kwargs,
):
wordcloud = WordCloud()
wordcloud.add(series_name, data_pair, **kwargs)
return wordcloud
def set_title(
wordcloud: WordCloud,
title: str
):
wordcloud.set_global_opts(
title_opts=opts.TitleOpts(title=title)
)
def render_to(
wordcloud: WordCloud,
file_name: Union[tool_file, str]
):
wordcloud.render(UnWrapper2Str(file_name))
class light_word_cloud(left_value_reference[WordCloud]):
def __init__(
self,
series_name: str,
data_pair: types.Sequence,
**kwargs,
):
super().__init__(make_word_cloud(series_name, data_pair, **kwargs))
def set_title(
self,
title: str
):
set_title(self.ref_value, title)
def render_to(
self,
file_name: Union[tool_file, str]
):
render_to(self.ref_value, file_name)
if __name__ == "__main__":
# 准备数据
wordcloud = make_word_cloud("", [
("Python", 100),
("Java", 80),
("C++", 70),
("JavaScript", 90),
("Go", 60),
("Rust", 50),
("C#", 40),
("PHP", 30),
("Swift", 20),
("Kotlin", 10),
], word_size_range=[20, 100])
set_title(wordcloud, "cloud")
render_to(wordcloud, "wordcloud.html")

View File

@@ -1,121 +0,0 @@
[返回](./Runtime-README.md)
# /Convention/Runtime/Web
---
网络工具模块提供HTTP客户端和URL操作功能
## ToolURL类
### 构造与基本信息
- `ToolURL(string url)` 从URL字符串创建对象
- `ToString()` / `GetFullURL()` / `FullURL` 获取完整URL
- `implicit operator string` 隐式字符串转换
### URL属性解析
- `GetFilename()` 获取URL中的文件名
- `GetExtension()` 获取文件扩展名
- `ExtensionIs(params string[] extensions)` 检查扩展名是否匹配
### URL验证
- `IsValid` 属性检查URL是否有效
- `ValidateURL()` 验证URL格式
- `implicit operator bool` 隐式布尔转换等同于IsValid
支持HTTP和HTTPS协议的绝对URL
### HTTP方法
#### GET请求
- `GetAsync(Action<HttpResponseMessage> callback)` 异步GET
- `Get(Action<HttpResponseMessage> callback)` 同步GET
#### POST请求
- `PostAsync(Action<HttpResponseMessage> callback, Dictionary<string, string> formData = null)` 异步POST
- `Post(Action<HttpResponseMessage> callback, Dictionary<string, string> formData = null)` 同步POST
支持表单数据提交
### 内容加载
#### 文本加载
- `LoadAsTextAsync()` 异步加载为文本
- `LoadAsText()` 同步加载为文本
#### 二进制加载
- `LoadAsBinaryAsync()` 异步加载为字节数组
- `LoadAsBinary()` 同步加载为字节数组
#### JSON加载
- `LoadAsJson<T>()` 同步加载并反序列化JSON
- `LoadAsJsonAsync<T>()` 异步加载并反序列化JSON
### 文件保存
- `Save(string localPath = null)` 自动选择格式保存到本地
- `SaveAsText(string localPath = null)` 保存为文本文件
- `SaveAsJson(string localPath = null)` 保存为JSON文件
- `SaveAsBinary(string localPath = null)` 保存为二进制文件
### 文件类型判断
- `IsText` 是否为文本文件txt, html, htm, css, js, xml, csv
- `IsJson` 是否为JSON文件
- `IsImage` 是否为图像文件jpg, jpeg, png, gif, bmp, svg
- `IsDocument` 是否为文档文件pdf, doc, docx, xls, xlsx, ppt, pptx
### 高级操作
- `Open(string url)` 在当前对象上打开新URL
- `DownloadAsync(string localPath = null)` 异步下载文件
- `Download(string localPath = null)` 同步下载文件
## 设计特点
### 统一的HTTP客户端
使用静态 `HttpClient` 实例,避免连接池耗尽
### 自动内容类型检测
基于文件扩展名自动判断内容类型,优化保存和处理策略
### 异步支持
所有网络操作都提供异步和同步两种版本
### 错误处理
网络请求失败时回调函数接收null参数方法返回false
### 文件管理集成
下载的文件自动转换为ToolFile对象与文件系统模块无缝集成
### 灵活的数据格式
支持文本、二进制、JSON等多种数据格式的加载和保存
## 使用示例
### 基本HTTP请求
```csharp
var url = new ToolURL("https://api.example.com/data");
if (url.IsValid)
{
url.Get(response => {
if (response != null && response.IsSuccessStatusCode)
{
// 处理响应
}
});
}
```
### 文件下载
```csharp
var url = new ToolURL("https://example.com/file.json");
var localFile = url.Download("./downloads/file.json");
if (localFile.Exists())
{
var data = localFile.LoadAsJson<MyDataType>();
}
```
### 类型安全的JSON加载
```csharp
var url = new ToolURL("https://api.example.com/users.json");
var users = url.LoadAsJson<List<User>>();
```

View File

@@ -1,9 +1,9 @@
import sys
import os
from time import sleep
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from Convention.Runtime.File import *
from Convention.Runtime.Config import *
PrintColorful(ConsoleFrontColor.RED, "Hello, World!")
first = ToolFile("E:/dev/")
second = ToolFile("/analyze/")
print(first|second)