VideoCompress V1.1

This commit is contained in:
2025-05-07 14:31:07 +08:00
parent dff3bcbd7a
commit 4cb1ab42dc
2 changed files with 389 additions and 60 deletions

286
VideoCompress/config.py Normal file
View File

@ -0,0 +1,286 @@
import json
import os
import sys
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import main as main_program
CONFIG_NAME = "config.json"
DEFAULT_CONFIG = {
"crf": 18,
"codec": "h264", # could be h264, h264_qsv, h264_nvenc … etc.
"ffmpeg": "ffmpeg",
"video_ext": [".mp4", ".mkv"],
"extra": [],
"manual": None,
"train": False,
}
HW_SUFFIXES = ["amf", "qsv", "nvenc"]
CODECS_BASE = ["h264", "hevc"]
preset_options = {
"不使用":["","ultrafast","superfast","veryfast","faster","fast","medium","slow","slower","veryslow",],
"AMD": ["","speed","balanced","quality",],
"Intel": ["","veryfast","faster","fast","medium","slow",],
"NVIDIA": ["","default","slow","medium","fast","hp","hq",]
}
def config_path() -> str:
"""Return path of config file next to the executable / script."""
if getattr(sys, "frozen", False): # PyInstaller executable
base = os.path.dirname(sys.executable)
else:
base = os.path.dirname(os.path.abspath(__file__))
return os.path.join(base, CONFIG_NAME)
def load_config() -> dict:
try:
with open(config_path(), "r", encoding="utf-8") as fp:
data = json.load(fp)
if not isinstance(data, dict):
raise ValueError("config.json root must be an object")
return {**DEFAULT_CONFIG, **data}
except FileNotFoundError:
return DEFAULT_CONFIG.copy()
except Exception as exc:
messagebox.showwarning("FFmpeg Config", f"Invalid config.json using defaults.\n{exc}")
return DEFAULT_CONFIG.copy()
def save_config(cfg: dict):
try:
with open(config_path(), "w", encoding="utf-8") as fp:
json.dump(cfg, fp, ensure_ascii=False, indent=4)
except Exception as exc:
messagebox.showerror("配置中心", f"保存失败:\n{exc}")
else:
messagebox.showinfo("配置中心", "保存成功。")
class ConfigApp(tk.Tk):
def __init__(self):
super().__init__()
# 设置现代化主题和统一内边距
style = ttk.Style(self)
style.theme_use('clam')
self.title("配置中心")
self.resizable(False, False)
self.cfg = load_config()
if "-preset" in self.cfg["extra"]:
idx = self.cfg["extra"].index("-preset")
self.preset = self.cfg["extra"][idx+1]
self.cfg["extra"].pop(idx)
self.cfg["extra"].pop(idx)
else:
self.preset = ""
self._build_ui()
# ── helper --------------------------------------------------------------
def _grid_label(self, row: int, text: str):
tk.Label(self, text=text, anchor="w").grid(row=row, column=0, sticky="w", pady=2, padx=4)
def _str_var(self, key: str):
var = tk.StringVar(value=str(self.cfg.get(key, "")))
var.trace_add("write", lambda *_: self.cfg.__setitem__(key, var.get()))
return var
def _bool_var(self, key: str):
var = tk.BooleanVar(value=bool(self.cfg.get(key)))
var.trace_add("write", lambda *_: self.cfg.__setitem__(key, var.get()))
return var
def _list_var_entry(self, key: str, width: int = 28):
"""Commaseparated list entry bound to config[key]."""
var = tk.StringVar(value=",".join(self.cfg.get(key, [])))
def _update(*_):
self.cfg[key] = [s.strip() for s in var.get().split(",") if s.strip()]
var.trace_add("write", _update)
ent = tk.Entry(self, textvariable=var, width=width)
return ent
# ── UI ------------------------------------------------------------------
def _build_ui(self):
row = 0
padx_val = 6
pady_val = 4
# 编解码器
self._grid_label(row, "编解码器 (h264 / hevc)")
codec_base = tk.StringVar()
codec_base.set(next((c for c in CODECS_BASE if self.cfg["codec"].startswith(c)), "h264"))
codec_menu = ttk.Combobox(self, textvariable=codec_base, values=CODECS_BASE, state="readonly", width=10)
codec_menu.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val)
row += 1
# 显卡品牌(硬件加速)
self._grid_label(row, "GPU加速")
accel = tk.StringVar()
brand_vals = ["不使用", "NVIDIA", "AMD", "Intel"]
def get_brand():
codec = self.cfg["codec"]
if codec.endswith("_nvenc"):
return "NVIDIA"
elif codec.endswith("_amf"):
return "AMD"
elif codec.endswith("_qsv"):
return "Intel"
return "不使用"
accel.set(get_brand())
accel_menu = ttk.Combobox(self, textvariable=accel, values=brand_vals, state="readonly", width=10)
accel_menu.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val)
row += 1
# CRF或码率选择
mode = tk.StringVar(value="crf")
if "bitrate" in self.cfg:
mode.set("bitrate")
def _switch_mode():
if mode.get() == "crf":
bitrate_ent.configure(state="disabled")
crf_ent.configure(state="normal")
else:
crf_ent.configure(state="disabled")
bitrate_ent.configure(state="normal")
tk.Radiobutton(self, text="使用 CRF", variable=mode, value="crf", command=_switch_mode).grid(row=row, column=0, sticky="w", padx=padx_val, pady=pady_val)
tk.Radiobutton(self, text="使用码率", variable=mode, value="bitrate", command=_switch_mode).grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val)
row += 1
# CRF输入框并在标签中解释参数含义
self._grid_label(row, "CRF 值(质量常数,数值越低质量越高)")
crf_var = self._str_var("crf")
crf_ent = tk.Entry(self, textvariable=crf_var, width=6)
crf_ent.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val)
row += 1
# 码率输入框 (kbit/s)
self._grid_label(row, "码率 (例如6M, 200k)")
bitrate_var = tk.StringVar(value=str(self.cfg.get("bitrate", "")))
def _update_bitrate(*_):
if mode.get() == "bitrate":
try:
self.cfg["bitrate"] = bitrate_var.get().strip()
except ValueError:
self.cfg.pop("bitrate", None)
bitrate_var.trace_add("write", _update_bitrate)
bitrate_ent = tk.Entry(self, textvariable=bitrate_var, width=8)
bitrate_ent.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val)
row += 1
# 预设选项
self._grid_label(row, "预设 (决定压缩的速度和质量)")
preset_var = tk.StringVar(value=self.preset)
def _update_preset_option(*_):
preset_menu['values'] = preset_options[accel.get()]
preset_menu.set("")
def _update_preset(*_):
print(preset_var.get())
if self.cfg["extra"].count("-preset")>0:
idx = self.cfg["extra"].index("-preset")
self.cfg["extra"].pop(idx)
self.cfg["extra"].pop(idx)
if preset_var.get():
self.cfg["extra"].extend(["-preset", preset_var.get()])
preset_var.trace_add("write", _update_preset)
accel.trace_add("write",_update_preset_option)
preset_menu = ttk.Combobox(self, textvariable=preset_var, values=preset_options[get_brand()], state="readonly", width=10)
preset_menu.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val)
row += 1
# ffmpeg路径
self._grid_label(row, "ffmpeg 可执行文件")
ffmpeg_var = self._str_var("ffmpeg")
ffmpeg_ent = tk.Entry(self, textvariable=ffmpeg_var, width=28)
ffmpeg_ent.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val)
ttk.Button(self, text="浏览", command=lambda: self._pick_ffmpeg(ffmpeg_var)).grid(row=row, column=2, padx=2, pady=pady_val)
row += 1
# 视频扩展名列表
self._grid_label(row, "视频扩展名 (.x,.y)")
self._list_var_entry("video_ext").grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val)
row += 1
# 额外参数列表
self._grid_label(row, "额外参数列表")
extra_entry = self._list_var_entry("extra")
extra_entry.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val)
row += 1
# 手动参数列表
self._grid_label(row, "手动参数列表")
manual_var = tk.StringVar(value="" if self.cfg.get("manual") is None else " ".join(self.cfg["manual"]))
def _update_manual(*_):
txt = manual_var.get().strip()
self.cfg["manual"] = None if not txt else txt.split()
manual_var.trace_add("write", _update_manual)
tk.Entry(self, textvariable=manual_var, width=28).grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val)
row += 1
# 训练模式复选框
train_var = self._bool_var("train")
tk.Checkbutton(self, text="启用训练(实验性)", variable=train_var).grid(row=row, column=0, columnspan=2, sticky="w", padx=padx_val, pady=pady_val)
row += 1
# 按钮
ttk.Button(self, text="保存", command=lambda: self._on_save(codec_base, accel, mode)).grid(row=row, column=0, pady=8, padx=padx_val)
ttk.Button(self, text="退出", command=self.destroy).grid(row=row, column=1, pady=8)
_switch_mode() # 初始启用/禁用
# ── callbacks -----------------------------------------------------------
def _on_save(self, codec_base_var, accel_var, mode_var):
# 重构codec字符串同时处理显卡品牌映射
base = codec_base_var.get()
brand = accel_var.get()
brand_map = {"NVIDIA": "nvenc", "AMD": "amf", "Intel": "qsv", "不使用": ""}
if brand != "不使用":
self.cfg["codec"] = f"{base}_{brand_map[brand]}"
else:
self.cfg["codec"] = base
# 处理码率和crf的配置
if mode_var.get() == "crf":
self.cfg.pop("bitrate", None)
else:
br = self.cfg.get("bitrate", "")
if not (br.endswith("M") or br.endswith("k")):
messagebox.showwarning("警告", "码率参数可能配置错误。例如3M, 500k")
tmp = self.cfg["extra"]
idx = 0
if len(tmp) == 0:idx = -1
else:
while True:
if tmp[idx] == "-preset":break
elif idx+2==len(tmp):
idx = -1
break
else: idx+=1
if idx!=-1:
preset = self.cfg["extra"][idx+1]
if preset not in preset_options[brand]:
messagebox.showwarning("警告", "预设(preset)参数可能配置错误!")
save_config(self.cfg)
def _pick_ffmpeg(self, var):
path = filedialog.askopenfilename(title="Select ffmpeg executable")
if path:
var.set(path)
def main():
if len(sys.argv) > 1:
main_program.main()
else:
app = ConfigApp()
app.mainloop()
if __name__ == "__main__":
main()

