From 26b848ca8310bd08dc040b04f9636522612879ec Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Mon, 20 Oct 2025 20:05:28 +0800
Subject: [PATCH 01/14] fix VideoCompress
---
VideoCompress/main.py | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/VideoCompress/main.py b/VideoCompress/main.py
index a3b6e5f..b1dd935 100644
--- a/VideoCompress/main.py
+++ b/VideoCompress/main.py
@@ -89,6 +89,7 @@ def get_cmd(video_path,output_file):
command.extend(CFG["extra"])
command.append(output_file)
+ logging.debug(f"Create CMD: {command}")
return command
@@ -161,7 +162,8 @@ def cpu_monitor():
except KeyboardInterrupt as e:
raise e
except Exception as e:
- logging.error(f"CPU监控异常: {e}")
+ pass
+ #logging.error(f"CPU监控异常: {e}")
# 等待下一次监控
threading.Event().wait(CFG["cpu_monitor_interval"])
@@ -302,7 +304,7 @@ def traverse_directory(root_dir: Path):
que = list(root_dir.glob("*"))
while que:
d = que.pop()
- for file in d.glob("*"):
+ for file in d.glob("*") if d.is_dir() else [d]:
if file.parent.name == CFG["compress_dir_name"] or file.name == CFG["compress_dir_name"]:
continue
if file.is_file() and file.suffix.lower() in video_extensions:
@@ -324,8 +326,8 @@ def traverse_directory(root_dir: Path):
for file in video_files:
prog.advance(task)
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split()
- cmd.append(str(file))
- proc = subprocess.run(cmd, shell=True, capture_output=True, text=True)
+ cmd.append(str(file.resolve()))
+ proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
logging.debug(f"无法获取视频信息: {file}, 返回码: {proc.returncode}")
frames[file] = 0
From 07bc8a63dba85f1baedaeee8b574a8a6693c33cb Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Mon, 20 Oct 2025 20:27:42 +0800
Subject: [PATCH 02/14] remove multi in VideoComprss, and fix
---
VideoCompress/main.py | 322 +++++++++---------------------------------
1 file changed, 63 insertions(+), 259 deletions(-)
diff --git a/VideoCompress/main.py b/VideoCompress/main.py
index b1dd935..5e98062 100644
--- a/VideoCompress/main.py
+++ b/VideoCompress/main.py
@@ -7,12 +7,10 @@ from datetime import datetime
from time import time
from rich.logging import RichHandler
from rich.progress import Progress
+from pickle import dumps, loads
from typing import Optional
import atexit
import re
-import threading
-import queue
-import psutil
root = None
CFG_FILE = Path(sys.path[0])/"config.json"
@@ -32,18 +30,8 @@ CFG = {
"test_video_fps": "30",
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
- "max_concurrent_instances": 2,
- "cpu_monitor_interval": 3, # CPU监控间隔(秒)
- "cpu_monitor_duration": 30, # 统计持续时间(秒,5分钟)
}
-# CPU监控相关全局变量
-ffmpeg_processes = {} # 存储活动的ffmpeg进程
-cpu_stats = {"system": [], "ffmpeg": []} # CPU使用率统计
-cpu_monitor_thread = None
-cpu_monitor_lock = threading.Lock()
-current_instances = 0
-instance_lock = threading.Lock()
def get_cmd(video_path,output_file):
if CFG["manual"] is not None:
@@ -123,105 +111,10 @@ def fmt_time(t:float|int) -> str:
else:
return f"{round(t)}s"
-def cpu_monitor():
- """CPU监控线程函数"""
- global cpu_stats
-
- while True:
- try:
- # 获取系统CPU使用率
- system_cpu = psutil.cpu_percent(interval=1)
-
- # 获取所有ffmpeg进程的CPU使用率
- ffmpeg_cpu_total = 0
- active_processes = []
-
- with cpu_monitor_lock:
- for proc_info in ffmpeg_processes.values():
- try:
- proc = proc_info['process']
- if proc.is_running():
- # print(proc,proc.cpu_percent() / psutil.cpu_count())
- ffmpeg_cpu_total += proc.cpu_percent() / psutil.cpu_count()
- active_processes.append(proc_info)
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- continue
-
- # 更新统计数据
- with cpu_monitor_lock:
- cpu_stats["system"].append(system_cpu)
- cpu_stats["ffmpeg"].append(ffmpeg_cpu_total)
-
- # 保持最近5分钟的数据
- max_samples = CFG["cpu_monitor_duration"] // CFG["cpu_monitor_interval"]
- if len(cpu_stats["system"]) > max_samples:
- cpu_stats["system"] = cpu_stats["system"][-max_samples:]
- if len(cpu_stats["ffmpeg"]) > max_samples:
- cpu_stats["ffmpeg"] = cpu_stats["ffmpeg"][-max_samples:]
-
- except KeyboardInterrupt as e:
- raise e
- except Exception as e:
- pass
- #logging.error(f"CPU监控异常: {e}")
-
- # 等待下一次监控
- threading.Event().wait(CFG["cpu_monitor_interval"])
-
-def start_cpu_monitor():
- """启动CPU监控线程"""
- global cpu_monitor_thread
- if cpu_monitor_thread is None or not cpu_monitor_thread.is_alive():
- cpu_monitor_thread = threading.Thread(target=cpu_monitor, daemon=True)
- cpu_monitor_thread.start()
- logging.info("CPU监控线程已启动")
-
-def get_cpu_usage_stats():
- """获取CPU使用率统计"""
- with cpu_monitor_lock:
- if not cpu_stats["system"] or not cpu_stats["ffmpeg"]:
- return None, None
-
- system_avg = sum(cpu_stats["system"]) / len(cpu_stats["system"])
- ffmpeg_avg = sum(cpu_stats["ffmpeg"]) / len(cpu_stats["ffmpeg"])
-
- return system_avg, ffmpeg_avg
-
-def should_increase_instances():
- """判断是否应该增加实例数"""
- system_avg, ffmpeg_avg = get_cpu_usage_stats()
-
- if system_avg is None or ffmpeg_avg is None:
- return False
-
- # 条件: 系统CPU - FFmpeg CPU > FFmpeg CPU * 2 + 0.1
- available_cpu = 100 - system_avg
- threshold = ffmpeg_avg # 10% = 0.1 * 100
-
- logging.debug(f"CPU统计: 系统平均={system_avg:.1f}%, FFmpeg平均={ffmpeg_avg:.1f}%, 可用={available_cpu:.1f}%, 阈值={threshold:.1f}%")
-
- return available_cpu > threshold
-
-def register_ffmpeg_process(proc_id, process):
- """注册ffmpeg进程用于监控"""
- with cpu_monitor_lock:
- ffmpeg_processes[proc_id] = {
- 'process': psutil.Process(process.pid),
- 'start_time': time()
- }
-
-def unregister_ffmpeg_process(proc_id):
- """注销ffmpeg进程"""
- with cpu_monitor_lock:
- if proc_id in ffmpeg_processes:
- del ffmpeg_processes[proc_id]
-
-def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_func=None, proc_id=None):
- global current_instances
+def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_func=None):
use=None
sz=video_path.stat().st_size//(1024*1024)
- bgn=time()
if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在)
compress_dir = video_path.parent / CFG["compress_dir_name"]
@@ -241,11 +134,6 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun
command = get_cmd(video_path_str,output_file)
try:
- with instance_lock:
- current_instances += 1
-
- logging.debug(f"启动FFmpeg进程 {proc_id}: {video_path.name}")
-
result = subprocess.Popen(
command,
stdout=subprocess.PIPE,
@@ -254,19 +142,17 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun
text=True
)
- # 注册进程用于CPU监控
- if proc_id:
- register_ffmpeg_process(proc_id, result)
-
while result.poll() is None:
line = " "
while result.poll() is None and line[-1:] not in "\r\n":
line+=result.stderr.read(1)
+ # print(line[-1])
if 'warning' in line.lower():
- logging.warning(f"[FFmpeg {proc_id}]({video_path_str}): {line}")
+ logging.warning(f"[FFmpeg]({video_path_str}): {line}")
elif 'error' in line.lower():
- logging.error(f"[FFmpeg {proc_id}]({video_path_str}): {line}")
+ logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "frame=" in line:
+ # print(line,end="")
match = re.search(r"frame=\s*(\d+)",line)
if match:
frame_number = int(match.group(1))
@@ -274,29 +160,18 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun
update_func(frame_number)
if result.returncode != 0:
- logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(map(str,command))}")
- logging.error(result.stdout.read())
- logging.error(result.stderr.read())
+ logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(command)}")
+ logging.error(result.stdout)
+ logging.error(result.stderr)
else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
except KeyboardInterrupt as e:raise e
except Exception as e:
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)},cmd={' '.join(map(str,command))}",exc_info=e)
- finally:
- # 注销进程监控
- if proc_id:
- unregister_ffmpeg_process(proc_id)
-
- with instance_lock:
- current_instances -= 1
-
- logging.debug(f"FFmpeg进程 {proc_id} 已结束")
-
return use
def traverse_directory(root_dir: Path):
- global current_instances
video_extensions = set(CFG["video_ext"])
sm=None
# 获取视频文件列表和帧数信息
@@ -311,20 +186,30 @@ def traverse_directory(root_dir: Path):
video_files.append(file)
elif file.is_dir():
que.append(file)
-
-
- # exit()
-
- if not video_files:
- logging.warning("未找到需要处理的视频文件")
- return
-
+
+
+ if not video_files:
+ logging.warning("未找到需要处理的视频文件")
+ return
+
# 获取视频信息
+ frames: dict[Path, float] = {}
+ info_file = Path("video_info.cache")
+ if info_file.is_file():
+ try:
+ cached_data = loads(info_file.read_bytes())
+ if isinstance(cached_data, dict):
+ frames = cached_data
+ logging.debug("Loaded video info from cache.")
+ except Exception as e:
+ logging.debug("Failed to load video info cache.",exc_info=e)
+
with Progress() as prog:
task = prog.add_task("正在获取视频信息", total=len(video_files))
- frames: dict[Path, float] = {}
for file in video_files:
prog.advance(task)
+ if file in frames and frames[file]>0:
+ continue
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split()
cmd.append(str(file.resolve()))
proc = subprocess.run(cmd, capture_output=True, text=True)
@@ -345,11 +230,16 @@ def traverse_directory(root_dir: Path):
except (ValueError, IndexError) as e:
logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
frames[file] = 0
-
+ if 0 in frames.values():
+ logging.warning(f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。")
+ try:
+ info_file.write_bytes(dumps(frames))
+ logging.debug("Saved video info to cache.")
+ except Exception as e:
+ logging.debug("Failed to save video info cache.",exc_info=e)
+
logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件")
- # 启动CPU监控
- start_cpu_monitor()
# 创建进度条
with Progress() as prog:
@@ -357,131 +247,45 @@ def traverse_directory(root_dir: Path):
main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames))
# 创建文件队列
- file_queue = queue.Queue()
for file in frames.keys():
- file_queue.put(file)
-
- # 进度跟踪
- progress_trackers = {}
- completed_files = 0
- total_completed_frames = 0
-
- def create_progress_updater(file_path, task_id):
- def update_progress(frame_count):
- nonlocal total_completed_frames
- if file_path in progress_trackers:
- old_frames = progress_trackers[file_path]
- diff = frame_count - old_frames
- total_completed_frames += diff
- else:
- total_completed_frames += frame_count
- progress_trackers[file_path] = frame_count
+ # 进度跟踪
+ filename = file.relative_to(root_dir)
+
+ # 创建文件级进度条
+ if frames[file] == 0:
+ file_task = prog.add_task(f"{filename}")
+ else:
+ file_task = prog.add_task(f"{filename}",total=frames[file])
+
+
+ with prog._lock:
+ completed_start = prog._tasks[main_task].completed
- if frames[file_path] > 0:
- prog.update(task_id, completed=frame_count)
- else:
- prog.update(task_id, description=f"{file_path.relative_to(root_dir)} 已处理{frame_count}帧")
-
- # 更新总进度
- if total_frames > 0:
- prog.update(main_task, completed=total_completed_frames)
- return update_progress
-
- def process_file_worker():
- nonlocal completed_files
- while True:
- try:
- file = file_queue.get(timeout=1)
- except queue.Empty:
- break
-
- filename = file.relative_to(root_dir)
-
- # 创建文件级进度条
+ def update_progress(x):
if frames[file] == 0:
- file_task = prog.add_task(f"{filename}")
+ prog.update(file_task,description=f"{filename} 已处理{x}帧")
else:
- file_task = prog.add_task(f"{filename}", total=frames[file])
-
- progress_updater = create_progress_updater(file, file_task)
-
- # 处理视频
- proc_id = f"worker_{threading.current_thread().ident}_{completed_files}"
-
- if CFG["save_to"] == "single":
- process_video(file, root_dir/"Compress", progress_updater, proc_id)
- else:
- process_video(file, None, progress_updater, proc_id)
-
- # 更新完成计数
- with instance_lock:
- completed_files += 1
- if total_frames == 0: # 如果没有总帧数,按文件数计算
- prog.update(main_task, completed=completed_files)
-
- # 移除文件级进度条
- prog.remove_task(file_task)
- file_queue.task_done()
-
- # 动态管理线程数
- active_threads = []
- max_workers = CFG["max_concurrent_instances"]
-
- def manage_workers():
- nonlocal active_threads
+ prog.update(file_task,completed=x)
+ prog.update(main_task, completed=completed_start+x)
- while completed_files < len(frames) or any(t.is_alive() for t in active_threads):
- # 清理已完成的线程
- active_threads = [t for t in active_threads if t.is_alive()]
-
- # 检查是否需要增加实例
- current_worker_count = len(active_threads)
-
- if current_worker_count < max_workers and not file_queue.empty():
- # 检查CPU使用率(运行5分钟后开始检查)
- should_add_worker = False
- if len(cpu_stats["system"]) >= 10: # 至少有5分钟的数据
- if current_worker_count >= 1: # 已有实例运行
- should_add_worker = should_increase_instances()
- if should_add_worker:
- logging.info("CPU资源充足,启动第二个压缩实例")
- else:
- should_add_worker = False
-
- if should_add_worker:
- worker_thread = threading.Thread(target=process_file_worker, daemon=True)
- worker_thread.start()
- active_threads.append(worker_thread)
- logging.debug(f"启动新的工作线程,当前活动线程数: {len(active_threads)}")
-
- threading.Event().wait(5) # 每5秒检查一次
-
- # 等待所有线程完成
- for thread in active_threads:
- thread.join()
-
- # 启动第一个工作线程
- if not file_queue.empty():
- first_worker = threading.Thread(target=process_file_worker, daemon=True)
- first_worker.start()
- active_threads.append(first_worker)
- logging.info("启动第一个压缩实例")
-
- # 启动线程管理器
- manager_thread = threading.Thread(target=manage_workers, daemon=True)
- manager_thread.start()
-
- # 等待管理线程完成
- manager_thread.join()
+ if CFG["save_to"] == "single":
+ process_video(file, root_dir/"Compress", update_progress)
+ else:
+ process_video(file, None, update_progress)
+
+ # 移除文件级进度条
+ prog.remove_task(file_task)
- logging.info(f"所有视频处理完成,共处理了 {completed_files} 个文件")
+ try:
+ info_file.unlink(missing_ok=True)
+ except Exception as e:
+ logging.warning("无法删除视频信息缓存文件",exc_info=e)
def test():
os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
try:
subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode()
- except KeyboardInterrupt as e:raise e
except Exception as e:
print(__file__)
logging.critical("无法运行ffmpeg")
From d197a212deee7592e0afe9572b6de81db5c13e46 Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Mon, 20 Oct 2025 20:36:27 +0800
Subject: [PATCH 03/14] enhance
---
VideoCompress/main.py | 33 ++++++++++++++++++---------------
VideoCompress/requirements.txt | 1 +
2 files changed, 19 insertions(+), 15 deletions(-)
create mode 100644 VideoCompress/requirements.txt
diff --git a/VideoCompress/main.py b/VideoCompress/main.py
index 5e98062..9f59c1c 100644
--- a/VideoCompress/main.py
+++ b/VideoCompress/main.py
@@ -8,7 +8,7 @@ from time import time
from rich.logging import RichHandler
from rich.progress import Progress
from pickle import dumps, loads
-from typing import Optional
+from typing import Optional,Callable
import atexit
import re
@@ -111,14 +111,17 @@ def fmt_time(t:float|int) -> str:
else:
return f"{round(t)}s"
-def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_func=None):
- use=None
- sz=video_path.stat().st_size//(1024*1024)
+def process_video(
+ video_path: Path,
+ compress_dir:Optional[Path]=None ,
+ update_func:Optional[Callable[[Optional[int],Optional[str]],None]]=None):
+
if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在)
compress_dir = video_path.parent / CFG["compress_dir_name"]
else:
+ assert root
compress_dir /= video_path.parent.relative_to(root)
assert isinstance(compress_dir,Path)
@@ -128,7 +131,6 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_fun
output_file = compress_dir / (video_path.stem + video_path.suffix)
if output_file.is_file():
logging.warning(f"文件{output_file}存在,跳过")
- return use
video_path_str = str(video_path.absolute())
command = get_cmd(video_path_str,output_file)
@@ -145,19 +147,21 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_fun
while result.poll() is None:
line = " "
while result.poll() is None and line[-1:] not in "\r\n":
+ assert result.stderr is not None
line+=result.stderr.read(1)
# print(line[-1])
if 'warning' in line.lower():
logging.warning(f"[FFmpeg]({video_path_str}): {line}")
elif 'error' in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}")
- elif "frame=" in line:
+ elif "frame=" in line and update_func is not None:
# print(line,end="")
match = re.search(r"frame=\s*(\d+)",line)
- if match:
- frame_number = int(match.group(1))
- if update_func is not None:
- update_func(frame_number)
+ frame_number = int(match.group(1)) if match else None
+
+ match = re.search(r"[\d\.]+x",line)
+ rate = match.group(0) if match else None
+ update_func(frame_number,rate)
if result.returncode != 0:
logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(command)}")
@@ -169,7 +173,6 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_fun
except KeyboardInterrupt as e:raise e
except Exception as e:
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)},cmd={' '.join(map(str,command))}",exc_info=e)
- return use
def traverse_directory(root_dir: Path):
video_extensions = set(CFG["video_ext"])
@@ -260,12 +263,12 @@ def traverse_directory(root_dir: Path):
with prog._lock:
completed_start = prog._tasks[main_task].completed
-
- def update_progress(x):
+
+ def update_progress(x, rate):
if frames[file] == 0:
- prog.update(file_task,description=f"{filename} 已处理{x}帧")
+ prog.update(file_task,description=f"{filename} 已处理{x}帧 {f'速率{rate}' if rate else ''}")
else:
- prog.update(file_task,completed=x)
+ prog.update(file_task,completed=x,description=f"{filename} {f'速率{rate}' if rate else ''}")
prog.update(main_task, completed=completed_start+x)
if CFG["save_to"] == "single":
diff --git a/VideoCompress/requirements.txt b/VideoCompress/requirements.txt
new file mode 100644
index 0000000..c94be38
--- /dev/null
+++ b/VideoCompress/requirements.txt
@@ -0,0 +1 @@
+rich
\ No newline at end of file
From 7ba40c2e9b746e752faa476102c90d26f5cdf389 Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Mon, 20 Oct 2025 21:05:02 +0800
Subject: [PATCH 04/14] fix Videocompress logging bug
---
VideoCompress/main.py | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/VideoCompress/main.py b/VideoCompress/main.py
index 9f59c1c..466b2cf 100644
--- a/VideoCompress/main.py
+++ b/VideoCompress/main.py
@@ -33,7 +33,12 @@ CFG = {
}
-def get_cmd(video_path,output_file):
+def get_cmd(video_path:str|Path,output_file:str|Path) -> list[str]:
+ if isinstance(video_path, Path):
+ video_path = str(video_path.resolve())
+ if isinstance(output_file, Path):
+ output_file = str(output_file.resolve())
+
if CFG["manual"] is not None:
command=[
CFG["ffmpeg"],
@@ -131,6 +136,7 @@ def process_video(
output_file = compress_dir / (video_path.stem + video_path.suffix)
if output_file.is_file():
logging.warning(f"文件{output_file}存在,跳过")
+ return
video_path_str = str(video_path.absolute())
command = get_cmd(video_path_str,output_file)
@@ -235,6 +241,7 @@ def traverse_directory(root_dir: Path):
frames[file] = 0
if 0 in frames.values():
logging.warning(f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。")
+ prog.remove_task(task)
try:
info_file.write_bytes(dumps(frames))
logging.debug("Saved video info to cache.")
From c23a6c00eed7d0fbbe91bc93975fe823f661684c Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Mon, 20 Oct 2025 21:08:31 +0800
Subject: [PATCH 05/14] add config for android
---
VideoCompress/config_for_Android.json | 10 ++++++++++
1 file changed, 10 insertions(+)
create mode 100644 VideoCompress/config_for_Android.json
diff --git a/VideoCompress/config_for_Android.json b/VideoCompress/config_for_Android.json
new file mode 100644
index 0000000..e781ec4
--- /dev/null
+++ b/VideoCompress/config_for_Android.json
@@ -0,0 +1,10 @@
+{
+ "save_to": "single",
+ "bitrate": "3M",
+ "codec": "h264_mediacodec",
+ "ffmpeg": "ffmpeg",
+ "video_ext": [
+ ".mp4",
+ ".mkv"
+ ],
+}
\ No newline at end of file
From ac2071709a92573c84eeabe9eb3d7f6d09b81b17 Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Mon, 20 Oct 2025 21:11:10 +0800
Subject: [PATCH 06/14] fix bug
---
VideoCompress/main.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/VideoCompress/main.py b/VideoCompress/main.py
index 466b2cf..3ad2561 100644
--- a/VideoCompress/main.py
+++ b/VideoCompress/main.py
@@ -279,11 +279,12 @@ def traverse_directory(root_dir: Path):
prog.update(main_task, completed=completed_start+x)
if CFG["save_to"] == "single":
- process_video(file, root_dir/"Compress", update_progress)
+ process_video(file, root_dir/CFG["compress_dir_name"], update_progress)
else:
process_video(file, None, update_progress)
# 移除文件级进度条
+ prog.update(main_task, completed=completed_start+frames[file])
prog.remove_task(file_task)
try:
From 072a1980329bb89e86b0f6db4086400874f94a0c Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Mon, 20 Oct 2025 22:35:03 +0800
Subject: [PATCH 07/14] Enhance Videocompress
---
VideoCompress/config.json | 17 ++++--
VideoCompress/config_for_Android.json | 2 +
VideoCompress/main.py | 85 ++++++++++++++++++++++-----
3 files changed, 84 insertions(+), 20 deletions(-)
diff --git a/VideoCompress/config.json b/VideoCompress/config.json
index f3b0f8b..acdcb9a 100644
--- a/VideoCompress/config.json
+++ b/VideoCompress/config.json
@@ -1,12 +1,21 @@
{
"save_to": "single",
- "crf": 18,
- "codec": "h264",
+ "bitrate": "3M",
+ "codec": "h264_qsv",
+ "hwaccel": "qsv",
+ "extra": [],
"ffmpeg": "ffmpeg",
+ "manual": null,
"video_ext": [
".mp4",
".mkv"
],
- "extra": [],
- "train": false
+ "compress_dir_name": "compress_qsv",
+ "resolution": null,
+ "fps": "30",
+ "test_video_resolution": "1920x1080",
+ "test_video_fps": "30",
+ "test_video_input": "compress_video_test.mp4",
+ "test_video_output": "compressed_video_test.mp4",
+ "disable_hwaccel_when_fail": true
}
\ No newline at end of file
diff --git a/VideoCompress/config_for_Android.json b/VideoCompress/config_for_Android.json
index e781ec4..cb64d79 100644
--- a/VideoCompress/config_for_Android.json
+++ b/VideoCompress/config_for_Android.json
@@ -2,9 +2,11 @@
"save_to": "single",
"bitrate": "3M",
"codec": "h264_mediacodec",
+ "hwaccel": "mediacodec",
"ffmpeg": "ffmpeg",
"video_ext": [
".mp4",
".mkv"
],
+ "resolution": null
}
\ No newline at end of file
diff --git a/VideoCompress/main.py b/VideoCompress/main.py
index 3ad2561..08a49ed 100644
--- a/VideoCompress/main.py
+++ b/VideoCompress/main.py
@@ -19,17 +19,19 @@ CFG = {
"crf":"18",
"bitrate": None,
"codec": "h264",
+ "hwaccel": None,
"extra": [],
"ffmpeg": "ffmpeg",
"manual": None,
"video_ext": [".mp4", ".mkv"],
"compress_dir_name": "compress",
- "resolution": "-1:1080",
+ "resolution": None,
"fps": "30",
"test_video_resolution": "1920x1080",
"test_video_fps": "30",
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
+ "disable_hwaccel_when_fail": True,
}
@@ -49,12 +51,20 @@ def get_cmd(video_path:str|Path,output_file:str|Path) -> list[str]:
command.append(output_file)
return command
+ command = [
+ CFG["ffmpeg"],
+ "-hide_banner",
+ ]
+ if CFG["hwaccel"] is not None:
+ command.extend([
+ "-hwaccel", CFG["hwaccel"],
+ ])
+ command.extend([
+ "-i", video_path,
+ ])
+
if CFG["bitrate"] is not None:
- command = [
- CFG["ffmpeg"],
- "-hide_banner",
- "-i", video_path,
- ]
+
if CFG['resolution'] is not None:
command.extend([
"-vf", f"scale={CFG['resolution']}",])
@@ -65,11 +75,6 @@ def get_cmd(video_path:str|Path,output_file:str|Path) -> list[str]:
"-y",
])
else:
- command = [
- CFG["ffmpeg"],
- "-hide_banner",
- "-i", video_path,
- ]
if CFG['resolution'] is not None:
command.extend([
"-vf", f"scale={CFG['resolution']}",])
@@ -136,7 +141,7 @@ def process_video(
output_file = compress_dir / (video_path.stem + video_path.suffix)
if output_file.is_file():
logging.warning(f"文件{output_file}存在,跳过")
- return
+ return False
video_path_str = str(video_path.absolute())
command = get_cmd(video_path_str,output_file)
@@ -150,16 +155,20 @@ def process_video(
text=True
)
+ total = ""
while result.poll() is None:
line = " "
while result.poll() is None and line[-1:] not in "\r\n":
assert result.stderr is not None
line+=result.stderr.read(1)
+ total+=line[-1]
# print(line[-1])
if 'warning' in line.lower():
logging.warning(f"[FFmpeg]({video_path_str}): {line}")
elif 'error' in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}")
+ elif "assertion" in line.lower():
+ logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "frame=" in line and update_func is not None:
# print(line,end="")
match = re.search(r"frame=\s*(\d+)",line)
@@ -171,14 +180,48 @@ def process_video(
if result.returncode != 0:
logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(command)}")
- logging.error(result.stdout)
- logging.error(result.stderr)
+ output_file.unlink(missing_ok=True)
+ assert result.stdout is not None
+ logging.error(result.stdout.read())
+ logging.error(total)
+ if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in ["h264_mediacodec","hevc_mediacodec"]:
+ logging.info("mediacodec硬件加速器已知在较短片段上存在异常,将禁用加速重试。")
+ output_file.unlink(missing_ok=True)
+ bak = CFG.copy()
+ CFG["hwaccel"] = None
+ CFG["codec"] = "h264" if CFG["codec"]=="h264_mediacodec" else "hevc"
+ assert not output_file.exists()
+ ret = process_video(video_path,compress_dir,update_func)
+ CFG.update(bak)
+ if not ret:
+ logging.error("重试仍然失败。")
+ return False
+ else:
+ return True
+ elif CFG["disable_hwaccel_when_fail"]:
+ logging.info("正在禁用硬件加速器重试,进度条可能发生混乱。")
+ output_file.unlink(missing_ok=True)
+ bak = CFG.copy()
+ CFG["hwaccel"] = None
+ if CFG['codec'].endswith("_mediacodec") or \
+ CFG['codec'].endswith("_qsv") or \
+ CFG['codec'].endswith("_nvenc") or\
+ CFG['codec'].endswith("_amf"):
+ CFG["codec"] = CFG["codec"].split("_")[0]
+ assert not output_file.exists()
+ ret = process_video(video_path,compress_dir,update_func)
+ CFG.update(bak)
+ if not ret:
+ logging.error("重试仍然失败。")
+ return False
else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
except KeyboardInterrupt as e:raise e
except Exception as e:
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)},cmd={' '.join(map(str,command))}",exc_info=e)
+ return False
+ return True
def traverse_directory(root_dir: Path):
video_extensions = set(CFG["video_ext"])
@@ -203,13 +246,15 @@ def traverse_directory(root_dir: Path):
# 获取视频信息
frames: dict[Path, float] = {}
+ cached_data: dict[Path, float] = {}
info_file = Path("video_info.cache")
if info_file.is_file():
try:
cached_data = loads(info_file.read_bytes())
if isinstance(cached_data, dict):
- frames = cached_data
logging.debug("Loaded video info from cache.")
+ else:
+ cached_data = {}
except Exception as e:
logging.debug("Failed to load video info cache.",exc_info=e)
@@ -217,7 +262,8 @@ def traverse_directory(root_dir: Path):
task = prog.add_task("正在获取视频信息", total=len(video_files))
for file in video_files:
prog.advance(task)
- if file in frames and frames[file]>0:
+ if file in cached_data and cached_data[file]>0:
+ frames[file] = cached_data[file]
continue
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split()
cmd.append(str(file.resolve()))
@@ -361,6 +407,13 @@ def main(_root = None):
except Exception as e:
logging.warning("Invalid config file, ignored.")
logging.debug(e)
+ else:
+ try:
+ import json
+ CFG_FILE.write_text(json.dumps(CFG,indent=4))
+ logging.info("Config file created.")
+ except Exception as e:
+ logging.warning("Failed to create config file.",exc_info=e)
if _root is not None:
root = Path(_root)
From 983ad0c8b6ef77484a8ae779b3ffcb5e272eef57 Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Mon, 20 Oct 2025 22:35:24 +0800
Subject: [PATCH 08/14] format
---
VideoCompress/main.py | 357 +++++++++++++++++++++++++-----------------
1 file changed, 210 insertions(+), 147 deletions(-)
diff --git a/VideoCompress/main.py b/VideoCompress/main.py
index 08a49ed..1988129 100644
--- a/VideoCompress/main.py
+++ b/VideoCompress/main.py
@@ -8,15 +8,15 @@ from time import time
from rich.logging import RichHandler
from rich.progress import Progress
from pickle import dumps, loads
-from typing import Optional,Callable
+from typing import Optional, Callable
import atexit
import re
root = None
-CFG_FILE = Path(sys.path[0])/"config.json"
+CFG_FILE = Path(sys.path[0]) / "config.json"
CFG = {
"save_to": "single",
- "crf":"18",
+ "crf": "18",
"bitrate": None,
"codec": "h264",
"hwaccel": None,
@@ -35,163 +35,188 @@ CFG = {
}
-def get_cmd(video_path:str|Path,output_file:str|Path) -> list[str]:
+def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
if isinstance(video_path, Path):
video_path = str(video_path.resolve())
if isinstance(output_file, Path):
output_file = str(output_file.resolve())
-
+
if CFG["manual"] is not None:
- command=[
- CFG["ffmpeg"],
- "-hide_banner",
- "-i", video_path
- ]
+ command = [CFG["ffmpeg"], "-hide_banner", "-i", video_path]
command.extend(CFG["manual"])
command.append(output_file)
return command
-
+
command = [
- CFG["ffmpeg"],
+ CFG["ffmpeg"],
"-hide_banner",
]
if CFG["hwaccel"] is not None:
- command.extend([
- "-hwaccel", CFG["hwaccel"],
- ])
- command.extend([
- "-i", video_path,
- ])
+ command.extend(
+ [
+ "-hwaccel",
+ CFG["hwaccel"],
+ ]
+ )
+ command.extend(
+ [
+ "-i",
+ video_path,
+ ]
+ )
if CFG["bitrate"] is not None:
-
- if CFG['resolution'] is not None:
- command.extend([
- "-vf", f"scale={CFG['resolution']}",])
- command.extend([
- "-c:v", CFG["codec"],
- "-b:v", CFG["bitrate"],
- "-r",CFG["fps"],
- "-y",
- ])
+
+ if CFG["resolution"] is not None:
+ command.extend(
+ [
+ "-vf",
+ f"scale={CFG['resolution']}",
+ ]
+ )
+ command.extend(
+ [
+ "-c:v",
+ CFG["codec"],
+ "-b:v",
+ CFG["bitrate"],
+ "-r",
+ CFG["fps"],
+ "-y",
+ ]
+ )
else:
- if CFG['resolution'] is not None:
- command.extend([
- "-vf", f"scale={CFG['resolution']}",])
- command.extend([
- "-c:v", CFG["codec"],
- "-global_quality", str(CFG["crf"]),
- "-r",CFG["fps"],
- "-y",
- ])
-
+ if CFG["resolution"] is not None:
+ command.extend(
+ [
+ "-vf",
+ f"scale={CFG['resolution']}",
+ ]
+ )
+ command.extend(
+ [
+ "-c:v",
+ CFG["codec"],
+ "-global_quality",
+ str(CFG["crf"]),
+ "-r",
+ CFG["fps"],
+ "-y",
+ ]
+ )
+
command.extend(CFG["extra"])
command.append(output_file)
logging.debug(f"Create CMD: {command}")
return command
-
# 配置logging
def setup_logging():
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log"
- stream = RichHandler(rich_tracebacks=True,tracebacks_show_locals=True)
+ stream = RichHandler(rich_tracebacks=True, tracebacks_show_locals=True)
stream.setLevel(logging.INFO)
stream.setFormatter(logging.Formatter("%(message)s"))
-
- file = logging.FileHandler(log_file, encoding='utf-8')
+
+ file = logging.FileHandler(log_file, encoding="utf-8")
file.setLevel(logging.DEBUG)
-
+
logging.basicConfig(
level=logging.DEBUG,
- format='%(asctime)s - %(levelname) 7s - %(message)s',
- handlers=[
- file,
- stream
- ]
+ format="%(asctime)s - %(levelname) 7s - %(message)s",
+ handlers=[file, stream],
)
-def fmt_time(t:float|int) -> str:
- if t>3600:
+
+def fmt_time(t: float | int) -> str:
+ if t > 3600:
return f"{t//3600}h {t//60}min {t%60}s"
- elif t>60:
+ elif t > 60:
return f"{t//60}min {t%60}s"
else:
return f"{round(t)}s"
+
def process_video(
- video_path: Path,
- compress_dir:Optional[Path]=None ,
- update_func:Optional[Callable[[Optional[int],Optional[str]],None]]=None):
-
-
+ video_path: Path,
+ compress_dir: Optional[Path] = None,
+ update_func: Optional[Callable[[Optional[int], Optional[str]], None]] = None,
+):
+
if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在)
compress_dir = video_path.parent / CFG["compress_dir_name"]
else:
assert root
compress_dir /= video_path.parent.relative_to(root)
-
- assert isinstance(compress_dir,Path)
- compress_dir.mkdir(exist_ok=True,parents=True)
-
+
+ assert isinstance(compress_dir, Path)
+ compress_dir.mkdir(exist_ok=True, parents=True)
+
# 输出文件路径:与原文件同名,保存在 compress 目录下
output_file = compress_dir / (video_path.stem + video_path.suffix)
if output_file.is_file():
logging.warning(f"文件{output_file}存在,跳过")
return False
-
+
video_path_str = str(video_path.absolute())
- command = get_cmd(video_path_str,output_file)
-
+ command = get_cmd(video_path_str, output_file)
+
try:
result = subprocess.Popen(
- command,
- stdout=subprocess.PIPE,
+ command,
+ stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- encoding="utf-8",
- text=True
+ encoding="utf-8",
+ text=True,
)
-
+
total = ""
while result.poll() is None:
line = " "
while result.poll() is None and line[-1:] not in "\r\n":
assert result.stderr is not None
- line+=result.stderr.read(1)
- total+=line[-1]
+ line += result.stderr.read(1)
+ total += line[-1]
# print(line[-1])
- if 'warning' in line.lower():
+ if "warning" in line.lower():
logging.warning(f"[FFmpeg]({video_path_str}): {line}")
- elif 'error' in line.lower():
+ elif "error" in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "assertion" in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "frame=" in line and update_func is not None:
# print(line,end="")
- match = re.search(r"frame=\s*(\d+)",line)
+ match = re.search(r"frame=\s*(\d+)", line)
frame_number = int(match.group(1)) if match else None
-
- match = re.search(r"[\d\.]+x",line)
- rate = match.group(0) if match else None
- update_func(frame_number,rate)
+
+ match = re.search(r"[\d\.]+x", line)
+ rate = match.group(0) if match else None
+ update_func(frame_number, rate)
if result.returncode != 0:
- logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(command)}")
+ logging.error(
+ f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(command)}"
+ )
output_file.unlink(missing_ok=True)
assert result.stdout is not None
logging.error(result.stdout.read())
logging.error(total)
- if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in ["h264_mediacodec","hevc_mediacodec"]:
- logging.info("mediacodec硬件加速器已知在较短片段上存在异常,将禁用加速重试。")
+ if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in [
+ "h264_mediacodec",
+ "hevc_mediacodec",
+ ]:
+ logging.info(
+ "mediacodec硬件加速器已知在较短片段上存在异常,将禁用加速重试。"
+ )
output_file.unlink(missing_ok=True)
bak = CFG.copy()
CFG["hwaccel"] = None
- CFG["codec"] = "h264" if CFG["codec"]=="h264_mediacodec" else "hevc"
+ CFG["codec"] = "h264" if CFG["codec"] == "h264_mediacodec" else "hevc"
assert not output_file.exists()
- ret = process_video(video_path,compress_dir,update_func)
+ ret = process_video(video_path, compress_dir, update_func)
CFG.update(bak)
if not ret:
logging.error("重试仍然失败。")
@@ -203,43 +228,52 @@ def process_video(
output_file.unlink(missing_ok=True)
bak = CFG.copy()
CFG["hwaccel"] = None
- if CFG['codec'].endswith("_mediacodec") or \
- CFG['codec'].endswith("_qsv") or \
- CFG['codec'].endswith("_nvenc") or\
- CFG['codec'].endswith("_amf"):
+ if (
+ CFG["codec"].endswith("_mediacodec")
+ or CFG["codec"].endswith("_qsv")
+ or CFG["codec"].endswith("_nvenc")
+ or CFG["codec"].endswith("_amf")
+ ):
CFG["codec"] = CFG["codec"].split("_")[0]
assert not output_file.exists()
- ret = process_video(video_path,compress_dir,update_func)
- CFG.update(bak)
+ ret = process_video(video_path, compress_dir, update_func)
+ CFG.update(bak)
if not ret:
logging.error("重试仍然失败。")
return False
else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
-
- except KeyboardInterrupt as e:raise e
+
+ except KeyboardInterrupt as e:
+ raise e
except Exception as e:
- logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)},cmd={' '.join(map(str,command))}",exc_info=e)
+ logging.error(
+ f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)},cmd={' '.join(map(str,command))}",
+ exc_info=e,
+ )
return False
return True
+
def traverse_directory(root_dir: Path):
video_extensions = set(CFG["video_ext"])
- sm=None
+ sm = None
# 获取视频文件列表和帧数信息
video_files = []
que = list(root_dir.glob("*"))
while que:
d = que.pop()
for file in d.glob("*") if d.is_dir() else [d]:
- if file.parent.name == CFG["compress_dir_name"] or file.name == CFG["compress_dir_name"]:
+ if (
+ file.parent.name == CFG["compress_dir_name"]
+ or file.name == CFG["compress_dir_name"]
+ ):
continue
if file.is_file() and file.suffix.lower() in video_extensions:
video_files.append(file)
elif file.is_dir():
que.append(file)
-
if not video_files:
logging.warning("未找到需要处理的视频文件")
return
@@ -256,16 +290,16 @@ def traverse_directory(root_dir: Path):
else:
cached_data = {}
except Exception as e:
- logging.debug("Failed to load video info cache.",exc_info=e)
-
+ logging.debug("Failed to load video info cache.", exc_info=e)
+
with Progress() as prog:
task = prog.add_task("正在获取视频信息", total=len(video_files))
for file in video_files:
prog.advance(task)
- if file in cached_data and cached_data[file]>0:
+ if file in cached_data and cached_data[file] > 0:
frames[file] = cached_data[file]
continue
- cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split()
+ cmd = f"ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1".split()
cmd.append(str(file.resolve()))
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
@@ -274,75 +308,93 @@ def traverse_directory(root_dir: Path):
continue
if proc.stdout.strip():
try:
- avg_frame_rate, duration = proc.stdout.strip().split('\n')
- tmp = avg_frame_rate.split('/')
+ avg_frame_rate, duration = proc.stdout.strip().split("\n")
+ tmp = avg_frame_rate.split("/")
avg_frame_rate = float(tmp[0]) / float(tmp[1])
if duration == "N/A":
duration = 0
- logging.debug(f"无法获取视频信息: {file}, 时长为N/A,默认使用0s")
+ logging.debug(
+ f"无法获取视频信息: {file}, 时长为N/A,默认使用0s"
+ )
duration = float(duration)
frames[file] = duration * avg_frame_rate
except (ValueError, IndexError) as e:
logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
frames[file] = 0
if 0 in frames.values():
- logging.warning(f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。")
+ logging.warning(
+ f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。"
+ )
prog.remove_task(task)
try:
info_file.write_bytes(dumps(frames))
logging.debug("Saved video info to cache.")
except Exception as e:
- logging.debug("Failed to save video info cache.",exc_info=e)
+ logging.debug("Failed to save video info cache.", exc_info=e)
logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件")
-
-
+
# 创建进度条
with Progress() as prog:
total_frames = sum(frames.values())
- main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames))
-
+ main_task = prog.add_task(
+ "总进度", total=total_frames if total_frames > 0 else len(frames)
+ )
+
# 创建文件队列
for file in frames.keys():
# 进度跟踪
filename = file.relative_to(root_dir)
-
+
# 创建文件级进度条
if frames[file] == 0:
file_task = prog.add_task(f"{filename}")
else:
- file_task = prog.add_task(f"{filename}",total=frames[file])
-
-
+ file_task = prog.add_task(f"{filename}", total=frames[file])
+
with prog._lock:
completed_start = prog._tasks[main_task].completed
def update_progress(x, rate):
if frames[file] == 0:
- prog.update(file_task,description=f"{filename} 已处理{x}帧 {f'速率{rate}' if rate else ''}")
+ prog.update(
+ file_task,
+ description=f"{filename} 已处理{x}帧 {f'速率{rate}' if rate else ''}",
+ )
else:
- prog.update(file_task,completed=x,description=f"{filename} {f'速率{rate}' if rate else ''}")
- prog.update(main_task, completed=completed_start+x)
-
+ prog.update(
+ file_task,
+ completed=x,
+ description=f"{filename} {f'速率{rate}' if rate else ''}",
+ )
+ prog.update(main_task, completed=completed_start + x)
+
if CFG["save_to"] == "single":
- process_video(file, root_dir/CFG["compress_dir_name"], update_progress)
+ process_video(
+ file, root_dir / CFG["compress_dir_name"], update_progress
+ )
else:
process_video(file, None, update_progress)
# 移除文件级进度条
- prog.update(main_task, completed=completed_start+frames[file])
+ prog.update(main_task, completed=completed_start + frames[file])
prog.remove_task(file_task)
-
+
try:
info_file.unlink(missing_ok=True)
except Exception as e:
- logging.warning("无法删除视频信息缓存文件",exc_info=e)
+ logging.warning("无法删除视频信息缓存文件", exc_info=e)
+
def test():
- os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
+ os.environ["PATH"] = (
+ Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
+ )
try:
- subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode()
+ subprocess.run(
+ [CFG["ffmpeg"], "-version"], stdout=-3, stderr=-3
+ ).check_returncode()
except Exception as e:
print(__file__)
logging.critical("无法运行ffmpeg")
@@ -352,19 +404,19 @@ def test():
f"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
- text=True
+ text=True,
)
if ret.returncode != 0:
logging.warning("无法生成测试视频.")
logging.debug(ret.stdout)
logging.debug(ret.stderr)
ret.check_returncode()
- cmd = get_cmd(CFG["test_video_input"],CFG["test_video_output"],)
+ cmd = get_cmd(
+ CFG["test_video_input"],
+ CFG["test_video_output"],
+ )
ret = subprocess.run(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- text=True
+ cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
if ret.returncode != 0:
logging.error("测试视频压缩失败")
@@ -374,47 +426,52 @@ def test():
exit(-1)
os.remove("compress_video_test.mp4")
os.remove("compressed_video_test.mp4")
- except KeyboardInterrupt as e:raise e
+ except KeyboardInterrupt as e:
+ raise e
except Exception as e:
if os.path.exists("compress_video_test.mp4"):
os.remove("compress_video_test.mp4")
logging.warning("测试未通过,继续运行可能出现未定义行为。")
- logging.debug("Test error",exc_info=e)
+ logging.debug("Test error", exc_info=e)
def exit_pause():
- if os.name == 'nt':
+ if os.name == "nt":
os.system("pause")
- elif os.name == 'posix':
+ elif os.name == "posix":
os.system("read -p 'Press Enter to continue...'")
-def main(_root = None):
-
+
+def main(_root=None):
+
atexit.register(exit_pause)
-
+
global root
setup_logging()
tot_bgn = time()
logging.info("-------------------------------")
- logging.info(datetime.now().strftime('Video Compress started at %Y/%m/%d %H:%M'))
-
+ logging.info(datetime.now().strftime("Video Compress started at %Y/%m/%d %H:%M"))
+
if CFG_FILE.exists():
try:
import json
- cfg:dict = json.loads(CFG_FILE.read_text())
+
+ cfg: dict = json.loads(CFG_FILE.read_text())
CFG.update(cfg)
- except KeyboardInterrupt as e:raise e
+ except KeyboardInterrupt as e:
+ raise e
except Exception as e:
logging.warning("Invalid config file, ignored.")
logging.debug(e)
else:
try:
import json
- CFG_FILE.write_text(json.dumps(CFG,indent=4))
+
+ CFG_FILE.write_text(json.dumps(CFG, indent=4))
logging.info("Config file created.")
except Exception as e:
- logging.warning("Failed to create config file.",exc_info=e)
-
+ logging.warning("Failed to create config file.", exc_info=e)
+
if _root is not None:
root = Path(_root)
else:
@@ -424,29 +481,35 @@ def main(_root = None):
logging.warning("Error termination via invalid input.")
sys.exit(1)
root = Path(sys.argv[1])
-
+
if root.name.lower() == CFG["compress_dir_name"].lower():
logging.critical("请修改目标目录名为非compress。")
logging.error("Error termination via invalid input.")
- sys.exit(1)
+ sys.exit(1)
logging.info("开始验证环境")
test()
-
+
if not root.is_dir():
print("提供的路径不是一个有效目录。")
logging.warning("Error termination via invalid input.")
sys.exit(1)
-
+
try:
traverse_directory(root)
tot_end = time()
logging.info(f"Elapsed time: {fmt_time(tot_end-tot_bgn)}")
logging.info("Normal termination of Video Compress.")
except KeyboardInterrupt:
- logging.warning("Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.")
+ logging.warning(
+ "Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED."
+ )
except Exception as e:
- logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e)
+ logging.error(
+ "Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",
+ exc_info=e,
+ )
+
if __name__ == "__main__":
main()
From 4ae07c57cc2439bbd559ca6a67a49a552ad28099 Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Fri, 24 Oct 2025 22:37:18 +0800
Subject: [PATCH 09/14] fix and enhance get frame.
---
VideoCompress/config.json | 3 +-
VideoCompress/get_frame.py | 127 +++++++++++++++++++++++++++++++++++++
VideoCompress/main.py | 50 ++++++---------
3 files changed, 149 insertions(+), 31 deletions(-)
create mode 100644 VideoCompress/get_frame.py
diff --git a/VideoCompress/config.json b/VideoCompress/config.json
index acdcb9a..e028a3e 100644
--- a/VideoCompress/config.json
+++ b/VideoCompress/config.json
@@ -1,6 +1,7 @@
{
"save_to": "single",
- "bitrate": "3M",
+ "bitrate": null,
+ "crf": 26,
"codec": "h264_qsv",
"hwaccel": "qsv",
"extra": [],
diff --git a/VideoCompress/get_frame.py b/VideoCompress/get_frame.py
new file mode 100644
index 0000000..086c40c
--- /dev/null
+++ b/VideoCompress/get_frame.py
@@ -0,0 +1,127 @@
+import json
+import shutil
+import subprocess
+from fractions import Fraction
+from decimal import Decimal
+from typing import Optional, Tuple
+
+class FFProbeError(RuntimeError):
+ pass
+
+def _run_ffprobe(args: list[str]) -> dict:
+ """运行 ffprobe 并以 JSON 返回,若失败抛异常。"""
+ if not shutil.which("ffprobe"):
+ raise FileNotFoundError("未找到 ffprobe,请先安装 FFmpeg 并确保 ffprobe 在 PATH 中。")
+ # 始终要求 JSON 输出,便于稳健解析
+ base = ["ffprobe", "-v", "error", "-print_format", "json"]
+ proc = subprocess.run(base + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+ if proc.returncode != 0:
+ raise FFProbeError(proc.stderr.strip() or "ffprobe 调用失败")
+ try:
+ return json.loads(proc.stdout or "{}")
+ except json.JSONDecodeError as e:
+ raise FFProbeError(f"无法解析 ffprobe 输出为 JSON: {e}")
+
+def _try_nb_frames(path: str, stream_index: int) -> Optional[int]:
+ data = _run_ffprobe([
+ "-select_streams", f"v:{stream_index}",
+ "-show_entries", "stream=nb_frames",
+ path
+ ])
+ streams = data.get("streams") or []
+ if not streams:
+ return None
+ nb = streams[0].get("nb_frames")
+ if nb and nb != "N/A":
+ try:
+ n = int(nb)
+ return n if n >= 0 else None
+ except ValueError:
+ return None
+ return None
+
+def _try_avgfps_times_duration(path: str, stream_index: int) -> Optional[int]:
+ # 读 avg_frame_rate
+ s = _run_ffprobe([
+ "-select_streams", f"v:{stream_index}",
+ "-show_entries", "stream=avg_frame_rate",
+ path
+ ])
+ streams = s.get("streams") or []
+ if not streams:
+ return None
+ afr = streams[0].get("avg_frame_rate")
+ if not afr or afr in ("0/0", "N/A"):
+ return None
+
+ # 读容器时长(单位:秒)
+ f = _run_ffprobe(["-show_entries", "format=duration", path])
+ dur_str = (f.get("format") or {}).get("duration")
+ if not dur_str:
+ return None
+
+ try:
+ fps = Fraction(afr) # 形如 "30000/1001"
+ dur = Decimal(dur_str)
+ # 四舍五入到最近整数,避免系统性低估
+ est = int(dur * Decimal(fps.numerator) / Decimal(fps.denominator) + Decimal("0.5"))
+ return est if est >= 0 else None
+ except Exception:
+ return None
+
+def _try_count_packets(path: str, stream_index: int) -> Optional[int]:
+ # 统计读取到的包数(不解码)。大多容器≈帧数,但不保证 1:1
+ data = _run_ffprobe([
+ "-select_streams", f"v:{stream_index}",
+ "-count_packets",
+ "-show_entries", "stream=nb_read_packets",
+ path
+ ])
+ streams = data.get("streams") or []
+ if not streams:
+ return None
+ nbp = streams[0].get("nb_read_packets")
+ try:
+ n = int(nbp)
+ return n if n >= 0 else None
+ except Exception:
+ return None
+
+def get_video_frame_count(
+ path: str,
+ stream_index: int = 0,
+ fallback_order: Tuple[str, ...] = ("nb_frames", "avg*dur", "count_packets"),
+) -> int|None:
+ """
+ 估计/获取视频总帧数(带回退)。
+ 参数:
+ - path: 视频文件路径
+ - stream_index: 选择哪个视频流,默认 0
+ - allow_slow_decode: 是否允许用解码全片的方式(最慢但最准)
+ - fallback_order: 回退顺序,四种方法的别名可选:
+ "nb_frames" -> 直接读元数据中的总帧数
+ "avg*dur" -> 平均帧率 × 时长(最快估算)
+ "count_packets" -> 统计包数(较快,接近帧数但不保证)
+ 返回:
+ (frame_count, method_used)
+ 异常:
+ - FileNotFoundError: ffprobe 未安装
+ - FFProbeError: ffprobe 调用异常或无法解析
+ - RuntimeError: 所有方法均失败
+ """
+ methods = {
+ "nb_frames": _try_nb_frames,
+ "avg*dur": _try_avgfps_times_duration,
+ "count_packets": _try_count_packets,
+ }
+
+ for key in fallback_order:
+ func = methods.get(key)
+ if not func:
+ continue
+ n = func(path, stream_index)
+ if isinstance(n, int) and n >= 0:
+ return n
+
+ return None
+ raise RuntimeError("无法获取或估计帧数:所有回退方法均失败。")
diff --git a/VideoCompress/main.py b/VideoCompress/main.py
index 1988129..f2ef4a9 100644
--- a/VideoCompress/main.py
+++ b/VideoCompress/main.py
@@ -11,6 +11,7 @@ from pickle import dumps, loads
from typing import Optional, Callable
import atexit
import re
+import get_frame
root = None
CFG_FILE = Path(sys.path[0]) / "config.json"
@@ -56,6 +57,9 @@ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
[
"-hwaccel",
CFG["hwaccel"],
+ "-hwaccel_output_format",
+ CFG["hwaccel"],
+
]
)
command.extend(
@@ -198,12 +202,13 @@ def process_video(
if result.returncode != 0:
logging.error(
- f"处理文件 {video_path_str} 失败,返回码: {result.returncode},cmd={' '.join(command)}"
+ f"处理文件 {video_path_str} 失败"
)
+ logging.debug(f"返回码: {result.returncode}; cmd={' '.join(command)}")
output_file.unlink(missing_ok=True)
assert result.stdout is not None
- logging.error(result.stdout.read())
- logging.error(total)
+ logging.debug(result.stdout.read())
+ logging.debug(total)
if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in [
"h264_mediacodec",
"hevc_mediacodec",
@@ -223,7 +228,7 @@ def process_video(
return False
else:
return True
- elif CFG["disable_hwaccel_when_fail"]:
+ elif CFG["disable_hwaccel_when_fail"] and CFG["hwaccel"] is not None:
logging.info("正在禁用硬件加速器重试,进度条可能发生混乱。")
output_file.unlink(missing_ok=True)
bak = CFG.copy()
@@ -259,7 +264,7 @@ def traverse_directory(root_dir: Path):
video_extensions = set(CFG["video_ext"])
sm = None
# 获取视频文件列表和帧数信息
- video_files = []
+ video_files:list[Path] = []
que = list(root_dir.glob("*"))
while que:
d = que.pop()
@@ -274,9 +279,9 @@ def traverse_directory(root_dir: Path):
elif file.is_dir():
que.append(file)
- if not video_files:
- logging.warning("未找到需要处理的视频文件")
- return
+ if not video_files:
+ logging.warning("未找到需要处理的视频文件")
+ return
# 获取视频信息
frames: dict[Path, float] = {}
@@ -299,28 +304,12 @@ def traverse_directory(root_dir: Path):
if file in cached_data and cached_data[file] > 0:
frames[file] = cached_data[file]
continue
- cmd = f"ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1".split()
- cmd.append(str(file.resolve()))
- proc = subprocess.run(cmd, capture_output=True, text=True)
- if proc.returncode != 0:
- logging.debug(f"无法获取视频信息: {file}, 返回码: {proc.returncode}")
- frames[file] = 0
- continue
- if proc.stdout.strip():
- try:
- avg_frame_rate, duration = proc.stdout.strip().split("\n")
- tmp = avg_frame_rate.split("/")
- avg_frame_rate = float(tmp[0]) / float(tmp[1])
- if duration == "N/A":
- duration = 0
- logging.debug(
- f"无法获取视频信息: {file}, 时长为N/A,默认使用0s"
- )
- duration = float(duration)
- frames[file] = duration * avg_frame_rate
- except (ValueError, IndexError) as e:
- logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
- frames[file] = 0
+ fr = get_frame.get_video_frame_count(str(file.resolve()))
+ if fr is None:
+ logging.debug(
+ f"无法获取视频信息: {file}, 时长为N/A,默认使用0s"
+ )
+ frames[file] = 0 if fr is None else fr
if 0 in frames.values():
logging.warning(
f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。"
@@ -512,4 +501,5 @@ def main(_root=None):
if __name__ == "__main__":
+ sys.argv.append(r'C:\Users\flt\Documents\WeChat Files\wxid_m8h0igh8p52p22\FileStorage\Video')
main()
From f56675c486f5c880d473cd86765a89d09e00ec25 Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Wed, 29 Oct 2025 00:24:13 +0800
Subject: [PATCH 10/14] use pydantic to validate config
---
VideoCompress/config.json | 3 +-
VideoCompress/get_frame.py | 35 ++++++++++------
VideoCompress/main.py | 83 +++++++++++++++++++++++++++++++++++---
3 files changed, 103 insertions(+), 18 deletions(-)
diff --git a/VideoCompress/config.json b/VideoCompress/config.json
index e028a3e..8f32343 100644
--- a/VideoCompress/config.json
+++ b/VideoCompress/config.json
@@ -5,7 +5,8 @@
"codec": "h264_qsv",
"hwaccel": "qsv",
"extra": [],
- "ffmpeg": "ffmpeg",
+ "ffmpeg": "C:/tools/ffmpeg/bin/ffmpeg.exe",
+ "ffprobe": "C:/tools/ffmpeg/bin/ffprobe",
"manual": null,
"video_ext": [
".mp4",
diff --git a/VideoCompress/get_frame.py b/VideoCompress/get_frame.py
index 086c40c..ed99480 100644
--- a/VideoCompress/get_frame.py
+++ b/VideoCompress/get_frame.py
@@ -1,19 +1,18 @@
import json
-import shutil
+import logging
import subprocess
from fractions import Fraction
from decimal import Decimal
from typing import Optional, Tuple
+ffprobe:str = "ffprobe"
class FFProbeError(RuntimeError):
pass
def _run_ffprobe(args: list[str]) -> dict:
"""运行 ffprobe 并以 JSON 返回,若失败抛异常。"""
- if not shutil.which("ffprobe"):
- raise FileNotFoundError("未找到 ffprobe,请先安装 FFmpeg 并确保 ffprobe 在 PATH 中。")
# 始终要求 JSON 输出,便于稳健解析
- base = ["ffprobe", "-v", "error", "-print_format", "json"]
+ base = [ffprobe, "-v", "error", "-print_format", "json"]
proc = subprocess.run(base + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
raise FFProbeError(proc.stderr.strip() or "ffprobe 调用失败")
@@ -30,6 +29,7 @@ def _try_nb_frames(path: str, stream_index: int) -> Optional[int]:
])
streams = data.get("streams") or []
if not streams:
+ logging.debug("_try_nb_frames: failed no stream")
return None
nb = streams[0].get("nb_frames")
if nb and nb != "N/A":
@@ -37,7 +37,9 @@ def _try_nb_frames(path: str, stream_index: int) -> Optional[int]:
n = int(nb)
return n if n >= 0 else None
except ValueError:
+ logging.debug(f"_try_nb_frames: failed nb not positive int: {nb}")
return None
+ logging.debug(f"_try_nb_frames: failed nb NA: {nb}")
return None
def _try_avgfps_times_duration(path: str, stream_index: int) -> Optional[int]:
@@ -58,6 +60,7 @@ def _try_avgfps_times_duration(path: str, stream_index: int) -> Optional[int]:
f = _run_ffprobe(["-show_entries", "format=duration", path])
dur_str = (f.get("format") or {}).get("duration")
if not dur_str:
+ logging.debug(f"_try_avgfps_times_duration: failed no dur_str, {f}")
return None
try:
@@ -66,7 +69,8 @@ def _try_avgfps_times_duration(path: str, stream_index: int) -> Optional[int]:
# 四舍五入到最近整数,避免系统性低估
est = int(dur * Decimal(fps.numerator) / Decimal(fps.denominator) + Decimal("0.5"))
return est if est >= 0 else None
- except Exception:
+ except Exception as e:
+ logging.debug("_try_avgfps_times_duration: failed",exc_info=e)
return None
def _try_count_packets(path: str, stream_index: int) -> Optional[int]:
@@ -79,12 +83,14 @@ def _try_count_packets(path: str, stream_index: int) -> Optional[int]:
])
streams = data.get("streams") or []
if not streams:
+ logging.debug("_try_count_packets: failed no stream")
return None
nbp = streams[0].get("nb_read_packets")
try:
n = int(nbp)
return n if n >= 0 else None
- except Exception:
+ except Exception as e:
+ logging.debug("_try_count_packets: failed",exc_info=e)
return None
def get_video_frame_count(
@@ -116,12 +122,17 @@ def get_video_frame_count(
}
for key in fallback_order:
- func = methods.get(key)
- if not func:
- continue
- n = func(path, stream_index)
- if isinstance(n, int) and n >= 0:
- return n
+ try:
+ func = methods.get(key)
+ if not func:
+ continue
+ n = func(path, stream_index)
+ if isinstance(n, int) and n >= 0:
+ return n
+ else:
+ logging.debug(f"Failed to get frame with {key}")
+ except Exception as e:
+ logging.debug(f"Errored to get frame with {key}.",exc_info=e)
return None
raise RuntimeError("无法获取或估计帧数:所有回退方法均失败。")
diff --git a/VideoCompress/main.py b/VideoCompress/main.py
index f2ef4a9..c1239e6 100644
--- a/VideoCompress/main.py
+++ b/VideoCompress/main.py
@@ -8,10 +8,76 @@ from time import time
from rich.logging import RichHandler
from rich.progress import Progress
from pickle import dumps, loads
-from typing import Optional, Callable
+from typing import Optional, Callable,Literal
import atexit
import re
import get_frame
+import pydantic as pyd
+from pydantic import BaseModel,Field,field_validator,model_validator
+
+class Config(BaseModel):
+ save_to:Literal["single","multi"] = Field("single",description="保存到单文件夹,或者每个子文件夹创建compress_dir")
+ crf: Optional[int] = Field(None, ge=0, le=51, description="CRF值,范围0-51")
+ bitrate: Optional[str] = Field(None, description="比特率,格式如: 1000k, 2.5M, 1500B")
+ codec: str = Field("h264",description="ffmpeg的codec,如果使用GPU需要对应设置")
+ hwaccel:Optional[Literal["amf","qsv","cuda"]] = Field(None,description="使用GPU加速")
+ extra:Optional[list[str]] = Field(None,description="插入到ffmpeg输出前的自定义参数")
+ ffmpeg:str = "ffmpeg"
+ ffprobe:str = "ffprobe"
+ manual:Optional[list[str]] = Field(None,description=r"手动设置ffmpeg,命令ffmpeg -i {input} {manual} {output}")
+ video_ext:list[str] = Field([".mp4", ".mkv"],description="视频文件后缀,含.")
+ compress_dir_name:str = Field("compress",description="压缩文件夹名称")
+ resolution: Optional[str] = Field(None,description="统一到特定尺寸,None为不使用缩放")
+ fps:int = Field(30,description="fps",ge=0)
+ test_video_resolution:str = "1920x1080"
+ test_video_fps:int = Field(30,ge=0)
+ test_video_input:str = "compress_video_test.mp4"
+ test_video_output:str = "compressed_video_test.mp4"
+ disable_hwaccel_when_fail:bool = Field(True,description="当运行失败时,禁用硬件加速")
+
+
+
+ @field_validator('bitrate')
+ @classmethod
+ def validate_bitrate(cls, v: Optional[str]) -> Optional[str]:
+ if v is None:
+ return v
+ pattern = r'^[\d\.]+[MkB]*$'
+ if not re.match(pattern, v):
+ raise ValueError('bitrate格式不正确,应为数字+单位(M/k/B),如: 1000k, 2.5M')
+ return v
+
+ @field_validator('resolution')
+ @classmethod
+ def validate_resolution(cls, v: Optional[str]) -> Optional[str]:
+ if v is None:
+ return v
+ pattern = r'^((-1)|\d+):((-1)|\d+)$'
+ if not re.match(pattern, v):
+ raise ValueError('resolution格式不正确,应为{数字/-1}:{数字/-1}')
+ return v
+
+ @field_validator("compress_dir_name")
+ # @field_validator("test_video_input")
+ # @field_validator("test_video_output")
+ @classmethod
+ def valid_path(cls, v:str) -> str:
+ if re.search(r'[\\/:*?"<>|\x00-\x1F]',v):
+ raise ValueError("某配置不符合目录名语法")
+ return v
+
+
+ @model_validator(mode='after')
+ def validate_mutual_exclusive(self):
+ crf_none = self.crf is None
+ bitrate_none = self.bitrate is None
+
+ # 有且只有一者为None
+ if crf_none == bitrate_none:
+ raise ValueError('crf和bitrate必须互斥:有且只有一个为None')
+
+ return self
+
root = None
CFG_FILE = Path(sys.path[0]) / "config.json"
@@ -85,7 +151,7 @@ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
"-b:v",
CFG["bitrate"],
"-r",
- CFG["fps"],
+ str(CFG["fps"]),
"-y",
]
)
@@ -104,7 +170,7 @@ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
"-global_quality",
str(CFG["crf"]),
"-r",
- CFG["fps"],
+ str(CFG["fps"]),
"-y",
]
)
@@ -390,7 +456,7 @@ def test():
exit(-1)
try:
ret = subprocess.run(
- f"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(),
+ f"{CFG['ffmpeg']} -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
@@ -413,6 +479,9 @@ def test():
logging.debug(ret.stderr)
logging.error("Error termination via test failed.")
exit(-1)
+
+ if get_frame.get_video_frame_count("compress_video_test.mp4") is None:
+ logging.error("测试读取帧数失败,将无法正确显示进度。")
os.remove("compress_video_test.mp4")
os.remove("compressed_video_test.mp4")
except KeyboardInterrupt as e:
@@ -446,7 +515,11 @@ def main(_root=None):
import json
cfg: dict = json.loads(CFG_FILE.read_text())
- CFG.update(cfg)
+ cfg_model = Config(**cfg)
+ CFG.update(cfg_model.model_dump())
+ get_frame.ffprobe = CFG["ffprobe"]
+ logging.debug(cfg_model)
+ logging.debug(CFG)
except KeyboardInterrupt as e:
raise e
except Exception as e:
From abf64d9cd60b91d7a2621b8e4ee038e50b74f2df Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Wed, 29 Oct 2025 22:45:45 +0800
Subject: [PATCH 11/14] pdf unlock: add outlines
---
pdf_unlock/main.py | 42 +++++++++++++++++++++++++++++++++++++++++-
1 file changed, 41 insertions(+), 1 deletion(-)
diff --git a/pdf_unlock/main.py b/pdf_unlock/main.py
index 14cdf4e..e8c27f1 100644
--- a/pdf_unlock/main.py
+++ b/pdf_unlock/main.py
@@ -1,8 +1,39 @@
import PyPDF2 # PyMuPDF
import sys
from pathlib import Path
+from typing import Optional
-def copy_pdf_pages(input_path: str, output_path: str) -> bool:
+def add_outlines(
+ reader_obj:PyPDF2.PdfReader,
+ writer_obj:PyPDF2.PdfWriter,
+ outlines:Optional[list] = None,
+ parent=None
+ ):
+
+ if outlines is None:
+ outlines = reader_obj.outline
+ last = None
+ for it in outlines:
+ if isinstance(it, list):
+ add_outlines(reader_obj, writer_obj ,it, last)
+ continue
+
+ title = getattr(it, 'title', None)
+ if title is None:
+ try:
+ title = str(it)
+ except Exception as e:
+ raise e
+ continue
+
+ page_num = reader_obj.get_destination_page_number(it)
+ if page_num is None:
+ continue
+
+ last = writer_obj.add_outline_item(title, page_num, parent=parent)
+
+
+def copy_pdf_pages(input_path: Path, output_path: Path) -> bool:
"""
移除PDF文件的所有限制
@@ -23,6 +54,14 @@ def copy_pdf_pages(input_path: str, output_path: str) -> bool:
# 复制所有页面
for page in reader.pages:
writer.add_page(page)
+
+ # 复制书签(如果有)
+
+ try:
+ add_outlines(reader, writer)
+ except Exception as e:
+ raise e
+ print(f"警告:{input_path.name}书签处理失败.")
# 写入新文件(不设置任何加密或限制)
with open(output_path, 'wb') as output_file:
@@ -32,6 +71,7 @@ def copy_pdf_pages(input_path: str, output_path: str) -> bool:
except Exception as e:
print(f"移除PDF限制时发生错误: {e}")
+ raise e
return False
# def copy_pdf_pages(input_file, output_file):
From d454f8c8f40830c508302c69f60a124ed47a24ed Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Wed, 29 Oct 2025 23:05:14 +0800
Subject: [PATCH 12/14] pdf_unlock: inline no recursion
---
pdf_unlock/main.py | 114 +++++++++++----------------------------------
1 file changed, 27 insertions(+), 87 deletions(-)
diff --git a/pdf_unlock/main.py b/pdf_unlock/main.py
index e8c27f1..f5704fa 100644
--- a/pdf_unlock/main.py
+++ b/pdf_unlock/main.py
@@ -1,37 +1,9 @@
import PyPDF2 # PyMuPDF
+from PyPDF2.generic import IndirectObject
import sys
from pathlib import Path
from typing import Optional
-
-def add_outlines(
- reader_obj:PyPDF2.PdfReader,
- writer_obj:PyPDF2.PdfWriter,
- outlines:Optional[list] = None,
- parent=None
- ):
-
- if outlines is None:
- outlines = reader_obj.outline
- last = None
- for it in outlines:
- if isinstance(it, list):
- add_outlines(reader_obj, writer_obj ,it, last)
- continue
-
- title = getattr(it, 'title', None)
- if title is None:
- try:
- title = str(it)
- except Exception as e:
- raise e
- continue
-
- page_num = reader_obj.get_destination_page_number(it)
- if page_num is None:
- continue
-
- last = writer_obj.add_outline_item(title, page_num, parent=parent)
-
+from itertools import repeat
def copy_pdf_pages(input_path: Path, output_path: Path) -> bool:
"""
@@ -48,19 +20,34 @@ def copy_pdf_pages(input_path: Path, output_path: Path) -> bool:
try:
with open(input_path, 'rb') as input_file:
reader = PyPDF2.PdfReader(input_file)
-
writer = PyPDF2.PdfWriter()
# 复制所有页面
for page in reader.pages:
writer.add_page(page)
- # 复制书签(如果有)
-
try:
- add_outlines(reader, writer)
+ que = list(zip(repeat(None),reader.outline))
+ last:Optional[IndirectObject] = None
+ for par, it in que:
+ if isinstance(it, list):
+ que.extend(zip(repeat(last),it))
+ continue
+
+ title = getattr(it, 'title', None)
+ if title is None:
+ try:
+ title = str(it)
+ except Exception:
+ print(f"警告:无法获取书签标题,跳过该书签.")
+ continue
+
+ page_num = reader.get_destination_page_number(it)
+ if page_num is None:
+ continue
+
+ last = writer.add_outline_item(title, page_num, parent=par)
except Exception as e:
- raise e
print(f"警告:{input_path.name}书签处理失败.")
# 写入新文件(不设置任何加密或限制)
@@ -71,53 +58,8 @@ def copy_pdf_pages(input_path: Path, output_path: Path) -> bool:
except Exception as e:
print(f"移除PDF限制时发生错误: {e}")
- raise e
return False
-# def copy_pdf_pages(input_file, output_file):
-# """
-# 读取PDF文件并逐页复制到新的PDF文件
-
-# Args:
-# input_file (str): 输入PDF文件路径
-# output_file (str): 输出PDF文件路径
-# """
-# try:
-# # 检查输入文件是否存在
-# if not os.path.exists(input_file):
-# print(f"错误:输入文件 '{input_file}' 不存在")
-# return False
-
-# # 打开输入PDF文件
-# pdf_document = fitz.open(input_file)
-
-# # 创建新的PDF文档
-# new_pdf = fitz.open()
-# new_pdf.insert_pdf(pdf_document)
-
-# # 保存输出文件
-# new_pdf.save(output_file)
-
-# # 关闭文档
-# pdf_document.close()
-# new_pdf.close()
-
-# return True
-
-# except FileNotFoundError:
-# print(f"错误:找不到文件 '{input_file}'")
-# return False
-# except PermissionError:
-# print(f"错误:权限不足,无法访问文件")
-# return False
-# except Exception as pdf_error:
-# error_msg = str(pdf_error).lower()
-# if "damaged" in error_msg or "corrupt" in error_msg:
-# print(f"错误:PDF文件 '{input_file}' 已损坏")
-# else:
-# print(f"发生错误:{str(pdf_error)}")
-# return False
-
def main():
"""主函数"""
if len(sys.argv) < 2:
@@ -129,12 +71,12 @@ def main():
else:
input_path = Path(sys.argv[1])
if input_path.is_dir():
- files = list(input_path.glob("**/*.pdf"))
+ files = list(input_path.rglob("*.pdf"))
else:
print("正在处理",input_path.name)
output_file = input_path.with_name(f"{input_path.stem}_decrypt.pdf")
- success = copy_pdf_pages(input_path, output_file)
- print("处理完成" if success else "处理失败")
+ suc = copy_pdf_pages(input_path, output_file)
+ print("处理完成" if suc else "处理失败")
return
total = len(files)
@@ -142,15 +84,13 @@ def main():
for i, pdf_file in enumerate(files, start=1):
rate= round(i/total *100)
print(f"进度: ", "-"* (rate//5)," "*(20-rate//5), f" {rate}%",sep="",end="\r")
- import time
- # time.sleep(1) # 模拟处理时间
if not pdf_file.is_file():
print(f"跳过非PDF文件:{pdf_file}")
continue
output_file = pdf_file.with_name(f"{pdf_file.stem}_decrypt.pdf")
- success = copy_pdf_pages(pdf_file, output_file)
+ suc = copy_pdf_pages(pdf_file, output_file)
- if not success:
+ if not suc:
print(f"{pdf_file.name} 处理失败")
if __name__ == "__main__":
From 1e550961d2acb057050a7fa4aa10e376a73fffbb Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Sat, 1 Nov 2025 00:12:29 +0800
Subject: [PATCH 13/14] pdf_unlock: add ui
---
VideoCompress/.gitignore | 3 +-
VideoCompress/config_ui_2.py | 599 +++++++++++++++++++++++++++++++++++
VideoCompress/main.py | 6 +-
pdf_unlock/ui.py | 148 +++++++++
4 files changed, 753 insertions(+), 3 deletions(-)
create mode 100644 VideoCompress/config_ui_2.py
create mode 100644 pdf_unlock/ui.py
diff --git a/VideoCompress/.gitignore b/VideoCompress/.gitignore
index 2e29888..293c674 100644
--- a/VideoCompress/.gitignore
+++ b/VideoCompress/.gitignore
@@ -4,4 +4,5 @@ config.json
*.xml
tmp
build
-dist
\ No newline at end of file
+dist
+video_info.cache
\ No newline at end of file
diff --git a/VideoCompress/config_ui_2.py b/VideoCompress/config_ui_2.py
new file mode 100644
index 0000000..76b6aac
--- /dev/null
+++ b/VideoCompress/config_ui_2.py
@@ -0,0 +1,599 @@
+# -*- coding: utf-8 -*-
+"""
+Fluent Design 配置界面(PySide6 + QFluentWidgets)
+
+功能要点:
+- 拆分「通用」与「高级」两页,符合需求
+- 内置 GPU 模板:CPU/libx264、NVIDIA(NVENC/cuda)、Intel(QSV)、AMD(AMF)
+- 与 Pydantic Config 对接:即时校验,错误信息以 InfoBar 呈现
+- 生成/预览 ffmpeg 命令;可对单文件执行(subprocess.run)
+- 可导入/导出配置(JSON)
+
+依赖:
+ pip install PySide6 qfluentwidgets pydantic
+
+注意:
+- 按需在同目录准备 main.py,导出:
+ from pydantic import BaseModel
+ class Config(BaseModel): ...
+ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]: ...
+- 本 UI 会在运行前,把实例化后的配置对象挂到 main 模块:`main.config = cfg`
+ 若你的 get_cmd 内部使用全局 config,将能读到该对象。
+"""
+
+from __future__ import annotations
+import json
+import os
+import re
+import sys
+import subprocess
+from typing import Optional
+
+from PySide6.QtCore import Qt
+from PySide6.QtWidgets import (
+ QApplication, QFileDialog, QWidget, QVBoxLayout, QHBoxLayout, QLabel,
+ QTextEdit
+)
+
+# QFluentWidgets
+from qfluentwidgets import (
+ FluentWindow, setTheme, Theme, setThemeColor, FluentIcon,
+ NavigationItemPosition, PrimaryPushButton, PushButton, LineEdit,
+ ComboBox, SpinBox, SwitchButton, InfoBar, InfoBarPosition,
+ CardWidget, NavigationPushButton
+)
+
+# Pydantic
+from pydantic import ValidationError
+
+# ---- 导入业务对象 -----------------------------------------------------------
+try:
+ import main as main_module # 用于挂载 config 实例
+ from main import Config, get_cmd # 直接调用用户提供的方法
+except Exception as e: # 允许先运行 UI 草拟界面,不阻断
+ main_module = None
+ Config = None # type: ignore
+ get_cmd = None # type: ignore
+ print("[警告] 无法从 main 导入 Config / get_cmd:", e)
+
+
+# --------------------------- 工具函数 ----------------------------------------
+
+def show_error(parent: QWidget, title: str, message: str):
+ InfoBar.error(
+ title=title,
+ content=message,
+ orient=Qt.Horizontal,
+ isClosable=True,
+ position=InfoBarPosition.TOP_RIGHT,
+ duration=5000,
+ parent=parent,
+ )
+
+
+def show_success(parent: QWidget, title: str, message: str):
+ InfoBar.success(
+ title=title,
+ content=message,
+ orient=Qt.Horizontal,
+ isClosable=True,
+ position=InfoBarPosition.TOP_RIGHT,
+ duration=3500,
+ parent=parent,
+ )
+
+
+# --------------------------- GPU 模板 ----------------------------------------
+GPU_TEMPLATES = {
+ "CPU (libx264)": {
+ "hwaccel": None,
+ "codec": "libx264",
+ "extra": ["-preset", "slow"],
+ "crf": 23,
+ "bitrate": None,
+ },
+ "NVIDIA (NVENC/cuda)": {
+ "hwaccel": "cuda",
+ "codec": "h264_nvenc",
+ "extra": ["-preset", "p6", "-rc", "vbr"],
+ "crf": None,
+ "bitrate": "5M",
+ },
+ "Intel (QSV)": {
+ "hwaccel": "qsv",
+ "codec": "h264_qsv",
+ "extra": ["-preset", "balanced"],
+ "crf": None,
+ "bitrate": "5M",
+ },
+ "AMD (AMF)": {
+ "hwaccel": "amf",
+ "codec": "h264_amf",
+ "extra": ["-quality", "balanced"],
+ "crf": None,
+ "bitrate": "5M",
+ },
+}
+
+SAVE_TO_OPTIONS = ["single", "multi"]
+HWACCEL_OPTIONS: list[Optional[str]] = [None, "cuda", "qsv", "amf"]
+
+
+# --------------------------- 卡片基础 ----------------------------------------
+class GroupCard(CardWidget):
+ def __init__(self, title: str, subtitle: str = "", parent: QWidget | None = None):
+ super().__init__(parent)
+ self.setMinimumWidth(720)
+ self.v = QVBoxLayout(self)
+ self.v.setContentsMargins(16, 12, 16, 16)
+ t = QLabel(f"{title}")
+ t.setProperty("cssClass", "cardTitle")
+ s = QLabel(subtitle)
+ s.setProperty("cssClass", "cardSubTitle")
+ s.setStyleSheet("color: rgba(0,0,0,0.55);")
+ self.v.addWidget(t)
+ if subtitle:
+ self.v.addWidget(s)
+ self.body = QVBoxLayout()
+ self.body.setSpacing(10)
+ self.v.addLayout(self.body)
+
+ def add_row(self, label: str, widget: QWidget):
+ row = QHBoxLayout()
+ lab = QLabel(label)
+ lab.setMinimumWidth(150)
+ row.addWidget(lab)
+ row.addWidget(widget, 1)
+ self.body.addLayout(row)
+
+
+# --------------------------- 通用设置页 --------------------------------------
+class GeneralPage(QWidget):
+ def __init__(self, parent: QWidget | None = None):
+ super().__init__(parent)
+ layout = QVBoxLayout(self)
+ layout.setContentsMargins(20, 16, 20, 20)
+ layout.setSpacing(12)
+
+ # --- I/O 区 ---
+ io_card = GroupCard("输入/输出", "选择要压缩的视频与输出文件")
+ self.input_edit = LineEdit(self)
+ self.input_edit.setPlaceholderText("选择输入视频文件... (.mp4/.mkv 等)")
+ btn_in = PrimaryPushButton("选择文件")
+ btn_in.clicked.connect(self._pick_input)
+ io_row = QHBoxLayout()
+ io_row.addWidget(self.input_edit, 1)
+ io_row.addWidget(btn_in)
+
+ self.output_edit = LineEdit(self)
+ self.output_edit.setPlaceholderText("选择输出文件路径,例如:/path/compressed.mp4")
+ btn_out = PushButton("选择输出")
+ btn_out.clicked.connect(self._pick_output)
+ oo_row = QHBoxLayout()
+ oo_row.addWidget(self.output_edit, 1)
+ oo_row.addWidget(btn_out)
+
+ io_card.body.addLayout(io_row)
+ io_card.body.addLayout(oo_row)
+
+ # --- 基础编码参数 ---
+ base_card = GroupCard("基础编码", "最常用的编码参数")
+
+ self.save_to_combo = ComboBox()
+ self.save_to_combo.addItems(SAVE_TO_OPTIONS)
+ base_card.add_row("保存方式(save_to)", self.save_to_combo)
+
+ self.codec_combo = ComboBox()
+ self.codec_combo.addItems(["h264", "libx264", "h264_nvenc", "h264_qsv", "h264_amf", "hevc", "hevc_nvenc", "hevc_qsv", "hevc_amf"])
+ base_card.add_row("编码器(codec)", self.codec_combo)
+
+ self.hwaccel_combo = ComboBox()
+ self.hwaccel_combo.addItems(["(无)", "cuda", "qsv", "amf"])
+ base_card.add_row("GPU 加速(hwaccel)", self.hwaccel_combo)
+
+ self.resolution_edit = LineEdit()
+ self.resolution_edit.setPlaceholderText("如 1920:1080 或 -1:1080,留空为不缩放")
+ base_card.add_row("分辨率(resolution)", self.resolution_edit)
+
+ self.fps_spin = SpinBox()
+ self.fps_spin.setRange(0, 999)
+ self.fps_spin.setValue(30)
+ base_card.add_row("帧率(fps)", self.fps_spin)
+
+ self.video_ext_edit = LineEdit()
+ self.video_ext_edit.setText(".mp4,.mkv")
+ base_card.add_row("视频后缀(video_ext)", self.video_ext_edit)
+
+ self.compress_dir_edit = LineEdit()
+ self.compress_dir_edit.setText("compress")
+ base_card.add_row("压缩目录名(compress_dir_name)", self.compress_dir_edit)
+
+ self.disable_hw_switch = SwitchButton("失败时禁用硬件加速(disable_hwaccel_when_fail)")
+ self.disable_hw_switch.setChecked(True)
+ base_card.body.addWidget(self.disable_hw_switch)
+
+ # --- 模板 ---
+ tpl_card = GroupCard("GPU 模板", "一键应用推荐参数(会覆盖 codec/hwaccel/crf/bitrate/extra)")
+ self.tpl_combo = ComboBox()
+ self.tpl_combo.addItems(list(GPU_TEMPLATES.keys()))
+ btn_apply_tpl = PrimaryPushButton("应用模板")
+ btn_apply_tpl.clicked.connect(self.apply_template)
+ row_tpl = QHBoxLayout()
+ row_tpl.addWidget(self.tpl_combo, 1)
+ row_tpl.addWidget(btn_apply_tpl)
+ tpl_card.body.addLayout(row_tpl)
+
+ # --- 运行/预览 ---
+ run_card = GroupCard("执行", "根据当前配置生成并运行 ffmpeg 命令")
+ self.preview_text = QTextEdit()
+ self.preview_text.setReadOnly(True)
+ btn_preview = PushButton("生成命令预览")
+ btn_preview.clicked.connect(self.on_preview)
+ btn_run = PrimaryPushButton("运行压缩 (subprocess.run)")
+ btn_run.clicked.connect(self.on_run)
+ rrow = QHBoxLayout()
+ rrow.addWidget(btn_preview)
+ rrow.addWidget(btn_run)
+ run_card.body.addWidget(self.preview_text)
+ run_card.body.addLayout(rrow)
+
+ # 排版
+ layout.addWidget(io_card)
+ layout.addWidget(base_card)
+ layout.addWidget(tpl_card)
+ layout.addWidget(run_card)
+ layout.addStretch(1)
+
+ # 高级区联动:由 AdvancedPage 控制
+ self.link_advanced: Optional[AdvancedPage] = None
+
+ # ----- 文件选择 -----
+ def _pick_input(self):
+ fn, _ = QFileDialog.getOpenFileName(self, "选择输入视频", "", "视频文件 (*.mp4 *.mkv *.mov *.avi *.*)")
+ if fn:
+ self.input_edit.setText(fn)
+
+ def _pick_output(self):
+ fn, _ = QFileDialog.getSaveFileName(self, "选择输出文件", "compressed.mp4", "视频文件 (*.mp4 *.mkv)")
+ if fn:
+ self.output_edit.setText(fn)
+
+ # ----- 模板应用 -----
+ def apply_template(self):
+ name = self.tpl_combo.currentText()
+ t = GPU_TEMPLATES.get(name, {})
+ # 覆盖通用字段
+ codec = t.get("codec")
+ if codec:
+ self.codec_combo.setCurrentText(codec)
+ hw = t.get("hwaccel")
+ if hw is None:
+ self.hwaccel_combo.setCurrentIndex(0)
+ else:
+ self.hwaccel_combo.setCurrentText(str(hw))
+ # 覆盖高级字段(若可用)
+ if self.link_advanced:
+ self.link_advanced.apply_template_from_general(t)
+ show_success(self, "已应用模板", f"{name} 参数已填充")
+
+ # ----- 组装配置 -----
+ def _collect_config_payload(self) -> dict:
+ # CRF / bitrate 值来自高级区
+ adv = self.link_advanced
+ crf_val = adv.crf_spin.value() if adv else None
+ if adv and adv.mode_combo.currentText() == "CRF 模式":
+ bitrate_val = None
+ else:
+ bitrate_val = adv.bitrate_edit.text().strip() or None
+
+ # hwaccel
+ _hw_text = self.hwaccel_combo.currentText()
+ hwaccel_val = None if _hw_text == "(无)" else _hw_text
+
+ # video_ext 解析
+ exts = [x.strip() for x in self.video_ext_edit.text().split(',') if x.strip()]
+
+ payload = {
+ "save_to": self.save_to_combo.currentText(),
+ "crf": crf_val if (adv and adv.mode_combo.currentText() == "CRF 模式") else None,
+ "bitrate": bitrate_val if (adv and adv.mode_combo.currentText() == "比特率模式") else None,
+ "codec": self.codec_combo.currentText(),
+ "hwaccel": hwaccel_val,
+ "extra": adv.parse_list_field(adv.extra_edit.toPlainText()) if adv else None,
+ "ffmpeg": adv.ffmpeg_edit.text().strip() if adv else "ffmpeg",
+ "ffprobe": adv.ffprobe_edit.text().strip() if adv else "ffprobe",
+ "manual": adv.parse_list_field(adv.manual_edit.toPlainText()) if (adv and adv.manual_switch.isChecked()) else None,
+ "video_ext": exts or [".mp4", ".mkv"],
+ "compress_dir_name": self.compress_dir_edit.text().strip() or "compress",
+ "resolution": self.resolution_edit.text().strip() or None,
+ "fps": self.fps_spin.value(),
+ "test_video_resolution": adv.test_res_edit.text().strip() if adv else "1920x1080",
+ "test_video_fps": adv.test_fps_spin.value() if adv else 30,
+ "test_video_input": adv.test_in_edit.text().strip() if adv else "compress_video_test.mp4",
+ "test_video_output": adv.test_out_edit.text().strip() if adv else "compressed_video_test.mp4",
+ "disable_hwaccel_when_fail": self.disable_hw_switch.isChecked(),
+ }
+ return payload
+
+ def build_config(self) -> Config:
+ if Config is None:
+ raise RuntimeError("未能导入 Config。请确认 main.py 可用且在同目录。")
+ payload = self._collect_config_payload()
+ try:
+ cfg = Config(**payload)
+ except ValidationError as e:
+ show_error(self, "配置校验失败", str(e))
+ raise
+ return cfg
+
+ # ----- 预览/运行 -----
+ def on_preview(self):
+ if get_cmd is None:
+ show_error(self, "缺少 get_cmd", "未能导入 get_cmd,请检查 main.py")
+ return
+ in_path = self.input_edit.text().strip()
+ out_path = self.output_edit.text().strip()
+ if not in_path or not out_path:
+ show_error(self, "缺少路径", "请先选择输入与输出文件")
+ return
+ try:
+ cfg = self.build_config()
+ if main_module is not None:
+ setattr(main_module, "config", cfg) # 挂载到 main 供 get_cmd 读取
+ cmd = get_cmd(in_path, out_path)
+ self.preview_text.setPlainText(" ".join([repr(c) if " " in c else c for c in cmd]))
+ show_success(self, "命令已生成", "如需执行请点击运行")
+ except Exception as e:
+ show_error(self, "生成失败", str(e))
+
+ def on_run(self):
+ if get_cmd is None:
+ show_error(self, "缺少 get_cmd", "未能导入 get_cmd,请检查 main.py")
+ return
+ in_path = self.input_edit.text().strip()
+ out_path = self.output_edit.text().strip()
+ if not in_path or not out_path:
+ show_error(self, "缺少路径", "请先选择输入与输出文件")
+ return
+ try:
+ cfg = self.build_config()
+ if main_module is not None:
+ setattr(main_module, "config", cfg)
+ cmd = get_cmd(in_path, out_path)
+ # 使用 subprocess.run 执行
+ completed = subprocess.run(cmd, capture_output=True, text=True)
+ log = (completed.stdout or "") + "\n" + (completed.stderr or "")
+ self.preview_text.setPlainText(log)
+ if completed.returncode == 0:
+ show_success(self, "执行成功", "视频压缩完成")
+ else:
+ show_error(self, "执行失败", f"返回码 {completed.returncode}")
+ except Exception as e:
+ show_error(self, "执行异常", str(e))
+
+
+# --------------------------- 高级设置页 --------------------------------------
+class AdvancedPage(QWidget):
+ def __init__(self, parent: QWidget | None = None):
+ super().__init__(parent)
+ layout = QVBoxLayout(self)
+ layout.setContentsMargins(20, 16, 20, 20)
+ layout.setSpacing(12)
+
+ # 模式切换:CRF / Bitrate 互斥
+ mode_card = GroupCard("编码模式", "CRF 与 比特率 二选一")
+ self.mode_combo = ComboBox()
+ self.mode_combo.addItems(["CRF 模式", "比特率模式"])
+ self.mode_combo.currentIndexChanged.connect(self._on_mode_changed)
+ mode_card.add_row("选择模式", self.mode_combo)
+
+ self.crf_spin = SpinBox()
+ self.crf_spin.setRange(0, 51)
+ self.crf_spin.setValue(23)
+ mode_card.add_row("CRF 值(0-51)", self.crf_spin)
+
+ self.bitrate_edit = LineEdit()
+ self.bitrate_edit.setPlaceholderText("如 1000k / 2.5M / 1500B")
+ mode_card.add_row("比特率", self.bitrate_edit)
+
+ # 其它高级参数
+ adv_card = GroupCard("高级参数", "额外 ffmpeg 参数 / 手动命令 / 可执行路径")
+
+ self.extra_edit = QTextEdit()
+ self.extra_edit.setPlaceholderText("每行一个参数;例如:\n-preset\np6\n-rc\nvbr")
+ adv_card.add_row("额外(extra)", self.extra_edit)
+
+ self.manual_switch = SwitchButton("使用手动命令 (manual)")
+ self.manual_switch.setChecked(False)
+ adv_card.body.addWidget(self.manual_switch)
+
+ self.manual_edit = QTextEdit()
+ self.manual_edit.setPlaceholderText("每行一个参数(会插入到 ffmpeg -i {input} {manual} {output} 中)")
+ self.manual_edit.setEnabled(False)
+ self.manual_switch.checkedChanged.connect(lambda c: self.manual_edit.setEnabled(c))
+ adv_card.body.addWidget(self.manual_edit)
+
+ self.ffmpeg_edit = LineEdit()
+ self.ffmpeg_edit.setText("ffmpeg")
+ adv_card.add_row("ffmpeg 路径", self.ffmpeg_edit)
+ self.ffprobe_edit = LineEdit()
+ self.ffprobe_edit.setText("ffprobe")
+ adv_card.add_row("ffprobe 路径", self.ffprobe_edit)
+
+ # 测试参数
+ test_card = GroupCard("测试", "用于快速验证配置是否能跑通")
+ self.test_res_edit = LineEdit(); self.test_res_edit.setText("1920x1080")
+ self.test_fps_spin = SpinBox(); self.test_fps_spin.setRange(0, 999); self.test_fps_spin.setValue(30)
+ self.test_in_edit = LineEdit(); self.test_in_edit.setText("compress_video_test.mp4")
+ self.test_out_edit = LineEdit(); self.test_out_edit.setText("compressed_video_test.mp4")
+ test_card.add_row("测试分辨率", self.test_res_edit)
+ test_card.add_row("测试 FPS", self.test_fps_spin)
+ test_card.add_row("测试输入文件名", self.test_in_edit)
+ test_card.add_row("测试输出文件名", self.test_out_edit)
+
+ # backup_card = GroupCard("备份配置", "导入导出当前配置")
+
+ layout.addWidget(mode_card)
+ layout.addWidget(adv_card)
+ layout.addWidget(test_card)
+ layout.addStretch(1)
+
+ self._on_mode_changed(0)
+
+ def _on_mode_changed(self, idx: int):
+ is_crf = (idx == 0)
+ self.crf_spin.setEnabled(is_crf)
+ self.bitrate_edit.setEnabled(not is_crf)
+
+ # 解析多行 -> list[str]
+ @staticmethod
+ def parse_list_field(text: str) -> Optional[list[str]]:
+ lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
+ return lines or None
+
+ # 从通用模板带入高级字段
+ def apply_template_from_general(self, tpl: dict):
+ if tpl.get("crf") is not None:
+ self.mode_combo.setCurrentText("CRF 模式")
+ self.crf_spin.setValue(int(tpl["crf"]))
+ self.bitrate_edit.clear()
+ else:
+ self.mode_combo.setCurrentText("比特率模式")
+ self.bitrate_edit.setText(str(tpl.get("bitrate", "5M")))
+ self.extra_edit.setPlainText("\n".join(tpl.get("extra", [])))
+
+
+# --------------------------- 主窗体 ------------------------------------------
+class MainWindow(FluentWindow):
+ def __init__(self):
+ super().__init__()
+ self.setWindowTitle("视频压缩配置 - Fluent UI")
+ self.resize(980, 720)
+
+ # 主题与主色
+ setTheme(Theme.AUTO)
+ setThemeColor("#2563eb") # Tailwind 蓝-600
+
+ # 页面
+ self.general = GeneralPage()
+ self.general.setObjectName("general_page")
+ self.advanced = AdvancedPage()
+ self.advanced.setObjectName("advanced_page")
+ self.general.link_advanced = self.advanced
+
+ # 导航
+ self.addSubInterface(self.general, FluentIcon.VIDEO, "通用 Config")
+ self.addSubInterface(self.advanced, FluentIcon.SETTING, "高级 Config")
+
+ # 顶部右侧:操作按钮
+ # self.navigationInterface.addWidget( # type: ignore
+ # routeKey="spacer",
+ # widget=QLabel(""),
+ # onClick=None,
+ # position=NavigationItemPosition.TOP
+ # )
+
+ # 导入/导出配置
+ export_btn = NavigationPushButton(FluentIcon.SAVE, "导出配置", False, self.navigationInterface)
+ export_btn.clicked.connect(self.export_config)
+ import_btn = NavigationPushButton(FluentIcon.FOLDER, "导入配置", False, self.navigationInterface)
+ import_btn.clicked.connect(self.import_config)
+ self.titleBar.raise_() # 确保标题栏在顶层
+ self.navigationInterface.addWidget( # type: ignore
+ routeKey="export",
+ widget=export_btn,
+ onClick=None,
+ position=NavigationItemPosition.BOTTOM
+ )
+ self.navigationInterface.addWidget( # type: ignore
+ routeKey="import",
+ widget=import_btn,
+ onClick=None,
+ position=NavigationItemPosition.BOTTOM
+ )
+
+ # ---------------- 导入/导出 -----------------
+ def export_config(self):
+ try:
+ cfg = self.general.build_config()
+ except ValidationError:
+ return
+ fn, _ = QFileDialog.getSaveFileName(self, "导出配置为 JSON", "config.json", "JSON (*.json)")
+ if not fn:
+ return
+ with open(fn, 'w', encoding='utf-8') as f:
+ json.dump(json.loads(cfg.model_dump_json()), f, ensure_ascii=False, indent=2)
+ show_success(self, "已导出", os.path.basename(fn))
+
+ def import_config(self):
+ fn, _ = QFileDialog.getOpenFileName(self, "导入配置 JSON", "", "JSON (*.json)")
+ if not fn:
+ return
+ try:
+ with open(fn, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+ # 回填到界面(允许缺省)
+ self._fill_general(data)
+ self._fill_advanced(data)
+ show_success(self, "已导入", os.path.basename(fn))
+ except Exception as e:
+ show_error(self, "导入失败", str(e))
+
+ def _fill_general(self, d: dict):
+ if v := d.get("save_to"):
+ self.general.save_to_combo.setCurrentText(v)
+ if v := d.get("codec"):
+ self.general.codec_combo.setCurrentText(v)
+ hw = d.get("hwaccel")
+ self.general.hwaccel_combo.setCurrentIndex(0 if hw in (None, "", "None") else max(1, self.general.hwaccel_combo.findText(str(hw))))
+ if v := d.get("resolution"):
+ self.general.resolution_edit.setText(v)
+ if v := d.get("fps"):
+ self.general.fps_spin.setValue(int(v))
+ if v := d.get("video_ext"):
+ if isinstance(v, list):
+ self.general.video_ext_edit.setText(",".join(v))
+ else:
+ self.general.video_ext_edit.setText(str(v))
+ if v := d.get("compress_dir_name"):
+ self.general.compress_dir_edit.setText(v)
+ if v := d.get("disable_hwaccel_when_fail") is not None:
+ self.general.disable_hw_switch.setChecked(bool(v))
+
+ def _fill_advanced(self, d: dict):
+ crf = d.get("crf")
+ bitrate = d.get("bitrate")
+ if crf is not None and bitrate is None:
+ self.advanced.mode_combo.setCurrentText("CRF 模式")
+ self.advanced.crf_spin.setValue(int(crf))
+ self.advanced.bitrate_edit.clear()
+ else:
+ self.advanced.mode_combo.setCurrentText("比特率模式")
+ if bitrate is not None:
+ self.advanced.bitrate_edit.setText(str(bitrate))
+ if v := d.get("extra"):
+ self.advanced.extra_edit.setPlainText("\n".join(v if isinstance(v, list) else [str(v)]))
+ if v := d.get("manual"):
+ self.advanced.manual_switch.setChecked(True)
+ self.advanced.manual_edit.setEnabled(True)
+ self.advanced.manual_edit.setPlainText("\n".join(v if isinstance(v, list) else [str(v)]))
+ if v := d.get("ffmpeg"):
+ self.advanced.ffmpeg_edit.setText(str(v))
+ if v := d.get("ffprobe"):
+ self.advanced.ffprobe_edit.setText(str(v))
+ if v := d.get("test_video_resolution"):
+ self.advanced.test_res_edit.setText(str(v))
+ if v := d.get("test_video_fps"):
+ self.advanced.test_fps_spin.setValue(int(v))
+ if v := d.get("test_video_input"):
+ self.advanced.test_in_edit.setText(str(v))
+ if v := d.get("test_video_output"):
+ self.advanced.test_out_edit.setText(str(v))
+
+
+# --------------------------- 入口 --------------------------------------------
+if __name__ == "__main__":
+ app = QApplication(sys.argv)
+ win = MainWindow()
+ win.show()
+ sys.exit(app.exec())
diff --git a/VideoCompress/main.py b/VideoCompress/main.py
index c1239e6..a4ab31d 100644
--- a/VideoCompress/main.py
+++ b/VideoCompress/main.py
@@ -12,7 +12,6 @@ from typing import Optional, Callable,Literal
import atexit
import re
import get_frame
-import pydantic as pyd
from pydantic import BaseModel,Field,field_validator,model_validator
class Config(BaseModel):
@@ -80,7 +79,10 @@ class Config(BaseModel):
root = None
-CFG_FILE = Path(sys.path[0]) / "config.json"
+if os.environ.get("INSTALL", "0") == "1":
+ CFG_FILE= Path(os.getenv("APPDATA", "C:/")) / "VideoCompress" / "config.json"
+else:
+ CFG_FILE= Path(sys.path[0]) / "config.json"
CFG = {
"save_to": "single",
"crf": "18",
diff --git a/pdf_unlock/ui.py b/pdf_unlock/ui.py
new file mode 100644
index 0000000..6c4fe46
--- /dev/null
+++ b/pdf_unlock/ui.py
@@ -0,0 +1,148 @@
+from __future__ import annotations
+from pathlib import Path
+import threading
+import traceback
+import tkinter as tk
+from tkinter import filedialog, messagebox
+import customtkinter as ctk
+
+from tkinterdnd2 import TkinterDnD, DND_FILES # type: ignore
+DND_AVAILABLE = True
+
+from main import copy_pdf_pages # type: ignore
+
+APP_TITLE = "PDF 解锁(拖入即可)"
+SUFFIX = "_decrypt"
+
+
+def parse_dropped_paths(tcl_list: str, tk_root: tk.Misc) -> list[Path]:
+ """将 DND 的 raw 字符串解析为 Path 列表(兼容空格/花括号)。"""
+ return [Path(p) for p in tk_root.tk.splitlist(tcl_list)]
+
+
+def gather_pdfs(paths: list[Path]) -> list[Path]:
+ """根据传入路径自动识别:单文件 / 多文件 / 目录(递归),提取所有 PDF。"""
+ out: list[Path] = []
+ for p in paths:
+ if p.is_dir():
+ out.extend([f for f in p.rglob("*.pdf") if f.is_file()])
+ elif p.is_file() and p.suffix.lower() == ".pdf":
+ out.append(p)
+ # 去重并按路径排序
+ uniq = sorted({f.resolve() for f in out})
+ return list(uniq)
+
+
+def output_path_for(in_path: Path) -> Path:
+ return in_path.with_name(f"{in_path.stem}{SUFFIX}.pdf")
+
+
+class App:
+ def __init__(self, root: tk.Tk):
+ self.root = root
+ self.root.title(APP_TITLE)
+ self.root.geometry("720x360")
+ ctk.set_appearance_mode("System")
+ ctk.set_default_color_theme("blue")
+
+ # ---- 单一可点击/可拖拽区域 ----
+ self.drop = ctk.CTkFrame(self.root, height=240, corner_radius=12)
+ self.drop.pack(fill="both", expand=True, padx=20, pady=20)
+
+ self.label = ctk.CTkLabel(
+ self.drop,
+ text=(
+ "将 PDF 文件或文件夹拖入此区域即可开始解锁\n"
+ "输出在原文件同目录,文件名加上 _decrypt 后缀"
+ + ("(拖拽可用)")
+ ),
+ font=("微软雅黑", 16),
+ justify="center",
+ )
+ self.label.place(relx=0.5, rely=0.5, anchor="center")
+
+ # 点击同样可选择(依然只有这一个控件)
+ self.drop.bind("", self._on_click_select)
+ self.label.bind("", self._on_click_select)
+
+ if DND_AVAILABLE:
+ self.drop.drop_target_register(DND_FILES) # type: ignore
+ self.drop.dnd_bind("<>", self._on_drop) # type: ignore
+
+ # ---- 事件 ----
+ def _on_click_select(self, _evt=None):
+ # 仅一个简单文件选择器;若想选目录,可直接把目录拖进来
+ files = filedialog.askopenfilenames(
+ title="选择 PDF 文件(可多选)",
+ filetypes=[("PDF 文件", "*.pdf"), ("所有文件", "*.*")],
+ )
+ if files:
+ self._start_process([Path(f) for f in files])
+
+ def _on_drop(self, event):
+ try:
+ paths = parse_dropped_paths(event.data, self.root)
+ except Exception:
+ return
+ self._start_process(paths)
+
+ # ---- 核心处理 ----
+ def _start_process(self, raw_paths: list[Path]):
+ if copy_pdf_pages is None:
+ messagebox.showerror(
+ "错误",
+ "未能从 main.py 导入 copy_pdf_pages(input_path: Path, output_path: Path) -> bool",
+ )
+ return
+
+ pdfs = gather_pdfs(raw_paths)
+ if not pdfs:
+ messagebox.showwarning("提示", "未找到任何 PDF 文件。")
+ return
+
+ # 后台线程,避免 UI 卡死
+ self.label.configure(text=f"发现 {len(pdfs)} 个 PDF,开始处理…")
+ t = threading.Thread(target=self._worker, args=(pdfs,), daemon=True)
+ t.start()
+
+ def _worker(self, pdfs: list[Path]):
+ ok, fail = 0, 0
+ errors: list[str] = []
+ for f in pdfs:
+ try:
+ out_path = output_path_for(f)
+ # 简化:若已存在,直接覆盖
+ out_path.parent.mkdir(parents=True, exist_ok=True)
+ success = bool(copy_pdf_pages(f, out_path)) # type: ignore
+ if success:
+ ok += 1
+ else:
+ fail += 1
+ except Exception as e:
+ fail += 1
+ errors.append(f"{f}: {e}{traceback.format_exc()}")
+
+ summary = f"完成:成功 {ok},失败 {fail}。输出文件位于各自原目录。"
+ self._set_status(summary)
+ if errors:
+ # 仅在有错误时弹出详情
+ messagebox.showerror("部分失败", summary + "\n" + "\n".join(errors[:3]))
+ else:
+ messagebox.showinfo("完成", summary)
+
+ def _set_status(self, text: str):
+ self.label.configure(text=text)
+
+
+def main():
+ root: tk.Misc
+ if DND_AVAILABLE:
+ root = TkinterDnD.Tk() # type: ignore
+ else:
+ root = tk.Tk()
+ App(root)
+ root.mainloop()
+
+
+if __name__ == "__main__":
+ main()
From 9bb73a633f6cc2261b4f3a0a5aa1e5e2c0c69791 Mon Sep 17 00:00:00 2001
From: flt6 <1404262047@qq.com>
Date: Sun, 2 Nov 2025 17:02:32 +0800
Subject: [PATCH 14/14] single file for ui
---
pdf_unlock/ui.py | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/pdf_unlock/ui.py b/pdf_unlock/ui.py
index 6c4fe46..c26923c 100644
--- a/pdf_unlock/ui.py
+++ b/pdf_unlock/ui.py
@@ -5,11 +5,13 @@ import traceback
import tkinter as tk
from tkinter import filedialog, messagebox
import customtkinter as ctk
+import sys
from tkinterdnd2 import TkinterDnD, DND_FILES # type: ignore
DND_AVAILABLE = True
from main import copy_pdf_pages # type: ignore
+from main import main as dummy_main # to avoid linter error
APP_TITLE = "PDF 解锁(拖入即可)"
SUFFIX = "_decrypt"
@@ -145,4 +147,7 @@ def main():
if __name__ == "__main__":
- main()
+ if len(sys.argv)>=2:
+ dummy_main()
+ else:
+ main()