Files
ChangeTree/app.py

267 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from Convention.Convention.Runtime.GlobalConfig import *
from Convention.Convention.Runtime.File import *
import argparse
from pydantic import BaseModel
import hashlib
import json
class HistoryBlock(BaseModel):
mode: Literal["add","delete"] = "add"
begin: int = 0
end: int = 0
content: str = ""
class HistoryObject(BaseModel):
hashcode: str = ""
blocks: List[HistoryBlock] = []
class HistoryCommit:
def __init__(self, object_chain:List[HistoryObject]) -> None:
self.content = ""
for item in object_chain:
for block in item.blocks:
if block.mode == "add":
self.content = self.content[:block.begin] + block.content + self.content[block.end:]
elif block.mode == "delete":
self.content = self.content[:block.begin] + self.content[block.end:]
else:
raise ValueError(f"Invalid block mode: {block.mode}")
def __str__(self) -> str:
return self.content
class HistoryModel(BaseModel):
# child node path : parent node path
obj_paths: Dict[str,Optional[str]] = {}
# branch name : branch head commit
branches: Dict[str,str] = {}
def ReadBranchHeadCommit(self, branch:str, parent_path:ToolFile) -> HistoryCommit:
if branch not in self.branches:
raise ValueError(f"Branch {branch} not found")
# 从分支中读入链中(倒序的)
object_chain:List[HistoryObject] = []
current_commit_file_path = self.branches[branch]
while current_commit_file_path is not None:
current_commit_file = parent_path|current_commit_file_path
node = HistoryObject.model_validate_json(current_commit_file.LoadAsText())
object_chain.append(node)
current_commit_file_path = self.obj_paths[current_commit_file_path]
# 反转链
object_chain.reverse()
# 构建提交对象
result = HistoryCommit(object_chain)
return result
def break_down_path(path:ToolFile|str) -> ToolFile:
temp = f"{path}"#[:-len(path.GetExtension())]
temp = temp.replace("\\\\",PlatformIndicator.GetFileSeparator())
temp = temp.replace("\\",PlatformIndicator.GetFileSeparator())
temp = temp.split(':')
if len(temp) == 1:
return ToolFile(temp[0])
else:
return ToolFile(temp[0])|temp[1]
def levenshtein_distance_with_operations(s1:str, s2:str) -> Tuple[int, List[Tuple[str, int, int, str]]]:
"""
计算两个字符串的编辑距离和操作序列
操作格式: (操作类型, 开始位置, 结束位置, 内容)
位置基于源字符串s1
"""
m, n = len(s1), len(s2)
# 使用简单的LCS算法来找到最长公共子序列
# 然后基于LCS生成操作序列
lcs = [[0] * (n + 1) for _ in range(m + 1)]
# 构建LCS表
for i in range(1, m + 1):
for j in range(1, n + 1):
if s1[i - 1] == s2[j - 1]:
lcs[i][j] = lcs[i - 1][j - 1] + 1
else:
lcs[i][j] = max(lcs[i - 1][j], lcs[i][j - 1])
# 基于LCS生成操作序列
operations = []
i, j = m, n
while i > 0 or j > 0:
if i > 0 and j > 0 and s1[i - 1] == s2[j - 1]:
# 字符匹配,不需要操作
i -= 1
j -= 1
elif j > 0 and (i == 0 or lcs[i][j - 1] >= lcs[i - 1][j]):
# 需要插入s2[j-1]
# 找到插入位置在s1中的位置
insert_pos = i
operations.insert(0, ("add", insert_pos, insert_pos, s2[j - 1]))
j -= 1
else:
# 需要删除s1[i-1]
operations.insert(0, ("delete", i - 1, i, s1[i - 1]))
i -= 1
# 合并连续的操作
merged_operations = []
for op in operations:
if merged_operations and merged_operations[-1][0] == op[0]:
last_op = merged_operations[-1]
if op[0] == "add" and last_op[2] == op[1]:
# 合并连续的添加操作
merged_operations[-1] = (op[0], last_op[1], op[2], last_op[3] + op[3])
elif op[0] == "delete" and last_op[2] == op[1]:
# 合并连续的删除操作
merged_operations[-1] = (op[0], last_op[1], op[2], last_op[3] + op[3])
else:
merged_operations.append(op)
else:
merged_operations.append(op)
# 计算编辑距离
edit_distance = m + n - 2 * lcs[m][n]
return edit_distance, merged_operations
class Cli:
def print_out(self) -> None:
sp = self.prints.split("\n")
status_len = len(str(len(sp)))
for line_index, line in enumerate(sp):
if ConsoleFrontColor.GREEN in line and ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.YELLOW}@"
elif ConsoleFrontColor.GREEN in line:
perfix = f"{ConsoleFrontColor.GREEN}+"
elif ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.RED}-"
else:
perfix = "="
print(f"{perfix}{line_index}{" "*max(0,status_len-len(str(line_index)))}{ConsoleFrontColor.RESET} | {line}")
print(ConsoleFrontColor.RESET)
def print(self, *args) -> None:
self.prints += "".join(args)
def __init__(self, asset:str, input:str, branch:str,
*,
history_file:Optional[str]=None
) -> None:
self.config = GlobalConfig(asset,True)
self.file = ToolFile(input)
self.branch = branch
self.historys_file = self.config.GetFile(history_file if history_file is not None else break_down_path(self.file.GetAbsPath())|"history", False)
if self.historys_file.Exists():
self.historys = HistoryModel.model_validate_json(self.historys_file.LoadAsText())
else:
self.historys = HistoryModel()
self.prints: str = ""
def compare(self) -> None:
if self.branch not in self.historys.branches:
self.config.Log("Error", f"Branch {self.branch} not found")
return
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
current_content = self.file.LoadAsText()
step, operations = levenshtein_distance_with_operations(head_commit, current_content)
if step == 0:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "No changes")
return
index = 0
for operation in operations:
# 显示操作前的不变内容
self.print(head_commit[index:operation[1]])
if operation[0] == "add":
color = ConsoleFrontColor.GREEN
elif operation[0] == "delete":
color = ConsoleFrontColor.RED
else:
raise ValueError(f"Invalid operation: {operation}")
sp = operation[3].split("\n")
if operation[3][0] == "\n":
self.print("\n")
for line_index, line in enumerate(sp):
self.print(f"{color}{line}{ConsoleFrontColor.RESET}")
if line_index == len(sp) - 1:
if operation[3][-1] == "\n":
self.print("\n")
else:
self.print("\n")
index = operation[2]
self.print(head_commit[index:])
self.print_out()
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "\noperations:")
print(f"{'\n'.join([f"{ConsoleFrontColor.GREEN if item[0] == "add" else ConsoleFrontColor.RED}{item[0]}{ConsoleFrontColor.RESET} \"{ConsoleFrontColor.YELLOW}{item[3]}{ConsoleFrontColor.RESET}\" on [{item[1]},{item[2]}]" for item in operations])}")
def save(self) -> None:
content = self.file.LoadAsText()
root = HistoryObject(hashcode=hashlib.md5(content.encode()).hexdigest())
if self.branch not in self.historys.branches:
# 创建分支并为其创建新的树
commit_name = f"{len(self.historys.obj_paths)}"
self.historys.obj_paths[commit_name] = None
self.historys.branches[self.branch] = commit_name
self.historys_file.MustExistsPath()
root.blocks=[HistoryBlock(mode="add",begin=0,end=len(content),content=content)]
else:
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
step, operations = levenshtein_distance_with_operations(head_commit, content)
for operation in operations:
if operation[0] == "add":
root.blocks.append(HistoryBlock(mode="add",begin=operation[1],end=operation[2],content=operation[3]))
elif operation[0] == "delete":
root.blocks.append(HistoryBlock(mode="delete",begin=operation[1],end=operation[2],content=operation[3]))
else:
raise ValueError(f"Invalid operation: {operation}")
# 创建树节点, 并链接
self.historys.obj_paths[f"{len(self.historys.obj_paths)}"] = self.historys.branches[self.branch]
self.historys.branches[self.branch] = f"{len(self.historys.obj_paths)}"
with open(f"{self.historys_file.GetDirToolFile()|self.historys.branches[self.branch]}", "w") as f:
f.write(root.model_dump_json())
with open(self.historys_file.GetFullPath(), "w") as f:
f.write(self.historys.model_dump_json())
def run() -> int:
parser = argparse.ArgumentParser()
# 目标文件
parser.add_argument("input", type=str, help="输入文件")
parser.add_argument("--history", type=str, help="手动指定目标历史文件", default=None)
# 可选的项目源
parser.add_argument("-a", "--asset", type=str, default=ProjectConfig.ProjectConfigFileFocus, help="配置文件目录")
# 分支
parser.add_argument("-b", "--branch", type=str, default="main", help="分支")
# 模式互斥组
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("-c", "--compare", action="store_true", help="比较当前文件的差异")
mode_group.add_argument("-s", "--save", action="store_true", help="保存当前文件的差异")
args = parser.parse_args()
if "help" in args:
parser.print_help()
return 0
cli = Cli(args.asset, args.input, args.branch,
history_file=args.history)
# 比较
if args.compare:
cli.compare()
return 0
# 保存
elif args.save:
cli.save()
return 0
raise NotImplementedError("Not implemented mode")
if __name__ == "__main__":
run()