diff --git a/VideoCompress/main.py b/VideoCompress/main.py index b1dd935..5e98062 100644 --- a/VideoCompress/main.py +++ b/VideoCompress/main.py @@ -7,12 +7,10 @@ from datetime import datetime from time import time from rich.logging import RichHandler from rich.progress import Progress +from pickle import dumps, loads from typing import Optional import atexit import re -import threading -import queue -import psutil root = None CFG_FILE = Path(sys.path[0])/"config.json" @@ -32,18 +30,8 @@ CFG = { "test_video_fps": "30", "test_video_input": "compress_video_test.mp4", "test_video_output": "compressed_video_test.mp4", - "max_concurrent_instances": 2, - "cpu_monitor_interval": 3, # CPU监控间隔(秒) - "cpu_monitor_duration": 30, # 统计持续时间(秒,5分钟) } -# CPU监控相关全局变量 -ffmpeg_processes = {} # 存储活动的ffmpeg进程 -cpu_stats = {"system": [], "ffmpeg": []} # CPU使用率统计 -cpu_monitor_thread = None -cpu_monitor_lock = threading.Lock() -current_instances = 0 -instance_lock = threading.Lock() def get_cmd(video_path,output_file): if CFG["manual"] is not None: @@ -123,105 +111,10 @@ def fmt_time(t:float|int) -> str: else: return f"{round(t)}s" -def cpu_monitor(): - """CPU监控线程函数""" - global cpu_stats - - while True: - try: - # 获取系统CPU使用率 - system_cpu = psutil.cpu_percent(interval=1) - - # 获取所有ffmpeg进程的CPU使用率 - ffmpeg_cpu_total = 0 - active_processes = [] - - with cpu_monitor_lock: - for proc_info in ffmpeg_processes.values(): - try: - proc = proc_info['process'] - if proc.is_running(): - # print(proc,proc.cpu_percent() / psutil.cpu_count()) - ffmpeg_cpu_total += proc.cpu_percent() / psutil.cpu_count() - active_processes.append(proc_info) - except (psutil.NoSuchProcess, psutil.AccessDenied): - continue - - # 更新统计数据 - with cpu_monitor_lock: - cpu_stats["system"].append(system_cpu) - cpu_stats["ffmpeg"].append(ffmpeg_cpu_total) - - # 保持最近5分钟的数据 - max_samples = CFG["cpu_monitor_duration"] // CFG["cpu_monitor_interval"] - if len(cpu_stats["system"]) > max_samples: - cpu_stats["system"] = cpu_stats["system"][-max_samples:] - if len(cpu_stats["ffmpeg"]) > max_samples: - cpu_stats["ffmpeg"] = cpu_stats["ffmpeg"][-max_samples:] - - except KeyboardInterrupt as e: - raise e - except Exception as e: - pass - #logging.error(f"CPU监控异常: {e}") - - # 等待下一次监控 - threading.Event().wait(CFG["cpu_monitor_interval"]) - -def start_cpu_monitor(): - """启动CPU监控线程""" - global cpu_monitor_thread - if cpu_monitor_thread is None or not cpu_monitor_thread.is_alive(): - cpu_monitor_thread = threading.Thread(target=cpu_monitor, daemon=True) - cpu_monitor_thread.start() - logging.info("CPU监控线程已启动") - -def get_cpu_usage_stats(): - """获取CPU使用率统计""" - with cpu_monitor_lock: - if not cpu_stats["system"] or not cpu_stats["ffmpeg"]: - return None, None - - system_avg = sum(cpu_stats["system"]) / len(cpu_stats["system"]) - ffmpeg_avg = sum(cpu_stats["ffmpeg"]) / len(cpu_stats["ffmpeg"]) - - return system_avg, ffmpeg_avg - -def should_increase_instances(): - """判断是否应该增加实例数""" - system_avg, ffmpeg_avg = get_cpu_usage_stats() - - if system_avg is None or ffmpeg_avg is None: - return False - - # 条件: 系统CPU - FFmpeg CPU > FFmpeg CPU * 2 + 0.1 - available_cpu = 100 - system_avg - threshold = ffmpeg_avg # 10% = 0.1 * 100 - - logging.debug(f"CPU统计: 系统平均={system_avg:.1f}%, FFmpeg平均={ffmpeg_avg:.1f}%, 可用={available_cpu:.1f}%, 阈值={threshold:.1f}%") - - return available_cpu > threshold - -def register_ffmpeg_process(proc_id, process): - """注册ffmpeg进程用于监控""" - with cpu_monitor_lock: - ffmpeg_processes[proc_id] = { - 'process': psutil.Process(process.pid), - 'start_time': time() - } - -def unregister_ffmpeg_process(proc_id): - """注销ffmpeg进程""" - with cpu_monitor_lock: - if proc_id in ffmpeg_processes: - del ffmpeg_processes[proc_id] - -def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_func=None, proc_id=None): - global current_instances +def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_func=None): use=None sz=video_path.stat().st_size//(1024*1024) - bgn=time() if compress_dir is None: # 在视频文件所在目录下创建 compress 子目录(如果不存在) compress_dir = video_path.parent / CFG["compress_dir_name"] @@ -241,11 +134,6 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun command = get_cmd(video_path_str,output_file) try: - with instance_lock: - current_instances += 1 - - logging.debug(f"启动FFmpeg进程 {proc_id}: {video_path.name}") - result = subprocess.Popen( command, stdout=subprocess.PIPE, @@ -254,19 +142,17 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun text=True ) - # 注册进程用于CPU监控 - if proc_id: - register_ffmpeg_process(proc_id, result) - while result.poll() is None: line = " " while result.poll() is None and line[-1:] not in "\r\n": line+=result.stderr.read(1) + # print(line[-1]) if 'warning' in line.lower(): - logging.warning(f"[FFmpeg {proc_id}]({video_path_str}): {line}") + logging.warning(f"[FFmpeg]({video_path_str}): {line}") elif 'error' in line.lower(): - logging.error(f"[FFmpeg {proc_id}]({video_path_str}): {line}") + logging.error(f"[FFmpeg]({video_path_str}): {line}") elif "frame=" in line: + # print(line,end="") match = re.search(r"frame=\s*(\d+)",line) if match: frame_number = int(match.group(1)) @@ -274,29 +160,18 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun update_func(frame_number) if result.returncode != 0: - logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(map(str,command))}") - logging.error(result.stdout.read()) - logging.error(result.stderr.read()) + logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(command)}") + logging.error(result.stdout) + logging.error(result.stderr) else: logging.debug(f"文件处理成功: {video_path_str} -> {output_file}") except KeyboardInterrupt as e:raise e except Exception as e: logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)},cmd={' '.join(map(str,command))}",exc_info=e) - finally: - # 注销进程监控 - if proc_id: - unregister_ffmpeg_process(proc_id) - - with instance_lock: - current_instances -= 1 - - logging.debug(f"FFmpeg进程 {proc_id} 已结束") - return use def traverse_directory(root_dir: Path): - global current_instances video_extensions = set(CFG["video_ext"]) sm=None # 获取视频文件列表和帧数信息 @@ -311,20 +186,30 @@ def traverse_directory(root_dir: Path): video_files.append(file) elif file.is_dir(): que.append(file) - - - # exit() - - if not video_files: - logging.warning("未找到需要处理的视频文件") - return - + + + if not video_files: + logging.warning("未找到需要处理的视频文件") + return + # 获取视频信息 + frames: dict[Path, float] = {} + info_file = Path("video_info.cache") + if info_file.is_file(): + try: + cached_data = loads(info_file.read_bytes()) + if isinstance(cached_data, dict): + frames = cached_data + logging.debug("Loaded video info from cache.") + except Exception as e: + logging.debug("Failed to load video info cache.",exc_info=e) + with Progress() as prog: task = prog.add_task("正在获取视频信息", total=len(video_files)) - frames: dict[Path, float] = {} for file in video_files: prog.advance(task) + if file in frames and frames[file]>0: + continue cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split() cmd.append(str(file.resolve())) proc = subprocess.run(cmd, capture_output=True, text=True) @@ -345,11 +230,16 @@ def traverse_directory(root_dir: Path): except (ValueError, IndexError) as e: logging.debug(f"解析视频信息失败: {file}, 错误: {e}") frames[file] = 0 - + if 0 in frames.values(): + logging.warning(f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。") + try: + info_file.write_bytes(dumps(frames)) + logging.debug("Saved video info to cache.") + except Exception as e: + logging.debug("Failed to save video info cache.",exc_info=e) + logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件") - # 启动CPU监控 - start_cpu_monitor() # 创建进度条 with Progress() as prog: @@ -357,131 +247,45 @@ def traverse_directory(root_dir: Path): main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames)) # 创建文件队列 - file_queue = queue.Queue() for file in frames.keys(): - file_queue.put(file) - - # 进度跟踪 - progress_trackers = {} - completed_files = 0 - total_completed_frames = 0 - - def create_progress_updater(file_path, task_id): - def update_progress(frame_count): - nonlocal total_completed_frames - if file_path in progress_trackers: - old_frames = progress_trackers[file_path] - diff = frame_count - old_frames - total_completed_frames += diff - else: - total_completed_frames += frame_count - progress_trackers[file_path] = frame_count + # 进度跟踪 + filename = file.relative_to(root_dir) + + # 创建文件级进度条 + if frames[file] == 0: + file_task = prog.add_task(f"{filename}") + else: + file_task = prog.add_task(f"{filename}",total=frames[file]) + + + with prog._lock: + completed_start = prog._tasks[main_task].completed - if frames[file_path] > 0: - prog.update(task_id, completed=frame_count) - else: - prog.update(task_id, description=f"{file_path.relative_to(root_dir)} 已处理{frame_count}帧") - - # 更新总进度 - if total_frames > 0: - prog.update(main_task, completed=total_completed_frames) - return update_progress - - def process_file_worker(): - nonlocal completed_files - while True: - try: - file = file_queue.get(timeout=1) - except queue.Empty: - break - - filename = file.relative_to(root_dir) - - # 创建文件级进度条 + def update_progress(x): if frames[file] == 0: - file_task = prog.add_task(f"{filename}") + prog.update(file_task,description=f"{filename} 已处理{x}帧") else: - file_task = prog.add_task(f"{filename}", total=frames[file]) - - progress_updater = create_progress_updater(file, file_task) - - # 处理视频 - proc_id = f"worker_{threading.current_thread().ident}_{completed_files}" - - if CFG["save_to"] == "single": - process_video(file, root_dir/"Compress", progress_updater, proc_id) - else: - process_video(file, None, progress_updater, proc_id) - - # 更新完成计数 - with instance_lock: - completed_files += 1 - if total_frames == 0: # 如果没有总帧数,按文件数计算 - prog.update(main_task, completed=completed_files) - - # 移除文件级进度条 - prog.remove_task(file_task) - file_queue.task_done() - - # 动态管理线程数 - active_threads = [] - max_workers = CFG["max_concurrent_instances"] - - def manage_workers(): - nonlocal active_threads + prog.update(file_task,completed=x) + prog.update(main_task, completed=completed_start+x) - while completed_files < len(frames) or any(t.is_alive() for t in active_threads): - # 清理已完成的线程 - active_threads = [t for t in active_threads if t.is_alive()] - - # 检查是否需要增加实例 - current_worker_count = len(active_threads) - - if current_worker_count < max_workers and not file_queue.empty(): - # 检查CPU使用率(运行5分钟后开始检查) - should_add_worker = False - if len(cpu_stats["system"]) >= 10: # 至少有5分钟的数据 - if current_worker_count >= 1: # 已有实例运行 - should_add_worker = should_increase_instances() - if should_add_worker: - logging.info("CPU资源充足,启动第二个压缩实例") - else: - should_add_worker = False - - if should_add_worker: - worker_thread = threading.Thread(target=process_file_worker, daemon=True) - worker_thread.start() - active_threads.append(worker_thread) - logging.debug(f"启动新的工作线程,当前活动线程数: {len(active_threads)}") - - threading.Event().wait(5) # 每5秒检查一次 - - # 等待所有线程完成 - for thread in active_threads: - thread.join() - - # 启动第一个工作线程 - if not file_queue.empty(): - first_worker = threading.Thread(target=process_file_worker, daemon=True) - first_worker.start() - active_threads.append(first_worker) - logging.info("启动第一个压缩实例") - - # 启动线程管理器 - manager_thread = threading.Thread(target=manage_workers, daemon=True) - manager_thread.start() - - # 等待管理线程完成 - manager_thread.join() + if CFG["save_to"] == "single": + process_video(file, root_dir/"Compress", update_progress) + else: + process_video(file, None, update_progress) + + # 移除文件级进度条 + prog.remove_task(file_task) - logging.info(f"所有视频处理完成,共处理了 {completed_files} 个文件") + try: + info_file.unlink(missing_ok=True) + except Exception as e: + logging.warning("无法删除视频信息缓存文件",exc_info=e) def test(): os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"] try: subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode() - except KeyboardInterrupt as e:raise e except Exception as e: print(__file__) logging.critical("无法运行ffmpeg")