Compare commits

...

14 Commits

Author SHA1 Message Date
4d6b0e1c28 添加图像模块 2025-10-22 16:17:54 +08:00
f98fd3b4c4 移除Web.md 2025-10-22 09:30:30 +08:00
ef266f4a17 PrintColorful增强 2025-10-21 09:40:22 +08:00
48a8318fe7 Merge branch 'main' of http://www.liubai.site:3000/ninemine/Convention-Python 2025-10-21 09:34:34 +08:00
11e1aa0f86 Colorful独立 2025-10-21 09:34:26 +08:00
007db5a06b Merge branch 'main' of http://www.liubai.site:3000/ninemine/Convention-Python 2025-10-14 19:29:03 +08:00
c11469a108 ToolFile+AbsPath 2025-10-14 19:26:05 +08:00
81209e85ee 增强ToolFile对斜杠的判断与相关行为 2025-10-14 17:09:04 +08:00
97c57f65df Update ToolFile 2025-10-13 17:01:21 +08:00
5b38b6239e Update ToolFile 2025-10-13 16:25:19 +08:00
f146d241eb 1.File新增逐行读取迭代2.Rename PrintColorful 2025-10-13 15:39:29 +08:00
7ff00a8ab9 放弃EP Visual 2025-09-30 14:49:39 +08:00
b02eafcb35 EP Interaction 2025-09-30 10:40:58 +08:00
ecaab13948 EP Interaction 2025-09-29 16:22:10 +08:00
13 changed files with 1956 additions and 3329 deletions

862
Convention/Image/OpenCV.py Normal file
View File

