from Convention.Convention.Runtime.GlobalConfig import * from Convention.Convention.Runtime.File import * import argparse from pydantic import BaseModel import pickle from datetime import datetime from tqdm import tqdm, trange def enum_for(verbose:bool, obj:Iterable, **kwargs): if verbose: return tqdm(obj, position=0, leave=False, **kwargs) else: return obj def enum_range(verbose:bool, start:int, end:int) -> Iterable[int]: if verbose: return trange(start, end, position=0, leave=False) else: return range(start, end) class ChangeEntry(BaseModel): before: Optional[str] = "" after: Optional[str] = "" class DataEntry(ChangeEntry): line: int = 0 class DataEntries(BaseModel): data: List[DataEntry] = [] time: datetime = datetime.now() class DataModel(BaseModel): data: List[DataEntries] = [] def enum_for_entries(self, verbose:bool, *, end_time:Optional[datetime]=None) -> Iterable[DataEntries]: return enum_for(verbose, [entries for entries in self.data if entries.time <= end_time]) def pickup(self, verbose:bool, *, end_time:Optional[datetime]=None) -> List[str]: if end_time is None: end_time = datetime.now() history_content: List[str] = [] for entries in self.enum_for_entries(verbose, end_time=end_time): for entry in entries.data: if len(history_content) <= entry.line: history_content.append(entry.after) elif history_content[entry.line] == entry.before: history_content[entry.line] = entry.after else: history_content.insert(entry.line, entry.after) return history_content def lineup(self, verbose:bool, *, end_time:Optional[datetime]=None) -> List[DataEntry]: if end_time is None: end_time = datetime.now() history_content: List[DataEntry] = [] for entries in self.enum_for_entries(verbose, end_time=end_time): for entry in entries.data: if len(history_content) <= entry.line: history_content.append(entry) elif history_content[entry.line].before == entry.before: history_content[entry.line] = entry else: history_content.insert(entry.line, entry) return history_content def parse(self, verbose:bool, content:List[str]) -> DataEntries: config = ProjectConfig() group_size = config.FindItem("group_size", 10) history_content = self.lineup(verbose) result_entries = DataEntries(time=datetime.now()) first = 0 second = 0 first_end = len(content) second_end = len(history_content) # 都未到达文件末尾时 while first < first_end and second < second_end: if verbose: PrintColorful(ConsoleFrontColor.BLUE,f"Current[{first}/{first_end}] -> History[{second}/{second_end}]", end="\r") if content[first] == history_content[second].after: first += 1 second += 1 continue # 判断状态 stats: Literal["add", "delete", "unknown"] = "unknown" # 假设如果当前与历史中都存在但是行不一致 # 寻找历史中与当前相同的行 if stats == "unknown": for i in range(first+1, min(first_end, first+group_size)): if content[i] == history_content[second].after: # 中间的部分全部为新增 stats = "add" for index in range(first, i): result_entries.data.append(DataEntry(line=index, before=None, after=content[index])) first = i second += 1 break # 寻找当前中与历史相同的行 if stats == "unknown": for i in range(second+1, min(second_end, second+group_size)): if history_content[i].after == content[first]: # 中间的部分全部为删除 stats = "delete" for index in range(second, i): result_entries.data.append(DataEntry(line=first, before=history_content[index].after, after=None)) first += 1 second = i break # 到达此处代表历史中second处的行不存在于当前中, 当前first处的行也不存在于历史中 if stats == "unknown": # 本行为修改 result_entries.data.append(DataEntry(line=first, before=history_content[second].after, after=content[first])) first += 1 second += 1 continue # 处理剩余的行 while first < first_end: # 当前末尾多出的行全部为新增 result_entries.data.append(DataEntry(line=first, before=None, after=content[first])) first += 1 while second < second_end: # 当前末尾缺少的行全部为删除 result_entries.data.append(DataEntry(line=first, before=history_content[second].after, after=None)) second += 1 return result_entries def get_line_header(head:str) -> str: return f"{head}{" "*(10-len(head))}" def run_parser(file:ToolFile, history:ToolFile, *,verbose:bool=False, immediately:bool=True): if verbose: PrintColorful(ConsoleFrontColor.BLUE,f"Running parser for file: {file}") PrintColorful(ConsoleFrontColor.BLUE,f"History file: {history}") PrintColorful(ConsoleFrontColor.BLUE,f"Start time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if history.GetSize() > 0: with open(f"{history}", "rb") as f: history_data: DataModel = pickle.load(f) #DataModel.model_validate(history.LoadAsJson()) else: history_data = DataModel() file_content: List[str] = [] _temp_file_reading: List[str] = [] if verbose: index = 0 for line in file.ReadLines(encoding='utf-8'): _temp_file_reading.append(line) index += 1 if index % 1000 == 0: PrintColorful(ConsoleFrontColor.BLUE,f"Reading file: {index} lines", end="\r") PrintColorful(ConsoleFrontColor.BLUE,f"Reading file: {index} lines", end="\r") else: _temp_file_reading = [line for line in file.ReadLines(encoding='utf-8')] for line in enum_for(verbose, _temp_file_reading, desc="Reading file"): file_content.append(line) change_content: DataEntries = history_data.parse(verbose, file_content) PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "Previewing change content:"," "*20) for line in change_content.data: if line.before is None: PrintColorful(ConsoleFrontColor.GREEN, f"{get_line_header(f"++ {line.line}")}| {line.after}") elif line.after is None: PrintColorful(ConsoleFrontColor.RED, f"{get_line_header(f"-- {line.line}")}| {line.before}") else: PrintColorful(ConsoleFrontColor.YELLOW, f"{get_line_header(f"@+ {line.line}")}| {line.before}") PrintColorful(ConsoleFrontColor.YELLOW, f"{get_line_header(f"@- {line.line}")}| {line.after}") if immediately: if len(change_content.data) > 0: history_data.data.append(change_content) with open(f"{history}", "wb") as f: pickle.dump(history_data, f) PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX,f"History data has been saved to {history}") else: PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX,f"No change") def run(): config = ProjectConfig() verbose = config.FindItem("verbose", False) immediately = config.FindItem("immediately", True) parser = argparse.ArgumentParser() parser.add_argument("-i", "--input", type=str) parser.add_argument("--verbose", type=bool, default=verbose) parser.add_argument("-c", "--clean", type=bool, default=False) parser.add_argument("-im", "--immediately", type=bool, default=immediately) parser.add_argument("-p", "--preview", type=bool, default=False) args = parser.parse_args() if args.input is None: PrintColorful(ConsoleFrontColor.RED,"Error: No input file specified") return verbose = args.verbose file = ToolFile(args.input) file = ToolFile(file.GetAbsPath()) if not file.Exists(): PrintColorful(ConsoleFrontColor.RED,f"Error: Input file {args.input} does not exist") return head, path_info = f"{file}".split(":", 1) if verbose: PrintColorful(ConsoleFrontColor.BLUE,f"Input file: {file}") PrintColorful(ConsoleFrontColor.BLUE,f"Head: {head}") PrintColorful(ConsoleFrontColor.BLUE,f"Path info: {path_info}") history_file = config.GetFile(ToolFile(head)|f"{path_info}.history", is_must_exist=True) if args.clean: history_file.Remove() PrintColorful(ConsoleFrontColor.GREEN,f"Input file history data {args.input} has been cleaned") return config.SaveProperties() run_parser(file, history_file, verbose=verbose, immediately=immediately and not args.preview) if __name__ == "__main__": run()