From 10d7a55454206462eaa79403c8acbe813fb575ac Mon Sep 17 00:00:00 2001 From: flt6 <1404262047@qq.com> Date: Tue, 6 May 2025 22:36:59 +0800 Subject: [PATCH] VideoCompress release v1.0 --- VideoCompress/main.py | 362 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 362 insertions(+) create mode 100644 VideoCompress/main.py diff --git a/VideoCompress/main.py b/VideoCompress/main.py new file mode 100644 index 0000000..421128d --- /dev/null +++ b/VideoCompress/main.py @@ -0,0 +1,362 @@ +import subprocess +from pathlib import Path +import sys +import logging +from datetime import datetime +from time import time +from rich.logging import RichHandler +from rich.progress import Progress +from pickle import dumps, loads +import atexit + +root = None +TRAIN = False +ESTI_FILE = Path("esti.out") +CFG_FILE = Path("config.json") +CFG = { + "crf":"18", + "codec": "h264", + "extra": [], + "ffmpeg": "ffmpeg", + "manual": None, + "video_ext": [".mp4", ".mkv"], + "train": False + +} +esti=None # :tuple[list[int],list[float]] + +def get_cmd(video_path,output_file): + if CFG["manual"] is not None: + command=[ + CFG["ffmpeg"], + "-hide_banner", + "-i", video_path + ] + command.extend(CFG["manual"]) + command.append(output_file) + + command = [ + CFG["ffmpeg"], + "-hide_banner", # 隐藏 ffmpeg 的横幅信息 + "-i", video_path, + "-vf", "scale=-1:1080", # 设置视频高度为 1080,宽度按比例自动计算 + "-c:v", CFG["codec"], # 使用 Intel Quick Sync Video 编码 + "-global_quality", CFG["crf"], # 设置全局质量(数值越低质量越高) + "-r","30", + "-preset", "slow", # 设置压缩速度为慢(压缩效果较好) + "-y", + ] + command.extend(CFG["extra"]) + command.append(output_file) + return command + +def train_init(): + global esti_data,TRAIN,data_file + data_file = Path("estiminate_data.dat") + if data_file.exists(): + esti_data=loads(data_file.read_bytes()) + if not isinstance(esti_data,tuple): + esti_data=([],[]) + else: + esti_data=([],[]) + TRAIN=True + atexit.register(save_esti) + # print(esti_data) + + +# 配置logging +def setup_logging(): + log_dir = Path("logs") + log_dir.mkdir(exist_ok=True) + log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log" + stream = RichHandler(rich_tracebacks=True,tracebacks_show_locals=True) + stream.setLevel(logging.DEBUG) + stream.setFormatter(logging.Formatter("%(message)s")) + + file = logging.FileHandler(log_file, encoding='utf-8') + file.setLevel(logging.INFO) + + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(levelname) 7s - %(message)s', + handlers=[ + file, + stream + ] + ) + +def polyfit_manual(x, y, degree=2): + """手动实现二次多项式最小二乘拟合""" + n = len(x) + if n != len(y): + raise ValueError("输入的x和y长度必须相同") + + # 对于二次多项式 y = ax^2 + bx + c + # 构建矩阵方程 A * [a, b, c]^T = B + # 其中 A = [[sum(x^4), sum(x^3), sum(x^2)], + # [sum(x^3), sum(x^2), sum(x)], + # [sum(x^2), sum(x), n]] + # B = [sum(x^2 * y), sum(x * y), sum(y)] + + # 计算需要的和 + sum_x = sum(x) + sum_x2 = sum(xi**2 for xi in x) + sum_x3 = sum(xi**3 for xi in x) + sum_x4 = sum(xi**4 for xi in x) + sum_y = sum(y) + sum_xy = sum(xi*yi for xi, yi in zip(x, y)) + sum_x2y = sum(xi**2*yi for xi, yi in zip(x, y)) + + # 构建矩阵A和向量B + A = [ + [sum_x4, sum_x3, sum_x2], + [sum_x3, sum_x2, sum_x], + [sum_x2, sum_x, n] + ] + B = [sum_x2y, sum_xy, sum_y] + + # 使用高斯消元法解线性方程组 + # 将增广矩阵 [A|B] 转换为行阶梯形式 + AB = [row + [b] for row, b in zip(A, B)] + n_rows = len(AB) + + # 高斯消元 + for i in range(n_rows): + # 寻找当前列中最大元素所在的行 + max_row = i + for j in range(i + 1, n_rows): + if abs(AB[j][i]) > abs(AB[max_row][i]): + max_row = j + + # 交换行 + AB[i], AB[max_row] = AB[max_row], AB[i] + + # 将当前行主元归一化 + pivot = AB[i][i] + if pivot == 0: + raise ValueError("矩阵奇异,无法求解") + + for j in range(i, n_rows + 1): + AB[i][j] /= pivot + + # 消元 + for j in range(n_rows): + if j != i: + factor = AB[j][i] + for k in range(i, n_rows + 1): + AB[j][k] -= factor * AB[i][k] + + # 提取结果 + coeffs = [AB[i][n_rows] for i in range(n_rows)] + + return coeffs # [a, b, c] 对应 ax^2 + bx + c + +def save_esti(): + try: + if len(esti_data[0]) > 0: + coeffs = polyfit_manual(esti_data[0], esti_data[1]) + # 保存为逗号分隔的文本格式 + ESTI_FILE.write_text(','.join(map(str, coeffs))) + except Exception as e: + logging.warning("保存估算数据失败", exc_info=e) + +def fmt_time(t:int) -> str: + if t>3600: + return f"{t//3600}h {t//60}min {t%60}s" + elif t>60: + return f"{t//60}min {t%60}s" + else: + return f"{round(t)}s" + +def func(sz:int,src=False): + if TRAIN: + try: + data_file.write_bytes(dumps(esti_data)) + except KeyboardInterrupt as e:raise e + except Exception as e: + logging.warning("无法保存数据",exc_info=e) + try: + if TRAIN: + if len(esti_data[0])==0: + return -1 if src else "NaN" + coeffs = polyfit_manual(esti_data[0], esti_data[1]) + t = coeffs[0]*sz**2 + coeffs[1]*sz + coeffs[2] + elif esti is not None: + t = esti[0]*sz**2 + esti[1]*sz + esti[2] + # print(t,sz) + else: + logging.warning(f"Unexpected condition at func->TRAIN") + return -1 if src else "NaN" + t = round(t) + if src: + return t + return fmt_time(t) + except KeyboardInterrupt as e:raise e + except Exception as e: + logging.warning("无法计算预计时间",exc_info=e) + return -1 if src else "NaN" + +def process_video(video_path: Path): + global esti_data + use=None + sz=video_path.stat().st_size//(1024*1024) + if esti is not None or TRAIN: + use = func(sz,True) + logging.debug(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M,预计{fmt_time(use)}") + else: + logging.debug(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M") + + + bgn=time() + # 在视频文件所在目录下创建 compress 子目录(如果不存在) + compress_dir = video_path.parent / "compress" + compress_dir.mkdir(exist_ok=True) + + # 输出文件路径:与原文件同名,保存在 compress 目录下 + output_file = compress_dir / (video_path.stem + video_path.suffix) + if output_file.is_file(): + logging.warning(f"文件{output_file}存在,跳过") + return use + + # 4x + # command = [ + # "ffmpeg.exe", # 可以修改为 ffmpeg 的完整路径,例如:C:/ffmpeg/bin/ffmpeg.exe + # "-hide_banner", # 隐藏 ffmpeg 的横幅信息 + # "-i", str(video_path.absolute()), + # "-filter:v", "setpts=0.25*PTS", # 设置视频高度为 1080,宽度按比例自动计算 + # "-filter:a", "atempo=4.0", + # "-c:v", "h264_qsv", # 使用 Intel Quick Sync Video 编码 + # "-global_quality", "28", # 设置全局质量(数值越低质量越高) + # "-r","30", + # "-preset", "fast", # 设置压缩速度为慢(压缩效果较好) + # "-y", + # str(output_file.absolute()) + # ] + + # 1x + command = get_cmd(str(video_path.absolute()),output_file) + + try: + # 调用 ffmpeg,并捕获标准输出和错误信息 + result = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + text=True + ) + + # 检查 ffmpeg 的错误输出 + if result.stderr: + # 记录所有警告和错误信息 + for line in result.stderr.splitlines(): + if 'warning' in line.lower(): + logging.warning(f"[FFmpeg]({video_path}): {line}") + elif 'error' in line.lower(): + logging.error(f"[FFmpeg]({video_path}): {line}") + + # 检查 ffmpeg 执行的返回码 + if result.returncode != 0: + logging.error(f"处理文件 {video_path} 失败,返回码: {result.returncode},cmd={' '.join(command)}") + logging.error(result.stdout) + logging.error(result.stderr) + else: + logging.debug(f"文件处理成功: {video_path} -> {output_file}") + + end=time() + if TRAIN: + esti_data[0].append(sz) + esti_data[1].append(end-bgn) + + + except Exception as e: + logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{video_path},cmd={' '.join(command)}",exc_info=e) + return use + +def traverse_directory(root_dir: Path): + video_extensions = set(CFG["video_ext"]) + sm=None + if esti is not None: + logging.info(f"正在估算时间(当存在大量小文件时,估算值将会很离谱)") + sm = 0 + for file in root_dir.rglob("*"): + if file.parent.name == "compress":continue + if file.is_file() and file.suffix.lower() in video_extensions: + sz=file.stat().st_size//(1024*1024) + tmp = func(sz,True) + if not isinstance(tmp,int): + logging.error("无法预估时间,因为预估函数返回非整数") + elif tmp == -1: + logging.error("无法预估时间,因为预估函数返回了异常") + sm += tmp + logging.info(f"预估用时:{fmt_time(sm)}") + + + logging.debug(f"开始遍历目录: {root_dir}") + # 定义需要处理的视频后缀(忽略大小写) + + with Progress() as prog: + task = prog.add_task("压缩视频",total=sm) + # prog.print("进度条右侧时间为不精确估算(当所有文件处理时间相同时估算精确)") + # 使用 rglob 递归遍历所有文件 + for file in root_dir.rglob("*"): + if file.parent.name == "compress":continue + if file.is_file() and file.suffix.lower() in video_extensions: + t = process_video(file) + # if esti is not None: + # sm-=t + # prog.update(task,advance=1,description=f"预计剩余{fmt_time(sm)}") + if t is None: + prog.advance(task) + else: + prog.advance(task,t) + +if __name__ == "__main__": + setup_logging() + tot_bgn = time() + logging.info("-------------------------------") + logging.info(datetime.now().strftime('Video Compress started at %Y/%m/%d %H:%M')) + + if CFG_FILE.exists(): + try: + import json + cfg:dict = json.loads(CFG_FILE.read_text()) + CFG.update(cfg) + except Exception as e: + logging.warning("Invalid config file, ignored.") + logging.debug(e) + + # 通过命令行参数传入需要遍历的目录 + if len(sys.argv) < 2: + print(f"用法:python {__file__} <目标目录>") + logging.warning("Error termination via invalid input.") + sys.exit(1) + + root = Path(sys.argv[1]) + if CFG["train"]: + train_init() + else: + if ESTI_FILE.exists(): + try: + # 从文件读取系数 + coeffs_str = ESTI_FILE.read_text().strip().split(',') + esti = [float(coeff) for coeff in coeffs_str] + except Exception as e: + logging.warning(f"预测输出文件{str(ESTI_FILE)}存在但无法读取", exc_info=e) + + if not root.is_dir(): + print("提供的路径不是一个有效目录。") + logging.warning("Error termination via invalid input.") + sys.exit(1) + + try: + traverse_directory(root) + tot_end = time() + logging.info(f"Elapsed time: {fmt_time(tot_end-tot_bgn)}") + logging.info("Normal termination of Video Compress.") + except KeyboardInterrupt: + logging.warning("Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.") + except Exception as e: + logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e) + +