@@ -0,0 +1,862 @@
from ..Runtime.Config import *
try:
import cv2 as cv2
import cv2.data as cv2data
from cv2.typing import *
except ImportError as e:
ImportingThrow(e, "OpenCV", ["opencv-python", "opencv-python-headless"])
try:
import numpy as np
except ImportError as e:
ImportingThrow(e, "OpenCV", ["numpy"])
try:
from PIL import ImageFile as ImageFile
from PIL import Image as Image
except ImportError as e:
ImportingThrow(e, "OpenCV", ["pillow"])
from ..Runtime.File import ToolFile
_Unwrapper2Str = lambda x: str(x)
_Wrapper2File = lambda x: ToolFile(x)
VideoWriter = cv2.VideoWriter
def mp4_with_MPEG4_fourcc() -> int:
return VideoWriter.fourcc(*"mp4v")
def avi_with_Xvid_fourcc() -> int:
return VideoWriter.fourcc(*"XVID")
def avi_with_DivX_fourcc() -> int:
return VideoWriter.fourcc(*"DIVX")
def avi_with_MJPG_fourcc() -> int:
return VideoWriter.fourcc(*"MJPG")
def mp4_or_avi_with_H264_fourcc() -> int:
return VideoWriter.fourcc(*"X264")
def avi_with_H265_fourcc() -> int:
return VideoWriter.fourcc(*"H264")
def wmv_with_WMV1_fourcc() -> int:
return VideoWriter.fourcc(*"WMV1")
def wmv_with_WMV2_fourcc() -> int:
return VideoWriter.fourcc(*"WMV2")
def oggTheora_with_THEO_fourcc() -> int:
return VideoWriter.fourcc(*"THEO")
def flv_with_FLV1_fourcc() -> int:
return VideoWriter.fourcc(*"FLV1")
class VideoWriterInstance(VideoWriter):
def __init__(
self,
file_name: Union[ToolFile, str],
fourcc: int,
fps: float,
frame_size: tuple[int, int],
is_color: bool = True
):
super().__init__(_Unwrapper2Str(file_name), fourcc, fps, frame_size, is_color)
def __del__(self):
self.release()
def wait_key(delay:int):
return cv2.waitKey(delay)
def until_esc():
return wait_key(0)
def is_current_key(key:str, *, wait_delay:int = 1):
return wait_key(wait_delay) & 0xFF == ord(key[0])
class BasicViewable:
def __init__(self, filename_or_index:Union[str, ToolFile, int]):
self._capture: cv2.VideoCapture = None
self.stats: bool = True
self.Retarget(filename_or_index)
def __del__(self):
self.Release()
def __bool__(self):
return self.stats
def IsOpened(self):
return self._capture.isOpened()
def Release(self):
if self._capture is not None:
self._capture.release()
def Retarget(self, filename_or_index:Union[str, ToolFile, int]):
self.Release()
if isinstance(filename_or_index, int):
self._capture = cv2.VideoCapture(filename_or_index)
else:
self._capture = cv2.VideoCapture(_Unwrapper2Str(filename_or_index))
return self
def NextFrame(self) -> MatLike:
self.stats, frame =self._capture.read()
if self.stats:
return frame
else:
return None
def GetCaptrueInfo(self, id:int):
return self._capture.get(id)
def GetPropPosMsec(self):
return self.GetCaptrueInfo(0)
def GetPropPosFrames(self):
return self.GetCaptrueInfo(1)
def GetPropAviRatio(self):
return self.GetCaptrueInfo(2)
def GetPropFrameWidth(self):
return self.GetCaptrueInfo(3)
def GetPropFrameHeight(self):
return self.GetCaptrueInfo(4)
def GetPropFPS(self):
return self.GetCaptrueInfo(5)
def GetPropFourcc(self):
return self.GetCaptrueInfo(6)
def GetPropFrameCount(self):
return self.GetCaptrueInfo(7)
def GetPropFormat(self):
return self.GetCaptrueInfo(8)
def GetPropMode(self):
return self.GetCaptrueInfo(9)
def GetPropBrightness(self):
return self.GetCaptrueInfo(10)
def GetPropContrast(self):
return self.GetCaptrueInfo(11)
def GetPropSaturation(self):
return self.GetCaptrueInfo(12)
def GetPropHue(self):
return self.GetCaptrueInfo(13)
def GetPropGain(self):
return self.GetCaptrueInfo(14)
def GetPropExposure(self):
return self.GetCaptrueInfo(15)
def GetPropConvertRGB(self):
return self.GetCaptrueInfo(16)
def SetupCapture(self, id:int, value):
self._capture.set(id, value)
return self
def SetPropPosMsec(self, value:int):
return self.SetupCapture(0, value)
def SetPropPosFrames(self, value:int):
return self.SetupCapture(1, value)
def SetPropAviRatio(self, value:float):
return self.SetupCapture(2, value)
def SetPropFrameWidth(self, value:int):
return self.SetupCapture(3, value)
def SetPropFrameHeight(self, value:int):
return self.SetupCapture(4, value)
def SetPropFPS(self, value:int):
return self.SetupCapture(5, value)
def SetPropFourcc(self, value):
return self.SetupCapture(6, value)
def SetPropFrameCount(self, value):
return self.SetupCapture(7, value)
def SetPropFormat(self, value):
return self.SetupCapture(8, value)
def SetPropMode(self, value):
return self.SetupCapture(9, value)
def SetPropBrightness(self, value):
return self.SetupCapture(10, value)
def SetPropContrast(self, value):
return self.SetupCapture(11, value)
def SetPropSaturation(self, value):
return self.SetupCapture(12, value)
def SetPropHue(self, value):
return self.SetupCapture(13, value)
def SetPropGain(self, value):
return self.SetupCapture(14, value)
def SetPropExposure(self, value):
return self.SetupCapture(15, value)
def SetPropConvertRGB(self, value:int):
return self.SetupCapture(16, value)
def SetPropRectification(self, value:int):
return self.SetupCapture(17, value)
@property
def FrameSize(self) -> Tuple[float, float]:
return self.GetPropFrameWidth(), self.GetPropFrameHeight()
class BasicCamera(BasicViewable):
def __init__(self, index:int = 0):
self.writer: VideoWriter = None
super().__init__(int(index))
@override
def Release(self):
super().Release()
if self.writer is not None:
self.writer.release()
def CurrentFrame(self):
return self.NextFrame()
def recording(
self,
stop_pr: Callable[[], bool],
writer: VideoWriter,
):
self.writer = writer
while self.IsOpened():
if stop_pr():
break
frame = self.CurrentFrame()
cv2.imshow("__recording__", frame)
writer.write(frame)
cv2.destroyWindow("__recording__")
return self
class ImageObject:
def __init__(
self,
image: Optional[Union[
str,
Self,
BasicCamera,
ToolFile,
MatLike,
np.ndarray,
ImageFile.ImageFile,
Image.Image
]],
flags: int = -1):
self.__image: MatLike = None
self.__camera: BasicCamera = None
self.current: MatLike = None
if isinstance(image, BasicCamera):
self.lock_from_camera(image)
else:
self.load_image(image, flags)
@property
def camera(self) -> BasicCamera:
if self.__camera is None or self.__camera.IsOpened() is False:
return None
else:
return self.__camera
@property
def image(self) -> MatLike:
if self.current is not None:
return self.current
elif self.camera is None:
return self.__image
else:
return self.__camera.CurrentFrame()
@image.setter
def image(self, image: Optional[Union[
str,
Self,
ToolFile,
MatLike,
np.ndarray,
ImageFile.ImageFile,
Image.Image
]]):
self.load_image(image)
def load_from_nparray(
self,
array_: np.ndarray,
code: int = cv2.COLOR_RGB2BGR,
*args, **kwargs
):
self.__image = cv2.cvtColor(array_, code, *args, **kwargs)
return self
def load_from_PIL_image(
self,
image: Image.Image,
code: int = cv2.COLOR_RGB2BGR,
*args, **kwargs
):
self.load_from_nparray(np.array(image), code, *args, **kwargs)
return self
def load_from_PIL_ImageFile(
self,
image: ImageFile.ImageFile,
rect: Optional[Tuple[float, float, float, float]] = None
):
return self.load_from_PIL_image(image.crop(rect))
def load_from_cv2_image(self, image: MatLike):
self.__image = image
return self
def lock_from_camera(self, camera: BasicCamera):
self.__camera = camera
return self
@property
def dimension(self) -> int:
return self.image.ndim
@property
def shape(self) -> Tuple[int, int, int]:
'''height, width, depth'''
return self.image.shape
@property
def height(self) -> int:
return self.shape[0]
@property
def width(self) -> int:
return self.shape[1]
def is_enable(self):
return self.image is not None
def is_invalid(self):
return self.is_enable() is False
def __bool__(self):
return self.is_enable()
def __MatLike__(self):
return self.image
def load_image(
self,
image: Optional[Union[
str,
ToolFile,
Self,
MatLike,
np.ndarray,
ImageFile.ImageFile,
Image.Image
]],
flags: int = -1
):
"""加载图片"""
if image is None:
self.__image = None
return self
elif isinstance(image, type(self)):
self.__image = image.image
elif isinstance(image, MatLike):
self.__image = image
elif isinstance(image, np.ndarray):
self.load_from_nparray(image, flags)
elif isinstance(image, ImageFile.ImageFile):
self.load_from_PIL_ImageFile(image, flags)
elif isinstance(image, Image.Image):
self.load_from_PIL_image(image, flags)
else:
self.__image = cv2.imread(_Unwrapper2Str(image), flags)
return self
def save_image(self, save_path:Union[str, ToolFile], is_path_must_exist = False):
"""保存图片"""
if is_path_must_exist:
_Wrapper2File(save_path).try_create_parent_path()
if self.is_enable():
cv2.imwrite(_Unwrapper2Str(save_path), self.image)
return self
def show_image(
self,
window_name: str = "Image",
delay: Union[int,str] = 0,
image_show_func: Callable[[Self], None] = None,
*args, **kwargs
):
"""显示图片"""
if self.is_invalid():
return self
if self.camera is not None:
while (wait_key(1) & 0xFF != ord(str(delay)[0])) and self.camera is not None:
# dont delete this line, self.image is camera flame now, see<self.current = None>
self.current = self.image
if image_show_func is not None:
image_show_func(self)
if self.current is not None:
cv2.imshow(window_name, self.current)
# dont delete this line, see property<image>
self.current = None
else:
cv2.imshow(window_name, self.image)
cv2.waitKey(delay = int(delay), *args, **kwargs)
if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) > 0:
cv2.destroyWindow(window_name)
return self
# 分离通道
def split(self):
"""分离通道"""
return cv2.split(self.image)
def split_to_image_object(self):
"""分离通道"""
return [ImageObject(channel) for channel in self.split()]
@property
def channels(self):
return self.split()
@property
def blue_channel(self):
return self.channels[0]
@property
def green_channel(self):
return self.channels[1]
@property
def red_channel(self):
return self.channels[2]
@property
def alpha_channel(self):
return self.channels[3]
def get_blue_image(self):
return ImageObject(self.blue_channel)
def get_green_image(self):
return ImageObject(self.green_channel)
def get_red_image(self):
return ImageObject(self.red_channel)
def get_alpha_image(self):
return ImageObject(self.alpha_channel)
# 混合通道
def merge_channels_from_list(self, channels:List[MatLike]):
"""合并通道"""
self.image = cv2.merge(channels)
return self
def merge_channels(self, blue:MatLike, green:MatLike, red:MatLike):
"""合并通道"""
return self.merge_channels_from_list([blue, green, red])
def merge_channel_list(self, bgr:List[MatLike]):
"""合并通道"""
return self.merge_channels_from_list(bgr)
# Transform
def get_resize_image(self, width:int, height:int):
if self.is_enable():
return cv2.resize(self.image, (width, height))
return None
def get_rotate_image(self, angle:float):
if self.is_invalid():
return None
(h, w) = self.image.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
return cv2.warpAffine(self.image, M, (w, h))
def resize_image(self, width:int, height:int):
"""调整图片大小"""
new_image = self.get_resize_image(width, height)
if new_image is not None:
self.image = new_image
return self
def rotate_image(self, angle:float):
"""旋转图片"""
new_image = self.get_rotate_image(angle)
if new_image is not None:
self.image = new_image
return self
# 图片翻折
def flip(self, flip_code:int):
"""翻转图片"""
if self.is_enable():
self.image = cv2.flip(self.image, flip_code)
return self
def horizon_flip(self):
"""水平翻转图片"""
return self.flip(1)
def vertical_flip(self):
"""垂直翻转图片"""
return self.flip(0)
def both_flip(self):
"""双向翻转图片"""
return self.flip(-1)
# 色彩空间猜测
def guess_color_space(self) -> Optional[str]:
"""猜测色彩空间"""
if self.is_invalid():
return None
image = self.image
# 计算每个通道的像素值分布
hist_b = cv2.calcHist([image], [0], None, [256], [0, 256])
hist_g = cv2.calcHist([image], [1], None, [256], [0, 256])
hist_r = cv2.calcHist([image], [2], None, [256], [0, 256])
# 计算每个通道的像素值总和
sum_b = np.sum(hist_b)
sum_g = np.sum(hist_g)
sum_r = np.sum(hist_r)
# 根据像素值总和判断色彩空间
if sum_b > sum_g and sum_b > sum_r:
#print("The image might be in BGR color space.")
return "BGR"
elif sum_g > sum_b and sum_g > sum_r:
#print("The image might be in GRAY color space.")
return "GRAY"
else:
#print("The image might be in RGB color space.")
return "RGB"
# 颜色转化
def get_convert(self, color_convert:int):
"""颜色转化"""
if self.is_invalid():
return None
return cv2.cvtColor(self.image, color_convert)
def convert_to(self, color_convert:int):
"""颜色转化"""
if self.is_invalid():
return None
self.image = self.get_convert(color_convert)
def is_grayscale(self):
return self.dimension == 2
def get_grayscale(self):
if self.is_invalid():
return None
return cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)
def convert_to_grayscale(self):
"""将图片转换为灰度图"""
self.image = self.get_grayscale()
return self
def get_convert_flag(
self,
targetColorTypeName:Literal[
"BGR", "RGB", "GRAY", "YCrCb"
]
) -> Optional[int]:
"""获取颜色转化标志"""
flag = self.guess_color_space()
if flag is None:
return None
if targetColorTypeName == "BGR":
if flag == "RGB":
return cv2.COLOR_RGB2BGR
elif flag == "GRAY":
return cv2.COLOR_GRAY2BGR
elif flag == "YCrCb":
return cv2.COLOR_YCrCb2BGR
elif targetColorTypeName == "RGB":
if flag == "BGR":
return cv2.COLOR_BGR2RGB
elif flag == "GRAY":
return cv2.COLOR_GRAY2RGB
elif flag == "YCrCb":
return cv2.COLOR_YCrCb2RGB
elif targetColorTypeName == "GRAY":
if flag == "RGB":
return cv2.COLOR_RGB2GRAY
elif flag == "RGB":
return cv2.COLOR_BGR2GRAY
return None
# 原址裁切
def sub_image(self, x:int, y:int ,width:int ,height:int):
"""裁剪图片"""
if self.is_invalid():
return self
self.image = self.image[y:y+height, x:x+width]
return self
# 直方图
def equalizeHist(self, is_cover = False) -> MatLike:
"""直方图均衡化"""
if self.is_invalid():
return self
result:MatLike = cv2.equalizeHist(self.image)
if is_cover:
self.image = result
return result
def calcHist(
self,
channel: Union[List[int], int],
mask: Optional[MatLike] = None,
hist_size: Sequence[int] = [256],
ranges: Sequence[float] = [0, 256]
) -> MatLike:
"""计算直方图"""
if self.is_invalid():
return None
return cv2.calcHist(
[self.image],
channel if isinstance(channel, list) else [channel],
mask,
hist_size,
ranges)
# 子集操作
def sub_image_with_rect(self, rect:Tuple[float, float, float, float]):
"""裁剪图片"""
if self.is_invalid():
return self
self.image = self.image[rect[1]:rect[1]+rect[3], rect[0]:rect[0]+rect[2]]
return self
def sub_image_with_box(self, box:Tuple[float, float, float, float]):
"""裁剪图片"""
if self.is_invalid():
return self
self.image = self.image[box[1]:box[3], box[0]:box[2]]
return self
def sub_cover_with_rect(self, image:Union[Self, MatLike], rect:Tuple[float, float, float, float]):
"""覆盖图片"""
if self.is_invalid():
raise ValueError("Real Image is none")
if isinstance(image, MatLike):
image = ImageObject(image)
self.image[rect[1]:rect[1]+rect[3], rect[0]:rect[0]+rect[2]] = image.image
return self
def sub_cover_with_box(self, image:Union[Self, MatLike], box:Tuple[float, float, float, float]):
"""覆盖图片"""
if self.is_invalid():
raise ValueError("Real Image is none")
if isinstance(image, MatLike):
image = ImageObject(image)
self.image[box[1]:box[3], box[0]:box[2]] = image.image
return self
def operator_cv(self, func:Callable[[MatLike], Any], *args, **kwargs):
func(self.image, *args, **kwargs)
return self
def stack(self, *args:Self, **kwargs) -> Self:
images = [ image for image in args]
images.append(self)
return ImageObject(np.stack([np.uint8(image.image) for image in images], *args, **kwargs))
def vstack(self, *args:Self) -> Self:
images = [ image for image in args]
images.append(self)
return ImageObject(np.vstack([np.uint8(image.image) for image in images]))
def hstack(self, *args:Self) -> Self:
images = [ image for image in args]
images.append(self)
return ImageObject(np.hstack([np.uint8(image.image) for image in images]))
def merge_with_blending(self, other:Self, weights:Tuple[float, float]):
return ImageObject(cv2.addWeighted(self.image, weights[0], other.image, weights[1], 0))
def add(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.add(self.image, image_or_value)
else:
self.image = cv2.add(self.image, image_or_value.image)
return self
def __add__(self, image_or_value:Union[Self, int]):
return ImageObject(self.image.copy()).add(image_or_value)
def subtract(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.subtract(self.image, image_or_value)
else:
self.image = cv2.subtract(self.image, image_or_value.image)
return self
def __sub__(self, image_or_value:Union[Self, int]):
return ImageObject(self.image.copy()).subtract(image_or_value)
def multiply(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.multiply(self.image, image_or_value)
else:
self.image = cv2.multiply(self.image, image_or_value.image)
return self
def __mul__(self, image_or_value:Union[Self, int]):
return ImageObject(self.image.copy()).multiply(image_or_value)
def divide(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.divide(self.image, image_or_value)
else:
self.image = cv2.divide(self.image, image_or_value.image)
return self
def __truediv__(self, image_or_value:Union[Self, int]):
return ImageObject(self.image.copy()).divide(image_or_value)
def bitwise_and(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.bitwise_and(self.image, image_or_value)
else:
self.image = cv2.bitwise_and(self.image, image_or_value.image)
return self
def bitwise_or(self, image_or_value:Union[Self, int]):
if isinstance(image_or_value, int):
self.image = cv2.bitwise_or(self.image, image_or_value)
else:
self.image = cv2.bitwise_or(self.image, image_or_value.image)
return self
def bitwise_xor(self, image_or_value:Union[Self]):
if isinstance(image_or_value, int):
self.image = cv2.bitwise_xor(self.image, image_or_value)
else:
self.image = cv2.bitwise_xor(self.image, image_or_value.image)
return self
def bitwise_not(self):
self.image = cv2.bitwise_not(self.image)
return self
def __neg__(self):
return ImageObject(self.image.copy()).bitwise_not()
class NoiseImageObject(ImageObject):
def __init__(
self,
height: int,
weight: int,
*,
mean: float = 0,
sigma: float = 25,
dtype = np.uint8
):
super().__init__(NoiseImageObject.get_new_noise(
None, height, weight, mean=mean, sigma=sigma, dtype=dtype
))
@classmethod
def get_new_noise(
raw_image: Optional[MatLike],
height: int,
weight: int,
*,
mean: float = 0,
sigma: float = 25,
dtype = np.uint8
) -> MatLike:
noise = raw_image
if noise is None:
noise = np.zeros((height, weight), dtype=dtype)
cv2.randn(noise, mean, sigma)
return cv2.cvtColor(noise, cv2.COLOR_GRAY2BGR)
def Unwrapper(image:Optional[Union[
str,
ImageObject,
ToolFile,
MatLike,
np.ndarray,
ImageFile.ImageFile,
Image.Image
]]) -> MatLike:
return image.image if isinstance(image, ImageObject) else ImageObject(image).image
def Wrapper(image:Optional[Union[
str,
ImageObject,
ToolFile,
MatLike,
np.ndarray,
ImageFile.ImageFile,
Image.Image
]]) -> ImageObject:
return ImageObject(image)
class light_cv_window:
def __init__(self, name:str):
self.__my_window_name = name
cv2.namedWindow(self.__my_window_name)
def __del__(self):
self.destroy()
def show_image(self, image:Union[ImageObject, MatLike]):
if self.__my_window_name is None:
self.__my_window_name = "window"
if isinstance(image, ImageObject):
image = image.image
cv2.imshow(self.__my_window_name, image)
return self
def destroy(self):
if self.__my_window_name is not None and cv2.getWindowProperty(self.__my_window_name, cv2.WND_PROP_VISIBLE) > 0:
cv2.destroyWindow(self.__my_window_name)
return self
@property
def window_rect(self):
return cv2.getWindowImageRect(self.__my_window_name)
@window_rect.setter
def window_rect(self, rect:Tuple[float, float, float, float]):
self.set_window_rect(rect[0], rect[1], rect[2], rect[3])
def set_window_size(self, weight:int, height:int):
cv2.resizeWindow(self.__my_window_name, weight, height)
return self
def get_window_size(self) -> Tuple[float, float]:
rect = self.window_rect
return rect[2], rect[3]
def get_window_property(self, prop_id:int):
return cv2.getWindowProperty(self.__my_window_name, prop_id)
def set_window_property(self, prop_id:int, prop_value:int):
cv2.setWindowProperty(self.__my_window_name, prop_id, prop_value)
return self
def get_prop_frame_width(self):
return self.window_rect[2]
def get_prop_frame_height(self):
return self.window_rect[3]
def is_full_window(self):
return cv2.getWindowProperty(self.__my_window_name, cv2.WINDOW_FULLSCREEN) > 0
def set_full_window(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_FULLSCREEN, 1)
return self
def set_normal_window(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_FULLSCREEN, 0)
return self
def is_using_openGL(self):
return cv2.getWindowProperty(self.__my_window_name, cv2.WINDOW_OPENGL) > 0
def set_using_openGL(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_OPENGL, 1)
return self
def set_not_using_openGL(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_OPENGL, 0)
return self
def is_autosize(self):
return cv2.getWindowProperty(self.__my_window_name, cv2.WINDOW_AUTOSIZE) > 0
def set_autosize(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_AUTOSIZE, 1)
return self
def set_not_autosize(self):
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_AUTOSIZE, 0)
return self
def set_window_rect(self, x:int, y:int, weight:int, height:int):
cv2.moveWindow(self.__my_window_name, x, y)
return self.set_window_size(weight, height)
def set_window_pos(self, x:int, y:int):
cv2.moveWindow(self.__my_window_name, x, y)
return self
def wait_key(self, wait_time:int=0):
return cv2.waitKey(wait_time)
def get_haarcascade_frontalface(name_or_default:Optional[str]=None):
if name_or_default is None:
name_or_default = "haarcascade_frontalface_default"
return cv2.CascadeClassifier(cv2data.haarcascades+'haarcascade_frontalface_default.xml')
def detect_human_face(
image: ImageObject,
detecter: cv2.CascadeClassifier,
scaleFactor: float = 1.1,
minNeighbors: int = 4,
*args, **kwargs):
'''return is Rect[]'''
return detecter.detectMultiScale(image.image, scaleFactor, minNeighbors, *args, **kwargs)
class internal_detect_faces_oop(Callable[[ImageObject], None]):
def __init__(self):
self.face_cascade = get_haarcascade_frontalface()
def __call__(self, image:ImageObject):
gray = image.convert_to_grayscale()
faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
for (x,y,w,h) in faces:
image.operator_cv(cv2.rectangle,(x,y),(x+w,y+h),(255,0,0),2)
def easy_detect_faces(camera:BasicCamera):
ImageObject(camera).show_image("window", 'q', internal_detect_faces_oop())
# 示例使用
if __name__ == "__main__":
img_obj = ImageObject("path/to/your/image.jpg")
img_obj.show_image()
img_obj.resize_image(800, 600)
img_obj.rotate_image(45)
img_obj.convert_to_grayscale()
img_obj.save_image("path/to/save/image.jpg")
# Override tool_file to tool_file_ex
class tool_file_cvex(ToolFile):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@override
def load_as_image(self) -> ImageObject:
self.data = ImageObject(self.get_path())
return self.data
@override
def save(self, path = None):
image:ImageObject = self.data
image.save_image(path if path is not None else self.get_path())
return self

View File

@@ -5,35 +5,132 @@ import sys
import threading import threading
import traceback import traceback
import datetime import datetime
try:
from colorama import Fore as ConsoleFrontColor, Back as ConsoleBackgroundColor, Style as ConsoleStyle # region ansi colorful
except:
print("colorama is not installed, using default colors") # Copyright Jonathan Hartley 2013. BSD 3-Clause license
class ConsoleFrontColor: '''
RED = "" This module generates ANSI character codes to printing colors to terminals.
GREEN = "" See: http://en.wikipedia.org/wiki/ANSI_escape_code
YELLOW = "" '''
BLUE = ""
MAGENTA = "" CSI = '\033['
CYAN = "" OSC = '\033]'
WHITE = "" BEL = '\a'
RESET = ""
class ConsoleBackgroundColor:
RED = "" def code_to_chars(code):
GREEN = "" return CSI + str(code) + 'm'
YELLOW = ""
BLUE = "" def set_title(title):
MAGENTA = "" return OSC + '2;' + title + BEL
CYAN = ""
WHITE = "" def clear_screen(mode=2):
RESET = "" return CSI + str(mode) + 'J'
class ConsoleStyle:
RESET = "" def clear_line(mode=2):
BOLD = "" return CSI + str(mode) + 'K'
DIM = ""
UNDERLINE = ""
REVERSE = "" class AnsiCodes(object):
HIDDEN = "" def __init__(self):
# the subclasses declare class attributes which are numbers.
# Upon instantiation we define instance attributes, which are the same
# as the class attributes but wrapped with the ANSI escape sequence
for name in dir(self):
if not name.startswith('_'):
value = getattr(self, name)
setattr(self, name, code_to_chars(value))
class ConsoleCursor(object):
def UP(self, n=1):
return CSI + str(n) + 'A'
def DOWN(self, n=1):
return CSI + str(n) + 'B'
def FORWARD(self, n=1):
return CSI + str(n) + 'C'
def BACK(self, n=1):
return CSI + str(n) + 'D'
def POS(self, x=1, y=1):
return CSI + str(y) + ';' + str(x) + 'H'
class ConsoleFrontColorClass(AnsiCodes):
BLACK = 30
RED = 31
GREEN = 32
YELLOW = 33
BLUE = 34
MAGENTA = 35
CYAN = 36
WHITE = 37
RESET = 39
# These are fairly well supported, but not part of the standard.
LIGHTBLACK_EX = 90
LIGHTRED_EX = 91
LIGHTGREEN_EX = 92
LIGHTYELLOW_EX = 93
LIGHTBLUE_EX = 94
LIGHTMAGENTA_EX = 95
LIGHTCYAN_EX = 96
LIGHTWHITE_EX = 97
ConsoleFrontColor = ConsoleFrontColorClass()
class ConsoleBackgroundColorClass(AnsiCodes):
BLACK = 40
RED = 41
GREEN = 42
YELLOW = 43
BLUE = 44
MAGENTA = 45
CYAN = 46
WHITE = 47
RESET = 49
# These are fairly well supported, but not part of the standard.
LIGHTBLACK_EX = 100
LIGHTRED_EX = 101
LIGHTGREEN_EX = 102
LIGHTYELLOW_EX = 103
LIGHTBLUE_EX = 104
LIGHTMAGENTA_EX = 105
LIGHTCYAN_EX = 106
LIGHTWHITE_EX = 107
ConsoleBackgroundColor = ConsoleBackgroundColorClass()
class ConsoleStyleClass(AnsiCodes):
BRIGHT = 1
DIM = 2
NORMAL = 22
RESET_ALL = 0
ConsoleStyle = ConsoleStyleClass()
def PrintColorful(color:str, *args, is_reset:bool=True, **kwargs):
with lock_guard():
if is_reset:
print(color,*args,ConsoleStyle.RESET_ALL, **kwargs)
else:
print(color,*args, **kwargs)
def PrintAsError(message:str):
PrintColorful(ConsoleFrontColor.RED, message)
def PrintAsWarning(message:str):
PrintColorful(ConsoleFrontColor.YELLOW, message)
def PrintAsInfo(message:str):
PrintColorful(ConsoleFrontColor.GREEN, message)
def PrintAsDebug(message:str):
PrintColorful(ConsoleFrontColor.BLUE, message)
def PrintAsSuccess(message:str):
PrintColorful(ConsoleFrontColor.GREEN, message)
def PrintAsLight(message:str):
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, message)
# endregion
class NotImplementedError(Exception): class NotImplementedError(Exception):
def __init__(self, message:Optional[str]=None) -> None: def __init__(self, message:Optional[str]=None) -> None:
@@ -60,13 +157,6 @@ def GetInternalDebug() -> bool:
global INTERNAL_DEBUG global INTERNAL_DEBUG
return INTERNAL_DEBUG return INTERNAL_DEBUG
def print_colorful(color:str, *args, is_reset:bool=True, **kwargs):
with lock_guard():
if is_reset:
print(color,*args,ConsoleStyle.RESET_ALL, **kwargs)
else:
print(color,*args, **kwargs)
ImportingFailedSet:Set[str] = set() ImportingFailedSet:Set[str] = set()
def ImportingThrow( def ImportingThrow(
ex: ImportError, ex: ImportError,
@@ -328,6 +418,12 @@ class PlatformIndicator:
CompanyName : str = "DefaultCompany" CompanyName : str = "DefaultCompany"
ProductName : str = "DefaultProject" ProductName : str = "DefaultProject"
@staticmethod
def GetFileSeparator(is_not_this_platform:bool = False) -> str:
if PlatformIndicator.IsPlatformWindows and not is_not_this_platform:
return "\\"
return "/"
@staticmethod @staticmethod
def GetApplicationPath() -> str: def GetApplicationPath() -> str:
"""获取应用程序所在目录""" """获取应用程序所在目录"""

View File

@@ -180,14 +180,14 @@ class ESReader(BaseModel):
''' '''
#module_name, _, class_name = type_label.split(",")[0].strip().rpartition('.') #module_name, _, class_name = type_label.split(",")[0].strip().rpartition('.')
#if GetInternalEasySaveDebug(): #if GetInternalEasySaveDebug():
# print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\ # PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
# f"{ConsoleFrontColor.YELLOW}, module_name: {ConsoleFrontColor.RESET}{module_name}"\ # f"{ConsoleFrontColor.YELLOW}, module_name: {ConsoleFrontColor.RESET}{module_name}"\
# f"{ConsoleFrontColor.YELLOW}, class_name: {ConsoleFrontColor.RESET}{class_name}") # f"{ConsoleFrontColor.YELLOW}, class_name: {ConsoleFrontColor.RESET}{class_name}")
#typen_to = try_to_type(class_name, module_name=module_name) or to_type(class_name) #typen_to = try_to_type(class_name, module_name=module_name) or to_type(class_name)
#return TypeManager.GetInstance().CreateOrGetRefType(typen_to) #return TypeManager.GetInstance().CreateOrGetRefType(typen_to)
typen, assembly_name = ReadAssemblyTypen(type_label) typen, assembly_name = ReadAssemblyTypen(type_label)
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"Prase __type label: {ConsoleFrontColor.RESET}{type_label}"\
f"{ConsoleFrontColor.YELLOW}, typen: {ConsoleFrontColor.RESET}{typen}"\ f"{ConsoleFrontColor.YELLOW}, typen: {ConsoleFrontColor.RESET}{typen}"\
f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}") f"{ConsoleFrontColor.YELLOW}, assembly_name: {ConsoleFrontColor.RESET}{assembly_name}")
return TypeManager.GetInstance().CreateOrGetRefType(typen) return TypeManager.GetInstance().CreateOrGetRefType(typen)
@@ -235,7 +235,7 @@ class ESReader(BaseModel):
if rtype is None: if rtype is None:
raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}") raise ValueError(f"{ConsoleFrontColor.RED}当前层不包含类型信息: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}")
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"layer: {ConsoleFrontColor.RESET}{LimitStringLength(str(layer), 100)}"\
f"{ConsoleFrontColor.YELLOW}, rtype: {ConsoleFrontColor.RESET}{rtype.ToString()}") f"{ConsoleFrontColor.YELLOW}, rtype: {ConsoleFrontColor.RESET}{rtype.ToString()}")
# 处理值类型 # 处理值类型
@@ -278,7 +278,7 @@ class ESReader(BaseModel):
else: else:
rinstance = rtype.CreateInstance() rinstance = rtype.CreateInstance()
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"rinstance rtype target: {ConsoleFrontColor.RESET}"\
f"{rtype.Print2Str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}") f"{rtype.Print2Str(verbose=True, flags=RefTypeFlag.Field|RefTypeFlag.Instance|RefTypeFlag.Public)}")
fields:List[FieldInfo] = self._GetFields(rtype) fields:List[FieldInfo] = self._GetFields(rtype)
for field in fields: for field in fields:
@@ -289,19 +289,19 @@ class ESReader(BaseModel):
if field.FieldType == list and field.ValueType.IsGeneric: if field.FieldType == list and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(ListIndictaor(field.ValueType.GenericArgs[0])) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(ListIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}List<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}List<"\
f"{field_rtype.GenericArgs[0]}>") f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == set and field.ValueType.IsGeneric: elif field.FieldType == set and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(SetIndictaor(field.ValueType.GenericArgs[0])) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(SetIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Set<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Set<"\
f"{field_rtype.GenericArgs[0]}>") f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == tuple and field.ValueType.IsGeneric: elif field.FieldType == tuple and field.ValueType.IsGeneric:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(TupleIndictaor(field.ValueType.GenericArgs[0])) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(TupleIndictaor(field.ValueType.GenericArgs[0]))
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Tuple<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Tuple<"\
f"{field_rtype.GenericArgs[0]}>") f"{field_rtype.GenericArgs[0]}>")
elif field.FieldType == dict and field.ValueType.IsGeneric: elif field.FieldType == dict and field.ValueType.IsGeneric:
@@ -309,13 +309,13 @@ class ESReader(BaseModel):
DictIndictaor(field.ValueType.GenericArgs[0], field.ValueType.GenericArgs[1]) DictIndictaor(field.ValueType.GenericArgs[0], field.ValueType.GenericArgs[1])
) )
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Dict<"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}Dict<"\
f"{field_rtype.GenericArgs[0]}, {field_rtype.GenericArgs[1]}>") f"{field_rtype.GenericArgs[0]}, {field_rtype.GenericArgs[1]}>")
else: else:
field_rtype = TypeManager.GetInstance().CreateOrGetRefType(field.FieldType) field_rtype = TypeManager.GetInstance().CreateOrGetRefType(field.FieldType)
if GetInternalEasySaveDebug(): if GetInternalEasySaveDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\ PrintColorful(ConsoleFrontColor.YELLOW, f"field: {ConsoleFrontColor.RESET}{field.FieldName}"\
f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}{field_rtype.RealType}"\ f"{ConsoleFrontColor.YELLOW}, field_rtype: {ConsoleFrontColor.RESET}{field_rtype.RealType}"\
f"<{field_rtype.GenericArgs}>") f"<{field_rtype.GenericArgs}>")
field.SetValue(rinstance, dfs(field_rtype, layer[field.FieldName])) field.SetValue(rinstance, dfs(field_rtype, layer[field.FieldName]))

