from Convention.Convention.Runtime.GlobalConfig import * from Convention.Convention.Runtime.File import * import argparse from pydantic import BaseModel import hashlib import json class HistoryBlock(BaseModel): mode: Literal["add","delete"] = "add" begin: int = 0 end: int = 0 content: str = "" class HistoryObject(BaseModel): hashcode: str = "" blocks: List[HistoryBlock] = [] class HistoryCommit: def __init__(self, object_chain:List[HistoryObject]) -> None: self.content = "" for item in object_chain: for block in item.blocks: if block.mode == "add": self.content = self.content[:block.begin] + block.content + self.content[block.end:] elif block.mode == "delete": self.content = self.content[:block.begin] + self.content[block.end:] else: raise ValueError(f"Invalid block mode: {block.mode}") def __str__(self) -> str: return self.content class HistoryModel(BaseModel): # child node path : parent node path obj_paths: Dict[str,Optional[str]] = {} # branch name : branch head commit branches: Dict[str,str] = {} def ReadBranchHeadCommit(self, branch:str, parent_path:ToolFile) -> HistoryCommit: if branch not in self.branches: raise ValueError(f"Branch {branch} not found") # 从分支中读入链中(倒序的) object_chain:List[HistoryObject] = [] current_commit_file_path = self.branches[branch] while current_commit_file_path is not None: current_commit_file = parent_path|current_commit_file_path node = HistoryObject.model_validate_json(current_commit_file.LoadAsText()) object_chain.append(node) current_commit_file_path = self.obj_paths[current_commit_file_path] # 反转链 object_chain.reverse() # 构建提交对象 result = HistoryCommit(object_chain) return result def break_down_path(path:ToolFile|str) -> ToolFile: temp = f"{path}"#[:-len(path.GetExtension())] temp = temp.replace("\\\\",PlatformIndicator.GetFileSeparator()) temp = temp.replace("\\",PlatformIndicator.GetFileSeparator()) temp = temp.split(':') if len(temp) == 1: return ToolFile(temp[0]) else: return ToolFile(temp[0])|temp[1] def levenshtein_distance_with_operations(s1:str, s2:str) -> Tuple[int, List[Tuple[str, int, int, str]]]: """ 计算两个字符串的编辑距离和操作序列 操作格式: (操作类型, 开始位置, 结束位置, 内容) 位置基于源字符串s1 """ m, n = len(s1), len(s2) # 使用简单的LCS算法来找到最长公共子序列 # 然后基于LCS生成操作序列 lcs = [[0] * (n + 1) for _ in range(m + 1)] # 构建LCS表 for i in range(1, m + 1): for j in range(1, n + 1): if s1[i - 1] == s2[j - 1]: lcs[i][j] = lcs[i - 1][j - 1] + 1 else: lcs[i][j] = max(lcs[i - 1][j], lcs[i][j - 1]) # 基于LCS生成操作序列 operations = [] i, j = m, n while i > 0 or j > 0: if i > 0 and j > 0 and s1[i - 1] == s2[j - 1]: # 字符匹配,不需要操作 i -= 1 j -= 1 elif j > 0 and (i == 0 or lcs[i][j - 1] >= lcs[i - 1][j]): # 需要插入s2[j-1] # 找到插入位置(在s1中的位置) insert_pos = i operations.insert(0, ("add", insert_pos, insert_pos, s2[j - 1])) j -= 1 else: # 需要删除s1[i-1] operations.insert(0, ("delete", i - 1, i, s1[i - 1])) i -= 1 # 合并连续的操作 merged_operations = [] for op in operations: if merged_operations and merged_operations[-1][0] == op[0]: last_op = merged_operations[-1] if op[0] == "add" and last_op[2] == op[1]: # 合并连续的添加操作 merged_operations[-1] = (op[0], last_op[1], op[2], last_op[3] + op[3]) elif op[0] == "delete" and last_op[2] == op[1]: # 合并连续的删除操作 merged_operations[-1] = (op[0], last_op[1], op[2], last_op[3] + op[3]) else: merged_operations.append(op) else: merged_operations.append(op) # 计算编辑距离 edit_distance = m + n - 2 * lcs[m][n] return edit_distance, merged_operations class Cli: def print_out(self) -> None: print(self.prints, ConsoleFrontColor.RESET) def print(self, *args) -> None: self.prints += "".join(args) def __init__(self, asset:str, input:str, branch:str, *, history_file:Optional[str]=None ) -> None: self.config = GlobalConfig(asset,True) self.file = ToolFile(input) self.branch = branch self.historys_file = self.config.GetFile(history_file if history_file is not None else break_down_path(self.file.GetAbsPath())|"history", False) if self.historys_file.Exists(): self.historys = HistoryModel.model_validate_json(self.historys_file.LoadAsText()) else: self.historys = HistoryModel() self.prints: str = "" def compare(self) -> None: if self.branch not in self.historys.branches: self.config.Log("Error", f"Branch {self.branch} not found") return head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile())) current_content = self.file.LoadAsText() step, operations = levenshtein_distance_with_operations(head_commit, current_content) if step == 0: self.print(f"{ConsoleFrontColor.LIGHTMAGENTA_EX}No changes") return index = 0 for operation in operations: # 显示操作前的不变内容 sp = head_commit[index:operation[1]].split("\n") if len(sp) > 3: self.print("\n".join(sp[-3:])) else: self.print("\n".join(sp)) if operation[0] == "add": self.print(f"{ConsoleFrontColor.GREEN}{operation[3]}{ConsoleFrontColor.RESET}") index = operation[2] elif operation[0] == "delete": self.print(f"{ConsoleFrontColor.RED}{operation[3]}{ConsoleFrontColor.RESET}") index = operation[2] else: raise ValueError(f"Invalid operation: {operation}") sp = head_commit[index:].split("\n") if len(sp) > 3: self.print("\n".join(sp[:3])) else: self.print("\n".join(sp)) self.print(f"\n{ConsoleFrontColor.LIGHTMAGENTA_EX}operations:\n") self.print(f"{'\n'.join([f"{ConsoleFrontColor.GREEN if item[0] == "add" else ConsoleFrontColor.RED}{item[0]}{ConsoleFrontColor.RESET} \"{item[3]}\" on [{item[1]},{item[2]}]" for item in operations])}") def save(self) -> None: content = self.file.LoadAsText() root = HistoryObject(hashcode=hashlib.md5(content.encode()).hexdigest()) if self.branch not in self.historys.branches: # 创建分支并为其创建新的树 commit_name = f"{len(self.historys.obj_paths)}" self.historys.obj_paths[commit_name] = None self.historys.branches[self.branch] = commit_name self.historys_file.MustExistsPath() root.blocks=[HistoryBlock(mode="add",begin=0,end=len(content),content=content)] else: head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile())) step, operations = levenshtein_distance_with_operations(head_commit, content) for operation in operations: if operation[0] == "add": root.blocks.append(HistoryBlock(mode="add",begin=operation[1],end=operation[2],content=operation[3])) elif operation[0] == "delete": root.blocks.append(HistoryBlock(mode="delete",begin=operation[1],end=operation[2],content=operation[3])) else: raise ValueError(f"Invalid operation: {operation}") # 创建树节点, 并链接 self.historys.obj_paths[f"{len(self.historys.obj_paths)}"] = self.historys.branches[self.branch] self.historys.branches[self.branch] = f"{len(self.historys.obj_paths)}" with open(f"{self.historys_file.GetDirToolFile()|self.historys.branches[self.branch]}", "w") as f: f.write(root.model_dump_json()) with open(self.historys_file.GetFullPath(), "w") as f: f.write(self.historys.model_dump_json()) def run() -> int: parser = argparse.ArgumentParser() # 目标文件 parser.add_argument("input", type=str, help="输入文件") parser.add_argument("--history", type=str, help="手动指定目标历史文件", default=None) # 可选的项目源 parser.add_argument("-a", "--asset", type=str, default=ProjectConfig.ProjectConfigFileFocus, help="配置文件目录") # 分支 parser.add_argument("-b", "--branch", type=str, default="main", help="分支") # 模式互斥组 mode_group = parser.add_mutually_exclusive_group(required=True) mode_group.add_argument("-c", "--compare", action="store_true", help="比较当前文件的差异") mode_group.add_argument("-s", "--save", action="store_true", help="保存当前文件的差异") args = parser.parse_args() if "help" in args: parser.print_help() return 0 cli = Cli(args.asset, args.input, args.branch, history_file=args.history) # 比较 if args.compare: try: cli.compare() finally: cli.print_out() return 0 # 保存 elif args.save: try: cli.save() finally: cli.print_out() return 0 raise NotImplementedError("Not implemented mode") if __name__ == "__main__": run()