Files
ChangeHistory/app.py
2025-10-20 17:33:36 +08:00

231 lines
9.3 KiB
Python

from Convention.Convention.Runtime.GlobalConfig import *
from Convention.Convention.Runtime.File import *
import argparse
from pydantic import BaseModel
import pickle
from datetime import datetime
from tqdm import tqdm, trange
def enum_for(verbose:bool, obj:Iterable, **kwargs):
if verbose:
return tqdm(obj, position=0, leave=False, **kwargs)
else:
return obj
def enum_range(verbose:bool, start:int, end:int) -> Iterable[int]:
if verbose:
return trange(start, end, position=0, leave=False)
else:
return range(start, end)
class ChangeEntry(BaseModel):
before: Optional[str] = ""
after: Optional[str] = ""
class DataEntry(ChangeEntry):
line: int = 0
class DataEntries(BaseModel):
data: List[DataEntry] = []
time: datetime = datetime.now()
class DataModel(BaseModel):
data: List[DataEntries] = []
def enum_for_entries(self, verbose:bool, *, end_time:Optional[datetime]=None) -> Iterable[DataEntries]:
return enum_for(verbose, [entries for entries in self.data if entries.time <= end_time])
def pickup(self, verbose:bool, *, end_time:Optional[datetime]=None) -> List[str]:
if end_time is None:
end_time = datetime.now()
history_content: List[str] = []
for entries in self.enum_for_entries(verbose, end_time=end_time):
for entry in entries.data:
if len(history_content) <= entry.line:
history_content.append(entry.after)
elif history_content[entry.line] == entry.before:
history_content[entry.line] = entry.after
else:
history_content.insert(entry.line, entry.after)
return history_content
def lineup(self, verbose:bool, *, end_time:Optional[datetime]=None) -> List[DataEntry]:
if end_time is None:
end_time = datetime.now()
history_content: List[DataEntry] = []
for entries in self.enum_for_entries(verbose, end_time=end_time):
for entry in entries.data:
if len(history_content) <= entry.line:
history_content.append(entry)
elif history_content[entry.line].before == entry.before:
history_content[entry.line] = entry
else:
history_content.insert(entry.line, entry)
return history_content
def parse(self, verbose:bool, content:List[str]) -> DataEntries:
config = ProjectConfig()
group_size = config.FindItem("group_size", 10)
history_content = self.lineup(verbose)
result_entries = DataEntries(time=datetime.now())
first = 0
second = 0
first_end = len(content)
second_end = len(history_content)
# 都未到达文件末尾时
while first < first_end and second < second_end:
if verbose:
PrintColorful(ConsoleFrontColor.BLUE,f"Current[{first}/{first_end}] -> History[{second}/{second_end}]", end="\r")
if content[first] == history_content[second].after:
first += 1
second += 1
continue
# 判断状态
stats: Literal["add", "delete", "unknown"] = "unknown"
# 假设如果当前与历史中都存在但是行不一致
# 寻找历史中与当前相同的行
if stats == "unknown":
for i in range(first+1, min(first_end, first+group_size)):
if content[i] == history_content[second].after:
# 中间的部分全部为新增
stats = "add"
for index in range(first, i):
result_entries.data.append(DataEntry(line=index, before=None, after=content[index]))
first = i
second += 1
break
# 寻找当前中与历史相同的行
if stats == "unknown":
for i in range(second+1, min(second_end, second+group_size)):
if history_content[i].after == content[first]:
# 中间的部分全部为删除
stats = "delete"
for index in range(second, i):
result_entries.data.append(DataEntry(line=first, before=history_content[index].after, after=None))
first += 1
second = i
break
# 到达此处代表历史中second处的行不存在于当前中, 当前first处的行也不存在于历史中
if stats == "unknown":
# 本行为修改
result_entries.data.append(DataEntry(line=first, before=history_content[second].after, after=content[first]))
first += 1
second += 1
continue
# 处理剩余的行
while first < first_end:
# 当前末尾多出的行全部为新增
result_entries.data.append(DataEntry(line=first, before=None, after=content[first]))
first += 1
while second < second_end:
# 当前末尾缺少的行全部为删除
result_entries.data.append(DataEntry(line=first, before=history_content[second].after, after=None))
second += 1
return result_entries
def get_line_header(head:str) -> str:
return f"{head}{" "*(10-len(head))}"
def run_parser(file:ToolFile, history:ToolFile, *,verbose:bool=False, immediately:bool=True):
if verbose:
PrintColorful(ConsoleFrontColor.BLUE,f"Running parser for file: {file}")
PrintColorful(ConsoleFrontColor.BLUE,f"History file: {history}")
PrintColorful(ConsoleFrontColor.BLUE,f"Start time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if history.GetSize() > 0:
with open(f"{history}", "rb") as f:
history_data: DataModel = pickle.load(f) #DataModel.model_validate(history.LoadAsJson())
else:
history_data = DataModel()
file_content: List[str] = []
_temp_file_reading: List[str] = []
if verbose:
index = 0
for line in file.ReadLines(encoding='utf-8'):
_temp_file_reading.append(line)
index += 1
if index % 1000 == 0:
PrintColorful(ConsoleFrontColor.BLUE,f"Reading file: {index} lines", end="\r")
PrintColorful(ConsoleFrontColor.BLUE,f"Reading file: {index} lines", end="\r")
else:
_temp_file_reading = [line for line in file.ReadLines(encoding='utf-8')]
for line in enum_for(verbose, _temp_file_reading, desc="Reading file"):
file_content.append(line)
change_content: DataEntries = history_data.parse(verbose, file_content)
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "Previewing change content:"," "*20)
for line in change_content.data:
if line.before is None:
PrintColorful(ConsoleFrontColor.GREEN, f"{get_line_header(f"++ {line.line}")}| {line.after}")
elif line.after is None:
PrintColorful(ConsoleFrontColor.RED, f"{get_line_header(f"-- {line.line}")}| {line.before}")
else:
PrintColorful(ConsoleFrontColor.YELLOW, f"{get_line_header(f"@+ {line.line}")}| {line.before}")
PrintColorful(ConsoleFrontColor.YELLOW, f"{get_line_header(f"@- {line.line}")}| {line.after}")
if immediately:
if len(change_content.data) > 0:
history_data.data.append(change_content)
with open(f"{history}", "wb") as f:
pickle.dump(history_data, f)
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX,f"History data has been saved to {history}")
else:
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX,f"No change")
def run():
config = ProjectConfig()
verbose = config.FindItem("verbose", False)
immediately = config.FindItem("immediately", True)
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", type=str)
parser.add_argument("--verbose", type=bool, default=verbose)
parser.add_argument("-c", "--clean", type=bool, default=False)
parser.add_argument("-im", "--immediately", type=bool, default=immediately)
parser.add_argument("-p", "--preview", type=bool, default=False)
args = parser.parse_args()
if args.input is None:
PrintColorful(ConsoleFrontColor.RED,"Error: No input file specified")
return
verbose = args.verbose
file = ToolFile(args.input)
file = ToolFile(file.GetAbsPath())
if not file.Exists():
PrintColorful(ConsoleFrontColor.RED,f"Error: Input file {args.input} does not exist")
return
head, path_info = f"{file}".split(":", 1)
if verbose:
PrintColorful(ConsoleFrontColor.BLUE,f"Input file: {file}")
PrintColorful(ConsoleFrontColor.BLUE,f"Head: {head}")
PrintColorful(ConsoleFrontColor.BLUE,f"Path info: {path_info}")
history_file = config.GetFile(ToolFile(head)|f"{path_info}.history", is_must_exist=True)
if args.clean:
history_file.Remove()
PrintColorful(ConsoleFrontColor.GREEN,f"Input file history data {args.input} has been cleaned")
return
config.SaveProperties()
run_parser(file, history_file, verbose=verbose, immediately=immediately and not args.preview)
if __name__ == "__main__":
run()