View File

@@ -1,10 +1,8 @@
import os.path
from .Config import * from .Config import *
import json import json
import shutil import shutil
import pandas as pd
import os import os
import sys
import pickle
import zipfile import zipfile
import tarfile import tarfile
import base64 import base64
@@ -14,21 +12,6 @@ import datetime
import stat import stat
from typing import * from typing import *
from pathlib import Path from pathlib import Path
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
from .String import Bytes2String
def GetExtensionName(file:str): def GetExtensionName(file:str):
return os.path.splitext(file)[1][1:] return os.path.splitext(file)[1][1:]
@@ -67,8 +50,10 @@ class PermissionError(FileOperationError):
"""权限操作异常""" """权限操作异常"""
pass pass
from pydantic import BaseModel, GetCoreSchemaHandler, Field try:
from pydantic_core import core_schema from pydantic import BaseModel
except ImportError as e:
ImportingThrow(e, "File", ["pydantic"])
class ToolFile(BaseModel): class ToolFile(BaseModel):
OriginFullPath:str OriginFullPath:str
@@ -78,7 +63,7 @@ class ToolFile(BaseModel):
filePath: Union[str, Self], filePath: Union[str, Self],
): ):
filePath = os.path.expandvars(str(filePath)) filePath = os.path.expandvars(str(filePath))
if filePath[1:].startswith(":/") or filePath[1:].startswith(":\\"): if ":" in filePath:
filePath = os.path.abspath(filePath) filePath = os.path.abspath(filePath)
super().__init__(OriginFullPath=filePath) super().__init__(OriginFullPath=filePath)
def __del__(self): def __del__(self):
@@ -92,15 +77,21 @@ class ToolFile(BaseModel):
def __or__(self, other): def __or__(self, other):
if other is None: if other is None:
return ToolFile(self.GetFullPath() if self.IsDir() else f"{self.GetFullPath()}\\") return ToolFile(self.GetFullPath() if self.IsDir() else f"{self.GetFullPath()}{PlatformIndicator.GetFileSeparator()}")
else: else:
# 不使用os.path.join因为os.path.join存在如下机制 # 不使用os.path.join因为os.path.join存在如下机制
# 当参数路径中存在绝对路径风格时,会忽略前面的参数,例如: # 当参数路径中存在绝对路径风格时,会忽略前面的参数,例如:
# os.path.join("E:/dev", "/analyze/") = "E:/analyze/" # os.path.join("E:/dev", "/analyze/") = "E:/analyze/"
# 而我们需要的是 "E:/dev/analyze" # 而我们需要的是 "E:/dev/analyze"
first = self.GetFullPath().replace('/','\\').strip('\\') separator = PlatformIndicator.GetFileSeparator()
second = str(other).replace('/','\\') separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
return ToolFile(f"{first}\\{second}") first = self.GetFullPath().replace(separator_not_this_platform,separator).strip(separator)
second = str(other).replace(separator_not_this_platform,separator)
if first == "./":
return ToolFile(f"{second}")
elif first == "../":
first = ToolFile(f"{os.path.abspath(first)}").BackToParentDir()
return ToolFile(f"{first}{separator}{second}")
def __idiv__(self, other): def __idiv__(self, other):
temp = self.__or__(other) temp = self.__or__(other)
self.OriginFullPath = temp.GetFullPath() self.OriginFullPath = temp.GetFullPath()
@@ -123,15 +114,18 @@ class ToolFile(BaseModel):
other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other) other_path = other.GetFullPath() if isinstance(other, ToolFile) else str(other)
self_path = self.OriginFullPath self_path = self.OriginFullPath
separator = PlatformIndicator.GetFileSeparator()
separator_not_this_platform = PlatformIndicator.GetFileSeparator(True)
# 如果两个文件都存在,则直接比较路径 # 如果两个文件都存在,则直接比较路径
if self.Exists() == True and other.Exists() == True: if self.Exists() == True and other.Exists() == True:
return self_path.strip('\\/') == other_path.strip('\\/') return self_path.strip(separator_not_this_platform) == other_path.strip(separator_not_this_platform)
# 如果一个文件存在另一个不被判定为存在则一定不同 # 如果一个文件存在另一个不被判定为存在则一定不同
elif self.Exists() != other.Exists(): elif self.Exists() != other.Exists():
return False return False
# 如果两个文件都不存在,则直接比较文件名在视正反斜杠相同的情况下比较路径字符串 # 如果两个文件都不存在,则直接比较文件名在视正反斜杠相同的情况下比较路径字符串
else: else:
return self_path.replace('/','\\') == other_path.replace('/','\\') return self_path.replace(separator_not_this_platform,separator) == other_path.replace(separator_not_this_platform,separator)
def ToPath(self): def ToPath(self):
return Path(self.OriginFullPath) return Path(self.OriginFullPath)
@@ -181,7 +175,7 @@ class ToolFile(BaseModel):
if self.Exists() is False: if self.Exists() is False:
raise FileNotFoundError("file not found") raise FileNotFoundError("file not found")
newpath = str(newpath) newpath = str(newpath)
if '\\' in newpath or '/' in newpath: if PlatformIndicator.GetFileSeparator() in newpath or PlatformIndicator.GetFileSeparator(True) in newpath:
newpath = GetBaseFilename(newpath) newpath = GetBaseFilename(newpath)
new_current_path = os.path.join(self.GetDir(), newpath) new_current_path = os.path.join(self.GetDir(), newpath)
os.rename(self.OriginFullPath, new_current_path) os.rename(self.OriginFullPath, new_current_path)
@@ -192,16 +186,32 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'r', encoding=encoding) as f: with open(self.OriginFullPath, 'r', encoding=encoding) as f:
json_data = json.load(f, **kwargs) json_data = json.load(f, **kwargs)
return json_data return json_data
def LoadAsCsv(self) -> pd.DataFrame: def LoadAsCsv(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f) return pd.read_csv(f)
def LoadAsXml(self) -> pd.DataFrame: def LoadAsXml(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_xml(f) return pd.read_xml(f)
def LoadAsDataframe(self) -> pd.DataFrame: def LoadAsDataframe(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_csv(f) return pd.read_csv(f)
def LoadAsExcel(self) -> pd.DataFrame: def LoadAsExcel(self) -> "pd.DataFrame":
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return pd.read_excel(f) return pd.read_excel(f)
def LoadAsBinary(self) -> bytes: def LoadAsBinary(self) -> bytes:
@@ -211,18 +221,103 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'r') as f: with open(self.OriginFullPath, 'r') as f:
return f.read() return f.read()
def LoadAsWav(self): def LoadAsWav(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_wav(self.OriginFullPath) return AudioSegment.from_wav(self.OriginFullPath)
def LoadAsAudio(self): def LoadAsAudio(self):
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
return AudioSegment.from_file(self.OriginFullPath) return AudioSegment.from_file(self.OriginFullPath)
def LoadAsImage(self) -> ImageFile.ImageFile: def LoadAsImage(self):
try:
from PIL import Image
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
return Image.open(self.OriginFullPath) return Image.open(self.OriginFullPath)
def LoadAsDocx(self) -> DocumentObject: def LoadAsDocx(self) -> "docx.document.Document":
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
return Document(self.OriginFullPath) return Document(self.OriginFullPath)
def LoadAsUnknown(self, suffix:str) -> Any: def LoadAsUnknown(self, suffix:str) -> Any:
return self.LoadAsText() return self.LoadAsText()
def LoadAsModel(self, model:type[BaseModel]) -> BaseModel: def LoadAsModel(self, model:type["BaseModel"]) -> "BaseModel":
return model.model_validate(self.LoadAsJson()) return model.model_validate(self.LoadAsJson())
def ReadLines(self, **kwargs):
with open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = f.readline()
if not line or line == '':
break
yield line
async def ReadLinesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'r', **kwargs) as f:
while True:
line = await f.readline()
if not line or line == '':
break
yield line
def ReadBytes(self, **kwargs):
with open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = f.read(1024)
if not data or data == '':
break
yield data
async def ReadBytesAsync(self, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'rb', **kwargs) as f:
while True:
data = await f.read(1024)
if not data or data == '':
break
yield data
def WriteBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'wb', **kwargs) as f:
f.write(data)
async def WriteBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'wb', **kwargs) as f:
await f.write(data)
def WriteLines(self, data:List[str], **kwargs):
with open(self.OriginFullPath, 'w', **kwargs) as f:
f.writelines(data)
async def WriteLinesAsync(self, data:List[str], **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'w', **kwargs) as f:
await f.writelines(data)
def AppendText(self, data:str, **kwargs):
with open(self.OriginFullPath, 'a', **kwargs) as f:
f.write(data)
async def AppendTextAsync(self, data:str, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'a', **kwargs) as f:
await f.write(data)
def AppendBytes(self, data:bytes, **kwargs):
with open(self.OriginFullPath, 'ab', **kwargs) as f:
f.write(data)
async def AppendBytesAsync(self, data:bytes, **kwargs):
import aiofiles
async with aiofiles.open(self.OriginFullPath, 'ab', **kwargs) as f:
await f.write(data)
def SaveAsJson(self, json_data): def SaveAsJson(self, json_data):
try: try:
from pydantic import BaseModel from pydantic import BaseModel
@@ -234,16 +329,40 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'w', encoding='utf-8') as f: with open(self.OriginFullPath, 'w', encoding='utf-8') as f:
json.dump(json_data, f, indent=4) json.dump(json_data, f, indent=4)
return self return self
def SaveAsCsv(self, csv_data:pd.DataFrame): def SaveAsCsv(self, csv_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
csv_data.to_csv(self.OriginFullPath) csv_data.to_csv(self.OriginFullPath)
return self return self
def SaveAsXml(self, xml_data:pd.DataFrame): def SaveAsXml(self, xml_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
xml_data.to_xml(self.OriginFullPath) xml_data.to_xml(self.OriginFullPath)
return self return self
def SaveAsDataframe(self, dataframe_data:pd.DataFrame): def SaveAsDataframe(self, dataframe_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
dataframe_data.to_csv(self.OriginFullPath) dataframe_data.to_csv(self.OriginFullPath)
return self return self
def SaveAsExcel(self, excel_data:pd.DataFrame): def SaveAsExcel(self, excel_data:"pd.DataFrame"):
'''
try:
import pandas as pd
except ImportError as e:
ImportingThrow(e, "File", ["pandas"])
'''
excel_data.to_excel(self.OriginFullPath, index=False) excel_data.to_excel(self.OriginFullPath, index=False)
return self return self
def SaveAsBinary(self, binary_data:bytes): def SaveAsBinary(self, binary_data:bytes):
@@ -254,13 +373,32 @@ class ToolFile(BaseModel):
with open(self.OriginFullPath, 'w') as f: with open(self.OriginFullPath, 'w') as f:
f.writelines(text_data) f.writelines(text_data)
return self return self
def SaveAsAudio(self, audio_data:AudioSegment): def SaveAsAudio(self, audio_data:"AudioSegment"):
'''
try:
from pydub import AudioSegment
except ImportError as e:
ImportingThrow(e, "File", ["pydub"])
'''
audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath)) audio_data.export(self.OriginFullPath, format=self.get_extension(self.OriginFullPath))
return self return self
def SaveAsImage(self, image_data:ImageFile.ImageFile): def SaveAsImage(self, image_data:"ImageFile.ImageFile"):
'''
try:
from PIL import Image, ImageFile
except ImportError as e:
ImportingThrow(e, "File", ["Pillow"])
'''
image_data.save(self.OriginFullPath) image_data.save(self.OriginFullPath)
return self return self
def SaveAsDocx(self, docx_data:DocumentObject): def SaveAsDocx(self, docx_data:"DocumentObject"):
'''
try:
from docx import Document
from docx.document import Document as DocumentObject
except ImportError as e:
ImportingThrow(e, "File", ["python-docx"])
'''
docx_data.save(self.OriginFullPath) docx_data.save(self.OriginFullPath)
return self return self
def SaveAsUnknown(self, unknown_data:Any): def SaveAsUnknown(self, unknown_data:Any):
@@ -276,6 +414,8 @@ class ToolFile(BaseModel):
return os.path.getsize(self.OriginFullPath) return os.path.getsize(self.OriginFullPath)
def GetExtension(self): def GetExtension(self):
return GetExtensionName(self.OriginFullPath) return GetExtensionName(self.OriginFullPath)
def GetAbsPath(self) -> str:
return os.path.abspath(self.OriginFullPath)
def GetFullPath(self) -> str: def GetFullPath(self) -> str:
return self.OriginFullPath return self.OriginFullPath
def GetFilename(self, is_without_extension = False): def GetFilename(self, is_without_extension = False):
@@ -285,7 +425,7 @@ class ToolFile(BaseModel):
''' '''
if is_without_extension and '.' in self.OriginFullPath: if is_without_extension and '.' in self.OriginFullPath:
return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)] return GetBaseFilename(self.OriginFullPath)[:-(len(self.GetExtension())+1)]
elif self.OriginFullPath[-1] == '/' or self.OriginFullPath[-1] == '\\': elif self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator(True):
return GetBaseFilename(self.OriginFullPath[:-1]) return GetBaseFilename(self.OriginFullPath[:-1])
else: else:
return GetBaseFilename(self.OriginFullPath) return GetBaseFilename(self.OriginFullPath)
@@ -297,7 +437,7 @@ class ToolFile(BaseModel):
return os.path.dirname(self.OriginFullPath) return os.path.dirname(self.OriginFullPath)
def IsDir(self): def IsDir(self):
if self.OriginFullPath[-1] == '\\' or self.GetFullPath()[-1] == '/': if self.OriginFullPath[-1] == PlatformIndicator.GetFileSeparator() or self.GetFullPath()[-1] == PlatformIndicator.GetFileSeparator(True):
return True return True
else: else:
return os.path.isdir(self.OriginFullPath) return os.path.isdir(self.OriginFullPath)
@@ -646,8 +786,11 @@ class ToolFile(BaseModel):
ignore_directories: 是否忽略目录事件 ignore_directories: 是否忽略目录事件
case_sensitive: 是否区分大小写 case_sensitive: 是否区分大小写
""" """
from watchdog.observers import Observer try:
from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
except ImportError as e:
ImportingThrow(e, "File", ["watchdog"])
if not self.Exists(): if not self.Exists():
raise FileNotFoundError(f"File not found: {self.GetFullPath()}") raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
@@ -1009,31 +1152,3 @@ class ToolFile(BaseModel):
""" """
return self.get_permissions()['hidden'] return self.get_permissions()['hidden']
def split_elements(
file: Union[ToolFile, str],
*,
ratios: List[float] = [1,1],
pr: Optional[Callable[[ToolFile], bool]] = None,
shuffler: Optional[Callable[[List[ToolFile]], None]] = None,
output_dirs: Optional[List[ToolFile]] = None,
output_must_exist: bool = True,
output_callback: Optional[Callable[[ToolFile], None]] = None
) -> List[List[ToolFile]]:
result: List[List[ToolFile]] = tool_split_elements(WrapperFile(file).dir_tool_file_iter(),
ratios=ratios,
pr=pr,
shuffler=shuffler)
if output_dirs is None:
return result
for i in range(min(len(output_dirs), len(result))):
output_dir: ToolFile = output_dirs[i]
if output_dir.IsDir() is False:
raise Exception("Outputs must be directory")
if output_must_exist:
output_dir.must_exists_as_new()
for file in result[i]:
current = output_dirs[i].MakeFileInside(file)
if output_callback:
output_callback(current)
return result

