From 4ae07c57cc2439bbd559ca6a67a49a552ad28099 Mon Sep 17 00:00:00 2001 From: flt6 <1404262047@qq.com> Date: Fri, 24 Oct 2025 22:37:18 +0800 Subject: [PATCH] fix and enhance get frame. --- VideoCompress/config.json | 3 +- VideoCompress/get_frame.py | 127 +++++++++++++++++++++++++++++++++++++ VideoCompress/main.py | 50 ++++++--------- 3 files changed, 149 insertions(+), 31 deletions(-) create mode 100644 VideoCompress/get_frame.py diff --git a/VideoCompress/config.json b/VideoCompress/config.json index acdcb9a..e028a3e 100644 --- a/VideoCompress/config.json +++ b/VideoCompress/config.json @@ -1,6 +1,7 @@ { "save_to": "single", - "bitrate": "3M", + "bitrate": null, + "crf": 26, "codec": "h264_qsv", "hwaccel": "qsv", "extra": [], diff --git a/VideoCompress/get_frame.py b/VideoCompress/get_frame.py new file mode 100644 index 0000000..086c40c --- /dev/null +++ b/VideoCompress/get_frame.py @@ -0,0 +1,127 @@ +import json +import shutil +import subprocess +from fractions import Fraction +from decimal import Decimal +from typing import Optional, Tuple + +class FFProbeError(RuntimeError): + pass + +def _run_ffprobe(args: list[str]) -> dict: + """运行 ffprobe 并以 JSON 返回,若失败抛异常。""" + if not shutil.which("ffprobe"): + raise FileNotFoundError("未找到 ffprobe,请先安装 FFmpeg 并确保 ffprobe 在 PATH 中。") + # 始终要求 JSON 输出,便于稳健解析 + base = ["ffprobe", "-v", "error", "-print_format", "json"] + proc = subprocess.run(base + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if proc.returncode != 0: + raise FFProbeError(proc.stderr.strip() or "ffprobe 调用失败") + try: + return json.loads(proc.stdout or "{}") + except json.JSONDecodeError as e: + raise FFProbeError(f"无法解析 ffprobe 输出为 JSON: {e}") + +def _try_nb_frames(path: str, stream_index: int) -> Optional[int]: + data = _run_ffprobe([ + "-select_streams", f"v:{stream_index}", + "-show_entries", "stream=nb_frames", + path + ]) + streams = data.get("streams") or [] + if not streams: + return None + nb = streams[0].get("nb_frames") + if nb and nb != "N/A": + try: + n = int(nb) + return n if n >= 0 else None + except ValueError: + return None + return None + +def _try_avgfps_times_duration(path: str, stream_index: int) -> Optional[int]: + # 读 avg_frame_rate + s = _run_ffprobe([ + "-select_streams", f"v:{stream_index}", + "-show_entries", "stream=avg_frame_rate", + path + ]) + streams = s.get("streams") or [] + if not streams: + return None + afr = streams[0].get("avg_frame_rate") + if not afr or afr in ("0/0", "N/A"): + return None + + # 读容器时长(单位:秒) + f = _run_ffprobe(["-show_entries", "format=duration", path]) + dur_str = (f.get("format") or {}).get("duration") + if not dur_str: + return None + + try: + fps = Fraction(afr) # 形如 "30000/1001" + dur = Decimal(dur_str) + # 四舍五入到最近整数,避免系统性低估 + est = int(dur * Decimal(fps.numerator) / Decimal(fps.denominator) + Decimal("0.5")) + return est if est >= 0 else None + except Exception: + return None + +def _try_count_packets(path: str, stream_index: int) -> Optional[int]: + # 统计读取到的包数(不解码)。大多容器≈帧数,但不保证 1:1 + data = _run_ffprobe([ + "-select_streams", f"v:{stream_index}", + "-count_packets", + "-show_entries", "stream=nb_read_packets", + path + ]) + streams = data.get("streams") or [] + if not streams: + return None + nbp = streams[0].get("nb_read_packets") + try: + n = int(nbp) + return n if n >= 0 else None + except Exception: + return None + +def get_video_frame_count( + path: str, + stream_index: int = 0, + fallback_order: Tuple[str, ...] = ("nb_frames", "avg*dur", "count_packets"), +) -> int|None: + """ + 估计/获取视频总帧数(带回退)。 + 参数: + - path: 视频文件路径 + - stream_index: 选择哪个视频流,默认 0 + - allow_slow_decode: 是否允许用解码全片的方式(最慢但最准) + - fallback_order: 回退顺序,四种方法的别名可选: + "nb_frames" -> 直接读元数据中的总帧数 + "avg*dur" -> 平均帧率 × 时长(最快估算) + "count_packets" -> 统计包数(较快,接近帧数但不保证) + 返回: + (frame_count, method_used) + 异常: + - FileNotFoundError: ffprobe 未安装 + - FFProbeError: ffprobe 调用异常或无法解析 + - RuntimeError: 所有方法均失败 + """ + methods = { + "nb_frames": _try_nb_frames, + "avg*dur": _try_avgfps_times_duration, + "count_packets": _try_count_packets, + } + + for key in fallback_order: + func = methods.get(key) + if not func: + continue + n = func(path, stream_index) + if isinstance(n, int) and n >= 0: + return n + + return None + raise RuntimeError("无法获取或估计帧数:所有回退方法均失败。") diff --git a/VideoCompress/main.py b/VideoCompress/main.py index 1988129..f2ef4a9 100644 --- a/VideoCompress/main.py +++ b/VideoCompress/main.py @@ -11,6 +11,7 @@ from pickle import dumps, loads from typing import Optional, Callable import atexit import re +import get_frame root = None CFG_FILE = Path(sys.path[0]) / "config.json" @@ -56,6 +57,9 @@ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]: [ "-hwaccel", CFG["hwaccel"], + "-hwaccel_output_format", + CFG["hwaccel"], + ] ) command.extend( @@ -198,12 +202,13 @@ def process_video( if result.returncode != 0: logging.error( - f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(command)}" + f"处理文件 {video_path_str} 失败" ) + logging.debug(f"返回码: {result.returncode}; cmd={' '.join(command)}") output_file.unlink(missing_ok=True) assert result.stdout is not None - logging.error(result.stdout.read()) - logging.error(total) + logging.debug(result.stdout.read()) + logging.debug(total) if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in [ "h264_mediacodec", "hevc_mediacodec", @@ -223,7 +228,7 @@ def process_video( return False else: return True - elif CFG["disable_hwaccel_when_fail"]: + elif CFG["disable_hwaccel_when_fail"] and CFG["hwaccel"] is not None: logging.info("正在禁用硬件加速器重试,进度条可能发生混乱。") output_file.unlink(missing_ok=True) bak = CFG.copy() @@ -259,7 +264,7 @@ def traverse_directory(root_dir: Path): video_extensions = set(CFG["video_ext"]) sm = None # 获取视频文件列表和帧数信息 - video_files = [] + video_files:list[Path] = [] que = list(root_dir.glob("*")) while que: d = que.pop() @@ -274,9 +279,9 @@ def traverse_directory(root_dir: Path): elif file.is_dir(): que.append(file) - if not video_files: - logging.warning("未找到需要处理的视频文件") - return + if not video_files: + logging.warning("未找到需要处理的视频文件") + return # 获取视频信息 frames: dict[Path, float] = {} @@ -299,28 +304,12 @@ def traverse_directory(root_dir: Path): if file in cached_data and cached_data[file] > 0: frames[file] = cached_data[file] continue - cmd = f"ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1".split() - cmd.append(str(file.resolve())) - proc = subprocess.run(cmd, capture_output=True, text=True) - if proc.returncode != 0: - logging.debug(f"无法获取视频信息: {file}, 返回码: {proc.returncode}") - frames[file] = 0 - continue - if proc.stdout.strip(): - try: - avg_frame_rate, duration = proc.stdout.strip().split("\n") - tmp = avg_frame_rate.split("/") - avg_frame_rate = float(tmp[0]) / float(tmp[1]) - if duration == "N/A": - duration = 0 - logging.debug( - f"无法获取视频信息: {file}, 时长为N/A,默认使用0s" - ) - duration = float(duration) - frames[file] = duration * avg_frame_rate - except (ValueError, IndexError) as e: - logging.debug(f"解析视频信息失败: {file}, 错误: {e}") - frames[file] = 0 + fr = get_frame.get_video_frame_count(str(file.resolve())) + if fr is None: + logging.debug( + f"无法获取视频信息: {file}, 时长为N/A,默认使用0s" + ) + frames[file] = 0 if fr is None else fr if 0 in frames.values(): logging.warning( f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。" @@ -512,4 +501,5 @@ def main(_root=None): if __name__ == "__main__": + sys.argv.append(r'C:\Users\flt\Documents\WeChat Files\wxid_m8h0igh8p52p22\FileStorage\Video') main()