Compare commits

..

3 Commits

Author SHA1 Message Date
4ae07c57cc fix and enhance get frame. 2025-10-24 22:37:18 +08:00
983ad0c8b6 format 2025-10-20 22:35:24 +08:00
072a198032 Enhance Videocompress 2025-10-20 22:35:03 +08:00
4 changed files with 411 additions and 166 deletions

View File

@ -1,12 +1,22 @@
{
"save_to": "single",
"crf": 18,
"codec": "h264",
"bitrate": null,
"crf": 26,
"codec": "h264_qsv",
"hwaccel": "qsv",
"extra": [],
"ffmpeg": "ffmpeg",
"manual": null,
"video_ext": [
".mp4",
".mkv"
],
"extra": [],
"train": false
"compress_dir_name": "compress_qsv",
"resolution": null,
"fps": "30",
"test_video_resolution": "1920x1080",
"test_video_fps": "30",
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
"disable_hwaccel_when_fail": true
}

View File

@ -2,9 +2,11 @@
"save_to": "single",
"bitrate": "3M",
"codec": "h264_mediacodec",
"hwaccel": "mediacodec",
"ffmpeg": "ffmpeg",
"video_ext": [
".mp4",
".mkv"
],
"resolution": null
}

127
VideoCompress/get_frame.py Normal file
View File

