正在进行编辑距离函数优化

This commit is contained in:
2025-10-23 15:09:52 +08:00
parent 25e03b4246
commit e091d846c1
4 changed files with 127 additions and 98 deletions

221
app.py
View File

@@ -1,5 +1,6 @@
from Convention.Convention.Runtime.GlobalConfig import *
from Convention.Convention.Runtime.File import *
from Convention.Convention.Runtime.String import GetEditorDistanceAndOperations, FillString
import argparse
from pydantic import BaseModel
@@ -37,12 +38,20 @@ class HistoryModel(BaseModel):
# branch name : branch head commit
branches: Dict[str,str] = {}
def ReadBranchHeadCommit(self, branch:str, parent_path:ToolFile) -> HistoryCommit:
def GetParentCommit(self, commit_name:str) -> Optional[str]:
if commit_name not in self.obj_paths:
raise ValueError(f"Commit {commit_name} not found")
return self.obj_paths[commit_name]
def GetBranchHeadCommit(self, branch:str) -> str:
if branch not in self.branches:
raise ValueError(f"Branch {branch} not found")
return self.branches[branch]
def ReadCommit(self, commit_name:str, parent_path:ToolFile) -> HistoryCommit:
# 从分支中读入链中(倒序的)
object_chain:List[HistoryObject] = []
current_commit_file_path = self.branches[branch]
current_commit_file_path = commit_name
while current_commit_file_path is not None:
current_commit_file = parent_path|current_commit_file_path
node = HistoryObject.model_validate_json(current_commit_file.LoadAsText())
@@ -55,6 +64,9 @@ class HistoryModel(BaseModel):
result = HistoryCommit(object_chain)
return result
def ReadBranchHeadCommit(self, branch:str, parent_path:ToolFile) -> HistoryCommit:
return self.ReadCommit(self.GetBranchHeadCommit(branch), parent_path)
def break_down_path(path:ToolFile|str) -> ToolFile:
temp = f"{path}"#[:-len(path.GetExtension())]
temp = temp.replace("\\\\",PlatformIndicator.GetFileSeparator())
@@ -65,81 +77,50 @@ def break_down_path(path:ToolFile|str) -> ToolFile:
else:
return ToolFile(temp[0])|temp[1]
def levenshtein_distance_with_operations(s1:str, s2:str) -> Tuple[int, List[Tuple[str, int, int, str]]]:
"""
计算两个字符串的编辑距离和操作序列
操作格式: (操作类型, 开始位置, 结束位置, 内容)
位置基于源字符串s1
"""
m, n = len(s1), len(s2)
# 使用简单的LCS算法来找到最长公共子序列
# 然后基于LCS生成操作序列
lcs = [[0] * (n + 1) for _ in range(m + 1)]
# 构建LCS表
for i in range(1, m + 1):
for j in range(1, n + 1):
if s1[i - 1] == s2[j - 1]:
lcs[i][j] = lcs[i - 1][j - 1] + 1
else:
lcs[i][j] = max(lcs[i - 1][j], lcs[i][j - 1])
# 基于LCS生成操作序列
operations = []
i, j = m, n
while i > 0 or j > 0:
if i > 0 and j > 0 and s1[i - 1] == s2[j - 1]:
# 字符匹配,不需要操作
i -= 1
j -= 1
elif j > 0 and (i == 0 or lcs[i][j - 1] >= lcs[i - 1][j]):
# 需要插入s2[j-1]
# 找到插入位置在s1中的位置
insert_pos = i
operations.insert(0, ("add", insert_pos, insert_pos, s2[j - 1]))
j -= 1
else:
# 需要删除s1[i-1]
operations.insert(0, ("delete", i - 1, i, s1[i - 1]))
i -= 1
# 合并连续的操作
merged_operations = []
for op in operations:
if merged_operations and merged_operations[-1][0] == op[0]:
last_op = merged_operations[-1]
if op[0] == "add" and last_op[2] == op[1]:
# 合并连续的添加操作
merged_operations[-1] = (op[0], last_op[1], op[2], last_op[3] + op[3])
elif op[0] == "delete" and last_op[2] == op[1]:
# 合并连续的删除操作
merged_operations[-1] = (op[0], last_op[1], op[2], last_op[3] + op[3])
else:
merged_operations.append(op)
else:
merged_operations.append(op)
# 计算编辑距离
edit_distance = m + n - 2 * lcs[m][n]
return edit_distance, merged_operations
class Cli:
def print_out(self) -> None:
def print_out(self, is_show_all = False) -> None:
sp = self.prints.split("\n")
status_len = len(str(len(sp)))
for line_index, line in enumerate(sp):
if ConsoleFrontColor.GREEN in line and ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.YELLOW}@"
elif ConsoleFrontColor.GREEN in line:
perfix = f"{ConsoleFrontColor.GREEN}+"
elif ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.RED}-"
else:
perfix = "="
if is_show_all:
for line_index, line in enumerate(sp):
if ConsoleFrontColor.GREEN in line and ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.YELLOW}@"
elif ConsoleFrontColor.GREEN in line:
perfix = f"{ConsoleFrontColor.GREEN}+"
elif ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.RED}-"
else:
perfix = "="
print(f"{perfix}{line_index}{" "*max(0,status_len-len(str(line_index)))}{ConsoleFrontColor.RESET} | {line}")
print(f"{perfix}{FillString(line_index+1, max_length=status_len, side = "left")}{ConsoleFrontColor.RESET} | {line}")
else:
layer = 0
for line_index, line in enumerate(sp):
layer -= 1
forward = sp[min(line_index+self.group_size, len(sp)-1)]
if ConsoleFrontColor.GREEN in forward and ConsoleFrontColor.RED in forward:
layer = self.group_size
elif ConsoleFrontColor.GREEN in forward:
layer = self.group_size
elif ConsoleFrontColor.RED in forward:
layer = self.group_size
if ConsoleFrontColor.GREEN in line and ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.YELLOW}@"
layer = self.group_size
elif ConsoleFrontColor.GREEN in line:
perfix = f"{ConsoleFrontColor.GREEN}+"
layer = self.group_size
elif ConsoleFrontColor.RED in line:
perfix = f"{ConsoleFrontColor.RED}-"
layer = self.group_size
else:
perfix = "="
if layer <= 0:
continue
print(f"{perfix}{FillString(line_index+1, max_length=status_len, side = "left")}{ConsoleFrontColor.RESET} | {line}")
print(ConsoleFrontColor.RESET)
def print(self, *args) -> None:
@@ -147,11 +128,14 @@ class Cli:
def __init__(self, asset:str, input:str, branch:str,
*,
history_file:Optional[str]=None
history_file:Optional[str]=None,
verbose:bool=False
) -> None:
self.config = GlobalConfig(asset,True)
self.file = ToolFile(input)
self.branch = branch
self.verbose = verbose or self.config.FindItem("verbose", False)
self.group_size = self.config.FindItem("group_size", 3)
self.historys_file = self.config.GetFile(history_file if history_file is not None else break_down_path(self.file.GetAbsPath())|"history", False)
if self.historys_file.Exists():
@@ -167,7 +151,7 @@ class Cli:
return
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
current_content = self.file.LoadAsText()
step, operations = levenshtein_distance_with_operations(head_commit, current_content)
step, operations = GetEditorDistanceAndOperations(head_commit, current_content)
if step == 0:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "No changes")
return
@@ -178,42 +162,51 @@ class Cli:
if operation[0] == "add":
color = ConsoleFrontColor.GREEN
eline = "[>>>]"
etab = "[>>]"
ewrite = "[>]"
elif operation[0] == "delete":
color = ConsoleFrontColor.RED
eline = "[<<<]"
etab = "[<<]"
ewrite = "[<]"
else:
raise ValueError(f"Invalid operation: {operation}")
sp = operation[3].split("\n")
if operation[3][0] == "\n":
self.print("\n")
for line_index, line in enumerate(sp):
self.print(f"{color}{line}{ConsoleFrontColor.RESET}")
if line_index == len(sp) - 1:
if operation[3][-1] == "\n":
self.print("\n")
self.print(color)
for ch in operation[3]:
if ch == '\n':
self.print(f"{eline}\n{color}")
elif ch == '\t':
self.print(etab)
elif ch == ' ':
self.print(ewrite)
else:
self.print("\n")
self.print(ch)
self.print(ConsoleFrontColor.RESET)
index = operation[2]
self.print(head_commit[index:])
self.print_out()
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "\noperations:")
print(f"{'\n'.join([f"{ConsoleFrontColor.GREEN if item[0] == "add" else ConsoleFrontColor.RED}{item[0]}{ConsoleFrontColor.RESET} \"{ConsoleFrontColor.YELLOW}{item[3]}{ConsoleFrontColor.RESET}\" on [{item[1]},{item[2]}]" for item in operations])}")
if self.verbose:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "\noperations:")
print(f"{'\n'.join([f"{ConsoleFrontColor.GREEN if item[0] == "add" else ConsoleFrontColor.RED}{item[0]}{ConsoleFrontColor.RESET} \"{ConsoleFrontColor.YELLOW}{item[3]}{ConsoleFrontColor.RESET}\" on [{item[1]},{item[2]}]" for item in operations])}")
def save(self) -> None:
content = self.file.LoadAsText()
root = HistoryObject(hashcode=hashlib.md5(content.encode()).hexdigest())
commit_name = f"{len(self.historys.obj_paths)}"
if self.branch not in self.historys.branches:
# 创建分支并为其创建新的树
commit_name = f"{len(self.historys.obj_paths)}"
self.historys.obj_paths[commit_name] = None
self.historys.branches[self.branch] = commit_name
self.historys_file.MustExistsPath()
root.blocks=[HistoryBlock(mode="add",begin=0,end=len(content),content=content)]
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "\nAll content is new")
else:
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
step, operations = levenshtein_distance_with_operations(head_commit, content)
step, operations = GetEditorDistanceAndOperations(head_commit, content)
for operation in operations:
if operation[0] == "add":
root.blocks.append(HistoryBlock(mode="add",begin=operation[1],end=operation[2],content=operation[3]))
@@ -221,13 +214,31 @@ class Cli:
root.blocks.append(HistoryBlock(mode="delete",begin=operation[1],end=operation[2],content=operation[3]))
else:
raise ValueError(f"Invalid operation: {operation}")
if self.verbose:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "\noperations:")
print(f"{'\n'.join([f"{ConsoleFrontColor.GREEN if item[0] == "add" else ConsoleFrontColor.RED}{item[0]}{ConsoleFrontColor.RESET} \"{ConsoleFrontColor.YELLOW}{item[3]}{ConsoleFrontColor.RESET}\" on [{item[1]},{item[2]}]" for item in operations])}")
# 创建树节点, 并链接
self.historys.obj_paths[f"{len(self.historys.obj_paths)}"] = self.historys.branches[self.branch]
self.historys.branches[self.branch] = f"{len(self.historys.obj_paths)}"
with open(f"{self.historys_file.GetDirToolFile()|self.historys.branches[self.branch]}", "w") as f:
f.write(root.model_dump_json())
with open(self.historys_file.GetFullPath(), "w") as f:
f.write(self.historys.model_dump_json())
self.historys.obj_paths[commit_name] = self.historys.branches[self.branch]
self.historys.branches[self.branch] = commit_name
(self.historys_file.GetDirToolFile()|commit_name).SaveAsText(root.model_dump_json())
(self.historys_file).SaveAsText(self.historys.model_dump_json())
def view(self, commit_name:Optional[str]=None) -> None:
if commit_name is None or commit_name == "":
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
else:
head_commit = str(self.historys.ReadCommit(commit_name, self.historys_file.GetDirToolFile()))
self.print(head_commit)
self.print_out(True)
def restore(self) -> None:
head_commit = str(self.historys.ReadBranchHeadCommit(self.branch, self.historys_file.GetDirToolFile()))
self.file.SaveAsText(head_commit)
def take(self, commit_name:str) -> None:
commit = self.historys.ReadCommit(commit_name, self.historys_file.GetDirToolFile())
self.file.SaveAsText(commit.content)
def run() -> int:
parser = argparse.ArgumentParser()
@@ -238,10 +249,15 @@ def run() -> int:
parser.add_argument("-a", "--asset", type=str, default=ProjectConfig.ProjectConfigFileFocus, help="配置文件目录")
# 分支
parser.add_argument("-b", "--branch", type=str, default="main", help="分支")
# 是否详细信息
parser.add_argument("--verbose", action="store_true", help="是否详细信息")
# 模式互斥组
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("-c", "--compare", action="store_true", help="比较当前文件的差异")
mode_group.add_argument("-s", "--save", action="store_true", help="保存当前文件的差异")
mode_group.add_argument("-v", "--view", type=str,default=None, help="查看记录内容")
mode_group.add_argument("-r", "--restore", action="store_true", help="恢复当前文件")
mode_group.add_argument("-t", "--take", type=str,default=None, help="获取指定提交的文件")
args = parser.parse_args()
@@ -250,7 +266,8 @@ def run() -> int:
return 0
cli = Cli(args.asset, args.input, args.branch,
history_file=args.history)
history_file=args.history,
verbose=args.verbose)
# 比较
if args.compare:
@@ -260,6 +277,18 @@ def run() -> int:
elif args.save:
cli.save()
return 0
# 查看记录内容
elif "view" in args:
cli.view(args.view)
return 0
# 恢复
elif args.restore:
cli.restore()
return 0
# 获取指定提交的文件
elif "take" in args:
cli.take(args.take)
return 0
raise NotImplementedError("Not implemented mode")