View File

@@ -0,0 +1,743 @@
from .Config import *
from .File import ToolFile
from .Web import ToolURL
import json
import urllib.parse
import os
from typing import *
try:
from pydantic import BaseModel, PrivateAttr, Field
except ImportError as e:
ImportingThrow(e, "Interaction", ["pydantic"])
try:
import aiofiles
except ImportError as e:
ImportingThrow(e, "Interaction", ["aiofiles"])
class InteractionError(Exception):
"""交互操作异常基类"""
pass
class PathValidationError(InteractionError):
"""路径验证异常"""
pass
class LoadError(InteractionError):
"""加载异常"""
pass
class SaveError(InteractionError):
"""保存异常"""
pass
class Interaction(BaseModel):
"""统一的文件交互类,自适应处理本地文件和网络文件"""
originPath: str
_is_url: bool = PrivateAttr(False)
_is_local: bool = PrivateAttr(False)
_tool_file: Optional[ToolFile] = PrivateAttr(None)
_tool_url: Optional[ToolURL] = PrivateAttr(None)
def __init__(self, path):
"""
从路径字符串创建对象自动识别本地文件或网络URL
Args:
path: 路径字符串或是可以转换为路径字符串的对象
"""
super().__init__(originPath=str(path))
# 自动识别路径类型
self._detect_path_type()
def _detect_path_type(self):
"""自动检测路径类型"""
path = self.originPath.strip()
# 检查是否为HTTP/HTTPS URL
if path.startswith(('http://', 'https://', 'file://')):
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(path)
return
# 检查是否为localhost URL
if path.startswith('localhost'):
# 转换为完整的HTTP URL
if not path.startswith('localhost:'):
# 默认端口80
full_url = f"http://{path}"
else:
full_url = f"http://{path}"
self._is_url = True
self._is_local = False
self._tool_url = ToolURL(full_url)
self.originPath = full_url
return
# 检查是否为绝对路径或相对路径
if (os.path.isabs(path) or
path.startswith('./') or
path.startswith('../') or
':' in path[:3]): # Windows盘符
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
return
# 默认作为相对路径处理
self._is_local = True
self._is_url = False
self._tool_file = ToolFile(path)
def __str__(self) -> str:
"""隐式字符串转换"""
return self.originPath
def __bool__(self) -> bool:
"""隐式布尔转换,检查路径是否有效"""
return self.IsValid
@property
def IsValid(self) -> bool:
"""检查路径是否有效"""
if self._is_url:
return self._tool_url.IsValid if self._tool_url else False
else:
return self._tool_file.Exists() if self._tool_file else False
@property
def IsURL(self) -> bool:
"""是否为网络URL"""
return self._is_url
@property
def IsLocal(self) -> bool:
"""是否为本地文件"""
return self._is_local
@property
def IsFile(self) -> bool:
"""是否为文件对于URL检查是否存在文件名"""
if self._is_url:
return bool(self._tool_url.GetFilename()) if self._tool_url else False
else:
return self._tool_file.IsFile() if self._tool_file else False
@property
def IsDir(self) -> bool:
"""是否为目录(仅对本地路径有效)"""
if self._is_local:
return self._tool_file.IsDir() if self._tool_file else False
return False
def GetFilename(self) -> str:
"""获取文件名"""
if self._is_url:
return self._tool_url.GetFilename() if self._tool_url else ""
else:
return self._tool_file.GetFilename() if self._tool_file else ""
def GetExtension(self) -> str:
"""获取文件扩展名"""
if self._is_url:
return self._tool_url.GetExtension() if self._tool_url else ""
else:
return self._tool_file.GetExtension() if self._tool_file else ""
def ExtensionIs(self, *extensions: str) -> bool:
"""检查扩展名是否匹配"""
if self._is_url:
return self._tool_url.ExtensionIs(*extensions) if self._tool_url else False
else:
current_ext = self.GetExtension()
return current_ext.lower() in [ext.lower().lstrip('.') for ext in extensions]
# 文件类型判断属性
@property
def IsText(self) -> bool:
"""是否为文本文件"""
return self.ExtensionIs('txt', 'html', 'htm', 'css', 'js', 'xml', 'csv', 'md', 'py', 'java', 'cpp', 'c', 'h')
@property
def IsJson(self) -> bool:
"""是否为JSON文件"""
return self.ExtensionIs('json')
@property
def IsImage(self) -> bool:
"""是否为图像文件"""
return self.ExtensionIs('jpg', 'jpeg', 'png', 'gif', 'bmp', 'svg', 'webp')
@property
def IsDocument(self) -> bool:
"""是否为文档文件"""
return self.ExtensionIs('pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx')
def Open(self, path: str) -> 'Interaction':
"""在当前对象上打开新路径"""
new_obj = Interaction(path)
self.originPath = new_obj.originPath
self._is_url = new_obj._is_url
self._is_local = new_obj._is_local
self._tool_file = new_obj._tool_file
self._tool_url = new_obj._tool_url
return self
# 同步加载方法
def LoadAsText(self) -> str:
"""
同步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsText()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsText()
def LoadAsBinary(self) -> bytes:
"""
同步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsBinary()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.LoadAsBinary()
def LoadAsJson(self, model_type: Optional[type] = None) -> Any:
"""
同步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.LoadAsJson(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
json_data = self._tool_file.LoadAsJson()
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
# 异步加载方法
async def LoadAsTextAsync(self) -> str:
"""
异步加载为文本
Returns:
文本内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsTextAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'r', encoding='utf-8') as f:
return await f.read()
async def LoadAsBinaryAsync(self) -> bytes:
"""
异步加载为字节数组
Returns:
二进制内容
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsBinaryAsync()
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地文件
async with aiofiles.open(self._tool_file.GetFullPath(), 'rb') as f:
return await f.read()
async def LoadAsJsonAsync(self, model_type: Optional[type] = None) -> Any:
"""
异步加载并反序列化JSON
Args:
model_type: 可选的Pydantic模型类型
Returns:
JSON数据或模型对象
"""
if self._is_url:
if not self._tool_url or not self._tool_url.IsValid:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.LoadAsJsonAsync(model_type)
else:
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
# 异步读取本地JSON文件
text_content = await self.LoadAsTextAsync()
try:
json_data = json.loads(text_content)
if model_type and issubclass(model_type, BaseModel):
return model_type.model_validate(json_data)
return json_data
except json.JSONDecodeError as e:
raise LoadError(f"Failed to parse JSON from {self.originPath}: {str(e)}")
# 同步保存方法
def SaveAsText(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL先下载然后保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsText(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsText(content)
return self
def SaveAsBinary(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsBinary(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsBinary(content)
return self
def SaveAsJson(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
同步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.json"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
file_obj.SaveAsJson(data)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
self._tool_file.SaveAsJson(data)
return self
# 异步保存方法
async def SaveAsTextAsync(self, content: str, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为文本
Args:
content: 文本内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.txt"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'w', encoding='utf-8') as f:
await f.write(content)
return self
async def SaveAsBinaryAsync(self, content: bytes, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为二进制
Args:
content: 二进制内容
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
if self._is_url:
# 对于URL保存到本地
if local_path is None:
local_path = self.GetFilename() or "downloaded.bin"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
async with aiofiles.open(file_obj.GetFullPath(), 'wb') as f:
await f.write(content)
else:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.TryCreateParentPath()
async with aiofiles.open(self._tool_file.GetFullPath(), 'wb') as f:
await f.write(content)
return self
async def SaveAsJsonAsync(self, data: Any, local_path: Optional[str] = None) -> 'Interaction':
"""
异步保存为JSON
Args:
data: JSON数据
local_path: 本地保存路径仅对URL有效
Returns:
保存的文件对象或Interaction对象
"""
# 序列化JSON数据
try:
from pydantic import BaseModel
if isinstance(data, BaseModel):
json_data = data.model_dump()
json_data["__type"] = f"{data.__class__.__name__}, pydantic.BaseModel"
else:
json_data = data
json_content = json.dumps(json_data, indent=4, ensure_ascii=False)
except Exception as e:
raise SaveError(f"Failed to serialize JSON data: {str(e)}")
# 保存JSON内容
return await self.SaveAsTextAsync(json_content, local_path)
# HTTP请求方法仅对URL有效
def Get(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
同步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Get(callback)
def Post(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
同步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return self._tool_url.Post(callback, form_data)
async def GetAsync(self, callback: Callable[[Optional[Any]], None]) -> bool:
"""
异步GET请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("GET method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.GetAsync(callback)
async def PostAsync(self, callback: Callable[[Optional[Any]], None], form_data: Optional[Dict[str, str]] = None) -> bool:
"""
异步POST请求仅对URL有效
Args:
callback: 响应回调函数成功时接收响应对象失败时接收None
form_data: 表单数据字典
Returns:
是否请求成功
"""
if not self._is_url:
raise InteractionError("POST method is only available for URLs")
if not self._tool_url:
callback(None)
return False
return await self._tool_url.PostAsync(callback, form_data)
# 便利方法
def Save(self, local_path: Optional[str] = None) -> 'Interaction':
"""
自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL先下载内容再保存
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
self._tool_url.Save(local_path)
return self
async def SaveAsync(self, local_path: Optional[str] = None) -> 'Interaction':
"""
异步自动选择格式保存
Args:
local_path: 本地保存路径
Returns:
保存的文件对象或Interaction对象
"""
# 对于本地文件,直接返回自身(已存在)
if self._is_url:
# 对于URL异步下载内容
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
if local_path is None:
local_path = self.GetFilename() or "downloaded_file"
file_obj = ToolFile(local_path)
file_obj.TryCreateParentPath()
try:
if self.IsText:
content = await self.LoadAsTextAsync()
await self.SaveAsTextAsync(content, local_path)
elif self.IsJson:
content = await self.LoadAsJsonAsync()
await self.SaveAsJsonAsync(content, local_path)
else:
content = await self.LoadAsBinaryAsync()
await self.SaveAsBinaryAsync(content, local_path)
except Exception as e:
raise SaveError(f"Failed to save {self.originPath}: {str(e)}")
return self
def Downloadable(self) -> bool:
"""检查是否可下载"""
return self._is_url and self._tool_url.IsValid if self._tool_url else False
def Download(self, local_path: Optional[str] = None) -> ToolFile:
"""
下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("Download method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return self._tool_url.Download(local_path)
async def DownloadAsync(self, local_path: Optional[str] = None) -> ToolFile:
"""
异步下载文件仅对URL有效
Args:
local_path: 本地保存路径
Returns:
下载的文件对象
"""
if self._is_local:
raise InteractionError("DownloadAsync method is only available for URLs")
if not self._tool_url:
raise PathValidationError(f"Invalid URL: {self.originPath}")
return await self._tool_url.DownloadAsync(local_path)
def Copy(self, target_path) -> ToolFile:
"""
复制文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
新的Interaction对象
"""
if not self._is_local:
raise InteractionError("Copy method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Copy(str(target_path))
def Move(self, target_path) -> ToolFile:
"""
移动文件(仅对本地文件有效)
Args:
target_path: 目标路径
Returns:
更新后的Interaction对象
"""
if not self._is_local:
raise InteractionError("Move method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
return self._tool_file.Move(str(target_path))
def Remove(self) -> 'Interaction':
"""
删除文件(仅对本地文件有效)
Returns:
Interaction对象本身
"""
if not self._is_local:
raise InteractionError("Remove method is only available for local files")
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
self._tool_file.Remove()
return self
def Exists(self) -> bool:
"""
检查文件是否存在
Returns:
是否存在
"""
return self.IsValid
def GetSize(self) -> int:
"""
获取文件大小(仅对本地文件有效)
Returns:
文件大小(字节)
"""
if not self._is_local:
raise InteractionError("GetSize method is only available for local files")
if not self._tool_file or not self._tool_file.Exists():
raise PathValidationError(f"File not found: {self.originPath}")
return self._tool_file.GetSize()
def GetDir(self) -> str:
"""
获取目录路径
Returns:
目录路径
"""
if self._is_local:
return self._tool_file.GetDir() if self._tool_file else ""
else:
# 对于URL返回基础URL
if self._tool_url:
parsed = urllib.parse.urlparse(self._tool_url.url)
return f"{parsed.scheme}://{parsed.netloc}"
return ""
def GetParentDir(self) -> 'Interaction':
"""
获取父目录的Interaction对象
Returns:
父目录的Interaction对象
"""
if self._is_local:
if not self._tool_file:
raise PathValidationError(f"Invalid file path: {self.originPath}")
parent_dir = self._tool_file.GetParentDir()
return Interaction(parent_dir.GetFullPath())
else:
# 对于URL返回基础URL
base_url = self.GetDir()
return Interaction(base_url)
def ToString(self) -> str:
"""获取完整路径"""
return self.originPath
def GetFullPath(self) -> str:
"""获取完整路径"""
return self.originPath

View File

@@ -241,7 +241,7 @@ def ToType(
type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None) type_module = module_name or (".".join(type_components[:-1]) if len(type_components) > 1 else None)
type_final = type_components[-1] type_final = type_components[-1]
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\ PrintColorful(ConsoleFrontColor.YELLOW, f"type_module: {type_module}, type_final: {type_final}, "\
f"typen: {typen}, type_components: {type_components}") f"typen: {typen}, type_components: {type_components}")
if type_module is not None: if type_module is not None:
return sys.modules[type_module].__dict__[type_final] return sys.modules[type_module].__dict__[type_final]
@@ -304,7 +304,7 @@ def DecayType(
return type_hint return type_hint
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}") PrintColorful(ConsoleFrontColor.YELLOW, f"Decay: {type_hint}")
result: type|List[type] = None result: type|List[type] = None
@@ -333,7 +333,7 @@ def DecayType(
raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>") raise ReflectionException(f"Invalid type: {type_hint}<{type_hint.__class__}>")
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Result: {result}") PrintColorful(ConsoleFrontColor.YELLOW, f"Result: {result}")
return result return result
def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool: def IsJustDefinedInCurrentClass(member_name:str, current_class:type) -> bool:
@@ -456,7 +456,7 @@ class ValueInfo(BaseInfo):
super().__init__(**kwargs) super().__init__(**kwargs)
self._RealType = metaType self._RealType = metaType
if GetInternalReflectionDebug() and len(generic_args) > 0: if GetInternalReflectionDebug() and len(generic_args) > 0:
print_colorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\ PrintColorful(ConsoleFrontColor.YELLOW, f"Current ValueInfo Debug Frame: "\
f"metaType={metaType}, generic_args={generic_args}") f"metaType={metaType}, generic_args={generic_args}")
self._GenericArgs = generic_args self._GenericArgs = generic_args
if not isinstance(metaType, type): if not isinstance(metaType, type):
@@ -546,7 +546,7 @@ class ValueInfo(BaseInfo):
**kwargs **kwargs
) -> 'ValueInfo': ) -> 'ValueInfo':
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\ PrintColorful(ConsoleFrontColor.BLUE, f"Current ValueInfo.Create Frame: "\
f"metaType={metaType}, SelfType={SelfType}") f"metaType={metaType}, SelfType={SelfType}")
if isinstance(metaType, type): if isinstance(metaType, type):
if metaType is list: if metaType is list:
@@ -601,7 +601,7 @@ class FieldInfo(MemberInfo):
selfType: type|Any|None = None selfType: type|Any|None = None
): ):
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\ PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current Make FieldInfo: {ctype}."\
f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ") f"{ConsoleFrontColor.RESET}{name} {ConsoleFrontColor.LIGHTBLUE_EX}{metaType} ")
super().__init__( super().__init__(
name = name, name = name,
@@ -611,7 +611,7 @@ class FieldInfo(MemberInfo):
) )
self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType) self._MetaType = ValueInfo.Create(metaType, module_name=module_name, SelfType=selfType)
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\ PrintColorful(ConsoleFrontColor.LIGHTBLUE_EX, f"Current RealType: {self.FieldType}"\
f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}") f"{f'<{self.ValueType.GenericArgs}>' if self.ValueType.IsGeneric else ''}")
@property @property
@@ -746,7 +746,7 @@ class MethodInfo(MemberInfo):
is_class_method: bool, is_class_method: bool,
): ):
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\ PrintColorful(ConsoleFrontColor.YELLOW, f"Current Make MethodInfo: "\
f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})") f"{return_type} {ctype}.{name}({', '.join([p.ParameterName for p in parameters])})")
MemberInfo.__init__(self, name, ctype, is_static, is_public) MemberInfo.__init__(self, name, ctype, is_static, is_public)
self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType) self._ReturnType = ValueInfo.Create(return_type, SelfType=self.ParentType)
@@ -1143,12 +1143,12 @@ class RefType(ValueInfo):
def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]: def dfs(currentType:RefType) -> Dict[str, Dict[str, Any]|Any]:
if currentType.IsPrimitive: if currentType.IsPrimitive:
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\ PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(IsPrimitive): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return f"{currentType.RealType}" return f"{currentType.RealType}"
elif currentType.RealType in type_set: elif currentType.RealType in type_set:
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\ PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Already): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
return { return {
"type": f"{currentType.RealType}", "type": f"{currentType.RealType}",
@@ -1156,13 +1156,13 @@ class RefType(ValueInfo):
} }
else: else:
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\ PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(New): "\
f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}") f"__type={currentType.RealType} __type.class={currentType.RealType.__class__}")
type_set.add(currentType.RealType) type_set.add(currentType.RealType)
value = {} value = {}
fields = currentType.GetFields() fields = currentType.GetFields()
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}") PrintColorful(ConsoleFrontColor.RED, f"Current Tree DFS(Fields): {[field.FieldName for field in fields]}")
for field in fields: for field in fields:
value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType)) value[field.FieldName] = dfs(TypeManager.GetInstance().CreateOrGetRefType(field.FieldType))
return { return {
@@ -1429,7 +1429,7 @@ class TypeManager(BaseModel):
if data is None: if data is None:
raise ReflectionException("data is None") raise ReflectionException("data is None")
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}") PrintColorful(ConsoleFrontColor.YELLOW, f"Try Get RefType: {ConsoleFrontColor.RESET}{data}")
# 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型 # 快速路径:如果是字符串并且在字符串缓存中,直接返回对应的类型
if isinstance(data, str) and data in self._string_to_type_cache: if isinstance(data, str) and data in self._string_to_type_cache:
@@ -1454,7 +1454,7 @@ class TypeManager(BaseModel):
# 添加到弱引用缓存 # 添加到弱引用缓存
self._weak_refs[type_id] = weakref.ref(ref_type) self._weak_refs[type_id] = weakref.ref(ref_type)
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.YELLOW, f"Get "\ PrintColorful(ConsoleFrontColor.YELLOW, f"Get "\
f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\ f"{ConsoleFrontColor.RESET}{metaType}{ConsoleFrontColor.YELLOW} RefType: "\
f"{ConsoleFrontColor.RESET}{ref_type.ToString()}") f"{ConsoleFrontColor.RESET}{ref_type.ToString()}")
return ref_type return ref_type
@@ -1507,7 +1507,7 @@ class TypeManager(BaseModel):
try: try:
ref_type = RefType(metaType) ref_type = RefType(metaType)
if GetInternalReflectionDebug(): if GetInternalReflectionDebug():
print_colorful(ConsoleFrontColor.RED, f"Create "\ PrintColorful(ConsoleFrontColor.RED, f"Create "\
f"{ConsoleFrontColor.RESET}{metaType} "\ f"{ConsoleFrontColor.RESET}{metaType} "\
f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}") f"{ConsoleFrontColor.RED}RefType: {ConsoleFrontColor.RESET}{ref_type.ToString()}")
self._RefTypes[metaType] = ref_type self._RefTypes[metaType] = ref_type

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,264 +0,0 @@
# Visual 模块
Visual模块提供了数据可视化和图像处理相关的功能包括数据图表、图像处理、词云等。
## 目录结构
- `Core.py`: 核心数据可视化功能
- `OpenCV.py`: OpenCV图像处理功能
- `WordCloud.py`: 词云生成功能
- `Manim.py`: 数学动画功能
## 功能特性
### 1. 数据可视化 (Core.py)
#### 1.1 基础图表
- 折线图
- 柱状图
- 散点图
- 直方图
- 饼图
- 箱线图
- 热力图
- 分类数据图
- 联合图
#### 1.2 数据处理
- 缺失值处理
- 重复值处理
- 数据标准化
- 数据归一化
### 2. 图像处理 (OpenCV.py)
#### 2.1 图像操作
- 图像加载
- 支持多种格式jpg, png, bmp等
- 支持从文件路径或URL加载
- 支持从内存缓冲区加载
- 图像保存
- 支持多种格式输出
- 支持质量参数设置
- 支持压缩选项
- 图像显示
- 支持窗口标题设置
- 支持窗口大小调整
- 支持键盘事件处理
- 图像转换
- RGB转灰度
- RGB转HSV
- RGB转LAB
- 支持自定义转换矩阵
- 图像缩放
- 支持多种插值方法
- 支持保持宽高比
- 支持指定目标尺寸
- 图像旋转
- 支持任意角度旋转
- 支持旋转中心点设置
- 支持旋转后尺寸调整
- 图像翻转
- 水平翻转
- 垂直翻转
- 对角线翻转
- 图像合并
- 支持多图像拼接
- 支持透明度混合
- 支持蒙版处理
#### 2.2 ImageObject类详解
ImageObject类提供了完整的图像处理功能
```python
from Convention.Visual import OpenCV
# 创建图像对象
image = OpenCV.ImageObject("input.jpg")
# 基本属性
width = image.width # 图像宽度
height = image.height # 图像高度
channels = image.channels # 通道数
dtype = image.dtype # 数据类型
# 图像处理
image.resize_image(800, 600) # 调整大小
image.convert_to_grayscale() # 转换为灰度图
image.filter_gaussian((5, 5), 1.5, 1.5) # 高斯滤波
image.rotate_image(45) # 旋转45度
image.flip_image(horizontal=True) # 水平翻转
# 图像增强
image.adjust_brightness(1.2) # 调整亮度
image.adjust_contrast(1.5) # 调整对比度
image.adjust_saturation(0.8) # 调整饱和度
image.equalize_histogram() # 直方图均衡化
# 边缘检测
image.detect_edges(threshold1=100, threshold2=200) # Canny边缘检测
image.detect_contours() # 轮廓检测
# 特征提取
keypoints = image.detect_keypoints() # 关键点检测
descriptors = image.compute_descriptors() # 描述子计算
# 图像保存
image.save_image("output.jpg", quality=95) # 保存图像
image.save_image("output.png", compression=9) # 保存PNG
# 图像显示
image.show_image("预览") # 显示图像
image.wait_key(0) # 等待按键
# 图像信息
print(image.get_info()) # 获取图像信息
print(image.get_histogram()) # 获取直方图
```
#### 2.3 图像增强
- 边缘检测
- 滤波处理
- 阈值处理
- 形态学操作
- 轮廓检测
- 特征匹配
#### 2.4 视频处理
- 视频读取
- 视频写入
- 摄像头控制
- 帧处理
### 3. 词云生成 (WordCloud.py)
#### 3.1 词云功能
- 词云创建
- 标题设置
- 渲染输出
- 样式定制
### 4. 数学动画 (Manim.py)
#### 4.1 动画功能
- 数学公式动画
- 几何图形动画
- 图表动画
- 场景管理
## 使用示例
### 1. 数据可视化示例
```python
from Convention.Visual import Core
# 创建数据可视化生成器
generator = Core.data_visual_generator("data.csv")
# 绘制折线图
generator.plot_line("x", "y", title="折线图示例")
# 绘制柱状图
generator.plot_bar("category", "value", title="柱状图示例")
# 绘制散点图
generator.plot_scatter("x", "y", title="散点图示例")
# 绘制饼图
generator.plot_pie("category", title="饼图示例")
```
### 2. 图像处理示例
```python
from Convention.Visual import OpenCV
# 创建图像对象
image = OpenCV.ImageObject("input.jpg")
# 图像处理
image.resize_image(800, 600)
image.convert_to_grayscale()
image.filter_gaussian((5, 5), 1.5, 1.5)
# 保存图像
image.save_image("output.jpg")
```
### 3. 词云生成示例
```python
from Convention.Visual import WordCloud
# 创建词云
wordcloud = WordCloud.make_word_cloud("词云", [
("Python", 100),
("Java", 80),
("C++", 70),
("JavaScript", 90),
])
# 设置标题
WordCloud.set_title(wordcloud, "编程语言词云")
# 渲染输出
WordCloud.render_to(wordcloud, "wordcloud.html")
```
### 4. 视频处理示例
```python
from Convention.Visual import OpenCV
# 创建视频捕获对象
camera = OpenCV.light_cv_camera(0)
# 创建视频写入对象
writer = OpenCV.VideoWriterInstance(
"output.avi",
OpenCV.avi_with_Xvid_fourcc(),
30.0,
(640, 480)
)
# 录制视频
def stop_condition():
return OpenCV.is_current_key('q')
camera.recording(stop_condition, writer)
```
## 依赖项
- matplotlib: 数据可视化
- seaborn: 高级数据可视化
- opencv-python: 图像处理
- pyecharts: 词云生成
- manim: 数学动画
## 注意事项
1. 使用图像处理时注意内存占用
2. 视频处理时注意帧率设置
3. 词云生成时注意数据量
4. 动画制作时注意性能优化
## 性能优化
1. 使用图像处理时注意批量处理
2. 视频处理时使用合适的编码格式
3. 词云生成时控制词数
4. 动画制作时优化渲染设置
## 贡献指南
欢迎提交Issue和Pull Request来改进功能或添加新特性。

View File

@@ -1,66 +0,0 @@
from ..Internal import *
from pyecharts.charts import WordCloud
from pyecharts import options as opts
from pyecharts import types
#from ..File.Core import tool_file, UnWrapper as UnWrapper2Str
def make_word_cloud(
series_name: str,
data_pair: Sequence[Tuple[str, int]],
**kwargs,
):
wordcloud = WordCloud()
wordcloud.add(series_name, data_pair, **kwargs)
return wordcloud
def set_title(
wordcloud: WordCloud,
title: str
):
wordcloud.set_global_opts(
title_opts=opts.TitleOpts(title=title)
)
def render_to(
wordcloud: WordCloud,
file_name: Union[tool_file, str]
):
wordcloud.render(UnWrapper2Str(file_name))
class light_word_cloud(left_value_reference[WordCloud]):
def __init__(
self,
series_name: str,
data_pair: types.Sequence,
**kwargs,
):
super().__init__(make_word_cloud(series_name, data_pair, **kwargs))
def set_title(
self,
title: str
):
set_title(self.ref_value, title)
def render_to(
self,
file_name: Union[tool_file, str]
):
render_to(self.ref_value, file_name)
if __name__ == "__main__":
# 准备数据
wordcloud = make_word_cloud("", [
("Python", 100),
("Java", 80),
("C++", 70),
("JavaScript", 90),
("Go", 60),
("Rust", 50),
("C#", 40),
("PHP", 30),
("Swift", 20),
("Kotlin", 10),
], word_size_range=[20, 100])
set_title(wordcloud, "cloud")
render_to(wordcloud, "wordcloud.html")

View File

@@ -1,121 +0,0 @@
[返回](./Runtime-README.md)
# /Convention/Runtime/Web
---
网络工具模块提供HTTP客户端和URL操作功能
## ToolURL类
### 构造与基本信息
- `ToolURL(string url)` 从URL字符串创建对象
- `ToString()` / `GetFullURL()` / `FullURL` 获取完整URL
- `implicit operator string` 隐式字符串转换
### URL属性解析
- `GetFilename()` 获取URL中的文件名
- `GetExtension()` 获取文件扩展名
- `ExtensionIs(params string[] extensions)` 检查扩展名是否匹配
### URL验证
- `IsValid` 属性检查URL是否有效
- `ValidateURL()` 验证URL格式
- `implicit operator bool` 隐式布尔转换等同于IsValid
支持HTTP和HTTPS协议的绝对URL
### HTTP方法
#### GET请求
- `GetAsync(Action<HttpResponseMessage> callback)` 异步GET
- `Get(Action<HttpResponseMessage> callback)` 同步GET
#### POST请求
- `PostAsync(Action<HttpResponseMessage> callback, Dictionary<string, string> formData = null)` 异步POST
- `Post(Action<HttpResponseMessage> callback, Dictionary<string, string> formData = null)` 同步POST
支持表单数据提交
### 内容加载
#### 文本加载
- `LoadAsTextAsync()` 异步加载为文本
- `LoadAsText()` 同步加载为文本
#### 二进制加载
- `LoadAsBinaryAsync()` 异步加载为字节数组
- `LoadAsBinary()` 同步加载为字节数组
#### JSON加载
- `LoadAsJson<T>()` 同步加载并反序列化JSON
- `LoadAsJsonAsync<T>()` 异步加载并反序列化JSON
### 文件保存
- `Save(string localPath = null)` 自动选择格式保存到本地
- `SaveAsText(string localPath = null)` 保存为文本文件
- `SaveAsJson(string localPath = null)` 保存为JSON文件
- `SaveAsBinary(string localPath = null)` 保存为二进制文件
### 文件类型判断
- `IsText` 是否为文本文件txt, html, htm, css, js, xml, csv
- `IsJson` 是否为JSON文件
- `IsImage` 是否为图像文件jpg, jpeg, png, gif, bmp, svg
- `IsDocument` 是否为文档文件pdf, doc, docx, xls, xlsx, ppt, pptx
### 高级操作
- `Open(string url)` 在当前对象上打开新URL
- `DownloadAsync(string localPath = null)` 异步下载文件
- `Download(string localPath = null)` 同步下载文件
## 设计特点
### 统一的HTTP客户端
使用静态 `HttpClient` 实例,避免连接池耗尽
### 自动内容类型检测
基于文件扩展名自动判断内容类型,优化保存和处理策略
### 异步支持
所有网络操作都提供异步和同步两种版本
### 错误处理
网络请求失败时回调函数接收null参数方法返回false
### 文件管理集成
下载的文件自动转换为ToolFile对象与文件系统模块无缝集成
### 灵活的数据格式
支持文本、二进制、JSON等多种数据格式的加载和保存
## 使用示例
### 基本HTTP请求
```csharp
var url = new ToolURL("https://api.example.com/data");
if (url.IsValid)
{
url.Get(response => {
if (response != null && response.IsSuccessStatusCode)
{
// 处理响应
}
});
}
```
### 文件下载
```csharp
var url = new ToolURL("https://example.com/file.json");
var localFile = url.Download("./downloads/file.json");
if (localFile.Exists())
{
var data = localFile.LoadAsJson<MyDataType>();
}
```
### 类型安全的JSON加载
```csharp
var url = new ToolURL("https://api.example.com/users.json");
var users = url.LoadAsJson<List<User>>();
```

View File

@@ -1,9 +1,9 @@
import sys import sys
import os import os
from time import sleep
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from Convention.Runtime.File import * from Convention.Runtime.Config import *
PrintColorful(ConsoleFrontColor.RED, "Hello, World!")
first = ToolFile("E:/dev/")
second = ToolFile("/analyze/")
print(first|second)