diff --git a/VideoCompress/config.py b/VideoCompress/config.py new file mode 100644 index 0000000..aab88b9 --- /dev/null +++ b/VideoCompress/config.py @@ -0,0 +1,286 @@ +import json +import os +import sys +import tkinter as tk +from tkinter import ttk, messagebox, filedialog +import main as main_program + +CONFIG_NAME = "config.json" + +DEFAULT_CONFIG = { + "crf": 18, + "codec": "h264", # could be h264, h264_qsv, h264_nvenc … etc. + "ffmpeg": "ffmpeg", + "video_ext": [".mp4", ".mkv"], + "extra": [], + "manual": None, + "train": False, +} + +HW_SUFFIXES = ["amf", "qsv", "nvenc"] +CODECS_BASE = ["h264", "hevc"] +preset_options = { + "不使用":["","ultrafast","superfast","veryfast","faster","fast","medium","slow","slower","veryslow",], + "AMD": ["","speed","balanced","quality",], + "Intel": ["","veryfast","faster","fast","medium","slow",], + "NVIDIA": ["","default","slow","medium","fast","hp","hq",] +} + + +def config_path() -> str: + """Return path of config file next to the executable / script.""" + if getattr(sys, "frozen", False): # PyInstaller executable + base = os.path.dirname(sys.executable) + else: + base = os.path.dirname(os.path.abspath(__file__)) + return os.path.join(base, CONFIG_NAME) + + +def load_config() -> dict: + try: + with open(config_path(), "r", encoding="utf-8") as fp: + data = json.load(fp) + if not isinstance(data, dict): + raise ValueError("config.json root must be an object") + return {**DEFAULT_CONFIG, **data} + except FileNotFoundError: + return DEFAULT_CONFIG.copy() + except Exception as exc: + messagebox.showwarning("FFmpeg Config", f"Invalid config.json – using defaults.\n{exc}") + return DEFAULT_CONFIG.copy() + + +def save_config(cfg: dict): + try: + with open(config_path(), "w", encoding="utf-8") as fp: + json.dump(cfg, fp, ensure_ascii=False, indent=4) + except Exception as exc: + messagebox.showerror("配置中心", f"保存失败:\n{exc}") + else: + messagebox.showinfo("配置中心", "保存成功。") + + +class ConfigApp(tk.Tk): + def __init__(self): + super().__init__() + # 设置现代化主题和统一内边距 + style = ttk.Style(self) + style.theme_use('clam') + self.title("配置中心") + self.resizable(False, False) + self.cfg = load_config() + + if "-preset" in self.cfg["extra"]: + idx = self.cfg["extra"].index("-preset") + self.preset = self.cfg["extra"][idx+1] + self.cfg["extra"].pop(idx) + self.cfg["extra"].pop(idx) + else: + self.preset = "" + + self._build_ui() + # ── helper -------------------------------------------------------------- + def _grid_label(self, row: int, text: str): + tk.Label(self, text=text, anchor="w").grid(row=row, column=0, sticky="w", pady=2, padx=4) + + def _str_var(self, key: str): + var = tk.StringVar(value=str(self.cfg.get(key, ""))) + var.trace_add("write", lambda *_: self.cfg.__setitem__(key, var.get())) + return var + + def _bool_var(self, key: str): + var = tk.BooleanVar(value=bool(self.cfg.get(key))) + var.trace_add("write", lambda *_: self.cfg.__setitem__(key, var.get())) + return var + + def _list_var_entry(self, key: str, width: int = 28): + """Comma‑separated list entry bound to config[key].""" + var = tk.StringVar(value=",".join(self.cfg.get(key, []))) + + def _update(*_): + self.cfg[key] = [s.strip() for s in var.get().split(",") if s.strip()] + + var.trace_add("write", _update) + ent = tk.Entry(self, textvariable=var, width=width) + return ent + + # ── UI ------------------------------------------------------------------ + def _build_ui(self): + row = 0 + padx_val = 6 + pady_val = 4 + # 编解码器 + self._grid_label(row, "编解码器 (h264 / hevc)") + codec_base = tk.StringVar() + codec_base.set(next((c for c in CODECS_BASE if self.cfg["codec"].startswith(c)), "h264")) + codec_menu = ttk.Combobox(self, textvariable=codec_base, values=CODECS_BASE, state="readonly", width=10) + codec_menu.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val) + row += 1 + + # 显卡品牌(硬件加速) + self._grid_label(row, "GPU加速") + accel = tk.StringVar() + brand_vals = ["不使用", "NVIDIA", "AMD", "Intel"] + def get_brand(): + codec = self.cfg["codec"] + if codec.endswith("_nvenc"): + return "NVIDIA" + elif codec.endswith("_amf"): + return "AMD" + elif codec.endswith("_qsv"): + return "Intel" + return "不使用" + accel.set(get_brand()) + accel_menu = ttk.Combobox(self, textvariable=accel, values=brand_vals, state="readonly", width=10) + accel_menu.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val) + row += 1 + + # CRF或码率选择 + mode = tk.StringVar(value="crf") + if "bitrate" in self.cfg: + mode.set("bitrate") + def _switch_mode(): + if mode.get() == "crf": + bitrate_ent.configure(state="disabled") + crf_ent.configure(state="normal") + else: + crf_ent.configure(state="disabled") + bitrate_ent.configure(state="normal") + tk.Radiobutton(self, text="使用 CRF", variable=mode, value="crf", command=_switch_mode).grid(row=row, column=0, sticky="w", padx=padx_val, pady=pady_val) + tk.Radiobutton(self, text="使用码率", variable=mode, value="bitrate", command=_switch_mode).grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val) + row += 1 + + # CRF输入框,并在标签中解释参数含义 + self._grid_label(row, "CRF 值(质量常数,数值越低质量越高)") + crf_var = self._str_var("crf") + crf_ent = tk.Entry(self, textvariable=crf_var, width=6) + crf_ent.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val) + row += 1 + + # 码率输入框 (kbit/s) + self._grid_label(row, "码率 (例如6M, 200k)") + bitrate_var = tk.StringVar(value=str(self.cfg.get("bitrate", ""))) + def _update_bitrate(*_): + if mode.get() == "bitrate": + try: + self.cfg["bitrate"] = bitrate_var.get().strip() + except ValueError: + self.cfg.pop("bitrate", None) + bitrate_var.trace_add("write", _update_bitrate) + bitrate_ent = tk.Entry(self, textvariable=bitrate_var, width=8) + bitrate_ent.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val) + row += 1 + + # 预设选项 + self._grid_label(row, "预设 (决定压缩的速度和质量)") + preset_var = tk.StringVar(value=self.preset) + + def _update_preset_option(*_): + preset_menu['values'] = preset_options[accel.get()] + preset_menu.set("") + def _update_preset(*_): + print(preset_var.get()) + if self.cfg["extra"].count("-preset")>0: + idx = self.cfg["extra"].index("-preset") + self.cfg["extra"].pop(idx) + self.cfg["extra"].pop(idx) + if preset_var.get(): + self.cfg["extra"].extend(["-preset", preset_var.get()]) + preset_var.trace_add("write", _update_preset) + accel.trace_add("write",_update_preset_option) + preset_menu = ttk.Combobox(self, textvariable=preset_var, values=preset_options[get_brand()], state="readonly", width=10) + preset_menu.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val) + row += 1 + + # ffmpeg路径 + self._grid_label(row, "ffmpeg 可执行文件") + ffmpeg_var = self._str_var("ffmpeg") + ffmpeg_ent = tk.Entry(self, textvariable=ffmpeg_var, width=28) + ffmpeg_ent.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val) + ttk.Button(self, text="浏览", command=lambda: self._pick_ffmpeg(ffmpeg_var)).grid(row=row, column=2, padx=2, pady=pady_val) + row += 1 + + # 视频扩展名列表 + self._grid_label(row, "视频扩展名 (.x,.y)") + self._list_var_entry("video_ext").grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val) + row += 1 + + # 额外参数列表 + self._grid_label(row, "额外参数列表") + extra_entry = self._list_var_entry("extra") + extra_entry.grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val) + row += 1 + + # 手动参数列表 + self._grid_label(row, "手动参数列表") + manual_var = tk.StringVar(value="" if self.cfg.get("manual") is None else " ".join(self.cfg["manual"])) + def _update_manual(*_): + txt = manual_var.get().strip() + self.cfg["manual"] = None if not txt else txt.split() + manual_var.trace_add("write", _update_manual) + tk.Entry(self, textvariable=manual_var, width=28).grid(row=row, column=1, sticky="w", padx=padx_val, pady=pady_val) + row += 1 + + + + # 训练模式复选框 + train_var = self._bool_var("train") + tk.Checkbutton(self, text="启用训练(实验性)", variable=train_var).grid(row=row, column=0, columnspan=2, sticky="w", padx=padx_val, pady=pady_val) + row += 1 + + + # 按钮 + ttk.Button(self, text="保存", command=lambda: self._on_save(codec_base, accel, mode)).grid(row=row, column=0, pady=8, padx=padx_val) + ttk.Button(self, text="退出", command=self.destroy).grid(row=row, column=1, pady=8) + _switch_mode() # 初始启用/禁用 + + # ── callbacks ----------------------------------------------------------- + def _on_save(self, codec_base_var, accel_var, mode_var): + # 重构codec字符串,同时处理显卡品牌映射 + base = codec_base_var.get() + brand = accel_var.get() + brand_map = {"NVIDIA": "nvenc", "AMD": "amf", "Intel": "qsv", "不使用": ""} + if brand != "不使用": + self.cfg["codec"] = f"{base}_{brand_map[brand]}" + else: + self.cfg["codec"] = base + # 处理码率和crf的配置 + if mode_var.get() == "crf": + self.cfg.pop("bitrate", None) + else: + br = self.cfg.get("bitrate", "") + if not (br.endswith("M") or br.endswith("k")): + messagebox.showwarning("警告", "码率参数可能配置错误。例如:3M, 500k") + tmp = self.cfg["extra"] + idx = 0 + if len(tmp) == 0:idx = -1 + else: + while True: + if tmp[idx] == "-preset":break + elif idx+2==len(tmp): + idx = -1 + break + else: idx+=1 + if idx!=-1: + preset = self.cfg["extra"][idx+1] + if preset not in preset_options[brand]: + messagebox.showwarning("警告", "预设(preset)参数可能配置错误!") + + save_config(self.cfg) + + def _pick_ffmpeg(self, var): + path = filedialog.askopenfilename(title="Select ffmpeg executable") + if path: + var.set(path) + + +def main(): + if len(sys.argv) > 1: + main_program.main() + else: + app = ConfigApp() + app.mainloop() + + +if __name__ == "__main__": + main() diff --git a/VideoCompress/main.py b/VideoCompress/main.py index 421128d..e5da80f 100644 --- a/VideoCompress/main.py +++ b/VideoCompress/main.py @@ -1,6 +1,7 @@ import subprocess from pathlib import Path import sys +import os import logging from datetime import datetime from time import time @@ -15,6 +16,7 @@ ESTI_FILE = Path("esti.out") CFG_FILE = Path("config.json") CFG = { "crf":"18", + "bitrate": None, "codec": "h264", "extra": [], "ffmpeg": "ffmpeg", @@ -34,18 +36,31 @@ def get_cmd(video_path,output_file): ] command.extend(CFG["manual"]) command.append(output_file) + return command + + if CFG["bitrate"] is not None: + command = [ + CFG["ffmpeg"], + "-hide_banner", + "-i", video_path, + "-vf", "scale=-1:1080", + "-c:v", CFG["codec"], + "-b:v", CFG["bitrate"], + "-r","30", + "-y", + ] + else: + command = [ + CFG["ffmpeg"], + "-hide_banner", + "-i", video_path, + "-vf", "scale=-1:1080", + "-c:v", CFG["codec"], + "-global_quality", str(CFG["crf"]), + "-r","30", + "-y", + ] - command = [ - CFG["ffmpeg"], - "-hide_banner", # 隐藏 ffmpeg 的横幅信息 - "-i", video_path, - "-vf", "scale=-1:1080", # 设置视频高度为 1080,宽度按比例自动计算 - "-c:v", CFG["codec"], # 使用 Intel Quick Sync Video 编码 - "-global_quality", CFG["crf"], # 设置全局质量(数值越低质量越高) - "-r","30", - "-preset", "slow", # 设置压缩速度为慢(压缩效果较好) - "-y", - ] command.extend(CFG["extra"]) command.append(output_file) return command @@ -70,11 +85,11 @@ def setup_logging(): log_dir.mkdir(exist_ok=True) log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log" stream = RichHandler(rich_tracebacks=True,tracebacks_show_locals=True) - stream.setLevel(logging.DEBUG) + stream.setLevel(logging.INFO) stream.setFormatter(logging.Formatter("%(message)s")) file = logging.FileHandler(log_file, encoding='utf-8') - file.setLevel(logging.INFO) + file.setLevel(logging.DEBUG) logging.basicConfig( level=logging.DEBUG, @@ -158,7 +173,8 @@ def save_esti(): # 保存为逗号分隔的文本格式 ESTI_FILE.write_text(','.join(map(str, coeffs))) except Exception as e: - logging.warning("保存估算数据失败", exc_info=e) + logging.warning("保存估算数据失败") + logging.debug("error at save_esti",exc_info=e) def fmt_time(t:int) -> str: if t>3600: @@ -193,7 +209,8 @@ def func(sz:int,src=False): return fmt_time(t) except KeyboardInterrupt as e:raise e except Exception as e: - logging.warning("无法计算预计时间",exc_info=e) + logging.warning("无法计算预计时间") + logging.debug("esti time exception", exc_info=e) return -1 if src else "NaN" def process_video(video_path: Path): @@ -202,9 +219,9 @@ def process_video(video_path: Path): sz=video_path.stat().st_size//(1024*1024) if esti is not None or TRAIN: use = func(sz,True) - logging.debug(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M,预计{fmt_time(use)}") + logging.info(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M,预计{fmt_time(use)}") else: - logging.debug(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M") + logging.info(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M") bgn=time() @@ -218,26 +235,9 @@ def process_video(video_path: Path): logging.warning(f"文件{output_file}存在,跳过") return use - # 4x - # command = [ - # "ffmpeg.exe", # 可以修改为 ffmpeg 的完整路径,例如:C:/ffmpeg/bin/ffmpeg.exe - # "-hide_banner", # 隐藏 ffmpeg 的横幅信息 - # "-i", str(video_path.absolute()), - # "-filter:v", "setpts=0.25*PTS", # 设置视频高度为 1080,宽度按比例自动计算 - # "-filter:a", "atempo=4.0", - # "-c:v", "h264_qsv", # 使用 Intel Quick Sync Video 编码 - # "-global_quality", "28", # 设置全局质量(数值越低质量越高) - # "-r","30", - # "-preset", "fast", # 设置压缩速度为慢(压缩效果较好) - # "-y", - # str(output_file.absolute()) - # ] - - # 1x command = get_cmd(str(video_path.absolute()),output_file) try: - # 调用 ffmpeg,并捕获标准输出和错误信息 result = subprocess.run( command, stdout=subprocess.PIPE, @@ -246,16 +246,13 @@ def process_video(video_path: Path): text=True ) - # 检查 ffmpeg 的错误输出 if result.stderr: - # 记录所有警告和错误信息 for line in result.stderr.splitlines(): if 'warning' in line.lower(): logging.warning(f"[FFmpeg]({video_path}): {line}") elif 'error' in line.lower(): logging.error(f"[FFmpeg]({video_path}): {line}") - # 检查 ffmpeg 执行的返回码 if result.returncode != 0: logging.error(f"处理文件 {video_path} 失败,返回码: {result.returncode},cmd={' '.join(command)}") logging.error(result.stdout) @@ -297,21 +294,69 @@ def traverse_directory(root_dir: Path): with Progress() as prog: task = prog.add_task("压缩视频",total=sm) - # prog.print("进度条右侧时间为不精确估算(当所有文件处理时间相同时估算精确)") - # 使用 rglob 递归遍历所有文件 for file in root_dir.rglob("*"): if file.parent.name == "compress":continue if file.is_file() and file.suffix.lower() in video_extensions: t = process_video(file) - # if esti is not None: - # sm-=t - # prog.update(task,advance=1,description=f"预计剩余{fmt_time(sm)}") if t is None: prog.advance(task) else: prog.advance(task,t) -if __name__ == "__main__": +def test(): + try: + subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode() + except Exception as e: + logging.critical("无法运行ffmpeg") + exit(-1) + try: + ret = subprocess.run( + "ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size=1920x1080:rate=30 -c:v libx264 -y -pix_fmt yuv420p compress_video_test.mp4", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + if ret.returncode != 0: + logging.warning("无法生成测试视频.") + logging.debug(ret.stdout) + logging.debug(ret.stderr) + ret.check_returncode() + cmd = get_cmd("compress_video_test.mp4","compressed_video_test.mp4",) + ret = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + if ret.returncode != 0: + logging.error("测试视频压缩失败") + logging.debug(ret.stdout) + logging.debug(ret.stderr) + logging.error("Error termination via test failed.") + exit(-1) + os.remove("compress_video_test.mp4") + os.remove("compressed_video_test.mp4") + except Exception as e: + if os.path.exists("compress_video_test.mp4"): + os.remove("compress_video_test.mp4") + logging.warning("测试未通过,继续运行可能出现未定义行为。") + logging.debug("Test error",exc_info=e) + +def init_train(): + global esti + if CFG["train"]: + train_init() + else: + if ESTI_FILE.exists(): + try: + # 从文件读取系数 + coeffs_str = ESTI_FILE.read_text().strip().split(',') + esti = [float(coeff) for coeff in coeffs_str] + except Exception as e: + logging.warning(f"预测输出文件{str(ESTI_FILE)}存在但无法读取", exc_info=e) + +def main(_root = None): + global root, esti setup_logging() tot_bgn = time() logging.info("-------------------------------") @@ -326,23 +371,20 @@ if __name__ == "__main__": logging.warning("Invalid config file, ignored.") logging.debug(e) - # 通过命令行参数传入需要遍历的目录 - if len(sys.argv) < 2: - print(f"用法:python {__file__} <目标目录>") - logging.warning("Error termination via invalid input.") - sys.exit(1) - - root = Path(sys.argv[1]) - if CFG["train"]: - train_init() + if _root is not None: + root = Path(_root) else: - if ESTI_FILE.exists(): - try: - # 从文件读取系数 - coeffs_str = ESTI_FILE.read_text().strip().split(',') - esti = [float(coeff) for coeff in coeffs_str] - except Exception as e: - logging.warning(f"预测输出文件{str(ESTI_FILE)}存在但无法读取", exc_info=e) + # 通过命令行参数传入需要遍历的目录 + if len(sys.argv) < 2: + print(f"用法:python {__file__} <目标目录>") + logging.warning("Error termination via invalid input.") + sys.exit(1) + root = Path(sys.argv[1]) + + logging.info("开始验证环境") + test() + + init_train() if not root.is_dir(): print("提供的路径不是一个有效目录。") @@ -359,4 +401,5 @@ if __name__ == "__main__": except Exception as e: logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e) - +if __name__ == "__main__": + main()