import subprocess from pathlib import Path import sys import os import logging from datetime import datetime from time import time from rich.logging import RichHandler from rich.progress import Progress from pickle import dumps, loads from typing import Optional import atexit import re import threading import queue import psutil from concurrent.futures import ThreadPoolExecutor, as_completed root = None TRAIN = False ESTI_FILE = Path(sys.path[0])/"esti.out" CFG_FILE = Path(sys.path[0])/"config.json" CFG = { "save_to": "single", "crf":"18", "bitrate": None, "codec": "h264", "extra": [], "ffmpeg": "ffmpeg", "manual": None, "video_ext": [".mp4", ".mkv"], "train": False, "compress_dir_name": "compress", "resolution": "-1:1080", "fps": "30", "esti_data_file": "estiminate_data.dat", "test_video_resolution": "1920x1080", "test_video_fps": "30", "test_video_input": "compress_video_test.mp4", "test_video_output": "compressed_video_test.mp4", "max_concurrent_instances": 2, "cpu_monitor_interval": 3, # CPU监控间隔(秒) "cpu_monitor_duration": 30, # 统计持续时间(秒,5分钟) } esti=None # :tuple[list[int],list[float]] # CPU监控相关全局变量 ffmpeg_processes = {} # 存储活动的ffmpeg进程 cpu_stats = {"system": [], "ffmpeg": []} # CPU使用率统计 cpu_monitor_thread = None cpu_monitor_lock = threading.Lock() current_instances = 0 instance_lock = threading.Lock() def get_cmd(video_path,output_file): if CFG["manual"] is not None: command=[ CFG["ffmpeg"], "-hide_banner", "-i", video_path ] command.extend(CFG["manual"]) command.append(output_file) return command if CFG["bitrate"] is not None: command = [ CFG["ffmpeg"], "-hide_banner", "-i", video_path, ] if CFG['resolution'] is not None: command.extend([ "-vf", f"scale={CFG['resolution']}",]) command.extend([ "-c:v", CFG["codec"], "-b:v", CFG["bitrate"], "-r",CFG["fps"], "-y", ]) else: command = [ CFG["ffmpeg"], "-hide_banner", "-i", video_path, ] if CFG['resolution'] is not None: command.extend([ "-vf", f"scale={CFG['resolution']}",]) command.extend([ "-c:v", CFG["codec"], "-global_quality", str(CFG["crf"]), "-r",CFG["fps"], "-y", ]) command.extend(CFG["extra"]) command.append(output_file) return command def train_init(): global esti_data,TRAIN,data_file data_file = Path(CFG["esti_data_file"]) if data_file.exists(): esti_data=loads(data_file.read_bytes()) if not isinstance(esti_data,tuple): esti_data=([],[]) else: esti_data=([],[]) TRAIN=True atexit.register(save_esti) # print(esti_data) # 配置logging def setup_logging(): log_dir = Path("logs") log_dir.mkdir(exist_ok=True) log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log" stream = RichHandler(rich_tracebacks=True,tracebacks_show_locals=True) stream.setLevel(logging.INFO) stream.setFormatter(logging.Formatter("%(message)s")) file = logging.FileHandler(log_file, encoding='utf-8') file.setLevel(logging.DEBUG) logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname) 7s - %(message)s', handlers=[ file, stream ] ) def polyfit_manual(x, y, degree=2): """手动实现二次多项式最小二乘拟合""" n = len(x) if n != len(y): raise ValueError("输入的x和y长度必须相同") # 对于二次多项式 y = ax^2 + bx + c # 构建矩阵方程 A * [a, b, c]^T = B # 其中 A = [[sum(x^4), sum(x^3), sum(x^2)], # [sum(x^3), sum(x^2), sum(x)], # [sum(x^2), sum(x), n]] # B = [sum(x^2 * y), sum(x * y), sum(y)] # 计算需要的和 sum_x = sum(x) sum_x2 = sum(xi**2 for xi in x) sum_x3 = sum(xi**3 for xi in x) sum_x4 = sum(xi**4 for xi in x) sum_y = sum(y) sum_xy = sum(xi*yi for xi, yi in zip(x, y)) sum_x2y = sum(xi**2*yi for xi, yi in zip(x, y)) # 构建矩阵A和向量B A = [ [sum_x4, sum_x3, sum_x2], [sum_x3, sum_x2, sum_x], [sum_x2, sum_x, n] ] B = [sum_x2y, sum_xy, sum_y] # 使用高斯消元法解线性方程组 # 将增广矩阵 [A|B] 转换为行阶梯形式 AB = [row + [b] for row, b in zip(A, B)] n_rows = len(AB) # 高斯消元 for i in range(n_rows): # 寻找当前列中最大元素所在的行 max_row = i for j in range(i + 1, n_rows): if abs(AB[j][i]) > abs(AB[max_row][i]): max_row = j # 交换行 AB[i], AB[max_row] = AB[max_row], AB[i] # 将当前行主元归一化 pivot = AB[i][i] if pivot == 0: raise ValueError("矩阵奇异,无法求解") for j in range(i, n_rows + 1): AB[i][j] /= pivot # 消元 for j in range(n_rows): if j != i: factor = AB[j][i] for k in range(i, n_rows + 1): AB[j][k] -= factor * AB[i][k] # 提取结果 coeffs = [AB[i][n_rows] for i in range(n_rows)] return coeffs # [a, b, c] 对应 ax^2 + bx + c def save_esti(): try: if len(esti_data[0]) > 0: coeffs = polyfit_manual(esti_data[0], esti_data[1]) # 保存为逗号分隔的文本格式 ESTI_FILE.write_text(','.join(map(str, coeffs))) except Exception as e: logging.warning("保存估算数据失败") logging.debug("error at save_esti",exc_info=e) def fmt_time(t:float|int) -> str: if t>3600: return f"{t//3600}h {t//60}min {t%60}s" elif t>60: return f"{t//60}min {t%60}s" else: return f"{round(t)}s" def cpu_monitor(): """CPU监控线程函数""" global cpu_stats while True: try: # 获取系统CPU使用率 system_cpu = psutil.cpu_percent(interval=1) # 获取所有ffmpeg进程的CPU使用率 ffmpeg_cpu_total = 0 active_processes = [] with cpu_monitor_lock: for proc_info in ffmpeg_processes.values(): try: proc = proc_info['process'] if proc.is_running(): # print(proc,proc.cpu_percent() / psutil.cpu_count()) ffmpeg_cpu_total += proc.cpu_percent() / psutil.cpu_count() active_processes.append(proc_info) except (psutil.NoSuchProcess, psutil.AccessDenied): continue # 更新统计数据 with cpu_monitor_lock: cpu_stats["system"].append(system_cpu) cpu_stats["ffmpeg"].append(ffmpeg_cpu_total) # 保持最近5分钟的数据 max_samples = CFG["cpu_monitor_duration"] // CFG["cpu_monitor_interval"] if len(cpu_stats["system"]) > max_samples: cpu_stats["system"] = cpu_stats["system"][-max_samples:] if len(cpu_stats["ffmpeg"]) > max_samples: cpu_stats["ffmpeg"] = cpu_stats["ffmpeg"][-max_samples:] logging.debug(f"CPU监控: 系统={system_cpu:.1f}%, FFmpeg总计={ffmpeg_cpu_total:.1f}%, 活动进程={len(active_processes)}") except KeyboardInterrupt as e: raise e except Exception as e: logging.debug(f"CPU监控异常: {e}") # 等待下一次监控 threading.Event().wait(CFG["cpu_monitor_interval"]) def start_cpu_monitor(): """启动CPU监控线程""" global cpu_monitor_thread if cpu_monitor_thread is None or not cpu_monitor_thread.is_alive(): cpu_monitor_thread = threading.Thread(target=cpu_monitor, daemon=True) cpu_monitor_thread.start() logging.info("CPU监控线程已启动") def get_cpu_usage_stats(): """获取CPU使用率统计""" with cpu_monitor_lock: if not cpu_stats["system"] or not cpu_stats["ffmpeg"]: return None, None system_avg = sum(cpu_stats["system"]) / len(cpu_stats["system"]) ffmpeg_avg = sum(cpu_stats["ffmpeg"]) / len(cpu_stats["ffmpeg"]) return system_avg, ffmpeg_avg def should_increase_instances(): """判断是否应该增加实例数""" system_avg, ffmpeg_avg = get_cpu_usage_stats() if system_avg is None or ffmpeg_avg is None: return False # 条件: 系统CPU - FFmpeg CPU > FFmpeg CPU * 2 + 0.1 available_cpu = 100 - system_avg threshold = ffmpeg_avg # 10% = 0.1 * 100 logging.debug(f"CPU统计: 系统平均={system_avg:.1f}%, FFmpeg平均={ffmpeg_avg:.1f}%, 可用={available_cpu:.1f}%, 阈值={threshold:.1f}%") return available_cpu > threshold def register_ffmpeg_process(proc_id, process): """注册ffmpeg进程用于监控""" with cpu_monitor_lock: ffmpeg_processes[proc_id] = { 'process': psutil.Process(process.pid), 'start_time': time() } def unregister_ffmpeg_process(proc_id): """注销ffmpeg进程""" with cpu_monitor_lock: if proc_id in ffmpeg_processes: del ffmpeg_processes[proc_id] def func(sz:int,src=False): if TRAIN: try: data_file.write_bytes(dumps(esti_data)) except KeyboardInterrupt as e:raise e except Exception as e: logging.warning("无法保存数据",exc_info=e) try: if TRAIN: if len(esti_data[0])==0: return -1 if src else "NaN" coeffs = polyfit_manual(esti_data[0], esti_data[1]) t = coeffs[0]*sz**2 + coeffs[1]*sz + coeffs[2] elif esti is not None: t = esti[0]*sz**2 + esti[1]*sz + esti[2] # print(t,sz) else: logging.warning(f"Unexpected condition at func->TRAIN") return -1 if src else "NaN" t = round(t) if src: return t return fmt_time(t) except KeyboardInterrupt as e:raise e except Exception as e: logging.warning("无法计算预计时间") logging.debug("esti time exception", exc_info=e) return -1 if src else "NaN" def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_func=None, proc_id=None): global esti_data, current_instances use=None sz=video_path.stat().st_size//(1024*1024) bgn=time() if compress_dir is None: # 在视频文件所在目录下创建 compress 子目录(如果不存在) compress_dir = video_path.parent / CFG["compress_dir_name"] else: compress_dir /= video_path.parent.relative_to(root) compress_dir.mkdir(exist_ok=True,parents=True) # 输出文件路径:与原文件同名,保存在 compress 目录下 output_file = compress_dir / (video_path.stem + video_path.suffix) if output_file.is_file(): logging.warning(f"文件{output_file}存在,跳过") return use video_path_str = str(video_path.absolute()) command = get_cmd(video_path_str,output_file) try: with instance_lock: current_instances += 1 logging.debug(f"启动FFmpeg进程 {proc_id}: {video_path.name}") result = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", text=True ) # 注册进程用于CPU监控 if proc_id: register_ffmpeg_process(proc_id, result) while result.poll() is None: line = " " while result.poll() is None and line[-1:] not in "\r\n": line+=result.stderr.read(1) if 'warning' in line.lower(): logging.warning(f"[FFmpeg {proc_id}]({video_path_str}): {line}") elif 'error' in line.lower(): logging.error(f"[FFmpeg {proc_id}]({video_path_str}): {line}") elif "frame=" in line: match = re.search(r"frame=\s*(\d+)",line) if match: frame_number = int(match.group(1)) if update_func is not None: update_func(frame_number) if result.returncode != 0: logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(map(str,command))}") logging.error(result.stdout.read()) logging.error(result.stderr.read()) else: logging.debug(f"文件处理成功: {video_path_str} -> {output_file}") end=time() if TRAIN: esti_data[0].append(sz) esti_data[1].append(end-bgn) except KeyboardInterrupt as e:raise e except Exception as e: logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)},cmd={' '.join(map(str,command))}",exc_info=e) finally: # 注销进程监控 if proc_id: unregister_ffmpeg_process(proc_id) with instance_lock: current_instances -= 1 logging.debug(f"FFmpeg进程 {proc_id} 已结束") return use def traverse_directory(root_dir: Path): global current_instances video_extensions = set(CFG["video_ext"]) sm=None if esti is not None: raise DeprecationWarning("不再支持训练模式") logging.info(f"正在估算时间(当存在大量小文件时,估算值将会很离谱)") sm = 0 for file in root_dir.rglob("*"): if file.parent.name.lower() == CFG["compress_dir_name"].lower():continue if file.is_file() and file.suffix.lower() in video_extensions: sz=file.stat().st_size//(1024*1024) tmp = func(sz,True) if not isinstance(tmp,int): logging.error("无法预估时间,因为预估函数返回非整数") elif tmp == -1: logging.error("无法预估时间,因为预估函数返回了异常") sm += tmp logging.info(f"预估用时:{fmt_time(sm)}") else: # 获取视频文件列表和帧数信息 video_files = [] que = list(root_dir.glob("*")) while que: d = que.pop() for file in d.glob("*"): if file.parent.name == CFG["compress_dir_name"] or file.name == CFG["compress_dir_name"]: continue if file.is_file() and file.suffix.lower() in video_extensions: video_files.append(file) elif file.is_dir(): que.append(file) # exit() if not video_files: logging.warning("未找到需要处理的视频文件") return # 获取视频信息 with Progress() as prog: task = prog.add_task("正在获取视频信息", total=len(video_files)) frames: dict[Path, float] = {} for file in video_files: prog.advance(task) cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split() cmd.append(str(file)) proc = subprocess.run(cmd, shell=True, capture_output=True, text=True) if proc.returncode != 0: logging.debug(f"无法获取视频信息: {file}, 返回码: {proc.returncode}") frames[file] = 0 continue if proc.stdout.strip(): try: avg_frame_rate, duration = proc.stdout.strip().split('\n') tmp = avg_frame_rate.split('/') avg_frame_rate = float(tmp[0]) / float(tmp[1]) if duration == "N/A": duration = 0 logging.debug(f"无法获取视频信息: {file}, 时长为N/A,默认使用0s") duration = float(duration) frames[file] = duration * avg_frame_rate except (ValueError, IndexError) as e: logging.debug(f"解析视频信息失败: {file}, 错误: {e}") frames[file] = 0 logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件") # 启动CPU监控 start_cpu_monitor() # 创建进度条 with Progress() as prog: total_frames = sum(frames.values()) main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames)) # 创建文件队列 file_queue = queue.Queue() for file in frames.keys(): file_queue.put(file) # 进度跟踪 progress_trackers = {} completed_files = 0 total_completed_frames = 0 def create_progress_updater(file_path, task_id): def update_progress(frame_count): nonlocal total_completed_frames if file_path in progress_trackers: old_frames = progress_trackers[file_path] diff = frame_count - old_frames total_completed_frames += diff else: total_completed_frames += frame_count progress_trackers[file_path] = frame_count if frames[file_path] > 0: prog.update(task_id, completed=frame_count) else: prog.update(task_id, description=f"{file_path.relative_to(root_dir)} 已处理{frame_count}帧") # 更新总进度 if total_frames > 0: prog.update(main_task, completed=total_completed_frames) return update_progress def process_file_worker(): nonlocal completed_files while True: try: file = file_queue.get(timeout=1) except queue.Empty: break filename = file.relative_to(root_dir) # 创建文件级进度条 if frames[file] == 0: file_task = prog.add_task(f"{filename}") else: file_task = prog.add_task(f"{filename}", total=frames[file]) progress_updater = create_progress_updater(file, file_task) # 处理视频 proc_id = f"worker_{threading.current_thread().ident}_{completed_files}" if CFG["save_to"] == "single": process_video(file, root_dir/"Compress", progress_updater, proc_id) else: process_video(file, None, progress_updater, proc_id) # 更新完成计数 with instance_lock: completed_files += 1 if total_frames == 0: # 如果没有总帧数,按文件数计算 prog.update(main_task, completed=completed_files) # 移除文件级进度条 prog.remove_task(file_task) file_queue.task_done() # 动态管理线程数 active_threads = [] max_workers = CFG["max_concurrent_instances"] def manage_workers(): nonlocal active_threads while completed_files < len(frames) or any(t.is_alive() for t in active_threads): # 清理已完成的线程 active_threads = [t for t in active_threads if t.is_alive()] # 检查是否需要增加实例 current_worker_count = len(active_threads) if current_worker_count < max_workers and not file_queue.empty(): # 检查CPU使用率(运行5分钟后开始检查) should_add_worker = False if len(cpu_stats["system"]) >= 10: # 至少有5分钟的数据 if current_worker_count >= 1: # 已有实例运行 should_add_worker = should_increase_instances() if should_add_worker: logging.info("CPU资源充足,启动第二个压缩实例") else: should_add_worker = False if should_add_worker: worker_thread = threading.Thread(target=process_file_worker, daemon=True) worker_thread.start() active_threads.append(worker_thread) logging.debug(f"启动新的工作线程,当前活动线程数: {len(active_threads)}") threading.Event().wait(5) # 每5秒检查一次 # 等待所有线程完成 for thread in active_threads: thread.join() # 启动第一个工作线程 if not file_queue.empty(): first_worker = threading.Thread(target=process_file_worker, daemon=True) first_worker.start() active_threads.append(first_worker) logging.info("启动第一个压缩实例") # 启动线程管理器 manager_thread = threading.Thread(target=manage_workers, daemon=True) manager_thread.start() # 等待管理线程完成 manager_thread.join() logging.info(f"所有视频处理完成,共处理了 {completed_files} 个文件") def test(): os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"] try: subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode() except KeyboardInterrupt as e:raise e except Exception as e: print(__file__) logging.critical("无法运行ffmpeg") exit(-1) try: ret = subprocess.run( f"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if ret.returncode != 0: logging.warning("无法生成测试视频.") logging.debug(ret.stdout) logging.debug(ret.stderr) ret.check_returncode() cmd = get_cmd(CFG["test_video_input"],CFG["test_video_output"],) ret = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) if ret.returncode != 0: logging.error("测试视频压缩失败") logging.debug(ret.stdout) logging.debug(ret.stderr) logging.error("Error termination via test failed.") exit(-1) os.remove("compress_video_test.mp4") os.remove("compressed_video_test.mp4") except KeyboardInterrupt as e:raise e except Exception as e: if os.path.exists("compress_video_test.mp4"): os.remove("compress_video_test.mp4") logging.warning("测试未通过,继续运行可能出现未定义行为。") logging.debug("Test error",exc_info=e) def init_train(): global esti if CFG["train"]: train_init() else: if ESTI_FILE.exists(): try: # 从文件读取系数 coeffs_str = ESTI_FILE.read_text().strip().split(',') esti = [float(coeff) for coeff in coeffs_str] except KeyboardInterrupt as e:raise e except Exception as e: logging.warning(f"预测输出文件{str(ESTI_FILE)}存在但无法读取", exc_info=e) def exit_pause(): if os.name == 'nt': os.system("pause") elif os.name == 'posix': os.system("read -p 'Press Enter to continue...'") def main(_root = None): atexit.register(exit_pause) global root, esti setup_logging() tot_bgn = time() logging.info("-------------------------------") logging.info(datetime.now().strftime('Video Compress started at %Y/%m/%d %H:%M')) if CFG_FILE.exists(): try: import json cfg:dict = json.loads(CFG_FILE.read_text()) CFG.update(cfg) except KeyboardInterrupt as e:raise e except Exception as e: logging.warning("Invalid config file, ignored.") logging.debug(e) if _root is not None: root = Path(_root) else: # 通过命令行参数传入需要遍历的目录 if len(sys.argv) < 2: print(f"用法:python {__file__} <目标目录>") logging.warning("Error termination via invalid input.") sys.exit(1) root = Path(sys.argv[1]) if root.name.lower() == CFG["compress_dir_name"].lower(): logging.critical("请修改目标目录名为非compress。") logging.error("Error termination via invalid input.") sys.exit(1) logging.info("开始验证环境") test() init_train() if not root.is_dir(): print("提供的路径不是一个有效目录。") logging.warning("Error termination via invalid input.") sys.exit(1) try: traverse_directory(root) tot_end = time() logging.info(f"Elapsed time: {fmt_time(tot_end-tot_bgn)}") logging.info("Normal termination of Video Compress.") except KeyboardInterrupt: logging.warning("Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.") except Exception as e: logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e) if __name__ == "__main__": main()