Compare commits
10 Commits
81209e85ee
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 59dfd08c54 | |||
| 61df36626c | |||
| da56b12c22 | |||
| 4d6b0e1c28 | |||
| f98fd3b4c4 | |||
| ef266f4a17 | |||
| 48a8318fe7 | |||
| 11e1aa0f86 | |||
| 007db5a06b | |||
| c11469a108 |
862
Convention/Image/OpenCV.py
Normal file
862
Convention/Image/OpenCV.py
Normal file
@@ -0,0 +1,862 @@
|
||||
from ..Runtime.Config import *
|
||||
|
||||
try:
|
||||
import cv2 as cv2
|
||||
import cv2.data as cv2data
|
||||
from cv2.typing import *
|
||||
except ImportError as e:
|
||||
ImportingThrow(e, "OpenCV", ["opencv-python", "opencv-python-headless"])
|
||||
try:
|
||||
import numpy as np
|
||||
except ImportError as e:
|
||||
ImportingThrow(e, "OpenCV", ["numpy"])
|
||||
try:
|
||||
from PIL import ImageFile as ImageFile
|
||||
from PIL import Image as Image
|
||||
except ImportError as e:
|
||||
ImportingThrow(e, "OpenCV", ["pillow"])
|
||||
|
||||
from ..Runtime.File import ToolFile
|
||||
|
||||
_Unwrapper2Str = lambda x: str(x)
|
||||
_Wrapper2File = lambda x: ToolFile(x)
|
||||
|
||||
VideoWriter = cv2.VideoWriter
|
||||
def mp4_with_MPEG4_fourcc() -> int:
|
||||
return VideoWriter.fourcc(*"mp4v")
|
||||
def avi_with_Xvid_fourcc() -> int:
|
||||
return VideoWriter.fourcc(*"XVID")
|
||||
def avi_with_DivX_fourcc() -> int:
|
||||
return VideoWriter.fourcc(*"DIVX")
|
||||
def avi_with_MJPG_fourcc() -> int:
|
||||
return VideoWriter.fourcc(*"MJPG")
|
||||
def mp4_or_avi_with_H264_fourcc() -> int:
|
||||
return VideoWriter.fourcc(*"X264")
|
||||
def avi_with_H265_fourcc() -> int:
|
||||
return VideoWriter.fourcc(*"H264")
|
||||
def wmv_with_WMV1_fourcc() -> int:
|
||||
return VideoWriter.fourcc(*"WMV1")
|
||||
def wmv_with_WMV2_fourcc() -> int:
|
||||
return VideoWriter.fourcc(*"WMV2")
|
||||
def oggTheora_with_THEO_fourcc() -> int:
|
||||
return VideoWriter.fourcc(*"THEO")
|
||||
def flv_with_FLV1_fourcc() -> int:
|
||||
return VideoWriter.fourcc(*"FLV1")
|
||||
class VideoWriterInstance(VideoWriter):
|
||||
def __init__(
|
||||
self,
|
||||
file_name: Union[ToolFile, str],
|
||||
fourcc: int,
|
||||
fps: float,
|
||||
frame_size: tuple[int, int],
|
||||
is_color: bool = True
|
||||
):
|
||||
super().__init__(_Unwrapper2Str(file_name), fourcc, fps, frame_size, is_color)
|
||||
def __del__(self):
|
||||
self.release()
|
||||
|
||||
def wait_key(delay:int):
|
||||
return cv2.waitKey(delay)
|
||||
def until_esc():
|
||||
return wait_key(0)
|
||||
|
||||
def is_current_key(key:str, *, wait_delay:int = 1):
|
||||
return wait_key(wait_delay) & 0xFF == ord(key[0])
|
||||
|
||||
class BasicViewable:
|
||||
def __init__(self, filename_or_index:Union[str, ToolFile, int]):
|
||||
self._capture: cv2.VideoCapture = None
|
||||
self.stats: bool = True
|
||||
self.Retarget(filename_or_index)
|
||||
def __del__(self):
|
||||
self.Release()
|
||||
|
||||
def __bool__(self):
|
||||
return self.stats
|
||||
|
||||
def IsOpened(self):
|
||||
return self._capture.isOpened()
|
||||
|
||||
def Release(self):
|
||||
if self._capture is not None:
|
||||
self._capture.release()
|
||||
def Retarget(self, filename_or_index:Union[str, ToolFile, int]):
|
||||
self.Release()
|
||||
if isinstance(filename_or_index, int):
|
||||
self._capture = cv2.VideoCapture(filename_or_index)
|
||||
else:
|
||||
self._capture = cv2.VideoCapture(_Unwrapper2Str(filename_or_index))
|
||||
return self
|
||||
|
||||
def NextFrame(self) -> MatLike:
|
||||
self.stats, frame =self._capture.read()
|
||||
if self.stats:
|
||||
return frame
|
||||
else:
|
||||
return None
|
||||
|
||||
def GetCaptrueInfo(self, id:int):
|
||||
return self._capture.get(id)
|
||||
def GetPropPosMsec(self):
|
||||
return self.GetCaptrueInfo(0)
|
||||
def GetPropPosFrames(self):
|
||||
return self.GetCaptrueInfo(1)
|
||||
def GetPropAviRatio(self):
|
||||
return self.GetCaptrueInfo(2)
|
||||
def GetPropFrameWidth(self):
|
||||
return self.GetCaptrueInfo(3)
|
||||
def GetPropFrameHeight(self):
|
||||
return self.GetCaptrueInfo(4)
|
||||
def GetPropFPS(self):
|
||||
return self.GetCaptrueInfo(5)
|
||||
def GetPropFourcc(self):
|
||||
return self.GetCaptrueInfo(6)
|
||||
def GetPropFrameCount(self):
|
||||
return self.GetCaptrueInfo(7)
|
||||
def GetPropFormat(self):
|
||||
return self.GetCaptrueInfo(8)
|
||||
def GetPropMode(self):
|
||||
return self.GetCaptrueInfo(9)
|
||||
def GetPropBrightness(self):
|
||||
return self.GetCaptrueInfo(10)
|
||||
def GetPropContrast(self):
|
||||
return self.GetCaptrueInfo(11)
|
||||
def GetPropSaturation(self):
|
||||
return self.GetCaptrueInfo(12)
|
||||
def GetPropHue(self):
|
||||
return self.GetCaptrueInfo(13)
|
||||
def GetPropGain(self):
|
||||
return self.GetCaptrueInfo(14)
|
||||
def GetPropExposure(self):
|
||||
return self.GetCaptrueInfo(15)
|
||||
def GetPropConvertRGB(self):
|
||||
return self.GetCaptrueInfo(16)
|
||||
|
||||
def SetupCapture(self, id:int, value):
|
||||
self._capture.set(id, value)
|
||||
return self
|
||||
def SetPropPosMsec(self, value:int):
|
||||
return self.SetupCapture(0, value)
|
||||
def SetPropPosFrames(self, value:int):
|
||||
return self.SetupCapture(1, value)
|
||||
def SetPropAviRatio(self, value:float):
|
||||
return self.SetupCapture(2, value)
|
||||
def SetPropFrameWidth(self, value:int):
|
||||
return self.SetupCapture(3, value)
|
||||
def SetPropFrameHeight(self, value:int):
|
||||
return self.SetupCapture(4, value)
|
||||
def SetPropFPS(self, value:int):
|
||||
return self.SetupCapture(5, value)
|
||||
def SetPropFourcc(self, value):
|
||||
return self.SetupCapture(6, value)
|
||||
def SetPropFrameCount(self, value):
|
||||
return self.SetupCapture(7, value)
|
||||
def SetPropFormat(self, value):
|
||||
return self.SetupCapture(8, value)
|
||||
def SetPropMode(self, value):
|
||||
return self.SetupCapture(9, value)
|
||||
def SetPropBrightness(self, value):
|
||||
return self.SetupCapture(10, value)
|
||||
def SetPropContrast(self, value):
|
||||
return self.SetupCapture(11, value)
|
||||
def SetPropSaturation(self, value):
|
||||
return self.SetupCapture(12, value)
|
||||
def SetPropHue(self, value):
|
||||
return self.SetupCapture(13, value)
|
||||
def SetPropGain(self, value):
|
||||
return self.SetupCapture(14, value)
|
||||
def SetPropExposure(self, value):
|
||||
return self.SetupCapture(15, value)
|
||||
def SetPropConvertRGB(self, value:int):
|
||||
return self.SetupCapture(16, value)
|
||||
def SetPropRectification(self, value:int):
|
||||
return self.SetupCapture(17, value)
|
||||
|
||||
@property
|
||||
def FrameSize(self) -> Tuple[float, float]:
|
||||
return self.GetPropFrameWidth(), self.GetPropFrameHeight()
|
||||
|
||||
class BasicCamera(BasicViewable):
|
||||
def __init__(self, index:int = 0):
|
||||
self.writer: VideoWriter = None
|
||||
super().__init__(int(index))
|
||||
|
||||
@override
|
||||
def Release(self):
|
||||
super().Release()
|
||||
if self.writer is not None:
|
||||
self.writer.release()
|
||||
|
||||
def CurrentFrame(self):
|
||||
return self.NextFrame()
|
||||
|
||||
def recording(
|
||||
self,
|
||||
stop_pr: Callable[[], bool],
|
||||
writer: VideoWriter,
|
||||
):
|
||||
self.writer = writer
|
||||
while self.IsOpened():
|
||||
if stop_pr():
|
||||
break
|
||||
frame = self.CurrentFrame()
|
||||
cv2.imshow("__recording__", frame)
|
||||
writer.write(frame)
|
||||
cv2.destroyWindow("__recording__")
|
||||
return self
|
||||
|
||||
class ImageObject:
|
||||
def __init__(
|
||||
self,
|
||||
image: Optional[Union[
|
||||
str,
|
||||
Self,
|
||||
BasicCamera,
|
||||
ToolFile,
|
||||
MatLike,
|
||||
np.ndarray,
|
||||
ImageFile.ImageFile,
|
||||
Image.Image
|
||||
]],
|
||||
flags: int = -1):
|
||||
self.__image: MatLike = None
|
||||
self.__camera: BasicCamera = None
|
||||
self.current: MatLike = None
|
||||
if isinstance(image, BasicCamera):
|
||||
self.lock_from_camera(image)
|
||||
else:
|
||||
self.load_image(image, flags)
|
||||
|
||||
@property
|
||||
def camera(self) -> BasicCamera:
|
||||
if self.__camera is None or self.__camera.IsOpened() is False:
|
||||
return None
|
||||
else:
|
||||
return self.__camera
|
||||
@property
|
||||
def image(self) -> MatLike:
|
||||
if self.current is not None:
|
||||
return self.current
|
||||
elif self.camera is None:
|
||||
return self.__image
|
||||
else:
|
||||
return self.__camera.CurrentFrame()
|
||||
|
||||
@image.setter
|
||||
def image(self, image: Optional[Union[
|
||||
str,
|
||||
Self,
|
||||
ToolFile,
|
||||
MatLike,
|
||||
np.ndarray,
|
||||
ImageFile.ImageFile,
|
||||
Image.Image
|
||||
]]):
|
||||
self.load_image(image)
|
||||
|
||||
def load_from_nparray(
|
||||
self,
|
||||
array_: np.ndarray,
|
||||
code: int = cv2.COLOR_RGB2BGR,
|
||||
*args, **kwargs
|
||||
):
|
||||
self.__image = cv2.cvtColor(array_, code, *args, **kwargs)
|
||||
return self
|
||||
def load_from_PIL_image(
|
||||
self,
|
||||
image: Image.Image,
|
||||
code: int = cv2.COLOR_RGB2BGR,
|
||||
*args, **kwargs
|
||||
):
|
||||
self.load_from_nparray(np.array(image), code, *args, **kwargs)
|
||||
return self
|
||||
def load_from_PIL_ImageFile(
|
||||
self,
|
||||
image: ImageFile.ImageFile,
|
||||
rect: Optional[Tuple[float, float, float, float]] = None
|
||||
):
|
||||
return self.load_from_PIL_image(image.crop(rect))
|
||||
def load_from_cv2_image(self, image: MatLike):
|
||||
self.__image = image
|
||||
return self
|
||||
def lock_from_camera(self, camera: BasicCamera):
|
||||
self.__camera = camera
|
||||
return self
|
||||
|
||||
@property
|
||||
def dimension(self) -> int:
|
||||
return self.image.ndim
|
||||
|
||||
@property
|
||||
def shape(self) -> Tuple[int, int, int]:
|
||||
'''height, width, depth'''
|
||||
return self.image.shape
|
||||
@property
|
||||
def height(self) -> int:
|
||||
return self.shape[0]
|
||||
@property
|
||||
def width(self) -> int:
|
||||
return self.shape[1]
|
||||
|
||||
def is_enable(self):
|
||||
return self.image is not None
|
||||
def is_invalid(self):
|
||||
return self.is_enable() is False
|
||||
def __bool__(self):
|
||||
return self.is_enable()
|
||||
def __MatLike__(self):
|
||||
return self.image
|
||||
|
||||
def load_image(
|
||||
self,
|
||||
image: Optional[Union[
|
||||
str,
|
||||
ToolFile,
|
||||
Self,
|
||||
MatLike,
|
||||
np.ndarray,
|
||||
ImageFile.ImageFile,
|
||||
Image.Image
|
||||
]],
|
||||
flags: int = -1
|
||||
):
|
||||
"""加载图片"""
|
||||
if image is None:
|
||||
self.__image = None
|
||||
return self
|
||||
elif isinstance(image, type(self)):
|
||||
self.__image = image.image
|
||||
elif isinstance(image, MatLike):
|
||||
self.__image = image
|
||||
elif isinstance(image, np.ndarray):
|
||||
self.load_from_nparray(image, flags)
|
||||
elif isinstance(image, ImageFile.ImageFile):
|
||||
self.load_from_PIL_ImageFile(image, flags)
|
||||
elif isinstance(image, Image.Image):
|
||||
self.load_from_PIL_image(image, flags)
|
||||
else:
|
||||
self.__image = cv2.imread(_Unwrapper2Str(image), flags)
|
||||
return self
|
||||
def save_image(self, save_path:Union[str, ToolFile], is_path_must_exist = False):
|
||||
"""保存图片"""
|
||||
if is_path_must_exist:
|
||||
_Wrapper2File(save_path).try_create_parent_path()
|
||||
if self.is_enable():
|
||||
cv2.imwrite(_Unwrapper2Str(save_path), self.image)
|
||||
return self
|
||||
|
||||
def show_image(
|
||||
self,
|
||||
window_name: str = "Image",
|
||||
delay: Union[int,str] = 0,
|
||||
image_show_func: Callable[[Self], None] = None,
|
||||
*args, **kwargs
|
||||
):
|
||||
"""显示图片"""
|
||||
if self.is_invalid():
|
||||
return self
|
||||
if self.camera is not None:
|
||||
while (wait_key(1) & 0xFF != ord(str(delay)[0])) and self.camera is not None:
|
||||
# dont delete this line, self.image is camera flame now, see<self.current = None>
|
||||
self.current = self.image
|
||||
if image_show_func is not None:
|
||||
image_show_func(self)
|
||||
if self.current is not None:
|
||||
cv2.imshow(window_name, self.current)
|
||||
# dont delete this line, see property<image>
|
||||
self.current = None
|
||||
else:
|
||||
cv2.imshow(window_name, self.image)
|
||||
cv2.waitKey(delay = int(delay), *args, **kwargs)
|
||||
if cv2.getWindowProperty(window_name, cv2.WND_PROP_VISIBLE) > 0:
|
||||
cv2.destroyWindow(window_name)
|
||||
return self
|
||||
|
||||
# 分离通道
|
||||
def split(self):
|
||||
"""分离通道"""
|
||||
return cv2.split(self.image)
|
||||
def split_to_image_object(self):
|
||||
"""分离通道"""
|
||||
return [ImageObject(channel) for channel in self.split()]
|
||||
@property
|
||||
def channels(self):
|
||||
return self.split()
|
||||
@property
|
||||
def blue_channel(self):
|
||||
return self.channels[0]
|
||||
@property
|
||||
def green_channel(self):
|
||||
return self.channels[1]
|
||||
@property
|
||||
def red_channel(self):
|
||||
return self.channels[2]
|
||||
@property
|
||||
def alpha_channel(self):
|
||||
return self.channels[3]
|
||||
def get_blue_image(self):
|
||||
return ImageObject(self.blue_channel)
|
||||
def get_green_image(self):
|
||||
return ImageObject(self.green_channel)
|
||||
def get_red_image(self):
|
||||
return ImageObject(self.red_channel)
|
||||
def get_alpha_image(self):
|
||||
return ImageObject(self.alpha_channel)
|
||||
|
||||
# 混合通道
|
||||
def merge_channels_from_list(self, channels:List[MatLike]):
|
||||
"""合并通道"""
|
||||
self.image = cv2.merge(channels)
|
||||
return self
|
||||
def merge_channels(self, blue:MatLike, green:MatLike, red:MatLike):
|
||||
"""合并通道"""
|
||||
return self.merge_channels_from_list([blue, green, red])
|
||||
def merge_channel_list(self, bgr:List[MatLike]):
|
||||
"""合并通道"""
|
||||
return self.merge_channels_from_list(bgr)
|
||||
|
||||
# Transform
|
||||
def get_resize_image(self, width:int, height:int):
|
||||
if self.is_enable():
|
||||
return cv2.resize(self.image, (width, height))
|
||||
return None
|
||||
def get_rotate_image(self, angle:float):
|
||||
if self.is_invalid():
|
||||
return None
|
||||
(h, w) = self.image.shape[:2]
|
||||
center = (w // 2, h // 2)
|
||||
M = cv2.getRotationMatrix2D(center, angle, 1.0)
|
||||
return cv2.warpAffine(self.image, M, (w, h))
|
||||
def resize_image(self, width:int, height:int):
|
||||
"""调整图片大小"""
|
||||
new_image = self.get_resize_image(width, height)
|
||||
if new_image is not None:
|
||||
self.image = new_image
|
||||
return self
|
||||
def rotate_image(self, angle:float):
|
||||
"""旋转图片"""
|
||||
new_image = self.get_rotate_image(angle)
|
||||
if new_image is not None:
|
||||
self.image = new_image
|
||||
return self
|
||||
|
||||
# 图片翻折
|
||||
def flip(self, flip_code:int):
|
||||
"""翻转图片"""
|
||||
if self.is_enable():
|
||||
self.image = cv2.flip(self.image, flip_code)
|
||||
return self
|
||||
def horizon_flip(self):
|
||||
"""水平翻转图片"""
|
||||
return self.flip(1)
|
||||
def vertical_flip(self):
|
||||
"""垂直翻转图片"""
|
||||
return self.flip(0)
|
||||
def both_flip(self):
|
||||
"""双向翻转图片"""
|
||||
return self.flip(-1)
|
||||
|
||||
# 色彩空间猜测
|
||||
def guess_color_space(self) -> Optional[str]:
|
||||
"""猜测色彩空间"""
|
||||
if self.is_invalid():
|
||||
return None
|
||||
image = self.image
|
||||
# 计算每个通道的像素值分布
|
||||
hist_b = cv2.calcHist([image], [0], None, [256], [0, 256])
|
||||
hist_g = cv2.calcHist([image], [1], None, [256], [0, 256])
|
||||
hist_r = cv2.calcHist([image], [2], None, [256], [0, 256])
|
||||
|
||||
# 计算每个通道的像素值总和
|
||||
sum_b = np.sum(hist_b)
|
||||
sum_g = np.sum(hist_g)
|
||||
sum_r = np.sum(hist_r)
|
||||
|
||||
# 根据像素值总和判断色彩空间
|
||||
if sum_b > sum_g and sum_b > sum_r:
|
||||
#print("The image might be in BGR color space.")
|
||||
return "BGR"
|
||||
elif sum_g > sum_b and sum_g > sum_r:
|
||||
#print("The image might be in GRAY color space.")
|
||||
return "GRAY"
|
||||
else:
|
||||
#print("The image might be in RGB color space.")
|
||||
return "RGB"
|
||||
|
||||
# 颜色转化
|
||||
def get_convert(self, color_convert:int):
|
||||
"""颜色转化"""
|
||||
if self.is_invalid():
|
||||
return None
|
||||
return cv2.cvtColor(self.image, color_convert)
|
||||
def convert_to(self, color_convert:int):
|
||||
"""颜色转化"""
|
||||
if self.is_invalid():
|
||||
return None
|
||||
self.image = self.get_convert(color_convert)
|
||||
|
||||
def is_grayscale(self):
|
||||
return self.dimension == 2
|
||||
def get_grayscale(self):
|
||||
if self.is_invalid():
|
||||
return None
|
||||
return cv2.cvtColor(self.image, cv2.COLOR_BGR2GRAY)
|
||||
def convert_to_grayscale(self):
|
||||
"""将图片转换为灰度图"""
|
||||
self.image = self.get_grayscale()
|
||||
return self
|
||||
|
||||
def get_convert_flag(
|
||||
self,
|
||||
targetColorTypeName:Literal[
|
||||
"BGR", "RGB", "GRAY", "YCrCb"
|
||||
]
|
||||
) -> Optional[int]:
|
||||
"""获取颜色转化标志"""
|
||||
flag = self.guess_color_space()
|
||||
if flag is None:
|
||||
return None
|
||||
|
||||
if targetColorTypeName == "BGR":
|
||||
if flag == "RGB":
|
||||
return cv2.COLOR_RGB2BGR
|
||||
elif flag == "GRAY":
|
||||
return cv2.COLOR_GRAY2BGR
|
||||
elif flag == "YCrCb":
|
||||
return cv2.COLOR_YCrCb2BGR
|
||||
elif targetColorTypeName == "RGB":
|
||||
if flag == "BGR":
|
||||
return cv2.COLOR_BGR2RGB
|
||||
elif flag == "GRAY":
|
||||
return cv2.COLOR_GRAY2RGB
|
||||
elif flag == "YCrCb":
|
||||
return cv2.COLOR_YCrCb2RGB
|
||||
elif targetColorTypeName == "GRAY":
|
||||
if flag == "RGB":
|
||||
return cv2.COLOR_RGB2GRAY
|
||||
elif flag == "RGB":
|
||||
return cv2.COLOR_BGR2GRAY
|
||||
return None
|
||||
|
||||
# 原址裁切
|
||||
def sub_image(self, x:int, y:int ,width:int ,height:int):
|
||||
"""裁剪图片"""
|
||||
if self.is_invalid():
|
||||
return self
|
||||
self.image = self.image[y:y+height, x:x+width]
|
||||
return self
|
||||
|
||||
# 直方图
|
||||
def equalizeHist(self, is_cover = False) -> MatLike:
|
||||
"""直方图均衡化"""
|
||||
if self.is_invalid():
|
||||
return self
|
||||
result:MatLike = cv2.equalizeHist(self.image)
|
||||
if is_cover:
|
||||
self.image = result
|
||||
return result
|
||||
def calcHist(
|
||||
self,
|
||||
channel: Union[List[int], int],
|
||||
mask: Optional[MatLike] = None,
|
||||
hist_size: Sequence[int] = [256],
|
||||
ranges: Sequence[float] = [0, 256]
|
||||
) -> MatLike:
|
||||
"""计算直方图"""
|
||||
if self.is_invalid():
|
||||
return None
|
||||
return cv2.calcHist(
|
||||
[self.image],
|
||||
channel if isinstance(channel, list) else [channel],
|
||||
mask,
|
||||
hist_size,
|
||||
ranges)
|
||||
|
||||
# 子集操作
|
||||
def sub_image_with_rect(self, rect:Tuple[float, float, float, float]):
|
||||
"""裁剪图片"""
|
||||
if self.is_invalid():
|
||||
return self
|
||||
self.image = self.image[rect[1]:rect[1]+rect[3], rect[0]:rect[0]+rect[2]]
|
||||
return self
|
||||
def sub_image_with_box(self, box:Tuple[float, float, float, float]):
|
||||
"""裁剪图片"""
|
||||
if self.is_invalid():
|
||||
return self
|
||||
self.image = self.image[box[1]:box[3], box[0]:box[2]]
|
||||
return self
|
||||
def sub_cover_with_rect(self, image:Union[Self, MatLike], rect:Tuple[float, float, float, float]):
|
||||
"""覆盖图片"""
|
||||
if self.is_invalid():
|
||||
raise ValueError("Real Image is none")
|
||||
if isinstance(image, MatLike):
|
||||
image = ImageObject(image)
|
||||
self.image[rect[1]:rect[1]+rect[3], rect[0]:rect[0]+rect[2]] = image.image
|
||||
return self
|
||||
def sub_cover_with_box(self, image:Union[Self, MatLike], box:Tuple[float, float, float, float]):
|
||||
"""覆盖图片"""
|
||||
if self.is_invalid():
|
||||
raise ValueError("Real Image is none")
|
||||
if isinstance(image, MatLike):
|
||||
image = ImageObject(image)
|
||||
self.image[box[1]:box[3], box[0]:box[2]] = image.image
|
||||
return self
|
||||
|
||||
def operator_cv(self, func:Callable[[MatLike], Any], *args, **kwargs):
|
||||
func(self.image, *args, **kwargs)
|
||||
return self
|
||||
|
||||
def stack(self, *args:Self, **kwargs) -> Self:
|
||||
images = [ image for image in args]
|
||||
images.append(self)
|
||||
return ImageObject(np.stack([np.uint8(image.image) for image in images], *args, **kwargs))
|
||||
def vstack(self, *args:Self) -> Self:
|
||||
images = [ image for image in args]
|
||||
images.append(self)
|
||||
return ImageObject(np.vstack([np.uint8(image.image) for image in images]))
|
||||
def hstack(self, *args:Self) -> Self:
|
||||
images = [ image for image in args]
|
||||
images.append(self)
|
||||
return ImageObject(np.hstack([np.uint8(image.image) for image in images]))
|
||||
|
||||
def merge_with_blending(self, other:Self, weights:Tuple[float, float]):
|
||||
return ImageObject(cv2.addWeighted(self.image, weights[0], other.image, weights[1], 0))
|
||||
|
||||
def add(self, image_or_value:Union[Self, int]):
|
||||
if isinstance(image_or_value, int):
|
||||
self.image = cv2.add(self.image, image_or_value)
|
||||
else:
|
||||
self.image = cv2.add(self.image, image_or_value.image)
|
||||
return self
|
||||
def __add__(self, image_or_value:Union[Self, int]):
|
||||
return ImageObject(self.image.copy()).add(image_or_value)
|
||||
def subtract(self, image_or_value:Union[Self, int]):
|
||||
if isinstance(image_or_value, int):
|
||||
self.image = cv2.subtract(self.image, image_or_value)
|
||||
else:
|
||||
self.image = cv2.subtract(self.image, image_or_value.image)
|
||||
return self
|
||||
def __sub__(self, image_or_value:Union[Self, int]):
|
||||
return ImageObject(self.image.copy()).subtract(image_or_value)
|
||||
def multiply(self, image_or_value:Union[Self, int]):
|
||||
if isinstance(image_or_value, int):
|
||||
self.image = cv2.multiply(self.image, image_or_value)
|
||||
else:
|
||||
self.image = cv2.multiply(self.image, image_or_value.image)
|
||||
return self
|
||||
def __mul__(self, image_or_value:Union[Self, int]):
|
||||
return ImageObject(self.image.copy()).multiply(image_or_value)
|
||||
def divide(self, image_or_value:Union[Self, int]):
|
||||
if isinstance(image_or_value, int):
|
||||
self.image = cv2.divide(self.image, image_or_value)
|
||||
else:
|
||||
self.image = cv2.divide(self.image, image_or_value.image)
|
||||
return self
|
||||
def __truediv__(self, image_or_value:Union[Self, int]):
|
||||
return ImageObject(self.image.copy()).divide(image_or_value)
|
||||
def bitwise_and(self, image_or_value:Union[Self, int]):
|
||||
if isinstance(image_or_value, int):
|
||||
self.image = cv2.bitwise_and(self.image, image_or_value)
|
||||
else:
|
||||
self.image = cv2.bitwise_and(self.image, image_or_value.image)
|
||||
return self
|
||||
def bitwise_or(self, image_or_value:Union[Self, int]):
|
||||
if isinstance(image_or_value, int):
|
||||
self.image = cv2.bitwise_or(self.image, image_or_value)
|
||||
else:
|
||||
self.image = cv2.bitwise_or(self.image, image_or_value.image)
|
||||
return self
|
||||
def bitwise_xor(self, image_or_value:Union[Self]):
|
||||
if isinstance(image_or_value, int):
|
||||
self.image = cv2.bitwise_xor(self.image, image_or_value)
|
||||
else:
|
||||
self.image = cv2.bitwise_xor(self.image, image_or_value.image)
|
||||
return self
|
||||
def bitwise_not(self):
|
||||
self.image = cv2.bitwise_not(self.image)
|
||||
return self
|
||||
def __neg__(self):
|
||||
return ImageObject(self.image.copy()).bitwise_not()
|
||||
|
||||
class NoiseImageObject(ImageObject):
|
||||
def __init__(
|
||||
self,
|
||||
height: int,
|
||||
weight: int,
|
||||
*,
|
||||
mean: float = 0,
|
||||
sigma: float = 25,
|
||||
dtype = np.uint8
|
||||
):
|
||||
super().__init__(NoiseImageObject.get_new_noise(
|
||||
None, height, weight, mean=mean, sigma=sigma, dtype=dtype
|
||||
))
|
||||
|
||||
@classmethod
|
||||
def get_new_noise(
|
||||
raw_image: Optional[MatLike],
|
||||
height: int,
|
||||
weight: int,
|
||||
*,
|
||||
mean: float = 0,
|
||||
sigma: float = 25,
|
||||
dtype = np.uint8
|
||||
) -> MatLike:
|
||||
noise = raw_image
|
||||
if noise is None:
|
||||
noise = np.zeros((height, weight), dtype=dtype)
|
||||
cv2.randn(noise, mean, sigma)
|
||||
return cv2.cvtColor(noise, cv2.COLOR_GRAY2BGR)
|
||||
|
||||
def Unwrapper(image:Optional[Union[
|
||||
str,
|
||||
ImageObject,
|
||||
ToolFile,
|
||||
MatLike,
|
||||
np.ndarray,
|
||||
ImageFile.ImageFile,
|
||||
Image.Image
|
||||
]]) -> MatLike:
|
||||
return image.image if isinstance(image, ImageObject) else ImageObject(image).image
|
||||
|
||||
def Wrapper(image:Optional[Union[
|
||||
str,
|
||||
ImageObject,
|
||||
ToolFile,
|
||||
MatLike,
|
||||
np.ndarray,
|
||||
ImageFile.ImageFile,
|
||||
Image.Image
|
||||
]]) -> ImageObject:
|
||||
return ImageObject(image)
|
||||
|
||||
class light_cv_window:
|
||||
def __init__(self, name:str):
|
||||
self.__my_window_name = name
|
||||
cv2.namedWindow(self.__my_window_name)
|
||||
def __del__(self):
|
||||
self.destroy()
|
||||
|
||||
def show_image(self, image:Union[ImageObject, MatLike]):
|
||||
if self.__my_window_name is None:
|
||||
self.__my_window_name = "window"
|
||||
if isinstance(image, ImageObject):
|
||||
image = image.image
|
||||
cv2.imshow(self.__my_window_name, image)
|
||||
return self
|
||||
def destroy(self):
|
||||
if self.__my_window_name is not None and cv2.getWindowProperty(self.__my_window_name, cv2.WND_PROP_VISIBLE) > 0:
|
||||
cv2.destroyWindow(self.__my_window_name)
|
||||
return self
|
||||
|
||||
@property
|
||||
def window_rect(self):
|
||||
return cv2.getWindowImageRect(self.__my_window_name)
|
||||
@window_rect.setter
|
||||
def window_rect(self, rect:Tuple[float, float, float, float]):
|
||||
self.set_window_rect(rect[0], rect[1], rect[2], rect[3])
|
||||
|
||||
def set_window_size(self, weight:int, height:int):
|
||||
cv2.resizeWindow(self.__my_window_name, weight, height)
|
||||
return self
|
||||
def get_window_size(self) -> Tuple[float, float]:
|
||||
rect = self.window_rect
|
||||
return rect[2], rect[3]
|
||||
|
||||
def get_window_property(self, prop_id:int):
|
||||
return cv2.getWindowProperty(self.__my_window_name, prop_id)
|
||||
def set_window_property(self, prop_id:int, prop_value:int):
|
||||
cv2.setWindowProperty(self.__my_window_name, prop_id, prop_value)
|
||||
return self
|
||||
def get_prop_frame_width(self):
|
||||
return self.window_rect[2]
|
||||
def get_prop_frame_height(self):
|
||||
return self.window_rect[3]
|
||||
def is_full_window(self):
|
||||
return cv2.getWindowProperty(self.__my_window_name, cv2.WINDOW_FULLSCREEN) > 0
|
||||
def set_full_window(self):
|
||||
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_FULLSCREEN, 1)
|
||||
return self
|
||||
def set_normal_window(self):
|
||||
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_FULLSCREEN, 0)
|
||||
return self
|
||||
def is_using_openGL(self):
|
||||
return cv2.getWindowProperty(self.__my_window_name, cv2.WINDOW_OPENGL) > 0
|
||||
def set_using_openGL(self):
|
||||
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_OPENGL, 1)
|
||||
return self
|
||||
def set_not_using_openGL(self):
|
||||
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_OPENGL, 0)
|
||||
return self
|
||||
def is_autosize(self):
|
||||
return cv2.getWindowProperty(self.__my_window_name, cv2.WINDOW_AUTOSIZE) > 0
|
||||
def set_autosize(self):
|
||||
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_AUTOSIZE, 1)
|
||||
return self
|
||||
def set_not_autosize(self):
|
||||
cv2.setWindowProperty(self.__my_window_name, cv2.WINDOW_AUTOSIZE, 0)
|
||||
return self
|
||||
|
||||
def set_window_rect(self, x:int, y:int, weight:int, height:int):
|
||||
cv2.moveWindow(self.__my_window_name, x, y)
|
||||
return self.set_window_size(weight, height)
|
||||
|
||||
def set_window_pos(self, x:int, y:int):
|
||||
cv2.moveWindow(self.__my_window_name, x, y)
|
||||
return self
|
||||
|
||||
def wait_key(self, wait_time:int=0):
|
||||
return cv2.waitKey(wait_time)
|
||||
|
||||
def get_haarcascade_frontalface(name_or_default:Optional[str]=None):
|
||||
if name_or_default is None:
|
||||
name_or_default = "haarcascade_frontalface_default"
|
||||
return cv2.CascadeClassifier(cv2data.haarcascades+'haarcascade_frontalface_default.xml')
|
||||
|
||||
def detect_human_face(
|
||||
image: ImageObject,
|
||||
detecter: cv2.CascadeClassifier,
|
||||
scaleFactor: float = 1.1,
|
||||
minNeighbors: int = 4,
|
||||
*args, **kwargs):
|
||||
'''return is Rect[]'''
|
||||
return detecter.detectMultiScale(image.image, scaleFactor, minNeighbors, *args, **kwargs)
|
||||
|
||||
class internal_detect_faces_oop(Callable[[ImageObject], None]):
|
||||
def __init__(self):
|
||||
self.face_cascade = get_haarcascade_frontalface()
|
||||
def __call__(self, image:ImageObject):
|
||||
gray = image.convert_to_grayscale()
|
||||
faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
|
||||
for (x,y,w,h) in faces:
|
||||
image.operator_cv(cv2.rectangle,(x,y),(x+w,y+h),(255,0,0),2)
|
||||
|
||||
def easy_detect_faces(camera:BasicCamera):
|
||||
ImageObject(camera).show_image("window", 'q', internal_detect_faces_oop())
|
||||
|
||||
# 示例使用
|
||||
if __name__ == "__main__":
|
||||
img_obj = ImageObject("path/to/your/image.jpg")
|
||||
img_obj.show_image()
|
||||
img_obj.resize_image(800, 600)
|
||||
img_obj.rotate_image(45)
|
||||
img_obj.convert_to_grayscale()
|
||||
img_obj.save_image("path/to/save/image.jpg")
|
||||
|
||||
# Override tool_file to tool_file_ex
|
||||
|
||||
class tool_file_cvex(ToolFile):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
@override
|
||||
def load_as_image(self) -> ImageObject:
|
||||
self.data = ImageObject(self.get_path())
|
||||
return self.data
|
||||
|
||||
@override
|
||||
def save(self, path = None):
|
||||
image:ImageObject = self.data
|
||||
image.save_image(path if path is not None else self.get_path())
|
||||
return self
|
||||
|
||||
0
Convention/Image/init.py
Normal file
0
Convention/Image/init.py
Normal file
@@ -5,35 +5,132 @@ import sys
|
||||
import threading
|
||||
import traceback
|
||||
import datetime
|
||||
try:
|
||||
from colorama import Fore as ConsoleFrontColor, Back as ConsoleBackgroundColor, Style as ConsoleStyle
|
||||
except:
|
||||
print("colorama is not installed")
|
||||
class ConsoleFrontColor:
|
||||
RED = ""
|
||||
GREEN = ""
|
||||
YELLOW = ""
|
||||
BLUE = ""
|
||||
MAGENTA = ""
|
||||
CYAN = ""
|
||||
WHITE = ""
|
||||
RESET = ""
|
||||
class ConsoleBackgroundColor:
|
||||
RED = ""
|
||||
GREEN = ""
|
||||
YELLOW = ""
|
||||
BLUE = ""
|
||||
MAGENTA = ""
|
||||
CYAN = ""
|
||||
WHITE = ""
|
||||
RESET = ""
|
||||
class ConsoleStyle:
|
||||
RESET = ""
|
||||
BOLD = ""
|
||||
DIM = ""
|
||||
UNDERLINE = ""
|
||||
REVERSE = ""
|
||||
HIDDEN = ""
|
||||
|
||||
# region ansi colorful
|
||||
|
||||
# Copyright Jonathan Hartley 2013. BSD 3-Clause license
|
||||
'''
|
||||
This module generates ANSI character codes to printing colors to terminals.
|
||||
See: http://en.wikipedia.org/wiki/ANSI_escape_code
|
||||
'''
|
||||
|
||||
CSI = '\033['
|
||||
OSC = '\033]'
|
||||
BEL = '\a'
|
||||
|
||||
|
||||
def code_to_chars(code):
|
||||
return CSI + str(code) + 'm'
|
||||
|
||||
def set_title(title):
|
||||
return OSC + '2;' + title + BEL
|
||||
|
||||
def clear_screen(mode=2):
|
||||
return CSI + str(mode) + 'J'
|
||||
|
||||
def clear_line(mode=2):
|
||||
return CSI + str(mode) + 'K'
|
||||
|
||||
|
||||
class AnsiCodes(object):
|
||||
def __init__(self):
|
||||
# the subclasses declare class attributes which are numbers.
|
||||
# Upon instantiation we define instance attributes, which are the same
|
||||
# as the class attributes but wrapped with the ANSI escape sequence
|
||||
for name in dir(self):
|
||||
if not name.startswith('_'):
|
||||
value = getattr(self, name)
|
||||
setattr(self, name, code_to_chars(value))
|
||||
|
||||
|
||||
class ConsoleCursor(object):
|
||||
def UP(self, n=1):
|
||||
return CSI + str(n) + 'A'
|
||||
def DOWN(self, n=1):
|
||||
return CSI + str(n) + 'B'
|
||||
def FORWARD(self, n=1):
|
||||
return CSI + str(n) + 'C'
|
||||
def BACK(self, n=1):
|
||||
return CSI + str(n) + 'D'
|
||||
def POS(self, x=1, y=1):
|
||||
return CSI + str(y) + ';' + str(x) + 'H'
|
||||
|
||||
|
||||
class ConsoleFrontColorClass(AnsiCodes):
|
||||
BLACK = 30
|
||||
RED = 31
|
||||
GREEN = 32
|
||||
YELLOW = 33
|
||||
BLUE = 34
|
||||
MAGENTA = 35
|
||||
CYAN = 36
|
||||
WHITE = 37
|
||||
RESET = 39
|
||||
|
||||
# These are fairly well supported, but not part of the standard.
|
||||
LIGHTBLACK_EX = 90
|
||||
LIGHTRED_EX = 91
|
||||
LIGHTGREEN_EX = 92
|
||||
LIGHTYELLOW_EX = 93
|
||||
LIGHTBLUE_EX = 94
|
||||
LIGHTMAGENTA_EX = 95
|
||||
LIGHTCYAN_EX = 96
|
||||
LIGHTWHITE_EX = 97
|
||||
|
||||
ConsoleFrontColor = ConsoleFrontColorClass()
|
||||
|
||||
class ConsoleBackgroundColorClass(AnsiCodes):
|
||||
BLACK = 40
|
||||
RED = 41
|
||||
GREEN = 42
|
||||
YELLOW = 43
|
||||
BLUE = 44
|
||||
MAGENTA = 45
|
||||
CYAN = 46
|
||||
WHITE = 47
|
||||
RESET = 49
|
||||
|
||||
# These are fairly well supported, but not part of the standard.
|
||||
LIGHTBLACK_EX = 100
|
||||
LIGHTRED_EX = 101
|
||||
LIGHTGREEN_EX = 102
|
||||
LIGHTYELLOW_EX = 103
|
||||
LIGHTBLUE_EX = 104
|
||||
LIGHTMAGENTA_EX = 105
|
||||
LIGHTCYAN_EX = 106
|
||||
LIGHTWHITE_EX = 107
|
||||
|
||||
ConsoleBackgroundColor = ConsoleBackgroundColorClass()
|
||||
|
||||
class ConsoleStyleClass(AnsiCodes):
|
||||
BRIGHT = 1
|
||||
DIM = 2
|
||||
NORMAL = 22
|
||||
RESET_ALL = 0
|
||||
|
||||
ConsoleStyle = ConsoleStyleClass()
|
||||
|
||||
def PrintColorful(color:str, *args, is_reset:bool=True, **kwargs):
|
||||
with lock_guard():
|
||||
if is_reset:
|
||||
print(color,*args,ConsoleStyle.RESET_ALL, **kwargs)
|
||||
else:
|
||||
print(color,*args, **kwargs)
|
||||
|
||||
def PrintAsError(message:str):
|
||||
PrintColorful(ConsoleFrontColor.RED, message)
|
||||
def PrintAsWarning(message:str):
|
||||
PrintColorful(ConsoleFrontColor.YELLOW, message)
|
||||
def PrintAsInfo(message:str):
|
||||
PrintColorful(ConsoleFrontColor.GREEN, message)
|
||||
def PrintAsDebug(message:str):
|
||||
PrintColorful(ConsoleFrontColor.BLUE, message)
|
||||
def PrintAsSuccess(message:str):
|
||||
PrintColorful(ConsoleFrontColor.GREEN, message)
|
||||
def PrintAsLight(message:str):
|
||||
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, message)
|
||||
|
||||
# endregion
|
||||
|
||||
class NotImplementedError(Exception):
|
||||
def __init__(self, message:Optional[str]=None) -> None:
|
||||
@@ -60,13 +157,6 @@ def GetInternalDebug() -> bool:
|
||||
global INTERNAL_DEBUG
|
||||
return INTERNAL_DEBUG
|
||||
|
||||
def PrintColorful(color:str, *args, is_reset:bool=True, **kwargs):
|
||||
with lock_guard():
|
||||
if is_reset:
|
||||
print(color,*args,ConsoleStyle.RESET_ALL, **kwargs)
|
||||
else:
|
||||
print(color,*args, **kwargs)
|
||||
|
||||
ImportingFailedSet:Set[str] = set()
|
||||
def ImportingThrow(
|
||||
ex: ImportError,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import os.path
|
||||
from .Config import *
|
||||
import json
|
||||
import shutil
|
||||
@@ -217,7 +218,7 @@ class ToolFile(BaseModel):
|
||||
with open(self.OriginFullPath, 'rb') as f:
|
||||
return f.read()
|
||||
def LoadAsText(self) -> str:
|
||||
with open(self.OriginFullPath, 'r') as f:
|
||||
with open(self.OriginFullPath, 'r', encoding='utf-8') as f:
|
||||
return f.read()
|
||||
def LoadAsWav(self):
|
||||
try:
|
||||
@@ -369,8 +370,8 @@ class ToolFile(BaseModel):
|
||||
f.write(binary_data)
|
||||
return self
|
||||
def SaveAsText(self, text_data:str):
|
||||
with open(self.OriginFullPath, 'w') as f:
|
||||
f.writelines(text_data)
|
||||
with open(self.OriginFullPath, 'w', encoding='utf-8') as f:
|
||||
f.write(text_data)
|
||||
return self
|
||||
def SaveAsAudio(self, audio_data:"AudioSegment"):
|
||||
'''
|
||||
@@ -413,6 +414,8 @@ class ToolFile(BaseModel):
|
||||
return os.path.getsize(self.OriginFullPath)
|
||||
def GetExtension(self):
|
||||
return GetExtensionName(self.OriginFullPath)
|
||||
def GetAbsPath(self) -> str:
|
||||
return os.path.abspath(self.OriginFullPath)
|
||||
def GetFullPath(self) -> str:
|
||||
return self.OriginFullPath
|
||||
def GetFilename(self, is_without_extension = False):
|
||||
|
||||
@@ -51,3 +51,263 @@ def word_segmentation(
|
||||
return jieba.dt.cut(str(sentence), cut_all=cut_all, HMM=HMM, use_paddle=use_paddle)
|
||||
except ImportError:
|
||||
raise ValueError("jieba is not install")
|
||||
|
||||
def GetEditorDistanceAndOperations(
|
||||
s1:str,
|
||||
s2:str,
|
||||
) -> Tuple[int, List[Tuple[Literal["add","delete"], int, int, str]]]:
|
||||
"""
|
||||
计算两个字符串的编辑距离和操作序列
|
||||
操作格式: (操作类型, 开始位置, 结束位置, 内容)
|
||||
位置基于源字符串s1
|
||||
"""
|
||||
m, n = len(s1), len(s2)
|
||||
|
||||
# 使用简单的LCS算法来找到最长公共子序列
|
||||
# 然后基于LCS生成操作序列
|
||||
lcs = [[0] * (n + 1) for _ in range(m + 1)]
|
||||
|
||||
# 构建LCS表
|
||||
for i in range(1, m + 1):
|
||||
for j in range(1, n + 1):
|
||||
if s1[i - 1] == s2[j - 1]:
|
||||
lcs[i][j] = lcs[i - 1][j - 1] + 1
|
||||
else:
|
||||
lcs[i][j] = max(lcs[i - 1][j], lcs[i][j - 1])
|
||||
|
||||
# 基于LCS生成操作序列
|
||||
operations = []
|
||||
i, j = m, n
|
||||
|
||||
while i > 0 or j > 0:
|
||||
if i > 0 and j > 0 and s1[i - 1] == s2[j - 1]:
|
||||
# 字符匹配,不需要操作
|
||||
i -= 1
|
||||
j -= 1
|
||||
elif j > 0 and (i == 0 or lcs[i][j - 1] >= lcs[i - 1][j]):
|
||||
# 需要插入s2[j-1]
|
||||
# 找到插入位置(在s1中的位置)
|
||||
insert_pos = i
|
||||
operations.insert(0, ("add", insert_pos, insert_pos, s2[j - 1]))
|
||||
j -= 1
|
||||
else:
|
||||
# 需要删除s1[i-1]
|
||||
operations.insert(0, ("delete", i - 1, i, s1[i - 1]))
|
||||
i -= 1
|
||||
|
||||
# 合并连续的操作
|
||||
merged_operations = []
|
||||
for op in operations:
|
||||
if merged_operations and merged_operations[-1][0] == op[0]:
|
||||
last_op = merged_operations[-1]
|
||||
if op[0] == "add" and last_op[2] == op[1]:
|
||||
# 合并连续的添加操作
|
||||
merged_operations[-1] = (op[0], last_op[1], op[2], last_op[3] + op[3])
|
||||
elif op[0] == "delete" and last_op[2] == op[1]:
|
||||
# 合并连续的删除操作
|
||||
merged_operations[-1] = (op[0], last_op[1], op[2], last_op[3] + op[3])
|
||||
else:
|
||||
merged_operations.append(op)
|
||||
else:
|
||||
merged_operations.append(op)
|
||||
|
||||
# 计算编辑距离
|
||||
edit_distance = m + n - 2 * lcs[m][n]
|
||||
return edit_distance, merged_operations
|
||||
|
||||
def _build_line_lcs(lines1: List[str], lines2: List[str]) -> List[List[int]]:
|
||||
"""
|
||||
构建行级LCS动态规划表
|
||||
"""
|
||||
m, n = len(lines1), len(lines2)
|
||||
lcs = [[0] * (n + 1) for _ in range(m + 1)]
|
||||
|
||||
# 使用哈希加速行比较
|
||||
hash1 = [hash(line) for line in lines1]
|
||||
hash2 = [hash(line) for line in lines2]
|
||||
|
||||
for i in range(1, m + 1):
|
||||
for j in range(1, n + 1):
|
||||
if hash1[i-1] == hash2[j-1] and lines1[i-1] == lines2[j-1]:
|
||||
lcs[i][j] = lcs[i-1][j-1] + 1
|
||||
else:
|
||||
lcs[i][j] = max(lcs[i-1][j], lcs[i][j-1])
|
||||
|
||||
return lcs
|
||||
|
||||
def _extract_line_operations(lines1: List[str], lines2: List[str], lcs: List[List[int]]) -> List[Tuple[str, int, int, List[str]]]:
|
||||
"""
|
||||
从LCS表提取行级操作序列
|
||||
返回: (操作类型, 起始行号, 结束行号, 行内容列表)
|
||||
"""
|
||||
operations = []
|
||||
m, n = len(lines1), len(lines2)
|
||||
i, j = m, n
|
||||
|
||||
while i > 0 or j > 0:
|
||||
if i > 0 and j > 0 and lines1[i-1] == lines2[j-1]:
|
||||
i -= 1
|
||||
j -= 1
|
||||
elif j > 0 and (i == 0 or lcs[i][j-1] >= lcs[i-1][j]):
|
||||
operations.insert(0, ("add", i, i, [lines2[j-1]]))
|
||||
j -= 1
|
||||
else:
|
||||
operations.insert(0, ("delete", i-1, i, [lines1[i-1]]))
|
||||
i -= 1
|
||||
|
||||
# 合并连续的同类行操作
|
||||
merged = []
|
||||
for op_type, start, end, lines in operations:
|
||||
if merged and merged[-1][0] == op_type and merged[-1][2] == start:
|
||||
merged[-1] = (op_type, merged[-1][1], end, merged[-1][3] + lines)
|
||||
else:
|
||||
merged.append((op_type, start, end, lines))
|
||||
|
||||
return merged
|
||||
|
||||
def _char_diff_in_region(s1: str, s2: str) -> List[Tuple[str, int, int, str]]:
|
||||
"""
|
||||
对小范围区域进行字符级LCS比较
|
||||
返回相对于输入字符串的位置
|
||||
"""
|
||||
m, n = len(s1), len(s2)
|
||||
|
||||
# 快速路径
|
||||
if m == 0 and n == 0:
|
||||
return []
|
||||
if m == 0:
|
||||
return [("add", 0, 0, s2)]
|
||||
if n == 0:
|
||||
return [("delete", 0, m, s1)]
|
||||
if s1 == s2:
|
||||
return []
|
||||
|
||||
# 字符级LCS
|
||||
lcs = [[0] * (n + 1) for _ in range(m + 1)]
|
||||
|
||||
for i in range(1, m + 1):
|
||||
for j in range(1, n + 1):
|
||||
if s1[i-1] == s2[j-1]:
|
||||
lcs[i][j] = lcs[i-1][j-1] + 1
|
||||
else:
|
||||
lcs[i][j] = max(lcs[i-1][j], lcs[i][j-1])
|
||||
|
||||
# 回溯生成操作
|
||||
operations = []
|
||||
i, j = m, n
|
||||
|
||||
while i > 0 or j > 0:
|
||||
if i > 0 and j > 0 and s1[i-1] == s2[j-1]:
|
||||
i -= 1
|
||||
j -= 1
|
||||
elif j > 0 and (i == 0 or lcs[i][j-1] >= lcs[i-1][j]):
|
||||
operations.insert(0, ("add", i, i, s2[j-1]))
|
||||
j -= 1
|
||||
else:
|
||||
operations.insert(0, ("delete", i-1, i, s1[i-1]))
|
||||
i -= 1
|
||||
|
||||
# 合并连续操作
|
||||
merged = []
|
||||
for op_type, start, end, content in operations:
|
||||
if merged and merged[-1][0] == op_type:
|
||||
last_op = merged[-1]
|
||||
if op_type == "add" and last_op[2] == start:
|
||||
merged[-1] = (op_type, last_op[1], end, last_op[3] + content)
|
||||
elif op_type == "delete" and last_op[2] == start:
|
||||
merged[-1] = (op_type, last_op[1], end, last_op[3] + content)
|
||||
else:
|
||||
merged.append((op_type, start, end, content))
|
||||
else:
|
||||
merged.append((op_type, start, end, content))
|
||||
|
||||
return merged
|
||||
|
||||
def GetDiffOperations(
|
||||
s1:str,
|
||||
s2:str,
|
||||
) -> List[Tuple[Literal["add","delete"], int, int, str]]:
|
||||
"""
|
||||
计算两个字符串的差异操作序列(混合行级+字符级算法)
|
||||
操作格式: (操作类型, 开始位置, 结束位置, 内容)
|
||||
位置基于源字符串s1的字符偏移
|
||||
"""
|
||||
# 快速路径
|
||||
if s1 == s2:
|
||||
return []
|
||||
if not s1:
|
||||
return [("add", 0, 0, s2)]
|
||||
if not s2:
|
||||
return [("delete", 0, len(s1), s1)]
|
||||
|
||||
# 阶段1: 分行并建立位置映射
|
||||
lines1 = s1.split('\n')
|
||||
lines2 = s2.split('\n')
|
||||
|
||||
# 构建行号到字符位置的映射
|
||||
line_offsets_s1 = [0]
|
||||
for line in lines1[:-1]:
|
||||
line_offsets_s1.append(line_offsets_s1[-1] + len(line) + 1) # +1 for '\n'
|
||||
|
||||
line_offsets_s2 = [0]
|
||||
for line in lines2[:-1]:
|
||||
line_offsets_s2.append(line_offsets_s2[-1] + len(line) + 1)
|
||||
|
||||
# 阶段2: 行级LCS分析
|
||||
lcs = _build_line_lcs(lines1, lines2)
|
||||
line_operations = _extract_line_operations(lines1, lines2, lcs)
|
||||
|
||||
# 阶段3: 转换为字符级操作
|
||||
final_operations = []
|
||||
|
||||
for op_type, start_line, end_line, op_lines in line_operations:
|
||||
if op_type == "add":
|
||||
# 添加操作: 在s1的start_line位置插入
|
||||
char_pos = line_offsets_s1[start_line] if start_line < len(line_offsets_s1) else len(s1)
|
||||
content = '\n'.join(op_lines)
|
||||
|
||||
# 对于添加的行块,可以选择字符级细化或直接使用
|
||||
# 这里先直接使用行级结果
|
||||
final_operations.append(("add", char_pos, char_pos, content))
|
||||
|
||||
elif op_type == "delete":
|
||||
# 删除操作: 删除s1的[start_line, end_line)行
|
||||
char_start = line_offsets_s1[start_line]
|
||||
if end_line < len(lines1):
|
||||
char_end = line_offsets_s1[end_line]
|
||||
else:
|
||||
char_end = len(s1)
|
||||
|
||||
content = '\n'.join(op_lines)
|
||||
final_operations.append(("delete", char_start, char_end, content))
|
||||
|
||||
# 阶段4: 对于连续的删除+添加,尝试字符级精细比较
|
||||
optimized_operations = []
|
||||
i = 0
|
||||
while i < len(final_operations):
|
||||
if (i + 1 < len(final_operations) and
|
||||
final_operations[i][0] == "delete" and
|
||||
final_operations[i+1][0] == "add" and
|
||||
final_operations[i][2] == final_operations[i+1][1]):
|
||||
|
||||
# 这是一个修改操作,进行字符级细化
|
||||
del_op = final_operations[i]
|
||||
add_op = final_operations[i+1]
|
||||
|
||||
old_text = del_op[3]
|
||||
new_text = add_op[3]
|
||||
base_pos = del_op[1]
|
||||
|
||||
# 字符级比较
|
||||
char_ops = _char_diff_in_region(old_text, new_text)
|
||||
|
||||
# 调整位置到全局坐标
|
||||
for op_type, rel_start, rel_end, content in char_ops:
|
||||
optimized_operations.append((op_type, base_pos + rel_start, base_pos + rel_end, content))
|
||||
|
||||
i += 2
|
||||
else:
|
||||
optimized_operations.append(final_operations[i])
|
||||
i += 1
|
||||
|
||||
return optimized_operations
|
||||
@@ -1,121 +0,0 @@
|
||||
[返回](./Runtime-README.md)
|
||||
|
||||
# /Convention/Runtime/Web
|
||||
|
||||
---
|
||||
|
||||
网络工具模块,提供HTTP客户端和URL操作功能
|
||||
|
||||
## ToolURL类
|
||||
|
||||
### 构造与基本信息
|
||||
- `ToolURL(string url)` 从URL字符串创建对象
|
||||
- `ToString()` / `GetFullURL()` / `FullURL` 获取完整URL
|
||||
- `implicit operator string` 隐式字符串转换
|
||||
|
||||
### URL属性解析
|
||||
- `GetFilename()` 获取URL中的文件名
|
||||
- `GetExtension()` 获取文件扩展名
|
||||
- `ExtensionIs(params string[] extensions)` 检查扩展名是否匹配
|
||||
|
||||
### URL验证
|
||||
- `IsValid` 属性,检查URL是否有效
|
||||
- `ValidateURL()` 验证URL格式
|
||||
- `implicit operator bool` 隐式布尔转换,等同于IsValid
|
||||
|
||||
支持HTTP和HTTPS协议的绝对URL
|
||||
|
||||
### HTTP方法
|
||||
|
||||
#### GET请求
|
||||
- `GetAsync(Action<HttpResponseMessage> callback)` 异步GET
|
||||
- `Get(Action<HttpResponseMessage> callback)` 同步GET
|
||||
|
||||
#### POST请求
|
||||
- `PostAsync(Action<HttpResponseMessage> callback, Dictionary<string, string> formData = null)` 异步POST
|
||||
- `Post(Action<HttpResponseMessage> callback, Dictionary<string, string> formData = null)` 同步POST
|
||||
|
||||
支持表单数据提交
|
||||
|
||||
### 内容加载
|
||||
|
||||
#### 文本加载
|
||||
- `LoadAsTextAsync()` 异步加载为文本
|
||||
- `LoadAsText()` 同步加载为文本
|
||||
|
||||
#### 二进制加载
|
||||
- `LoadAsBinaryAsync()` 异步加载为字节数组
|
||||
- `LoadAsBinary()` 同步加载为字节数组
|
||||
|
||||
#### JSON加载
|
||||
- `LoadAsJson<T>()` 同步加载并反序列化JSON
|
||||
- `LoadAsJsonAsync<T>()` 异步加载并反序列化JSON
|
||||
|
||||
### 文件保存
|
||||
- `Save(string localPath = null)` 自动选择格式保存到本地
|
||||
- `SaveAsText(string localPath = null)` 保存为文本文件
|
||||
- `SaveAsJson(string localPath = null)` 保存为JSON文件
|
||||
- `SaveAsBinary(string localPath = null)` 保存为二进制文件
|
||||
|
||||
### 文件类型判断
|
||||
- `IsText` 是否为文本文件(txt, html, htm, css, js, xml, csv)
|
||||
- `IsJson` 是否为JSON文件
|
||||
- `IsImage` 是否为图像文件(jpg, jpeg, png, gif, bmp, svg)
|
||||
- `IsDocument` 是否为文档文件(pdf, doc, docx, xls, xlsx, ppt, pptx)
|
||||
|
||||
### 高级操作
|
||||
- `Open(string url)` 在当前对象上打开新URL
|
||||
- `DownloadAsync(string localPath = null)` 异步下载文件
|
||||
- `Download(string localPath = null)` 同步下载文件
|
||||
|
||||
## 设计特点
|
||||
|
||||
### 统一的HTTP客户端
|
||||
使用静态 `HttpClient` 实例,避免连接池耗尽
|
||||
|
||||
### 自动内容类型检测
|
||||
基于文件扩展名自动判断内容类型,优化保存和处理策略
|
||||
|
||||
### 异步支持
|
||||
所有网络操作都提供异步和同步两种版本
|
||||
|
||||
### 错误处理
|
||||
网络请求失败时回调函数接收null参数,方法返回false
|
||||
|
||||
### 文件管理集成
|
||||
下载的文件自动转换为ToolFile对象,与文件系统模块无缝集成
|
||||
|
||||
### 灵活的数据格式
|
||||
支持文本、二进制、JSON等多种数据格式的加载和保存
|
||||
|
||||
## 使用示例
|
||||
|
||||
### 基本HTTP请求
|
||||
```csharp
|
||||
var url = new ToolURL("https://api.example.com/data");
|
||||
if (url.IsValid)
|
||||
{
|
||||
url.Get(response => {
|
||||
if (response != null && response.IsSuccessStatusCode)
|
||||
{
|
||||
// 处理响应
|
||||
}
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
### 文件下载
|
||||
```csharp
|
||||
var url = new ToolURL("https://example.com/file.json");
|
||||
var localFile = url.Download("./downloads/file.json");
|
||||
if (localFile.Exists())
|
||||
{
|
||||
var data = localFile.LoadAsJson<MyDataType>();
|
||||
}
|
||||
```
|
||||
|
||||
### 类型安全的JSON加载
|
||||
```csharp
|
||||
var url = new ToolURL("https://api.example.com/users.json");
|
||||
var users = url.LoadAsJson<List<User>>();
|
||||
```
|
||||
@@ -3,8 +3,7 @@ import os
|
||||
from time import sleep
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from Convention.Runtime.File import *
|
||||
from Convention.Runtime.Config import *
|
||||
|
||||
file = ToolFile("[Test]")|"temp"|None
|
||||
print(file.MustExistsPath())
|
||||
PrintColorful(ConsoleFrontColor.RED, "Hello, World!")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user