View File

@ -1,6 +1,7 @@
import subprocess
from pathlib import Path
import sys
import os
import logging
from datetime import datetime
from time import time
@ -15,6 +16,7 @@ ESTI_FILE = Path("esti.out")
CFG_FILE = Path("config.json")
CFG = {
"crf":"18",
"bitrate": None,
"codec": "h264",
"extra": [],
"ffmpeg": "ffmpeg",
@ -34,18 +36,31 @@ def get_cmd(video_path,output_file):
]
command.extend(CFG["manual"])
command.append(output_file)
return command
if CFG["bitrate"] is not None:
command = [
CFG["ffmpeg"],
"-hide_banner",
"-i", video_path,
"-vf", "scale=-1:1080",
"-c:v", CFG["codec"],
"-b:v", CFG["bitrate"],
"-r","30",
"-y",
]
else:
command = [
CFG["ffmpeg"],
"-hide_banner",
"-i", video_path,
"-vf", "scale=-1:1080",
"-c:v", CFG["codec"],
"-global_quality", str(CFG["crf"]),
"-r","30",
"-y",
]
command = [
CFG["ffmpeg"],
"-hide_banner", # 隐藏 ffmpeg 的横幅信息
"-i", video_path,
"-vf", "scale=-1:1080", # 设置视频高度为 1080宽度按比例自动计算
"-c:v", CFG["codec"], # 使用 Intel Quick Sync Video 编码
"-global_quality", CFG["crf"], # 设置全局质量(数值越低质量越高)
"-r","30",
"-preset", "slow", # 设置压缩速度为慢(压缩效果较好)
"-y",
]
command.extend(CFG["extra"])
command.append(output_file)
return command
@ -70,11 +85,11 @@ def setup_logging():
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log"
stream = RichHandler(rich_tracebacks=True,tracebacks_show_locals=True)
stream.setLevel(logging.DEBUG)
stream.setLevel(logging.INFO)
stream.setFormatter(logging.Formatter("%(message)s"))
file = logging.FileHandler(log_file, encoding='utf-8')
file.setLevel(logging.INFO)
file.setLevel(logging.DEBUG)
logging.basicConfig(
level=logging.DEBUG,
@ -158,7 +173,8 @@ def save_esti():
# 保存为逗号分隔的文本格式
ESTI_FILE.write_text(','.join(map(str, coeffs)))
except Exception as e:
logging.warning("保存估算数据失败", exc_info=e)
logging.warning("保存估算数据失败")
logging.debug("error at save_esti",exc_info=e)
def fmt_time(t:int) -> str:
if t>3600:
@ -193,7 +209,8 @@ def func(sz:int,src=False):
return fmt_time(t)
except KeyboardInterrupt as e:raise e
except Exception as e:
logging.warning("无法计算预计时间",exc_info=e)
logging.warning("无法计算预计时间")
logging.debug("esti time exception", exc_info=e)
return -1 if src else "NaN"
def process_video(video_path: Path):
@ -202,9 +219,9 @@ def process_video(video_path: Path):
sz=video_path.stat().st_size//(1024*1024)
if esti is not None or TRAIN:
use = func(sz,True)
logging.debug(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M预计{fmt_time(use)}")
logging.info(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M预计{fmt_time(use)}")
else:
logging.debug(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M")
logging.info(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M")
bgn=time()
@ -218,26 +235,9 @@ def process_video(video_path: Path):
logging.warning(f"文件{output_file}存在,跳过")
return use
# 4x
# command = [
# "ffmpeg.exe", # 可以修改为 ffmpeg 的完整路径例如C:/ffmpeg/bin/ffmpeg.exe
# "-hide_banner", # 隐藏 ffmpeg 的横幅信息
# "-i", str(video_path.absolute()),
# "-filter:v", "setpts=0.25*PTS", # 设置视频高度为 1080宽度按比例自动计算
# "-filter:a", "atempo=4.0",
# "-c:v", "h264_qsv", # 使用 Intel Quick Sync Video 编码
# "-global_quality", "28", # 设置全局质量(数值越低质量越高)
# "-r","30",
# "-preset", "fast", # 设置压缩速度为慢(压缩效果较好)
# "-y",
# str(output_file.absolute())
# ]
# 1x
command = get_cmd(str(video_path.absolute()),output_file)
try:
# 调用 ffmpeg并捕获标准输出和错误信息
result = subprocess.run(
command,
stdout=subprocess.PIPE,
@ -246,16 +246,13 @@ def process_video(video_path: Path):
text=True
)
# 检查 ffmpeg 的错误输出
if result.stderr:
# 记录所有警告和错误信息
for line in result.stderr.splitlines():
if 'warning' in line.lower():
logging.warning(f"[FFmpeg]({video_path}): {line}")
elif 'error' in line.lower():
logging.error(f"[FFmpeg]({video_path}): {line}")
# 检查 ffmpeg 执行的返回码
if result.returncode != 0:
logging.error(f"处理文件 {video_path} 失败,返回码: {result.returncode}cmd={' '.join(command)}")
logging.error(result.stdout)
@ -297,21 +294,69 @@ def traverse_directory(root_dir: Path):
with Progress() as prog:
task = prog.add_task("压缩视频",total=sm)
# prog.print("进度条右侧时间为不精确估算(当所有文件处理时间相同时估算精确)")
# 使用 rglob 递归遍历所有文件
for file in root_dir.rglob("*"):
if file.parent.name == "compress":continue
if file.is_file() and file.suffix.lower() in video_extensions:
t = process_video(file)
# if esti is not None:
# sm-=t
# prog.update(task,advance=1,description=f"预计剩余{fmt_time(sm)}")
if t is None:
prog.advance(task)
else:
prog.advance(task,t)
if __name__ == "__main__":
def test():
try:
subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode()
except Exception as e:
logging.critical("无法运行ffmpeg")
exit(-1)
try:
ret = subprocess.run(
"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size=1920x1080:rate=30 -c:v libx264 -y -pix_fmt yuv420p compress_video_test.mp4",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if ret.returncode != 0:
logging.warning("无法生成测试视频.")
logging.debug(ret.stdout)
logging.debug(ret.stderr)
ret.check_returncode()
cmd = get_cmd("compress_video_test.mp4","compressed_video_test.mp4",)
ret = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if ret.returncode != 0:
logging.error("测试视频压缩失败")
logging.debug(ret.stdout)
logging.debug(ret.stderr)
logging.error("Error termination via test failed.")
exit(-1)
os.remove("compress_video_test.mp4")
os.remove("compressed_video_test.mp4")
except Exception as e:
if os.path.exists("compress_video_test.mp4"):
os.remove("compress_video_test.mp4")
logging.warning("测试未通过,继续运行可能出现未定义行为。")
logging.debug("Test error",exc_info=e)
def init_train():
global esti
if CFG["train"]:
train_init()
else:
if ESTI_FILE.exists():
try:
# 从文件读取系数
coeffs_str = ESTI_FILE.read_text().strip().split(',')
esti = [float(coeff) for coeff in coeffs_str]
except Exception as e:
logging.warning(f"预测输出文件{str(ESTI_FILE)}存在但无法读取", exc_info=e)
def main(_root = None):
global root, esti
setup_logging()
tot_bgn = time()
logging.info("-------------------------------")
@ -326,23 +371,20 @@ if __name__ == "__main__":
logging.warning("Invalid config file, ignored.")
logging.debug(e)
# 通过命令行参数传入需要遍历的目录
if len(sys.argv) < 2:
print(f"用法python {__file__} <目标目录>")
logging.warning("Error termination via invalid input.")
sys.exit(1)
root = Path(sys.argv[1])
if CFG["train"]:
train_init()
if _root is not None:
root = Path(_root)
else:
if ESTI_FILE.exists():
try:
# 从文件读取系数
coeffs_str = ESTI_FILE.read_text().strip().split(',')
esti = [float(coeff) for coeff in coeffs_str]
except Exception as e:
logging.warning(f"预测输出文件{str(ESTI_FILE)}存在但无法读取", exc_info=e)
# 通过命令行参数传入需要遍历的目录
if len(sys.argv) < 2:
print(f"用法python {__file__} <目标目录>")
logging.warning("Error termination via invalid input.")
sys.exit(1)
root = Path(sys.argv[1])
logging.info("开始验证环境")
test()
init_train()
if not root.is_dir():
print("提供的路径不是一个有效目录。")
@ -359,4 +401,5 @@ if __name__ == "__main__":
except Exception as e:
logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e)
if __name__ == "__main__":
main()