This commit is contained in:
2025-10-20 22:35:24 +08:00
parent 072a198032
commit 983ad0c8b6

View File

@ -8,15 +8,15 @@ from time import time
from rich.logging import RichHandler from rich.logging import RichHandler
from rich.progress import Progress from rich.progress import Progress
from pickle import dumps, loads from pickle import dumps, loads
from typing import Optional,Callable from typing import Optional, Callable
import atexit import atexit
import re import re
root = None root = None
CFG_FILE = Path(sys.path[0])/"config.json" CFG_FILE = Path(sys.path[0]) / "config.json"
CFG = { CFG = {
"save_to": "single", "save_to": "single",
"crf":"18", "crf": "18",
"bitrate": None, "bitrate": None,
"codec": "h264", "codec": "h264",
"hwaccel": None, "hwaccel": None,
@ -35,18 +35,14 @@ CFG = {
} }
def get_cmd(video_path:str|Path,output_file:str|Path) -> list[str]: def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
if isinstance(video_path, Path): if isinstance(video_path, Path):
video_path = str(video_path.resolve()) video_path = str(video_path.resolve())
if isinstance(output_file, Path): if isinstance(output_file, Path):
output_file = str(output_file.resolve()) output_file = str(output_file.resolve())
if CFG["manual"] is not None: if CFG["manual"] is not None:
command=[ command = [CFG["ffmpeg"], "-hide_banner", "-i", video_path]
CFG["ffmpeg"],
"-hide_banner",
"-i", video_path
]
command.extend(CFG["manual"]) command.extend(CFG["manual"])
command.append(output_file) command.append(output_file)
return command return command
@ -56,34 +52,58 @@ def get_cmd(video_path:str|Path,output_file:str|Path) -> list[str]:
"-hide_banner", "-hide_banner",
] ]
if CFG["hwaccel"] is not None: if CFG["hwaccel"] is not None:
command.extend([ command.extend(
"-hwaccel", CFG["hwaccel"], [
]) "-hwaccel",
command.extend([ CFG["hwaccel"],
"-i", video_path, ]
]) )
command.extend(
[
"-i",
video_path,
]
)
if CFG["bitrate"] is not None: if CFG["bitrate"] is not None:
if CFG['resolution'] is not None: if CFG["resolution"] is not None:
command.extend([ command.extend(
"-vf", f"scale={CFG['resolution']}",]) [
command.extend([ "-vf",
"-c:v", CFG["codec"], f"scale={CFG['resolution']}",
"-b:v", CFG["bitrate"], ]
"-r",CFG["fps"], )
"-y", command.extend(
]) [
"-c:v",
CFG["codec"],
"-b:v",
CFG["bitrate"],
"-r",
CFG["fps"],
"-y",
]
)
else: else:
if CFG['resolution'] is not None: if CFG["resolution"] is not None:
command.extend([ command.extend(
"-vf", f"scale={CFG['resolution']}",]) [
command.extend([ "-vf",
"-c:v", CFG["codec"], f"scale={CFG['resolution']}",
"-global_quality", str(CFG["crf"]), ]
"-r",CFG["fps"], )
"-y", command.extend(
]) [
"-c:v",
CFG["codec"],
"-global_quality",
str(CFG["crf"]),
"-r",
CFG["fps"],
"-y",
]
)
command.extend(CFG["extra"]) command.extend(CFG["extra"])
command.append(output_file) command.append(output_file)
@ -91,41 +111,39 @@ def get_cmd(video_path:str|Path,output_file:str|Path) -> list[str]:
return command return command
# 配置logging # 配置logging
def setup_logging(): def setup_logging():
log_dir = Path("logs") log_dir = Path("logs")
log_dir.mkdir(exist_ok=True) log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log" log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log"
stream = RichHandler(rich_tracebacks=True,tracebacks_show_locals=True) stream = RichHandler(rich_tracebacks=True, tracebacks_show_locals=True)
stream.setLevel(logging.INFO) stream.setLevel(logging.INFO)
stream.setFormatter(logging.Formatter("%(message)s")) stream.setFormatter(logging.Formatter("%(message)s"))
file = logging.FileHandler(log_file, encoding='utf-8') file = logging.FileHandler(log_file, encoding="utf-8")
file.setLevel(logging.DEBUG) file.setLevel(logging.DEBUG)
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.DEBUG,
format='%(asctime)s - %(levelname) 7s - %(message)s', format="%(asctime)s - %(levelname) 7s - %(message)s",
handlers=[ handlers=[file, stream],
file,
stream
]
) )
def fmt_time(t:float|int) -> str:
if t>3600: def fmt_time(t: float | int) -> str:
if t > 3600:
return f"{t//3600}h {t//60}min {t%60}s" return f"{t//3600}h {t//60}min {t%60}s"
elif t>60: elif t > 60:
return f"{t//60}min {t%60}s" return f"{t//60}min {t%60}s"
else: else:
return f"{round(t)}s" return f"{round(t)}s"
def process_video( def process_video(
video_path: Path, video_path: Path,
compress_dir:Optional[Path]=None , compress_dir: Optional[Path] = None,
update_func:Optional[Callable[[Optional[int],Optional[str]],None]]=None): update_func: Optional[Callable[[Optional[int], Optional[str]], None]] = None,
):
if compress_dir is None: if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在) # 在视频文件所在目录下创建 compress 子目录(如果不存在)
@ -134,8 +152,8 @@ def process_video(
assert root assert root
compress_dir /= video_path.parent.relative_to(root) compress_dir /= video_path.parent.relative_to(root)
assert isinstance(compress_dir,Path) assert isinstance(compress_dir, Path)
compress_dir.mkdir(exist_ok=True,parents=True) compress_dir.mkdir(exist_ok=True, parents=True)
# 输出文件路径:与原文件同名,保存在 compress 目录下 # 输出文件路径:与原文件同名,保存在 compress 目录下
output_file = compress_dir / (video_path.stem + video_path.suffix) output_file = compress_dir / (video_path.stem + video_path.suffix)
@ -144,7 +162,7 @@ def process_video(
return False return False
video_path_str = str(video_path.absolute()) video_path_str = str(video_path.absolute())
command = get_cmd(video_path_str,output_file) command = get_cmd(video_path_str, output_file)
try: try:
result = subprocess.Popen( result = subprocess.Popen(
@ -152,7 +170,7 @@ def process_video(
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
encoding="utf-8", encoding="utf-8",
text=True text=True,
) )
total = "" total = ""
@ -160,38 +178,45 @@ def process_video(
line = " " line = " "
while result.poll() is None and line[-1:] not in "\r\n": while result.poll() is None and line[-1:] not in "\r\n":
assert result.stderr is not None assert result.stderr is not None
line+=result.stderr.read(1) line += result.stderr.read(1)
total+=line[-1] total += line[-1]
# print(line[-1]) # print(line[-1])
if 'warning' in line.lower(): if "warning" in line.lower():
logging.warning(f"[FFmpeg]({video_path_str}): {line}") logging.warning(f"[FFmpeg]({video_path_str}): {line}")
elif 'error' in line.lower(): elif "error" in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}") logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "assertion" in line.lower(): elif "assertion" in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}") logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "frame=" in line and update_func is not None: elif "frame=" in line and update_func is not None:
# print(line,end="") # print(line,end="")
match = re.search(r"frame=\s*(\d+)",line) match = re.search(r"frame=\s*(\d+)", line)
frame_number = int(match.group(1)) if match else None frame_number = int(match.group(1)) if match else None
match = re.search(r"[\d\.]+x",line) match = re.search(r"[\d\.]+x", line)
rate = match.group(0) if match else None rate = match.group(0) if match else None
update_func(frame_number,rate) update_func(frame_number, rate)
if result.returncode != 0: if result.returncode != 0:
logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode}cmd={' '.join(command)}") logging.error(
f"处理文件 {video_path_str} 失败,返回码: {result.returncode}cmd={' '.join(command)}"
)
output_file.unlink(missing_ok=True) output_file.unlink(missing_ok=True)
assert result.stdout is not None assert result.stdout is not None
logging.error(result.stdout.read()) logging.error(result.stdout.read())
logging.error(total) logging.error(total)
if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in ["h264_mediacodec","hevc_mediacodec"]: if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in [
logging.info("mediacodec硬件加速器已知在较短片段上存在异常,将禁用加速重试。") "h264_mediacodec",
"hevc_mediacodec",
]:
logging.info(
"mediacodec硬件加速器已知在较短片段上存在异常将禁用加速重试。"
)
output_file.unlink(missing_ok=True) output_file.unlink(missing_ok=True)
bak = CFG.copy() bak = CFG.copy()
CFG["hwaccel"] = None CFG["hwaccel"] = None
CFG["codec"] = "h264" if CFG["codec"]=="h264_mediacodec" else "hevc" CFG["codec"] = "h264" if CFG["codec"] == "h264_mediacodec" else "hevc"
assert not output_file.exists() assert not output_file.exists()
ret = process_video(video_path,compress_dir,update_func) ret = process_video(video_path, compress_dir, update_func)
CFG.update(bak) CFG.update(bak)
if not ret: if not ret:
logging.error("重试仍然失败。") logging.error("重试仍然失败。")
@ -203,13 +228,15 @@ def process_video(
output_file.unlink(missing_ok=True) output_file.unlink(missing_ok=True)
bak = CFG.copy() bak = CFG.copy()
CFG["hwaccel"] = None CFG["hwaccel"] = None
if CFG['codec'].endswith("_mediacodec") or \ if (
CFG['codec'].endswith("_qsv") or \ CFG["codec"].endswith("_mediacodec")
CFG['codec'].endswith("_nvenc") or\ or CFG["codec"].endswith("_qsv")
CFG['codec'].endswith("_amf"): or CFG["codec"].endswith("_nvenc")
or CFG["codec"].endswith("_amf")
):
CFG["codec"] = CFG["codec"].split("_")[0] CFG["codec"] = CFG["codec"].split("_")[0]
assert not output_file.exists() assert not output_file.exists()
ret = process_video(video_path,compress_dir,update_func) ret = process_video(video_path, compress_dir, update_func)
CFG.update(bak) CFG.update(bak)
if not ret: if not ret:
logging.error("重试仍然失败。") logging.error("重试仍然失败。")
@ -217,29 +244,36 @@ def process_video(
else: else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}") logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
except KeyboardInterrupt as e:raise e except KeyboardInterrupt as e:
raise e
except Exception as e: except Exception as e:
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",exc_info=e) logging.error(
f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",
exc_info=e,
)
return False return False
return True return True
def traverse_directory(root_dir: Path): def traverse_directory(root_dir: Path):
video_extensions = set(CFG["video_ext"]) video_extensions = set(CFG["video_ext"])
sm=None sm = None
# 获取视频文件列表和帧数信息 # 获取视频文件列表和帧数信息
video_files = [] video_files = []
que = list(root_dir.glob("*")) que = list(root_dir.glob("*"))
while que: while que:
d = que.pop() d = que.pop()
for file in d.glob("*") if d.is_dir() else [d]: for file in d.glob("*") if d.is_dir() else [d]:
if file.parent.name == CFG["compress_dir_name"] or file.name == CFG["compress_dir_name"]: if (
file.parent.name == CFG["compress_dir_name"]
or file.name == CFG["compress_dir_name"]
):
continue continue
if file.is_file() and file.suffix.lower() in video_extensions: if file.is_file() and file.suffix.lower() in video_extensions:
video_files.append(file) video_files.append(file)
elif file.is_dir(): elif file.is_dir():
que.append(file) que.append(file)
if not video_files: if not video_files:
logging.warning("未找到需要处理的视频文件") logging.warning("未找到需要处理的视频文件")
return return
@ -256,16 +290,16 @@ def traverse_directory(root_dir: Path):
else: else:
cached_data = {} cached_data = {}
except Exception as e: except Exception as e:
logging.debug("Failed to load video info cache.",exc_info=e) logging.debug("Failed to load video info cache.", exc_info=e)
with Progress() as prog: with Progress() as prog:
task = prog.add_task("正在获取视频信息", total=len(video_files)) task = prog.add_task("正在获取视频信息", total=len(video_files))
for file in video_files: for file in video_files:
prog.advance(task) prog.advance(task)
if file in cached_data and cached_data[file]>0: if file in cached_data and cached_data[file] > 0:
frames[file] = cached_data[file] frames[file] = cached_data[file]
continue continue
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split() cmd = f"ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1".split()
cmd.append(str(file.resolve())) cmd.append(str(file.resolve()))
proc = subprocess.run(cmd, capture_output=True, text=True) proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0: if proc.returncode != 0:
@ -274,33 +308,38 @@ def traverse_directory(root_dir: Path):
continue continue
if proc.stdout.strip(): if proc.stdout.strip():
try: try:
avg_frame_rate, duration = proc.stdout.strip().split('\n') avg_frame_rate, duration = proc.stdout.strip().split("\n")
tmp = avg_frame_rate.split('/') tmp = avg_frame_rate.split("/")
avg_frame_rate = float(tmp[0]) / float(tmp[1]) avg_frame_rate = float(tmp[0]) / float(tmp[1])
if duration == "N/A": if duration == "N/A":
duration = 0 duration = 0
logging.debug(f"无法获取视频信息: {file}, 时长为N/A默认使用0s") logging.debug(
f"无法获取视频信息: {file}, 时长为N/A默认使用0s"
)
duration = float(duration) duration = float(duration)
frames[file] = duration * avg_frame_rate frames[file] = duration * avg_frame_rate
except (ValueError, IndexError) as e: except (ValueError, IndexError) as e:
logging.debug(f"解析视频信息失败: {file}, 错误: {e}") logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
frames[file] = 0 frames[file] = 0
if 0 in frames.values(): if 0 in frames.values():
logging.warning(f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。") logging.warning(
f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。"
)
prog.remove_task(task) prog.remove_task(task)
try: try:
info_file.write_bytes(dumps(frames)) info_file.write_bytes(dumps(frames))
logging.debug("Saved video info to cache.") logging.debug("Saved video info to cache.")
except Exception as e: except Exception as e:
logging.debug("Failed to save video info cache.",exc_info=e) logging.debug("Failed to save video info cache.", exc_info=e)
logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件") logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件")
# 创建进度条 # 创建进度条
with Progress() as prog: with Progress() as prog:
total_frames = sum(frames.values()) total_frames = sum(frames.values())
main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames)) main_task = prog.add_task(
"总进度", total=total_frames if total_frames > 0 else len(frames)
)
# 创建文件队列 # 创建文件队列
for file in frames.keys(): for file in frames.keys():
@ -311,38 +350,51 @@ def traverse_directory(root_dir: Path):
if frames[file] == 0: if frames[file] == 0:
file_task = prog.add_task(f"{filename}") file_task = prog.add_task(f"{filename}")
else: else:
file_task = prog.add_task(f"{filename}",total=frames[file]) file_task = prog.add_task(f"{filename}", total=frames[file])
with prog._lock: with prog._lock:
completed_start = prog._tasks[main_task].completed completed_start = prog._tasks[main_task].completed
def update_progress(x, rate): def update_progress(x, rate):
if frames[file] == 0: if frames[file] == 0:
prog.update(file_task,description=f"{filename} 已处理{x}{f'速率{rate}' if rate else ''}") prog.update(
file_task,
description=f"{filename} 已处理{x}{f'速率{rate}' if rate else ''}",
)
else: else:
prog.update(file_task,completed=x,description=f"{filename} {f'速率{rate}' if rate else ''}") prog.update(
prog.update(main_task, completed=completed_start+x) file_task,
completed=x,
description=f"{filename} {f'速率{rate}' if rate else ''}",
)
prog.update(main_task, completed=completed_start + x)
if CFG["save_to"] == "single": if CFG["save_to"] == "single":
process_video(file, root_dir/CFG["compress_dir_name"], update_progress) process_video(
file, root_dir / CFG["compress_dir_name"], update_progress
)
else: else:
process_video(file, None, update_progress) process_video(file, None, update_progress)
# 移除文件级进度条 # 移除文件级进度条
prog.update(main_task, completed=completed_start+frames[file]) prog.update(main_task, completed=completed_start + frames[file])
prog.remove_task(file_task) prog.remove_task(file_task)
try: try:
info_file.unlink(missing_ok=True) info_file.unlink(missing_ok=True)
except Exception as e: except Exception as e:
logging.warning("无法删除视频信息缓存文件",exc_info=e) logging.warning("无法删除视频信息缓存文件", exc_info=e)
def test(): def test():
os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"] os.environ["PATH"] = (
Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
)
try: try:
subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode() subprocess.run(
[CFG["ffmpeg"], "-version"], stdout=-3, stderr=-3
).check_returncode()
except Exception as e: except Exception as e:
print(__file__) print(__file__)
logging.critical("无法运行ffmpeg") logging.critical("无法运行ffmpeg")
@ -352,19 +404,19 @@ def test():
f"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(), f"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True text=True,
) )
if ret.returncode != 0: if ret.returncode != 0:
logging.warning("无法生成测试视频.") logging.warning("无法生成测试视频.")
logging.debug(ret.stdout) logging.debug(ret.stdout)
logging.debug(ret.stderr) logging.debug(ret.stderr)
ret.check_returncode() ret.check_returncode()
cmd = get_cmd(CFG["test_video_input"],CFG["test_video_output"],) cmd = get_cmd(
CFG["test_video_input"],
CFG["test_video_output"],
)
ret = subprocess.run( ret = subprocess.run(
cmd, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
) )
if ret.returncode != 0: if ret.returncode != 0:
logging.error("测试视频压缩失败") logging.error("测试视频压缩失败")
@ -374,21 +426,23 @@ def test():
exit(-1) exit(-1)
os.remove("compress_video_test.mp4") os.remove("compress_video_test.mp4")
os.remove("compressed_video_test.mp4") os.remove("compressed_video_test.mp4")
except KeyboardInterrupt as e:raise e except KeyboardInterrupt as e:
raise e
except Exception as e: except Exception as e:
if os.path.exists("compress_video_test.mp4"): if os.path.exists("compress_video_test.mp4"):
os.remove("compress_video_test.mp4") os.remove("compress_video_test.mp4")
logging.warning("测试未通过,继续运行可能出现未定义行为。") logging.warning("测试未通过,继续运行可能出现未定义行为。")
logging.debug("Test error",exc_info=e) logging.debug("Test error", exc_info=e)
def exit_pause(): def exit_pause():
if os.name == 'nt': if os.name == "nt":
os.system("pause") os.system("pause")
elif os.name == 'posix': elif os.name == "posix":
os.system("read -p 'Press Enter to continue...'") os.system("read -p 'Press Enter to continue...'")
def main(_root = None):
def main(_root=None):
atexit.register(exit_pause) atexit.register(exit_pause)
@ -396,24 +450,27 @@ def main(_root = None):
setup_logging() setup_logging()
tot_bgn = time() tot_bgn = time()
logging.info("-------------------------------") logging.info("-------------------------------")
logging.info(datetime.now().strftime('Video Compress started at %Y/%m/%d %H:%M')) logging.info(datetime.now().strftime("Video Compress started at %Y/%m/%d %H:%M"))
if CFG_FILE.exists(): if CFG_FILE.exists():
try: try:
import json import json
cfg:dict = json.loads(CFG_FILE.read_text())
cfg: dict = json.loads(CFG_FILE.read_text())
CFG.update(cfg) CFG.update(cfg)
except KeyboardInterrupt as e:raise e except KeyboardInterrupt as e:
raise e
except Exception as e: except Exception as e:
logging.warning("Invalid config file, ignored.") logging.warning("Invalid config file, ignored.")
logging.debug(e) logging.debug(e)
else: else:
try: try:
import json import json
CFG_FILE.write_text(json.dumps(CFG,indent=4))
CFG_FILE.write_text(json.dumps(CFG, indent=4))
logging.info("Config file created.") logging.info("Config file created.")
except Exception as e: except Exception as e:
logging.warning("Failed to create config file.",exc_info=e) logging.warning("Failed to create config file.", exc_info=e)
if _root is not None: if _root is not None:
root = Path(_root) root = Path(_root)
@ -444,9 +501,15 @@ def main(_root = None):
logging.info(f"Elapsed time: {fmt_time(tot_end-tot_bgn)}") logging.info(f"Elapsed time: {fmt_time(tot_end-tot_bgn)}")
logging.info("Normal termination of Video Compress.") logging.info("Normal termination of Video Compress.")
except KeyboardInterrupt: except KeyboardInterrupt:
logging.warning("Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.") logging.warning(
"Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED."
)
except Exception as e: except Exception as e:
logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e) logging.error(
"Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",
exc_info=e,
)
if __name__ == "__main__": if __name__ == "__main__":
main() main()