remove multi in VideoComprss, and fix

This commit is contained in:
2025-10-20 20:27:42 +08:00
parent 26b848ca83
commit 07bc8a63db

View File

@ -7,12 +7,10 @@ from datetime import datetime
from time import time from time import time
from rich.logging import RichHandler from rich.logging import RichHandler
from rich.progress import Progress from rich.progress import Progress
from pickle import dumps, loads
from typing import Optional from typing import Optional
import atexit import atexit
import re import re
import threading
import queue
import psutil
root = None root = None
CFG_FILE = Path(sys.path[0])/"config.json" CFG_FILE = Path(sys.path[0])/"config.json"
@ -32,18 +30,8 @@ CFG = {
"test_video_fps": "30", "test_video_fps": "30",
"test_video_input": "compress_video_test.mp4", "test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4", "test_video_output": "compressed_video_test.mp4",
"max_concurrent_instances": 2,
"cpu_monitor_interval": 3, # CPU监控间隔
"cpu_monitor_duration": 30, # 统计持续时间5分钟
} }
# CPU监控相关全局变量
ffmpeg_processes = {} # 存储活动的ffmpeg进程
cpu_stats = {"system": [], "ffmpeg": []} # CPU使用率统计
cpu_monitor_thread = None
cpu_monitor_lock = threading.Lock()
current_instances = 0
instance_lock = threading.Lock()
def get_cmd(video_path,output_file): def get_cmd(video_path,output_file):
if CFG["manual"] is not None: if CFG["manual"] is not None:
@ -123,105 +111,10 @@ def fmt_time(t:float|int) -> str:
else: else:
return f"{round(t)}s" return f"{round(t)}s"
def cpu_monitor(): def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_func=None):
"""CPU监控线程函数"""
global cpu_stats
while True:
try:
# 获取系统CPU使用率
system_cpu = psutil.cpu_percent(interval=1)
# 获取所有ffmpeg进程的CPU使用率
ffmpeg_cpu_total = 0
active_processes = []
with cpu_monitor_lock:
for proc_info in ffmpeg_processes.values():
try:
proc = proc_info['process']
if proc.is_running():
# print(proc,proc.cpu_percent() / psutil.cpu_count())
ffmpeg_cpu_total += proc.cpu_percent() / psutil.cpu_count()
active_processes.append(proc_info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 更新统计数据
with cpu_monitor_lock:
cpu_stats["system"].append(system_cpu)
cpu_stats["ffmpeg"].append(ffmpeg_cpu_total)
# 保持最近5分钟的数据
max_samples = CFG["cpu_monitor_duration"] // CFG["cpu_monitor_interval"]
if len(cpu_stats["system"]) > max_samples:
cpu_stats["system"] = cpu_stats["system"][-max_samples:]
if len(cpu_stats["ffmpeg"]) > max_samples:
cpu_stats["ffmpeg"] = cpu_stats["ffmpeg"][-max_samples:]
except KeyboardInterrupt as e:
raise e
except Exception as e:
pass
#logging.error(f"CPU监控异常: {e}")
# 等待下一次监控
threading.Event().wait(CFG["cpu_monitor_interval"])
def start_cpu_monitor():
"""启动CPU监控线程"""
global cpu_monitor_thread
if cpu_monitor_thread is None or not cpu_monitor_thread.is_alive():
cpu_monitor_thread = threading.Thread(target=cpu_monitor, daemon=True)
cpu_monitor_thread.start()
logging.info("CPU监控线程已启动")
def get_cpu_usage_stats():
"""获取CPU使用率统计"""
with cpu_monitor_lock:
if not cpu_stats["system"] or not cpu_stats["ffmpeg"]:
return None, None
system_avg = sum(cpu_stats["system"]) / len(cpu_stats["system"])
ffmpeg_avg = sum(cpu_stats["ffmpeg"]) / len(cpu_stats["ffmpeg"])
return system_avg, ffmpeg_avg
def should_increase_instances():
"""判断是否应该增加实例数"""
system_avg, ffmpeg_avg = get_cpu_usage_stats()
if system_avg is None or ffmpeg_avg is None:
return False
# 条件: 系统CPU - FFmpeg CPU > FFmpeg CPU * 2 + 0.1
available_cpu = 100 - system_avg
threshold = ffmpeg_avg # 10% = 0.1 * 100
logging.debug(f"CPU统计: 系统平均={system_avg:.1f}%, FFmpeg平均={ffmpeg_avg:.1f}%, 可用={available_cpu:.1f}%, 阈值={threshold:.1f}%")
return available_cpu > threshold
def register_ffmpeg_process(proc_id, process):
"""注册ffmpeg进程用于监控"""
with cpu_monitor_lock:
ffmpeg_processes[proc_id] = {
'process': psutil.Process(process.pid),
'start_time': time()
}
def unregister_ffmpeg_process(proc_id):
"""注销ffmpeg进程"""
with cpu_monitor_lock:
if proc_id in ffmpeg_processes:
del ffmpeg_processes[proc_id]
def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_func=None, proc_id=None):
global current_instances
use=None use=None
sz=video_path.stat().st_size//(1024*1024) sz=video_path.stat().st_size//(1024*1024)
bgn=time()
if compress_dir is None: if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在) # 在视频文件所在目录下创建 compress 子目录(如果不存在)
compress_dir = video_path.parent / CFG["compress_dir_name"] compress_dir = video_path.parent / CFG["compress_dir_name"]
@ -241,11 +134,6 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun
command = get_cmd(video_path_str,output_file) command = get_cmd(video_path_str,output_file)
try: try:
with instance_lock:
current_instances += 1
logging.debug(f"启动FFmpeg进程 {proc_id}: {video_path.name}")
result = subprocess.Popen( result = subprocess.Popen(
command, command,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
@ -254,19 +142,17 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun
text=True text=True
) )
# 注册进程用于CPU监控
if proc_id:
register_ffmpeg_process(proc_id, result)
while result.poll() is None: while result.poll() is None:
line = " " line = " "
while result.poll() is None and line[-1:] not in "\r\n": while result.poll() is None and line[-1:] not in "\r\n":
line+=result.stderr.read(1) line+=result.stderr.read(1)
# print(line[-1])
if 'warning' in line.lower(): if 'warning' in line.lower():
logging.warning(f"[FFmpeg {proc_id}]({video_path_str}): {line}") logging.warning(f"[FFmpeg]({video_path_str}): {line}")
elif 'error' in line.lower(): elif 'error' in line.lower():
logging.error(f"[FFmpeg {proc_id}]({video_path_str}): {line}") logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "frame=" in line: elif "frame=" in line:
# print(line,end="")
match = re.search(r"frame=\s*(\d+)",line) match = re.search(r"frame=\s*(\d+)",line)
if match: if match:
frame_number = int(match.group(1)) frame_number = int(match.group(1))
@ -274,29 +160,18 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun
update_func(frame_number) update_func(frame_number)
if result.returncode != 0: if result.returncode != 0:
logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode}cmd={' '.join(map(str,command))}") logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode}cmd={' '.join(command)}")
logging.error(result.stdout.read()) logging.error(result.stdout)
logging.error(result.stderr.read()) logging.error(result.stderr)
else: else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}") logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
except KeyboardInterrupt as e:raise e except KeyboardInterrupt as e:raise e
except Exception as e: except Exception as e:
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",exc_info=e) logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",exc_info=e)
finally:
# 注销进程监控
if proc_id:
unregister_ffmpeg_process(proc_id)
with instance_lock:
current_instances -= 1
logging.debug(f"FFmpeg进程 {proc_id} 已结束")
return use return use
def traverse_directory(root_dir: Path): def traverse_directory(root_dir: Path):
global current_instances
video_extensions = set(CFG["video_ext"]) video_extensions = set(CFG["video_ext"])
sm=None sm=None
# 获取视频文件列表和帧数信息 # 获取视频文件列表和帧数信息
@ -311,20 +186,30 @@ def traverse_directory(root_dir: Path):
video_files.append(file) video_files.append(file)
elif file.is_dir(): elif file.is_dir():
que.append(file) que.append(file)
# exit() if not video_files:
logging.warning("未找到需要处理的视频文件")
if not video_files: return
logging.warning("未找到需要处理的视频文件")
return
# 获取视频信息 # 获取视频信息
frames: dict[Path, float] = {}
info_file = Path("video_info.cache")
if info_file.is_file():
try:
cached_data = loads(info_file.read_bytes())
if isinstance(cached_data, dict):
frames = cached_data
logging.debug("Loaded video info from cache.")
except Exception as e:
logging.debug("Failed to load video info cache.",exc_info=e)
with Progress() as prog: with Progress() as prog:
task = prog.add_task("正在获取视频信息", total=len(video_files)) task = prog.add_task("正在获取视频信息", total=len(video_files))
frames: dict[Path, float] = {}
for file in video_files: for file in video_files:
prog.advance(task) prog.advance(task)
if file in frames and frames[file]>0:
continue
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split() cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split()
cmd.append(str(file.resolve())) cmd.append(str(file.resolve()))
proc = subprocess.run(cmd, capture_output=True, text=True) proc = subprocess.run(cmd, capture_output=True, text=True)
@ -345,11 +230,16 @@ def traverse_directory(root_dir: Path):
except (ValueError, IndexError) as e: except (ValueError, IndexError) as e:
logging.debug(f"解析视频信息失败: {file}, 错误: {e}") logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
frames[file] = 0 frames[file] = 0
if 0 in frames.values():
logging.warning(f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。")
try:
info_file.write_bytes(dumps(frames))
logging.debug("Saved video info to cache.")
except Exception as e:
logging.debug("Failed to save video info cache.",exc_info=e)
logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件") logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件")
# 启动CPU监控
start_cpu_monitor()
# 创建进度条 # 创建进度条
with Progress() as prog: with Progress() as prog:
@ -357,131 +247,45 @@ def traverse_directory(root_dir: Path):
main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames)) main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames))
# 创建文件队列 # 创建文件队列
file_queue = queue.Queue()
for file in frames.keys(): for file in frames.keys():
file_queue.put(file) # 进度跟踪
filename = file.relative_to(root_dir)
# 进度跟踪
progress_trackers = {} # 创建文件级进度条
completed_files = 0 if frames[file] == 0:
total_completed_frames = 0 file_task = prog.add_task(f"{filename}")
else:
def create_progress_updater(file_path, task_id): file_task = prog.add_task(f"{filename}",total=frames[file])
def update_progress(frame_count):
nonlocal total_completed_frames
if file_path in progress_trackers: with prog._lock:
old_frames = progress_trackers[file_path] completed_start = prog._tasks[main_task].completed
diff = frame_count - old_frames
total_completed_frames += diff
else:
total_completed_frames += frame_count
progress_trackers[file_path] = frame_count
if frames[file_path] > 0: def update_progress(x):
prog.update(task_id, completed=frame_count)
else:
prog.update(task_id, description=f"{file_path.relative_to(root_dir)} 已处理{frame_count}")
# 更新总进度
if total_frames > 0:
prog.update(main_task, completed=total_completed_frames)
return update_progress
def process_file_worker():
nonlocal completed_files
while True:
try:
file = file_queue.get(timeout=1)
except queue.Empty:
break
filename = file.relative_to(root_dir)
# 创建文件级进度条
if frames[file] == 0: if frames[file] == 0:
file_task = prog.add_task(f"{filename}") prog.update(file_task,description=f"{filename} 已处理{x}")
else: else:
file_task = prog.add_task(f"{filename}", total=frames[file]) prog.update(file_task,completed=x)
prog.update(main_task, completed=completed_start+x)
progress_updater = create_progress_updater(file, file_task)
# 处理视频
proc_id = f"worker_{threading.current_thread().ident}_{completed_files}"
if CFG["save_to"] == "single":
process_video(file, root_dir/"Compress", progress_updater, proc_id)
else:
process_video(file, None, progress_updater, proc_id)
# 更新完成计数
with instance_lock:
completed_files += 1
if total_frames == 0: # 如果没有总帧数,按文件数计算
prog.update(main_task, completed=completed_files)
# 移除文件级进度条
prog.remove_task(file_task)
file_queue.task_done()
# 动态管理线程数
active_threads = []
max_workers = CFG["max_concurrent_instances"]
def manage_workers():
nonlocal active_threads
while completed_files < len(frames) or any(t.is_alive() for t in active_threads): if CFG["save_to"] == "single":
# 清理已完成的线程 process_video(file, root_dir/"Compress", update_progress)
active_threads = [t for t in active_threads if t.is_alive()] else:
process_video(file, None, update_progress)
# 检查是否需要增加实例
current_worker_count = len(active_threads) # 移除文件级进度条
prog.remove_task(file_task)
if current_worker_count < max_workers and not file_queue.empty():
# 检查CPU使用率运行5分钟后开始检查
should_add_worker = False
if len(cpu_stats["system"]) >= 10: # 至少有5分钟的数据
if current_worker_count >= 1: # 已有实例运行
should_add_worker = should_increase_instances()
if should_add_worker:
logging.info("CPU资源充足启动第二个压缩实例")
else:
should_add_worker = False
if should_add_worker:
worker_thread = threading.Thread(target=process_file_worker, daemon=True)
worker_thread.start()
active_threads.append(worker_thread)
logging.debug(f"启动新的工作线程,当前活动线程数: {len(active_threads)}")
threading.Event().wait(5) # 每5秒检查一次
# 等待所有线程完成
for thread in active_threads:
thread.join()
# 启动第一个工作线程
if not file_queue.empty():
first_worker = threading.Thread(target=process_file_worker, daemon=True)
first_worker.start()
active_threads.append(first_worker)
logging.info("启动第一个压缩实例")
# 启动线程管理器
manager_thread = threading.Thread(target=manage_workers, daemon=True)
manager_thread.start()
# 等待管理线程完成
manager_thread.join()
logging.info(f"所有视频处理完成,共处理了 {completed_files} 个文件") try:
info_file.unlink(missing_ok=True)
except Exception as e:
logging.warning("无法删除视频信息缓存文件",exc_info=e)
def test(): def test():
os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"] os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
try: try:
subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode() subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode()
except KeyboardInterrupt as e:raise e
except Exception as e: except Exception as e:
print(__file__) print(__file__)
logging.critical("无法运行ffmpeg") logging.critical("无法运行ffmpeg")