231 lines
9.3 KiB
Python
231 lines
9.3 KiB
Python
from Convention.Convention.Runtime.GlobalConfig import *
|
|
from Convention.Convention.Runtime.File import *
|
|
|
|
import argparse
|
|
from pydantic import BaseModel
|
|
import pickle
|
|
from datetime import datetime
|
|
|
|
from tqdm import tqdm, trange
|
|
|
|
def enum_for(verbose:bool, obj:Iterable, **kwargs):
|
|
if verbose:
|
|
return tqdm(obj, position=0, leave=False, **kwargs)
|
|
else:
|
|
return obj
|
|
|
|
def enum_range(verbose:bool, start:int, end:int) -> Iterable[int]:
|
|
if verbose:
|
|
return trange(start, end, position=0, leave=False)
|
|
else:
|
|
return range(start, end)
|
|
|
|
class ChangeEntry(BaseModel):
|
|
before: Optional[str] = ""
|
|
after: Optional[str] = ""
|
|
|
|
class DataEntry(ChangeEntry):
|
|
line: int = 0
|
|
|
|
class DataEntries(BaseModel):
|
|
data: List[DataEntry] = []
|
|
time: datetime = datetime.now()
|
|
|
|
class DataModel(BaseModel):
|
|
data: List[DataEntries] = []
|
|
|
|
def enum_for_entries(self, verbose:bool, *, end_time:Optional[datetime]=None) -> Iterable[DataEntries]:
|
|
return enum_for(verbose, [entries for entries in self.data if entries.time <= end_time])
|
|
|
|
def pickup(self, verbose:bool, *, end_time:Optional[datetime]=None) -> List[str]:
|
|
if end_time is None:
|
|
end_time = datetime.now()
|
|
|
|
history_content: List[str] = []
|
|
|
|
for entries in self.enum_for_entries(verbose, end_time=end_time):
|
|
for entry in entries.data:
|
|
if len(history_content) <= entry.line:
|
|
history_content.append(entry.after)
|
|
elif history_content[entry.line] == entry.before:
|
|
history_content[entry.line] = entry.after
|
|
else:
|
|
history_content.insert(entry.line, entry.after)
|
|
|
|
return history_content
|
|
|
|
def lineup(self, verbose:bool, *, end_time:Optional[datetime]=None) -> List[DataEntry]:
|
|
if end_time is None:
|
|
end_time = datetime.now()
|
|
|
|
history_content: List[DataEntry] = []
|
|
|
|
for entries in self.enum_for_entries(verbose, end_time=end_time):
|
|
for entry in entries.data:
|
|
if len(history_content) <= entry.line:
|
|
history_content.append(entry)
|
|
elif history_content[entry.line].before == entry.before:
|
|
history_content[entry.line] = entry
|
|
else:
|
|
history_content.insert(entry.line, entry)
|
|
|
|
return history_content
|
|
|
|
def parse(self, verbose:bool, content:List[str]) -> DataEntries:
|
|
config = ProjectConfig()
|
|
group_size = config.FindItem("group_size", 10)
|
|
|
|
history_content = self.lineup(verbose)
|
|
result_entries = DataEntries(time=datetime.now())
|
|
|
|
first = 0
|
|
second = 0
|
|
first_end = len(content)
|
|
second_end = len(history_content)
|
|
# 都未到达文件末尾时
|
|
while first < first_end and second < second_end:
|
|
if verbose:
|
|
PrintColorful(ConsoleFrontColor.BLUE,f"Current[{first}/{first_end}] -> History[{second}/{second_end}]", end="\r")
|
|
if content[first] == history_content[second].after:
|
|
first += 1
|
|
second += 1
|
|
continue
|
|
# 判断状态
|
|
stats: Literal["add", "delete", "unknown"] = "unknown"
|
|
# 假设如果当前与历史中都存在但是行不一致
|
|
# 寻找历史中与当前相同的行
|
|
if stats == "unknown":
|
|
for i in range(first+1, min(first_end, first+group_size)):
|
|
if content[i] == history_content[second].after:
|
|
# 中间的部分全部为新增
|
|
stats = "add"
|
|
for index in range(first, i):
|
|
result_entries.data.append(DataEntry(line=index, before=None, after=content[index]))
|
|
first = i
|
|
second += 1
|
|
break
|
|
# 寻找当前中与历史相同的行
|
|
if stats == "unknown":
|
|
for i in range(second+1, min(second_end, second+group_size)):
|
|
if history_content[i].after == content[first]:
|
|
# 中间的部分全部为删除
|
|
stats = "delete"
|
|
for index in range(second, i):
|
|
result_entries.data.append(DataEntry(line=first, before=history_content[index].after, after=None))
|
|
first += 1
|
|
second = i
|
|
break
|
|
# 到达此处代表历史中second处的行不存在于当前中, 当前first处的行也不存在于历史中
|
|
if stats == "unknown":
|
|
# 本行为修改
|
|
result_entries.data.append(DataEntry(line=first, before=history_content[second].after, after=content[first]))
|
|
first += 1
|
|
second += 1
|
|
continue
|
|
# 处理剩余的行
|
|
while first < first_end:
|
|
# 当前末尾多出的行全部为新增
|
|
result_entries.data.append(DataEntry(line=first, before=None, after=content[first]))
|
|
first += 1
|
|
while second < second_end:
|
|
# 当前末尾缺少的行全部为删除
|
|
result_entries.data.append(DataEntry(line=first, before=history_content[second].after, after=None))
|
|
second += 1
|
|
|
|
return result_entries
|
|
|
|
def get_line_header(head:str) -> str:
|
|
return f"{head}{" "*(10-len(head))}"
|
|
|
|
def run_parser(file:ToolFile, history:ToolFile, *,verbose:bool=False, immediately:bool=True):
|
|
if verbose:
|
|
PrintColorful(ConsoleFrontColor.BLUE,f"Running parser for file: {file}")
|
|
PrintColorful(ConsoleFrontColor.BLUE,f"History file: {history}")
|
|
PrintColorful(ConsoleFrontColor.BLUE,f"Start time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
if history.GetSize() > 0:
|
|
with open(f"{history}", "rb") as f:
|
|
history_data: DataModel = pickle.load(f) #DataModel.model_validate(history.LoadAsJson())
|
|
else:
|
|
history_data = DataModel()
|
|
file_content: List[str] = []
|
|
_temp_file_reading: List[str] = []
|
|
if verbose:
|
|
index = 0
|
|
for line in file.ReadLines(encoding='utf-8'):
|
|
_temp_file_reading.append(line)
|
|
index += 1
|
|
if index % 1000 == 0:
|
|
PrintColorful(ConsoleFrontColor.BLUE,f"Reading file: {index} lines", end="\r")
|
|
PrintColorful(ConsoleFrontColor.BLUE,f"Reading file: {index} lines", end="\r")
|
|
else:
|
|
_temp_file_reading = [line for line in file.ReadLines(encoding='utf-8')]
|
|
for line in enum_for(verbose, _temp_file_reading, desc="Reading file"):
|
|
file_content.append(line)
|
|
|
|
change_content: DataEntries = history_data.parse(verbose, file_content)
|
|
|
|
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX, "Previewing change content:"," "*20)
|
|
for line in change_content.data:
|
|
if line.before is None:
|
|
PrintColorful(ConsoleFrontColor.GREEN, f"{get_line_header(f"++ {line.line}")}| {line.after}")
|
|
elif line.after is None:
|
|
PrintColorful(ConsoleFrontColor.RED, f"{get_line_header(f"-- {line.line}")}| {line.before}")
|
|
else:
|
|
PrintColorful(ConsoleFrontColor.YELLOW, f"{get_line_header(f"@+ {line.line}")}| {line.before}")
|
|
PrintColorful(ConsoleFrontColor.YELLOW, f"{get_line_header(f"@- {line.line}")}| {line.after}")
|
|
|
|
if immediately:
|
|
if len(change_content.data) > 0:
|
|
history_data.data.append(change_content)
|
|
with open(f"{history}", "wb") as f:
|
|
pickle.dump(history_data, f)
|
|
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX,f"History data has been saved to {history}")
|
|
else:
|
|
PrintColorful(ConsoleFrontColor.LIGHTMAGENTA_EX,f"No change")
|
|
|
|
|
|
def run():
|
|
config = ProjectConfig()
|
|
verbose = config.FindItem("verbose", False)
|
|
immediately = config.FindItem("immediately", True)
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-i", "--input", type=str)
|
|
parser.add_argument("--verbose", type=bool, default=verbose)
|
|
parser.add_argument("-c", "--clean", type=bool, default=False)
|
|
parser.add_argument("-im", "--immediately", type=bool, default=immediately)
|
|
parser.add_argument("-p", "--preview", type=bool, default=False)
|
|
args = parser.parse_args()
|
|
|
|
if args.input is None:
|
|
PrintColorful(ConsoleFrontColor.RED,"Error: No input file specified")
|
|
return
|
|
|
|
verbose = args.verbose
|
|
|
|
file = ToolFile(args.input)
|
|
file = ToolFile(file.GetAbsPath())
|
|
if not file.Exists():
|
|
PrintColorful(ConsoleFrontColor.RED,f"Error: Input file {args.input} does not exist")
|
|
return
|
|
|
|
head, path_info = f"{file}".split(":", 1)
|
|
if verbose:
|
|
PrintColorful(ConsoleFrontColor.BLUE,f"Input file: {file}")
|
|
PrintColorful(ConsoleFrontColor.BLUE,f"Head: {head}")
|
|
PrintColorful(ConsoleFrontColor.BLUE,f"Path info: {path_info}")
|
|
|
|
history_file = config.GetFile(ToolFile(head)|f"{path_info}.history", is_must_exist=True)
|
|
|
|
if args.clean:
|
|
history_file.Remove()
|
|
PrintColorful(ConsoleFrontColor.GREEN,f"Input file history data {args.input} has been cleaned")
|
|
return
|
|
|
|
config.SaveProperties()
|
|
|
|
run_parser(file, history_file, verbose=verbose, immediately=immediately and not args.preview)
|
|
|
|
if __name__ == "__main__":
|
|
run() |