From 1d203f9a7e23f119ae1d90d6a68dc55965324daa Mon Sep 17 00:00:00 2001 From: ninemine <1371605831@qq.com> Date: Mon, 20 Oct 2025 17:33:36 +0800 Subject: [PATCH] Init --- .gitignore | 4 +- .gitmodules | 3 + Convention | 1 + app.py | 231 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 .gitmodules create mode 160000 Convention create mode 100644 app.py diff --git a/.gitignore b/.gitignore index dd15ec4..5310e59 100644 --- a/.gitignore +++ b/.gitignore @@ -178,4 +178,6 @@ cython_debug/ # exclude from AI features like autocomplete and code analysis. Recommended for sensitive data # refer to https://docs.cursor.com/context/ignore-files .cursorignore -.cursorindexingignore \ No newline at end of file +.cursorindexingignore +/Assets +*.txt diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..820a10e --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "Convention"] + path = Convention + url = http://www.liubai.site:3000/ninemine/Convention-Python.git diff --git a/Convention b/Convention new file mode 160000 index 0000000..007db5a --- /dev/null +++ b/Convention @@ -0,0 +1 @@ +Subproject commit 007db5a06b0b66e5ca2c108a0f3fcafac8e3da7f diff --git a/app.py b/app.py new file mode 100644 index 0000000..b66dd28 --- /dev/null +++ b/app.py @@ -0,0 +1,231 @@ +from Convention.Convention.Runtime.GlobalConfig import * +from Convention.Convention.Runtime.File import * + +import argparse +from pydantic import BaseModel +import pickle +from datetime import datetime + +from tqdm import tqdm, trange + +def enum_for(verbose:bool, obj:Iterable, **kwargs): + if verbose: + return tqdm(obj, position=0, leave=False, **kwargs) + else: + return obj + +def enum_range(verbose:bool, start:int, end:int) -> Iterable[int]: + if verbose: + return trange(start, end, position=0, leave=False) + else: + return range(start, end) + +class ChangeEntry(BaseModel): + before: Optional[str] = "" + after: Optional[str] = "" + +class DataEntry(ChangeEntry): + line: int = 0 + +class DataEntries(BaseModel): + data: List[DataEntry] = [] + time: datetime = datetime.now() + +class DataModel(BaseModel): + data: List[DataEntries] = [] + + def enum_for_entries(self, verbose:bool, *, end_time:Optional[datetime]=None) -> Iterable[DataEntries]: + return enum_for(verbose, [entries for entries in self.data if entries.time <= end_time]) + + def pickup(self, verbose:bool, *, end_time:Optional[datetime]=None) -> List[str]: + if end_time is None: + end_time = datetime.now() + + history_content: List[str] = [] + + for entries in self.enum_for_entries(verbose, end_time=end_time): + for entry in entries.data: + if len(history_content) <= entry.line: + history_content.append(entry.after) + elif history_content[entry.line] == entry.before: + history_content[entry.line] = entry.after + else: + history_content.insert(entry.line, entry.after) + + return history_content + + def lineup(self, verbose:bool, *, end_time:Optional[datetime]=None) -> List[DataEntry]: + if end_time is None: + end_time = datetime.now() + + history_content: List[DataEntry] = [] + + for entries in self.enum_for_entries(verbose, end_time=end_time): + for entry in entries.data: + if len(history_content) <= entry.line: + history_content.append(entry) + elif history_content[entry.line].before == entry.before: + history_content[entry.line] = entry + else: + history_content.insert(entry.line, entry) + + return history_content + + def parse(self, verbose:bool, content:List[str]) -> DataEntries: + config = ProjectConfig() + group_size = config.FindItem("group_size", 10) + + history_content = self.lineup(verbose) + result_entries = DataEntries(time=datetime.now()) + + first = 0 + second = 0 + first_end = len(content) + second_end = len(history_content) + # 都未到达文件末尾时 + while first < first_end and second < second_end: + if verbose: + PrintColorful(ConsoleFrontColor.BLUE,f"Current[{first}/{first_end}] -> History[{second}/{second_end}]", end="\r") + if content[first] == history_content[second].after: + first += 1 + second += 1 + continue + # 判断状态 + stats: Literal["add", "delete", "unknown"] = "unknown" + # 假设如果当前与历史中都存在但是行不一致 + # 寻找历史中与当前相同的行 + if stats == "unknown": + for i in range(first+1, min(first_end, first+group_size)): + if content[i] == history_content[second].after: + # 中间的部分全部为新增 + stats = "add" + for index in range(first, i): + result_entries.data.append(DataEntry(line=index, before=None, after=content[index])) + first = i + second += 1 + break + # 寻找当前中与历史相同的行 + if stats == "unknown": + for i in range(second+1, min(second_end, second+group_size)): + if history_content[i].after == content[first]: + # 中间的部分全部为删除 + stats = "delete" + for index in range(second, i): + result_entries.data.append(DataEntry(line=first, before=history_content[index].after, after=None)) + first += 1 + second = i + break + # 到达此处代表历史中second处的行不存在于当前中, 当前first处的行也不存在于历史中 + if stats == "unknown": + # 本行为修改 + result_entries.data.append(DataEntry(line=first, before=history_content[second].after, after=content[first])) + first += 1 + second += 1 + continue + # 处理剩余的行 + while first < first_end: + # 当前末尾多出的行全部为新增 + result_entries.data.append(DataEntry(line=first, before=None, after=content[first])) + first += 1 + while second < second_end: + # 当前末尾缺少的行全部为删除 + result_entries.data.append(DataEntry(line=first, before=history_content[second].after, after=None)) + second += 1 + + return result_entries + +def get_line_header(head:str) -> str: + return f"{head}{" "*(10-len(head))}" + +def run_parser(file:ToolFile, history:ToolFile, *,verbose:bool=False, immediately:bool=True): + if verbose: + PrintColorful(ConsoleFrontColor.BLUE,f"Running parser for file: {file}") + PrintColorful(ConsoleFrontColor.BLUE,f"History file: {history}") + PrintColorful(ConsoleFrontColor.BLUE,f"Start time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + if history.GetSize() > 0: + with open(f"{history}", "rb") as f: + history_data: DataModel = pickle.load(f) #DataModel.model_validate(history.LoadAsJson()) + else: + history_data = DataModel() + file_content: List[str] = [] + _temp_file_reading: List[str] = [] + if verbose: + index = 0 + for line in file.ReadLines(encoding='utf-8'): + _temp_file_reading.append(line) + index += 1 + if index % 1000 == 0: + PrintColorful(ConsoleFrontColor.BLUE,f"Reading file: {index} lines", end="\r") + PrintColorful(ConsoleFrontColor.BLUE,f"Reading file: {index} lines", end="\r") + else: + _temp_file_reading = [line for line in file.ReadLines(encoding='utf-8')] + for line in enum_for(verbose, _temp_file_reading, desc="Reading file"): + file_content.append(line) + + change_content: DataEntries = history_data.parse(verbose, file_content) + + PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "Previewing change content:"," "*20) + for line in change_content.data: + if line.before is None: + PrintColorful(ConsoleFrontColor.GREEN, f"{get_line_header(f"++ {line.line}")}| {line.after}") + elif line.after is None: + PrintColorful(ConsoleFrontColor.RED, f"{get_line_header(f"-- {line.line}")}| {line.before}") + else: + PrintColorful(ConsoleFrontColor.YELLOW, f"{get_line_header(f"@+ {line.line}")}| {line.before}") + PrintColorful(ConsoleFrontColor.YELLOW, f"{get_line_header(f"@- {line.line}")}| {line.after}") + + if immediately: + if len(change_content.data) > 0: + history_data.data.append(change_content) + with open(f"{history}", "wb") as f: + pickle.dump(history_data, f) + PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX,f"History data has been saved to {history}") + else: + PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX,f"No change") + + +def run(): + config = ProjectConfig() + verbose = config.FindItem("verbose", False) + immediately = config.FindItem("immediately", True) + + parser = argparse.ArgumentParser() + parser.add_argument("-i", "--input", type=str) + parser.add_argument("--verbose", type=bool, default=verbose) + parser.add_argument("-c", "--clean", type=bool, default=False) + parser.add_argument("-im", "--immediately", type=bool, default=immediately) + parser.add_argument("-p", "--preview", type=bool, default=False) + args = parser.parse_args() + + if args.input is None: + PrintColorful(ConsoleFrontColor.RED,"Error: No input file specified") + return + + verbose = args.verbose + + file = ToolFile(args.input) + file = ToolFile(file.GetAbsPath()) + if not file.Exists(): + PrintColorful(ConsoleFrontColor.RED,f"Error: Input file {args.input} does not exist") + return + + head, path_info = f"{file}".split(":", 1) + if verbose: + PrintColorful(ConsoleFrontColor.BLUE,f"Input file: {file}") + PrintColorful(ConsoleFrontColor.BLUE,f"Head: {head}") + PrintColorful(ConsoleFrontColor.BLUE,f"Path info: {path_info}") + + history_file = config.GetFile(ToolFile(head)|f"{path_info}.history", is_must_exist=True) + + if args.clean: + history_file.Remove() + PrintColorful(ConsoleFrontColor.GREEN,f"Input file history data {args.input} has been cleaned") + return + + config.SaveProperties() + + run_parser(file, history_file, verbose=verbose, immediately=immediately and not args.preview) + +if __name__ == "__main__": + run() \ No newline at end of file