Files
ChangeTree/app.py

377 lines
17 KiB
Python
Raw Permalink Normal View History

from Convention.Convention.Runtime.GlobalConfig import *
from Convention.Convention.Runtime.File import *
2025-10-23 16:24:04 +08:00
from Convention.Convention.Runtime.String import GetDiffOperations, FillString
import argparse
from pydantic import BaseModel
import hashlib
2025-10-23 16:24:04 +08:00
import pickle
class HistoryBlock(BaseModel):
mode: Literal["add","delete"] = "add"
begin: int = 0
end: int = 0
content: str = ""
2025-10-23 16:24:04 +08:00
def ToOperation(self) -> Tuple[Literal["add","delete"], int, int, str]:
return (self.mode, self.begin, self.end, self.content)
def FromOperation(self, operation:Tuple[Literal["add","delete"], int, int, str]) -> None:
self.mode = operation[0]
self.begin = operation[1]
self.end = operation[2]
self.content = operation[3]
class HistoryObject(BaseModel):
hashcode: str = ""
blocks: List[HistoryBlock] = []
class HistoryCommit:
def __init__(self, object_chain:List[HistoryObject]) -> None:
self.content = ""
for item in object_chain:
2025-10-23 16:24:04 +08:00
# 从后往前应用操作,因为操作使用的是静态坐标系(基于原始字符串)
for block in reversed(item.blocks):
if block.mode == "add":
self.content = self.content[:block.begin] + block.content + self.content[block.end:]
elif block.mode == "delete":
self.content = self.content[:block.begin] + self.content[block.end:]
else:
raise ValueError(f"Invalid block mode: {block.mode}")
def __str__(self) -> str:
return self.content
class HistoryModel(BaseModel):
# child node path : parent node path
obj_paths: Dict[str,Optional[str]] = {}
# branch name : branch head commit
branches: Dict[str,str] = {}
2025-10-23 15:09:52 +08:00
def GetParentCommit(self, commit_name:str) -> Optional[str]:
if commit_name not in self.obj_paths:
raise ValueError(f"Commit {commit_name} not found")
return self.obj_paths[commit_name]
def GetBranchHeadCommit(self, branch:str) -> str:
if branch not in self.branches:
raise ValueError(f"Branch {branch} not found")
2025-10-23 15:09:52 +08:00
return self.branches[branch]
def ReadCommit(self, commit_name:str, parent_path:ToolFile) -> HistoryCommit:
# 从分支中读入链中(倒序的)
object_chain:List[HistoryObject] = []
2025-10-23 15:09:52 +08:00
current_commit_file_path = commit_name
while current_commit_file_path is not None:
current_commit_file = parent_path|current_commit_file_path
2025-10-23 16:24:04 +08:00
node = pickle.loads(current_commit_file.LoadAsBinary())
object_chain.append(node)
current_commit_file_path = self.obj_paths[current_commit_file_path]
# 反转链
object_chain.reverse()
# 构建提交对象
result = HistoryCommit(object_chain)
return result
2025-10-23 15:09:52 +08:00
def ReadBranchHeadCommit(self, branch:str, parent_path:ToolFile) -> HistoryCommit:
return self.ReadCommit(self.GetBranchHeadCommit(branch), parent_path)
def break_down_path(path:ToolFile|str) -> ToolFile:
temp = f"{path}"#[:-len(path.GetExtension())]
temp = temp.replace("\\\\",PlatformIndicator.GetFileSeparator())
temp = temp.replace("\\",PlatformIndicator.GetFileSeparator())
temp = temp.split(':')
if len(temp) == 1:
return ToolFile(temp[0])
else:
return ToolFile(temp[0])|temp[1]
class Cli:
2025-10-23 15:09:52 +08:00
def print_out(self, is_show_all = False) -> None:
sp = self.prints.split("\n")
status_len = len(str(len(sp)))
2025-10-23 15:09:52 +08:00
if is_show_all:
for line_index, line in enumerate(sp):
if ConsoleFrontColor.GREEN in line and ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.YELLOW}@"
elif ConsoleFrontColor.GREEN in line:
perfix = f"{ConsoleFrontColor.GREEN}+"
elif ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.RED}-"
else:
perfix = "="
2025-10-23 15:09:52 +08:00
print(f"{perfix}{FillString(line_index+1, max_length=status_len, side = "left")}{ConsoleFrontColor.RESET} | {line}")
else:
layer = 0
for line_index, line in enumerate(sp):
forward = sp[min(line_index+self.group_size, len(sp)-1)]
if ConsoleFrontColor.GREEN in forward and ConsoleFrontColor.RED in forward:
layer = self.group_size
elif ConsoleFrontColor.GREEN in forward:
layer = self.group_size
elif ConsoleFrontColor.RED in forward:
layer = self.group_size
if ConsoleFrontColor.GREEN in line and ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.YELLOW}@"
layer = self.group_size
elif ConsoleFrontColor.GREEN in line:
perfix = f"{ConsoleFrontColor.GREEN}+"
layer = self.group_size
elif ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.RED}-"
layer = self.group_size
else:
perfix = "="
layer -= 1
2025-10-23 15:09:52 +08:00
if layer < 0:
2025-10-23 15:09:52 +08:00
continue
print(f"{perfix}{FillString(line_index+1, max_length=status_len, side = "left")}{ConsoleFrontColor.RESET} | {line}")
print(ConsoleFrontColor.RESET)
def print(self, *args) -> None:
self.prints += "".join(args)
def __init__(self, asset:str, input:str, branch:str,
*,
2025-10-23 15:09:52 +08:00
history_file:Optional[str]=None,
verbose:bool=False,
base_branch:str="main"
) -> None:
self.config = GlobalConfig(asset,True)
self.file = ToolFile(input)
self.branch = branch
2025-10-23 15:09:52 +08:00
self.verbose = verbose or self.config.FindItem("verbose", False)
self.group_size = self.config.FindItem("group_size", 3)
self.base_branch = base_branch
self.historys_file = self.config.GetFile(history_file if history_file is not None else break_down_path(self.file.GetAbsPath())|"history", False)
if self.historys_file.Exists():
2025-10-23 16:24:04 +08:00
self.historys = pickle.loads(self.historys_file.LoadAsBinary())
else:
self.historys = HistoryModel()
self.prints: str = ""
2025-10-23 16:24:04 +08:00
def show_compare_result(self, head_commit:str, operations:List[Tuple[Literal["add","delete"], int, int, str]]) -> None:
index = 0
for operation in operations:
# 显示操作前的不变内容
self.print(head_commit[index:operation[1]])
if operation[0] == "add":
color = ConsoleFrontColor.GREEN
2025-10-23 16:24:04 +08:00
eline = f"{ConsoleBackgroundColor.LIGHTGREEN_EX} {ConsoleBackgroundColor.RESET}"
etab = f"{ConsoleBackgroundColor.GREEN}\t{ConsoleBackgroundColor.RESET}"
ewrite = f"{ConsoleBackgroundColor.GREEN} {ConsoleBackgroundColor.RESET}"
elif operation[0] == "delete":
color = ConsoleFrontColor.RED
2025-10-23 16:24:04 +08:00
eline = f"{ConsoleBackgroundColor.LIGHTRED_EX} {ConsoleBackgroundColor.RESET}"
etab = f"{ConsoleBackgroundColor.RED}\t{ConsoleBackgroundColor.RESET}"
ewrite = f"{ConsoleBackgroundColor.RED} {ConsoleBackgroundColor.RESET}"
else:
raise ValueError(f"Invalid operation: {operation}")
2025-10-23 15:09:52 +08:00
self.print(color)
for ch in operation[3]:
if ch == '\n':
self.print(f"{eline}\n{color}")
elif ch == '\t':
self.print(etab)
elif ch == ' ':
self.print(ewrite)
else:
2025-10-23 15:09:52 +08:00
self.print(ch)
self.print(ConsoleFrontColor.RESET)
index = operation[2]
self.print(head_commit[index:])
self.print_out()
2025-10-23 16:24:04 +08:00
def compare(self) -> bool:
'''
返回值: 是否比较成功且存在差异
'''
2025-10-23 16:24:04 +08:00
if self.branch not in self.historys.branches:
self.config.Log("Error", f"Branch {self.branch} not found")
return False
2025-10-23 16:24:04 +08:00
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
current_content = self.file.LoadAsText()
operations = GetDiffOperations(head_commit, current_content)
if len(operations) == 0:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "No changes")
return False
2025-10-23 16:24:04 +08:00
self.show_compare_result(head_commit, operations)
2025-10-23 15:09:52 +08:00
if self.verbose:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "\noperations:")
print(f"{'\n'.join([f"{ConsoleFrontColor.GREEN if item[0] == "add" else ConsoleFrontColor.RED}{item[0]}{ConsoleFrontColor.RESET} \"{ConsoleFrontColor.YELLOW}{item[3]}{ConsoleFrontColor.RESET}\" on [{item[1]},{item[2]}]" for item in operations])}")
return True
def save(self) -> None:
content = self.file.LoadAsText()
root = HistoryObject(hashcode=hashlib.md5(content.encode()).hexdigest())
2025-10-23 15:09:52 +08:00
commit_name = f"{len(self.historys.obj_paths)}"
def create_change_operations(head_commit:str) -> bool:
2025-10-23 16:24:04 +08:00
operations = GetDiffOperations(head_commit, content)
if len(operations) == 0:
return False
for operation in operations:
if operation[0] == "add":
root.blocks.append(HistoryBlock(mode="add",begin=operation[1],end=operation[2],content=operation[3]))
elif operation[0] == "delete":
root.blocks.append(HistoryBlock(mode="delete",begin=operation[1],end=operation[2],content=operation[3]))
else:
raise ValueError(f"Invalid operation: {operation}")
2025-10-23 15:09:52 +08:00
if self.verbose:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "\noperations:")
print(f"{'\n'.join([f"{ConsoleFrontColor.GREEN if item[0] == "add" else ConsoleFrontColor.RED}{item[0]}{ConsoleFrontColor.RESET} \"{ConsoleFrontColor.YELLOW}{item[3]}{ConsoleFrontColor.RESET}\" on [{item[1]},{item[2]}]" for item in operations])}")
return True
if self.branch not in self.historys.branches:
# 创建分支并为其创建新的树
if self.base_branch == self.branch:
self.historys.obj_paths[commit_name] = None
self.historys.branches[self.branch] = commit_name
self.historys_file.MustExistsPath()
root.blocks=[HistoryBlock(mode="add",begin=0,end=len(content),content=content)]
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "All content is new")
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, f"Branch {self.branch} created")
else:
head_commit = str(self.historys.ReadBranchHeadCommit(self.base_branch, self. historys_file.GetDirToolFile()))
if not create_change_operations(head_commit):
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "No changes")
return
# 创建树节点, 并链接
self.historys.obj_paths[commit_name] = self.historys.branches[self.base_branch]
self.historys.branches[self.branch] = commit_name
else:
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
if not create_change_operations(head_commit):
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "No changes")
return
# 创建树节点, 并链接
2025-10-23 15:09:52 +08:00
self.historys.obj_paths[commit_name] = self.historys.branches[self.branch]
self.historys.branches[self.branch] = commit_name
2025-10-23 16:24:04 +08:00
(self.historys_file.GetDirToolFile()|commit_name).SaveAsBinary(pickle.dumps(root))
(self.historys_file).SaveAsBinary(pickle.dumps(self.historys))
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, f"Branch {self.branch} saved commit {commit_name}")
2025-10-23 15:09:52 +08:00
def view(self, commit_name:Optional[str]=None) -> None:
if commit_name is None or commit_name == "":
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
else:
head_commit = str(self.historys.ReadCommit(commit_name, self.historys_file.GetDirToolFile()))
self.print(head_commit)
self.print_out(True)
def restore(self) -> None:
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
self.file.SaveAsText(head_commit)
2025-10-23 16:24:04 +08:00
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, f"\nRestored {self.file.GetAbsPath()}")
2025-10-23 15:09:52 +08:00
2025-10-23 16:24:04 +08:00
def take(self, commit_name:Optional[str]=None) -> None:
if commit_name is None or commit_name == "":
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
else:
head_commit = str(self.historys.ReadCommit(commit_name, self.historys_file.GetDirToolFile()))
self.file.SaveAsText(head_commit)
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, f"\nTaken {self.file} to commit {"head" if commit_name is None or commit_name == "" else commit_name}")
def list_branches(self) -> None:
if len(self.historys.branches) == 0:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "No branches")
return
for branch in self.historys.branches:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, f"{branch} -> commit \"{self.historys.branches[branch]}\"")
def list_commits(self) -> None:
if len(self.historys.branches) == 0:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "No branches")
return
if self.branch not in self.historys.branches:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, f"Branch {self.branch} not found")
return
commit = self.historys.branches[self.branch]
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, f"Branch {self.branch} commits:")
while commit in self.historys.obj_paths:
commit_file = self.historys_file.GetDirToolFile()|commit
commit_object = pickle.loads(commit_file.LoadAsBinary())
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, f"{commit}: {ConsoleFrontColor.YELLOW}{commit_object.hashcode}{ConsoleFrontColor.GREEN}{len(commit_object.blocks)}{ConsoleFrontColor.RESET}")
commit = self.historys.obj_paths[commit]
def run() -> int:
parser = argparse.ArgumentParser()
# 目标文件
parser.add_argument("input", type=str, help="输入文件")
parser.add_argument("--history", type=str, help="手动指定目标历史文件", default=None)
# 可选的项目源
parser.add_argument("-a", "--asset", type=str, default=ProjectConfig.ProjectConfigFileFocus, help="配置文件目录")
# 分支
parser.add_argument("--base-branch", type=str, default="main", help="指定基分支")
parser.add_argument("--branch", type=str, default="main", help="指定分支, 当指定的分支不存在时会新建分支, 并以指定的基分支当前的commit为父节点创建分支节点, 如果没有指定基分支则默认使用main分支")
2025-10-23 15:09:52 +08:00
# 是否详细信息
parser.add_argument("--verbose", action="store_true", help="是否详细信息")
# 模式互斥组
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("-c", "--compare", action="store_true", help="比较当前文件的差异")
mode_group.add_argument("-s", "--save", action="store_true", help="保存当前文件的差异")
2025-10-23 15:09:52 +08:00
mode_group.add_argument("-v", "--view", type=str,default=None, help="查看记录内容")
mode_group.add_argument("-r", "--restore", action="store_true", help="恢复当前文件")
mode_group.add_argument("-t", "--take", type=str,default=None, help="获取指定提交的文件")
mode_group.add_argument("-lb", "--list-branches", action="store_true", help="列出所有分支")
mode_group.add_argument("-lc", "--list-commits", action="store_true", help="列出指定分支的提交列表")
args = parser.parse_args()
if "help" in args:
parser.print_help()
return 0
cli = Cli(args.asset, args.input, args.branch,
2025-10-23 15:09:52 +08:00
history_file=args.history,
verbose=args.verbose,
base_branch=args.base_branch)
# 比较
if args.compare:
cli.compare()
return 0
# 保存
elif args.save:
cli.save()
return 0
2025-10-23 15:09:52 +08:00
# 查看记录内容
2025-10-23 16:24:04 +08:00
elif args.view is not None:
2025-10-23 15:09:52 +08:00
cli.view(args.view)
return 0
# 恢复
elif args.restore:
cli.restore()
return 0
# 获取指定提交的文件
2025-10-23 16:24:04 +08:00
elif args.take is not None:
2025-10-23 15:09:52 +08:00
cli.take(args.take)
return 0
# 列出所有分支
elif args.list_branches:
cli.list_branches()
return 0
# 列出指定分支的提交列表
elif args.list_commits:
cli.list_commits()
return 0
raise NotImplementedError("Not implemented mode")
if __name__ == "__main__":
run()