@ -0,0 +1,127 @@
import json
import shutil
import subprocess
from fractions import Fraction
from decimal import Decimal
from typing import Optional, Tuple
class FFProbeError(RuntimeError):
pass
def _run_ffprobe(args: list[str]) -> dict:
"""运行 ffprobe 并以 JSON 返回,若失败抛异常。"""
if not shutil.which("ffprobe"):
raise FileNotFoundError("未找到 ffprobe请先安装 FFmpeg 并确保 ffprobe 在 PATH 中。")
# 始终要求 JSON 输出,便于稳健解析
base = ["ffprobe", "-v", "error", "-print_format", "json"]
proc = subprocess.run(base + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
raise FFProbeError(proc.stderr.strip() or "ffprobe 调用失败")
try:
return json.loads(proc.stdout or "{}")
except json.JSONDecodeError as e:
raise FFProbeError(f"无法解析 ffprobe 输出为 JSON: {e}")
def _try_nb_frames(path: str, stream_index: int) -> Optional[int]:
data = _run_ffprobe([
"-select_streams", f"v:{stream_index}",
"-show_entries", "stream=nb_frames",
path
])
streams = data.get("streams") or []
if not streams:
return None
nb = streams[0].get("nb_frames")
if nb and nb != "N/A":
try:
n = int(nb)
return n if n >= 0 else None
except ValueError:
return None
return None
def _try_avgfps_times_duration(path: str, stream_index: int) -> Optional[int]:
# 读 avg_frame_rate
s = _run_ffprobe([
"-select_streams", f"v:{stream_index}",
"-show_entries", "stream=avg_frame_rate",
path
])
streams = s.get("streams") or []
if not streams:
return None
afr = streams[0].get("avg_frame_rate")
if not afr or afr in ("0/0", "N/A"):
return None
# 读容器时长(单位:秒)
f = _run_ffprobe(["-show_entries", "format=duration", path])
dur_str = (f.get("format") or {}).get("duration")
if not dur_str:
return None
try:
fps = Fraction(afr) # 形如 "30000/1001"
dur = Decimal(dur_str)
# 四舍五入到最近整数,避免系统性低估
est = int(dur * Decimal(fps.numerator) / Decimal(fps.denominator) + Decimal("0.5"))
return est if est >= 0 else None
except Exception:
return None
def _try_count_packets(path: str, stream_index: int) -> Optional[int]:
# 统计读取到的包数(不解码)。大多容器≈帧数,但不保证 1:1
data = _run_ffprobe([
"-select_streams", f"v:{stream_index}",
"-count_packets",
"-show_entries", "stream=nb_read_packets",
path
])
streams = data.get("streams") or []
if not streams:
return None
nbp = streams[0].get("nb_read_packets")
try:
n = int(nbp)
return n if n >= 0 else None
except Exception:
return None
def get_video_frame_count(
path: str,
stream_index: int = 0,
fallback_order: Tuple[str, ...] = ("nb_frames", "avg*dur", "count_packets"),
) -> int|None:
"""
估计/获取视频总帧数(带回退)。
参数:
- path: 视频文件路径
- stream_index: 选择哪个视频流,默认 0
- allow_slow_decode: 是否允许用解码全片的方式(最慢但最准)
- fallback_order: 回退顺序,四种方法的别名可选:
"nb_frames" -> 直接读元数据中的总帧数
"avg*dur" -> 平均帧率 × 时长(最快估算)
"count_packets" -> 统计包数(较快,接近帧数但不保证)
返回:
(frame_count, method_used)
异常:
- FileNotFoundError: ffprobe 未安装
- FFProbeError: ffprobe 调用异常或无法解析
- RuntimeError: 所有方法均失败
"""
methods = {
"nb_frames": _try_nb_frames,
"avg*dur": _try_avgfps_times_duration,
"count_packets": _try_count_packets,
}
for key in fallback_order:
func = methods.get(key)
if not func:
continue
n = func(path, stream_index)
if isinstance(n, int) and n >= 0:
return n
return None
raise RuntimeError("无法获取或估计帧数:所有回退方法均失败。")

View File

@ -8,295 +8,382 @@ from time import time
from rich.logging import RichHandler
from rich.progress import Progress
from pickle import dumps, loads
from typing import Optional,Callable
from typing import Optional, Callable
import atexit
import re
import get_frame
root = None
CFG_FILE = Path(sys.path[0])/"config.json"
CFG_FILE = Path(sys.path[0]) / "config.json"
CFG = {
"save_to": "single",
"crf":"18",
"crf": "18",
"bitrate": None,
"codec": "h264",
"hwaccel": None,
"extra": [],
"ffmpeg": "ffmpeg",
"manual": None,
"video_ext": [".mp4", ".mkv"],
"compress_dir_name": "compress",
"resolution": "-1:1080",
"resolution": None,
"fps": "30",
"test_video_resolution": "1920x1080",
"test_video_fps": "30",
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
"disable_hwaccel_when_fail": True,
}
def get_cmd(video_path:str|Path,output_file:str|Path) -> list[str]:
def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
if isinstance(video_path, Path):
video_path = str(video_path.resolve())
if isinstance(output_file, Path):
output_file = str(output_file.resolve())
if CFG["manual"] is not None:
command=[
CFG["ffmpeg"],
"-hide_banner",
"-i", video_path
]
command = [CFG["ffmpeg"], "-hide_banner", "-i", video_path]
command.extend(CFG["manual"])
command.append(output_file)
return command
if CFG["bitrate"] is not None:
command = [
CFG["ffmpeg"],
"-hide_banner",
"-i", video_path,
]
if CFG['resolution'] is not None:
command.extend([
"-vf", f"scale={CFG['resolution']}",])
command.extend([
"-c:v", CFG["codec"],
"-b:v", CFG["bitrate"],
"-r",CFG["fps"],
"-y",
])
else:
command = [
CFG["ffmpeg"],
"-hide_banner",
"-i", video_path,
command = [
CFG["ffmpeg"],
"-hide_banner",
]
if CFG["hwaccel"] is not None:
command.extend(
[
"-hwaccel",
CFG["hwaccel"],
"-hwaccel_output_format",
CFG["hwaccel"],
]
if CFG['resolution'] is not None:
command.extend([
"-vf", f"scale={CFG['resolution']}",])
command.extend([
"-c:v", CFG["codec"],
"-global_quality", str(CFG["crf"]),
"-r",CFG["fps"],
"-y",
])
)
command.extend(
[
"-i",
video_path,
]
)
if CFG["bitrate"] is not None:
if CFG["resolution"] is not None:
command.extend(
[
"-vf",
f"scale={CFG['resolution']}",
]
)
command.extend(
[
"-c:v",
CFG["codec"],
"-b:v",
CFG["bitrate"],
"-r",
CFG["fps"],
"-y",
]
)
else:
if CFG["resolution"] is not None:
command.extend(
[
"-vf",
f"scale={CFG['resolution']}",
]
)
command.extend(
[
"-c:v",
CFG["codec"],
"-global_quality",
str(CFG["crf"]),
"-r",
CFG["fps"],
"-y",
]
)
command.extend(CFG["extra"])
command.append(output_file)
logging.debug(f"Create CMD: {command}")
return command
# 配置logging
def setup_logging():
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log"
stream = RichHandler(rich_tracebacks=True,tracebacks_show_locals=True)
stream = RichHandler(rich_tracebacks=True, tracebacks_show_locals=True)
stream.setLevel(logging.INFO)
stream.setFormatter(logging.Formatter("%(message)s"))
file = logging.FileHandler(log_file, encoding='utf-8')
file = logging.FileHandler(log_file, encoding="utf-8")
file.setLevel(logging.DEBUG)
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname) 7s - %(message)s',
handlers=[
file,
stream
]
format="%(asctime)s - %(levelname) 7s - %(message)s",
handlers=[file, stream],
)
def fmt_time(t:float|int) -> str:
if t>3600:
def fmt_time(t: float | int) -> str:
if t > 3600:
return f"{t//3600}h {t//60}min {t%60}s"
elif t>60:
elif t > 60:
return f"{t//60}min {t%60}s"
else:
return f"{round(t)}s"
def process_video(
video_path: Path,
compress_dir:Optional[Path]=None ,
update_func:Optional[Callable[[Optional[int],Optional[str]],None]]=None):
video_path: Path,
compress_dir: Optional[Path] = None,
update_func: Optional[Callable[[Optional[int], Optional[str]], None]] = None,
):
if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在)
compress_dir = video_path.parent / CFG["compress_dir_name"]
else:
assert root
compress_dir /= video_path.parent.relative_to(root)
assert isinstance(compress_dir,Path)
compress_dir.mkdir(exist_ok=True,parents=True)
assert isinstance(compress_dir, Path)
compress_dir.mkdir(exist_ok=True, parents=True)
# 输出文件路径:与原文件同名,保存在 compress 目录下
output_file = compress_dir / (video_path.stem + video_path.suffix)
if output_file.is_file():
logging.warning(f"文件{output_file}存在,跳过")
return
return False
video_path_str = str(video_path.absolute())
command = get_cmd(video_path_str,output_file)
command = get_cmd(video_path_str, output_file)
try:
result = subprocess.Popen(
command,
stdout=subprocess.PIPE,
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
text=True
encoding="utf-8",
text=True,
)
total = ""
while result.poll() is None:
line = " "
while result.poll() is None and line[-1:] not in "\r\n":
assert result.stderr is not None
line+=result.stderr.read(1)
line += result.stderr.read(1)
total += line[-1]
# print(line[-1])
if 'warning' in line.lower():
if "warning" in line.lower():
logging.warning(f"[FFmpeg]({video_path_str}): {line}")
elif 'error' in line.lower():
elif "error" in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "assertion" in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "frame=" in line and update_func is not None:
# print(line,end="")
match = re.search(r"frame=\s*(\d+)",line)
match = re.search(r"frame=\s*(\d+)", line)
frame_number = int(match.group(1)) if match else None
match = re.search(r"[\d\.]+x",line)
rate = match.group(0) if match else None
update_func(frame_number,rate)
match = re.search(r"[\d\.]+x", line)
rate = match.group(0) if match else None
update_func(frame_number, rate)
if result.returncode != 0:
logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode}cmd={' '.join(command)}")
logging.error(result.stdout)
logging.error(result.stderr)
logging.error(
f"处理文件 {video_path_str} 失败"
)
logging.debug(f"返回码: {result.returncode}; cmd={' '.join(command)}")
output_file.unlink(missing_ok=True)
assert result.stdout is not None
logging.debug(result.stdout.read())
logging.debug(total)
if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in [
"h264_mediacodec",
"hevc_mediacodec",
]:
logging.info(
"mediacodec硬件加速器已知在较短片段上存在异常将禁用加速重试。"
)
output_file.unlink(missing_ok=True)
bak = CFG.copy()
CFG["hwaccel"] = None
CFG["codec"] = "h264" if CFG["codec"] == "h264_mediacodec" else "hevc"
assert not output_file.exists()
ret = process_video(video_path, compress_dir, update_func)
CFG.update(bak)
if not ret:
logging.error("重试仍然失败。")
return False
else:
return True
elif CFG["disable_hwaccel_when_fail"] and CFG["hwaccel"] is not None:
logging.info("正在禁用硬件加速器重试,进度条可能发生混乱。")
output_file.unlink(missing_ok=True)
bak = CFG.copy()
CFG["hwaccel"] = None
if (
CFG["codec"].endswith("_mediacodec")
or CFG["codec"].endswith("_qsv")
or CFG["codec"].endswith("_nvenc")
or CFG["codec"].endswith("_amf")
):
CFG["codec"] = CFG["codec"].split("_")[0]
assert not output_file.exists()
ret = process_video(video_path, compress_dir, update_func)
CFG.update(bak)
if not ret:
logging.error("重试仍然失败。")
return False
else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
except KeyboardInterrupt as e:raise e
except KeyboardInterrupt as e:
raise e
except Exception as e:
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",exc_info=e)
logging.error(
f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",
exc_info=e,
)
return False
return True
def traverse_directory(root_dir: Path):
video_extensions = set(CFG["video_ext"])
sm=None
sm = None
# 获取视频文件列表和帧数信息
video_files = []
video_files:list[Path] = []
que = list(root_dir.glob("*"))
while que:
d = que.pop()
for file in d.glob("*") if d.is_dir() else [d]:
if file.parent.name == CFG["compress_dir_name"] or file.name == CFG["compress_dir_name"]:
if (
file.parent.name == CFG["compress_dir_name"]
or file.name == CFG["compress_dir_name"]
):
continue
if file.is_file() and file.suffix.lower() in video_extensions:
video_files.append(file)
elif file.is_dir():
que.append(file)
if not video_files:
logging.warning("未找到需要处理的视频文件")
return
if not video_files:
logging.warning("未找到需要处理的视频文件")
return
# 获取视频信息
frames: dict[Path, float] = {}
cached_data: dict[Path, float] = {}
info_file = Path("video_info.cache")
if info_file.is_file():
try:
cached_data = loads(info_file.read_bytes())
if isinstance(cached_data, dict):
frames = cached_data
logging.debug("Loaded video info from cache.")
else:
cached_data = {}
except Exception as e:
logging.debug("Failed to load video info cache.",exc_info=e)
logging.debug("Failed to load video info cache.", exc_info=e)
with Progress() as prog:
task = prog.add_task("正在获取视频信息", total=len(video_files))
for file in video_files:
prog.advance(task)
if file in frames and frames[file]>0:
if file in cached_data and cached_data[file] > 0:
frames[file] = cached_data[file]
continue
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split()
cmd.append(str(file.resolve()))
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
logging.debug(f"无法获取视频信息: {file}, 返回码: {proc.returncode}")
frames[file] = 0
continue
if proc.stdout.strip():
try:
avg_frame_rate, duration = proc.stdout.strip().split('\n')
tmp = avg_frame_rate.split('/')
avg_frame_rate = float(tmp[0]) / float(tmp[1])
if duration == "N/A":
duration = 0
logging.debug(f"无法获取视频信息: {file}, 时长为N/A默认使用0s")
duration = float(duration)
frames[file] = duration * avg_frame_rate
except (ValueError, IndexError) as e:
logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
frames[file] = 0
fr = get_frame.get_video_frame_count(str(file.resolve()))
if fr is None:
logging.debug(
f"无法获取视频信息: {file}, 时长为N/A默认使用0s"
)
frames[file] = 0 if fr is None else fr
if 0 in frames.values():
logging.warning(f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。")
logging.warning(
f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。"
)
prog.remove_task(task)
try:
info_file.write_bytes(dumps(frames))
logging.debug("Saved video info to cache.")
except Exception as e:
logging.debug("Failed to save video info cache.",exc_info=e)
logging.debug("Failed to save video info cache.", exc_info=e)
logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件")
# 创建进度条
with Progress() as prog:
total_frames = sum(frames.values())
main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames))
main_task = prog.add_task(
"总进度", total=total_frames if total_frames > 0 else len(frames)
)
# 创建文件队列
for file in frames.keys():
# 进度跟踪
filename = file.relative_to(root_dir)
# 创建文件级进度条
if frames[file] == 0:
file_task = prog.add_task(f"{filename}")
else:
file_task = prog.add_task(f"{filename}",total=frames[file])
file_task = prog.add_task(f"{filename}", total=frames[file])
with prog._lock:
completed_start = prog._tasks[main_task].completed
def update_progress(x, rate):
if frames[file] == 0:
prog.update(file_task,description=f"{filename} 已处理{x}{f'速率{rate}' if rate else ''}")
prog.update(
file_task,
description=f"{filename} 已处理{x}{f'速率{rate}' if rate else ''}",
)
else:
prog.update(file_task,completed=x,description=f"{filename} {f'速率{rate}' if rate else ''}")
prog.update(main_task, completed=completed_start+x)
prog.update(
file_task,
completed=x,
description=f"{filename} {f'速率{rate}' if rate else ''}",
)
prog.update(main_task, completed=completed_start + x)
if CFG["save_to"] == "single":
process_video(file, root_dir/CFG["compress_dir_name"], update_progress)
process_video(
file, root_dir / CFG["compress_dir_name"], update_progress
)
else:
process_video(file, None, update_progress)
# 移除文件级进度条
prog.update(main_task, completed=completed_start+frames[file])
prog.update(main_task, completed=completed_start + frames[file])
prog.remove_task(file_task)
try:
info_file.unlink(missing_ok=True)
except Exception as e:
logging.warning("无法删除视频信息缓存文件",exc_info=e)
logging.warning("无法删除视频信息缓存文件", exc_info=e)
def test():
os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
os.environ["PATH"] = (
Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
)
try:
subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode()
subprocess.run(
[CFG["ffmpeg"], "-version"], stdout=-3, stderr=-3
).check_returncode()
except Exception as e:
print(__file__)
logging.critical("无法运行ffmpeg")
@ -306,19 +393,19 @@ def test():
f"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
text=True,
)
if ret.returncode != 0:
logging.warning("无法生成测试视频.")
logging.debug(ret.stdout)
logging.debug(ret.stderr)
ret.check_returncode()
cmd = get_cmd(CFG["test_video_input"],CFG["test_video_output"],)
cmd = get_cmd(
CFG["test_video_input"],
CFG["test_video_output"],
)
ret = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if ret.returncode != 0:
logging.error("测试视频压缩失败")
@ -328,40 +415,52 @@ def test():
exit(-1)
os.remove("compress_video_test.mp4")
os.remove("compressed_video_test.mp4")
except KeyboardInterrupt as e:raise e
except KeyboardInterrupt as e:
raise e
except Exception as e:
if os.path.exists("compress_video_test.mp4"):
os.remove("compress_video_test.mp4")
logging.warning("测试未通过,继续运行可能出现未定义行为。")
logging.debug("Test error",exc_info=e)
logging.debug("Test error", exc_info=e)
def exit_pause():
if os.name == 'nt':
if os.name == "nt":
os.system("pause")
elif os.name == 'posix':
elif os.name == "posix":
os.system("read -p 'Press Enter to continue...'")
def main(_root = None):
def main(_root=None):
atexit.register(exit_pause)
global root
setup_logging()
tot_bgn = time()
logging.info("-------------------------------")
logging.info(datetime.now().strftime('Video Compress started at %Y/%m/%d %H:%M'))
logging.info(datetime.now().strftime("Video Compress started at %Y/%m/%d %H:%M"))
if CFG_FILE.exists():
try:
import json
cfg:dict = json.loads(CFG_FILE.read_text())
cfg: dict = json.loads(CFG_FILE.read_text())
CFG.update(cfg)
except KeyboardInterrupt as e:raise e
except KeyboardInterrupt as e:
raise e
except Exception as e:
logging.warning("Invalid config file, ignored.")
logging.debug(e)
else:
try:
import json
CFG_FILE.write_text(json.dumps(CFG, indent=4))
logging.info("Config file created.")
except Exception as e:
logging.warning("Failed to create config file.", exc_info=e)
if _root is not None:
root = Path(_root)
else:
@ -371,29 +470,36 @@ def main(_root = None):
logging.warning("Error termination via invalid input.")
sys.exit(1)
root = Path(sys.argv[1])
if root.name.lower() == CFG["compress_dir_name"].lower():
logging.critical("请修改目标目录名为非compress。")
logging.error("Error termination via invalid input.")
sys.exit(1)
sys.exit(1)
logging.info("开始验证环境")
test()
if not root.is_dir():
print("提供的路径不是一个有效目录。")
logging.warning("Error termination via invalid input.")
sys.exit(1)
try:
traverse_directory(root)
tot_end = time()
logging.info(f"Elapsed time: {fmt_time(tot_end-tot_bgn)}")
logging.info("Normal termination of Video Compress.")
except KeyboardInterrupt:
logging.warning("Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.")
logging.warning(
"Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED."
)
except Exception as e:
logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e)
logging.error(
"Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",
exc_info=e,
)
if __name__ == "__main__":
sys.argv.append(r'C:\Users\flt\Documents\WeChat Files\wxid_m8h0igh8p52p22\FileStorage\Video')
main()