from Convention.Convention.Runtime.GlobalConfig import * from Convention.Convention.Runtime.File import * from Convention.Convention.Runtime.String import GetDiffOperations, FillString import argparse from pydantic import BaseModel import hashlib import pickle class HistoryBlock(BaseModel): mode: Literal["add","delete"] = "add" begin: int = 0 end: int = 0 content: str = "" def ToOperation(self) -> Tuple[Literal["add","delete"], int, int, str]: return (self.mode, self.begin, self.end, self.content) def FromOperation(self, operation:Tuple[Literal["add","delete"], int, int, str]) -> None: self.mode = operation[0] self.begin = operation[1] self.end = operation[2] self.content = operation[3] class HistoryObject(BaseModel): hashcode: str = "" blocks: List[HistoryBlock] = [] class HistoryCommit: def __init__(self, object_chain:List[HistoryObject]) -> None: self.content = "" for item in object_chain: # 从后往前应用操作,因为操作使用的是静态坐标系(基于原始字符串) for block in reversed(item.blocks): if block.mode == "add": self.content = self.content[:block.begin] + block.content + self.content[block.end:] elif block.mode == "delete": self.content = self.content[:block.begin] + self.content[block.end:] else: raise ValueError(f"Invalid block mode: {block.mode}") def __str__(self) -> str: return self.content class HistoryModel(BaseModel): # child node path : parent node path obj_paths: Dict[str,Optional[str]] = {} # branch name : branch head commit branches: Dict[str,str] = {} def GetParentCommit(self, commit_name:str) -> Optional[str]: if commit_name not in self.obj_paths: raise ValueError(f"Commit {commit_name} not found") return self.obj_paths[commit_name] def GetBranchHeadCommit(self, branch:str) -> str: if branch not in self.branches: raise ValueError(f"Branch {branch} not found") return self.branches[branch] def ReadCommit(self, commit_name:str, parent_path:ToolFile) -> HistoryCommit: # 从分支中读入链中(倒序的) object_chain:List[HistoryObject] = [] current_commit_file_path = commit_name while current_commit_file_path is not None: current_commit_file = parent_path|current_commit_file_path node = pickle.loads(current_commit_file.LoadAsBinary()) object_chain.append(node) current_commit_file_path = self.obj_paths[current_commit_file_path] # 反转链 object_chain.reverse() # 构建提交对象 result = HistoryCommit(object_chain) return result def ReadBranchHeadCommit(self, branch:str, parent_path:ToolFile) -> HistoryCommit: return self.ReadCommit(self.GetBranchHeadCommit(branch), parent_path) def break_down_path(path:ToolFile|str) -> ToolFile: temp = f"{path}"#[:-len(path.GetExtension())] temp = temp.replace("\\\\",PlatformIndicator.GetFileSeparator()) temp = temp.replace("\\",PlatformIndicator.GetFileSeparator()) temp = temp.split(':') if len(temp) == 1: return ToolFile(temp[0]) else: return ToolFile(temp[0])|temp[1] class Cli: def print_out(self, is_show_all = False) -> None: sp = self.prints.split("\n") status_len = len(str(len(sp))) if is_show_all: for line_index, line in enumerate(sp): if ConsoleFrontColor.GREEN in line and ConsoleFrontColor.RED in line: perfix = f"{ConsoleFrontColor.YELLOW}@" elif ConsoleFrontColor.GREEN in line: perfix = f"{ConsoleFrontColor.GREEN}+" elif ConsoleFrontColor.RED in line: perfix = f"{ConsoleFrontColor.RED}-" else: perfix = "=" print(f"{perfix}{FillString(line_index+1, max_length=status_len, side = "left")}{ConsoleFrontColor.RESET} | {line}") else: layer = 0 for line_index, line in enumerate(sp): layer -= 1 forward = sp[min(line_index+self.group_size, len(sp)-1)] if ConsoleFrontColor.GREEN in forward and ConsoleFrontColor.RED in forward: layer = self.group_size elif ConsoleFrontColor.GREEN in forward: layer = self.group_size elif ConsoleFrontColor.RED in forward: layer = self.group_size if ConsoleFrontColor.GREEN in line and ConsoleFrontColor.RED in line: perfix = f"{ConsoleFrontColor.YELLOW}@" layer = self.group_size elif ConsoleFrontColor.GREEN in line: perfix = f"{ConsoleFrontColor.GREEN}+" layer = self.group_size elif ConsoleFrontColor.RED in line: perfix = f"{ConsoleFrontColor.RED}-" layer = self.group_size else: perfix = "=" if layer <= 0: continue print(f"{perfix}{FillString(line_index+1, max_length=status_len, side = "left")}{ConsoleFrontColor.RESET} | {line}") print(ConsoleFrontColor.RESET) def print(self, *args) -> None: self.prints += "".join(args) def __init__(self, asset:str, input:str, branch:str, *, history_file:Optional[str]=None, verbose:bool=False ) -> None: self.config = GlobalConfig(asset,True) self.file = ToolFile(input) self.branch = branch self.verbose = verbose or self.config.FindItem("verbose", False) self.group_size = self.config.FindItem("group_size", 3) self.historys_file = self.config.GetFile(history_file if history_file is not None else break_down_path(self.file.GetAbsPath())|"history", False) if self.historys_file.Exists(): self.historys = pickle.loads(self.historys_file.LoadAsBinary()) else: self.historys = HistoryModel() self.prints: str = "" def show_compare_result(self, head_commit:str, operations:List[Tuple[Literal["add","delete"], int, int, str]]) -> None: index = 0 for operation in operations: # 显示操作前的不变内容 self.print(head_commit[index:operation[1]]) if operation[0] == "add": color = ConsoleFrontColor.GREEN eline = f"{ConsoleBackgroundColor.LIGHTGREEN_EX} {ConsoleBackgroundColor.RESET}" etab = f"{ConsoleBackgroundColor.GREEN}\t{ConsoleBackgroundColor.RESET}" ewrite = f"{ConsoleBackgroundColor.GREEN} {ConsoleBackgroundColor.RESET}" elif operation[0] == "delete": color = ConsoleFrontColor.RED eline = f"{ConsoleBackgroundColor.LIGHTRED_EX} {ConsoleBackgroundColor.RESET}" etab = f"{ConsoleBackgroundColor.RED}\t{ConsoleBackgroundColor.RESET}" ewrite = f"{ConsoleBackgroundColor.RED} {ConsoleBackgroundColor.RESET}" else: raise ValueError(f"Invalid operation: {operation}") self.print(color) for ch in operation[3]: if ch == '\n': self.print(f"{eline}\n{color}") elif ch == '\t': self.print(etab) elif ch == ' ': self.print(ewrite) else: self.print(ch) self.print(ConsoleFrontColor.RESET) index = operation[2] self.print(head_commit[index:]) self.print_out() def compare(self) -> None: if self.branch not in self.historys.branches: self.config.Log("Error", f"Branch {self.branch} not found") return head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile())) current_content = self.file.LoadAsText() operations = GetDiffOperations(head_commit, current_content) if len(operations) == 0: PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "No changes") return self.show_compare_result(head_commit, operations) if self.verbose: PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "\noperations:") print(f"{'\n'.join([f"{ConsoleFrontColor.GREEN if item[0] == "add" else ConsoleFrontColor.RED}{item[0]}{ConsoleFrontColor.RESET} \"{ConsoleFrontColor.YELLOW}{item[3]}{ConsoleFrontColor.RESET}\" on [{item[1]},{item[2]}]" for item in operations])}") def save(self) -> None: content = self.file.LoadAsText() root = HistoryObject(hashcode=hashlib.md5(content.encode()).hexdigest()) commit_name = f"{len(self.historys.obj_paths)}" if self.branch not in self.historys.branches: # 创建分支并为其创建新的树 self.historys.obj_paths[commit_name] = None self.historys.branches[self.branch] = commit_name self.historys_file.MustExistsPath() root.blocks=[HistoryBlock(mode="add",begin=0,end=len(content),content=content)] PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "\nAll content is new") else: head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile())) operations = GetDiffOperations(head_commit, content) for operation in operations: if operation[0] == "add": root.blocks.append(HistoryBlock(mode="add",begin=operation[1],end=operation[2],content=operation[3])) elif operation[0] == "delete": root.blocks.append(HistoryBlock(mode="delete",begin=operation[1],end=operation[2],content=operation[3])) else: raise ValueError(f"Invalid operation: {operation}") if self.verbose: PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "\noperations:") print(f"{'\n'.join([f"{ConsoleFrontColor.GREEN if item[0] == "add" else ConsoleFrontColor.RED}{item[0]}{ConsoleFrontColor.RESET} \"{ConsoleFrontColor.YELLOW}{item[3]}{ConsoleFrontColor.RESET}\" on [{item[1]},{item[2]}]" for item in operations])}") # 创建树节点, 并链接 self.historys.obj_paths[commit_name] = self.historys.branches[self.branch] self.historys.branches[self.branch] = commit_name (self.historys_file.GetDirToolFile()|commit_name).SaveAsBinary(pickle.dumps(root)) (self.historys_file).SaveAsBinary(pickle.dumps(self.historys)) def view(self, commit_name:Optional[str]=None) -> None: if commit_name is None or commit_name == "": head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile())) else: head_commit = str(self.historys.ReadCommit(commit_name, self.historys_file.GetDirToolFile())) self.print(head_commit) self.print_out(True) def restore(self) -> None: head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile())) self.file.SaveAsText(head_commit) PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, f"\nRestored {self.file.GetAbsPath()}") def take(self, commit_name:Optional[str]=None) -> None: if commit_name is None or commit_name == "": head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile())) else: head_commit = str(self.historys.ReadCommit(commit_name, self.historys_file.GetDirToolFile())) self.file.SaveAsText(head_commit) PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, f"\nTaken {self.file} to commit {"head" if commit_name is None or commit_name == "" else commit_name}") def run() -> int: parser = argparse.ArgumentParser() # 目标文件 parser.add_argument("input", type=str, help="输入文件") parser.add_argument("--history", type=str, help="手动指定目标历史文件", default=None) # 可选的项目源 parser.add_argument("-a", "--asset", type=str, default=ProjectConfig.ProjectConfigFileFocus, help="配置文件目录") # 分支 parser.add_argument("-b", "--branch", type=str, default="main", help="分支") # 是否详细信息 parser.add_argument("--verbose", action="store_true", help="是否详细信息") # 模式互斥组 mode_group = parser.add_mutually_exclusive_group(required=True) mode_group.add_argument("-c", "--compare", action="store_true", help="比较当前文件的差异") mode_group.add_argument("-s", "--save", action="store_true", help="保存当前文件的差异") mode_group.add_argument("-v", "--view", type=str,default=None, help="查看记录内容") mode_group.add_argument("-r", "--restore", action="store_true", help="恢复当前文件") mode_group.add_argument("-t", "--take", type=str,default=None, help="获取指定提交的文件") args = parser.parse_args() if "help" in args: parser.print_help() return 0 cli = Cli(args.asset, args.input, args.branch, history_file=args.history, verbose=args.verbose) # 比较 if args.compare: cli.compare() return 0 # 保存 elif args.save: cli.save() return 0 # 查看记录内容 elif args.view is not None: cli.view(args.view) return 0 # 恢复 elif args.restore: cli.restore() return 0 # 获取指定提交的文件 elif args.take is not None: cli.take(args.take) return 0 raise NotImplementedError("Not implemented mode") if __name__ == "__main__": run()