Merge branch 'master' of flt6.top:flt/tools

This commit is contained in:
2025-12-21 14:26:07 +08:00
9 changed files with 1352 additions and 463 deletions

View File

@ -5,3 +5,4 @@ config.json
tmp tmp
build build
dist dist
video_info.cache

View File

@ -1,11 +1,23 @@
{ {
"save_to": "single", "save_to": "single",
"crf": 18, "bitrate": null,
"codec": "h264", "crf": 26,
"ffmpeg": "ffmpeg", "codec": "h264_qsv",
"hwaccel": "qsv",
"extra": [],
"ffmpeg": "C:/tools/ffmpeg/bin/ffmpeg.exe",
"ffprobe": "C:/tools/ffmpeg/bin/ffprobe",
"manual": null,
"video_ext": [ "video_ext": [
".mp4", ".mp4",
".mkv" ".mkv"
], ],
"extra": [] "compress_dir_name": "compress_qsv",
"resolution": null,
"fps": "30",
"test_video_resolution": "1920x1080",
"test_video_fps": "30",
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
"disable_hwaccel_when_fail": true
} }

View File

@ -0,0 +1,12 @@
{
"save_to": "single",
"bitrate": "3M",
"codec": "h264_mediacodec",
"hwaccel": "mediacodec",
"ffmpeg": "ffmpeg",
"video_ext": [
".mp4",
".mkv"
],
"resolution": null
}

View File

@ -0,0 +1,599 @@
# -*- coding: utf-8 -*-
"""
Fluent Design 配置界面PySide6 + QFluentWidgets
功能要点:
- 拆分「通用」与「高级」两页,符合需求
- 内置 GPU 模板CPU/libx264、NVIDIA(NVENC/cuda)、Intel(QSV)、AMD(AMF)
- 与 Pydantic Config 对接:即时校验,错误信息以 InfoBar 呈现
- 生成/预览 ffmpeg 命令可对单文件执行subprocess.run
- 可导入/导出配置JSON
依赖:
pip install PySide6 qfluentwidgets pydantic
注意:
- 按需在同目录准备 main.py导出
from pydantic import BaseModel
class Config(BaseModel): ...
def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]: ...
- 本 UI 会在运行前,把实例化后的配置对象挂到 main 模块:`main.config = cfg`
若你的 get_cmd 内部使用全局 config将能读到该对象。
"""
from __future__ import annotations
import json
import os
import re
import sys
import subprocess
from typing import Optional
from PySide6.QtCore import Qt
from PySide6.QtWidgets import (
QApplication, QFileDialog, QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QTextEdit
)
# QFluentWidgets
from qfluentwidgets import (
FluentWindow, setTheme, Theme, setThemeColor, FluentIcon,
NavigationItemPosition, PrimaryPushButton, PushButton, LineEdit,
ComboBox, SpinBox, SwitchButton, InfoBar, InfoBarPosition,
CardWidget, NavigationPushButton
)
# Pydantic
from pydantic import ValidationError
# ---- 导入业务对象 -----------------------------------------------------------
try:
import main as main_module # 用于挂载 config 实例
from main import Config, get_cmd # 直接调用用户提供的方法
except Exception as e: # 允许先运行 UI 草拟界面,不阻断
main_module = None
Config = None # type: ignore
get_cmd = None # type: ignore
print("[警告] 无法从 main 导入 Config / get_cmd", e)
# --------------------------- 工具函数 ----------------------------------------
def show_error(parent: QWidget, title: str, message: str):
InfoBar.error(
title=title,
content=message,
orient=Qt.Horizontal,
isClosable=True,
position=InfoBarPosition.TOP_RIGHT,
duration=5000,
parent=parent,
)
def show_success(parent: QWidget, title: str, message: str):
InfoBar.success(
title=title,
content=message,
orient=Qt.Horizontal,
isClosable=True,
position=InfoBarPosition.TOP_RIGHT,
duration=3500,
parent=parent,
)
# --------------------------- GPU 模板 ----------------------------------------
GPU_TEMPLATES = {
"CPU (libx264)": {
"hwaccel": None,
"codec": "libx264",
"extra": ["-preset", "slow"],
"crf": 23,
"bitrate": None,
},
"NVIDIA (NVENC/cuda)": {
"hwaccel": "cuda",
"codec": "h264_nvenc",
"extra": ["-preset", "p6", "-rc", "vbr"],
"crf": None,
"bitrate": "5M",
},
"Intel (QSV)": {
"hwaccel": "qsv",
"codec": "h264_qsv",
"extra": ["-preset", "balanced"],
"crf": None,
"bitrate": "5M",
},
"AMD (AMF)": {
"hwaccel": "amf",
"codec": "h264_amf",
"extra": ["-quality", "balanced"],
"crf": None,
"bitrate": "5M",
},
}
SAVE_TO_OPTIONS = ["single", "multi"]
HWACCEL_OPTIONS: list[Optional[str]] = [None, "cuda", "qsv", "amf"]
# --------------------------- 卡片基础 ----------------------------------------
class GroupCard(CardWidget):
def __init__(self, title: str, subtitle: str = "", parent: QWidget | None = None):
super().__init__(parent)
self.setMinimumWidth(720)
self.v = QVBoxLayout(self)
self.v.setContentsMargins(16, 12, 16, 16)
t = QLabel(f"<b>{title}</b>")
t.setProperty("cssClass", "cardTitle")
s = QLabel(subtitle)
s.setProperty("cssClass", "cardSubTitle")
s.setStyleSheet("color: rgba(0,0,0,0.55);")
self.v.addWidget(t)
if subtitle:
self.v.addWidget(s)
self.body = QVBoxLayout()
self.body.setSpacing(10)
self.v.addLayout(self.body)
def add_row(self, label: str, widget: QWidget):
row = QHBoxLayout()
lab = QLabel(label)
lab.setMinimumWidth(150)
row.addWidget(lab)
row.addWidget(widget, 1)
self.body.addLayout(row)
# --------------------------- 通用设置页 --------------------------------------
class GeneralPage(QWidget):
def __init__(self, parent: QWidget | None = None):
super().__init__(parent)
layout = QVBoxLayout(self)
layout.setContentsMargins(20, 16, 20, 20)
layout.setSpacing(12)
# --- I/O 区 ---
io_card = GroupCard("输入/输出", "选择要压缩的视频与输出文件")
self.input_edit = LineEdit(self)
self.input_edit.setPlaceholderText("选择输入视频文件... (.mp4/.mkv 等)")
btn_in = PrimaryPushButton("选择文件")
btn_in.clicked.connect(self._pick_input)
io_row = QHBoxLayout()
io_row.addWidget(self.input_edit, 1)
io_row.addWidget(btn_in)
self.output_edit = LineEdit(self)
self.output_edit.setPlaceholderText("选择输出文件路径,例如:/path/compressed.mp4")
btn_out = PushButton("选择输出")
btn_out.clicked.connect(self._pick_output)
oo_row = QHBoxLayout()
oo_row.addWidget(self.output_edit, 1)
oo_row.addWidget(btn_out)
io_card.body.addLayout(io_row)
io_card.body.addLayout(oo_row)
# --- 基础编码参数 ---
base_card = GroupCard("基础编码", "最常用的编码参数")
self.save_to_combo = ComboBox()
self.save_to_combo.addItems(SAVE_TO_OPTIONS)
base_card.add_row("保存方式(save_to)", self.save_to_combo)
self.codec_combo = ComboBox()
self.codec_combo.addItems(["h264", "libx264", "h264_nvenc", "h264_qsv", "h264_amf", "hevc", "hevc_nvenc", "hevc_qsv", "hevc_amf"])
base_card.add_row("编码器(codec)", self.codec_combo)
self.hwaccel_combo = ComboBox()
self.hwaccel_combo.addItems(["(无)", "cuda", "qsv", "amf"])
base_card.add_row("GPU 加速(hwaccel)", self.hwaccel_combo)
self.resolution_edit = LineEdit()
self.resolution_edit.setPlaceholderText("如 1920:1080 或 -1:1080留空为不缩放")
base_card.add_row("分辨率(resolution)", self.resolution_edit)
self.fps_spin = SpinBox()
self.fps_spin.setRange(0, 999)
self.fps_spin.setValue(30)
base_card.add_row("帧率(fps)", self.fps_spin)
self.video_ext_edit = LineEdit()
self.video_ext_edit.setText(".mp4,.mkv")
base_card.add_row("视频后缀(video_ext)", self.video_ext_edit)
self.compress_dir_edit = LineEdit()
self.compress_dir_edit.setText("compress")
base_card.add_row("压缩目录名(compress_dir_name)", self.compress_dir_edit)
self.disable_hw_switch = SwitchButton("失败时禁用硬件加速(disable_hwaccel_when_fail)")
self.disable_hw_switch.setChecked(True)
base_card.body.addWidget(self.disable_hw_switch)
# --- 模板 ---
tpl_card = GroupCard("GPU 模板", "一键应用推荐参数(会覆盖 codec/hwaccel/crf/bitrate/extra")
self.tpl_combo = ComboBox()
self.tpl_combo.addItems(list(GPU_TEMPLATES.keys()))
btn_apply_tpl = PrimaryPushButton("应用模板")
btn_apply_tpl.clicked.connect(self.apply_template)
row_tpl = QHBoxLayout()
row_tpl.addWidget(self.tpl_combo, 1)
row_tpl.addWidget(btn_apply_tpl)
tpl_card.body.addLayout(row_tpl)
# --- 运行/预览 ---
run_card = GroupCard("执行", "根据当前配置生成并运行 ffmpeg 命令")
self.preview_text = QTextEdit()
self.preview_text.setReadOnly(True)
btn_preview = PushButton("生成命令预览")
btn_preview.clicked.connect(self.on_preview)
btn_run = PrimaryPushButton("运行压缩 (subprocess.run)")
btn_run.clicked.connect(self.on_run)
rrow = QHBoxLayout()
rrow.addWidget(btn_preview)
rrow.addWidget(btn_run)
run_card.body.addWidget(self.preview_text)
run_card.body.addLayout(rrow)
# 排版
layout.addWidget(io_card)
layout.addWidget(base_card)
layout.addWidget(tpl_card)
layout.addWidget(run_card)
layout.addStretch(1)
# 高级区联动:由 AdvancedPage 控制
self.link_advanced: Optional[AdvancedPage] = None
# ----- 文件选择 -----
def _pick_input(self):
fn, _ = QFileDialog.getOpenFileName(self, "选择输入视频", "", "视频文件 (*.mp4 *.mkv *.mov *.avi *.*)")
if fn:
self.input_edit.setText(fn)
def _pick_output(self):
fn, _ = QFileDialog.getSaveFileName(self, "选择输出文件", "compressed.mp4", "视频文件 (*.mp4 *.mkv)")
if fn:
self.output_edit.setText(fn)
# ----- 模板应用 -----
def apply_template(self):
name = self.tpl_combo.currentText()
t = GPU_TEMPLATES.get(name, {})
# 覆盖通用字段
codec = t.get("codec")
if codec:
self.codec_combo.setCurrentText(codec)
hw = t.get("hwaccel")
if hw is None:
self.hwaccel_combo.setCurrentIndex(0)
else:
self.hwaccel_combo.setCurrentText(str(hw))
# 覆盖高级字段(若可用)
if self.link_advanced:
self.link_advanced.apply_template_from_general(t)
show_success(self, "已应用模板", f"{name} 参数已填充")
# ----- 组装配置 -----
def _collect_config_payload(self) -> dict:
# CRF / bitrate 值来自高级区
adv = self.link_advanced
crf_val = adv.crf_spin.value() if adv else None
if adv and adv.mode_combo.currentText() == "CRF 模式":
bitrate_val = None
else:
bitrate_val = adv.bitrate_edit.text().strip() or None
# hwaccel
_hw_text = self.hwaccel_combo.currentText()
hwaccel_val = None if _hw_text == "(无)" else _hw_text
# video_ext 解析
exts = [x.strip() for x in self.video_ext_edit.text().split(',') if x.strip()]
payload = {
"save_to": self.save_to_combo.currentText(),
"crf": crf_val if (adv and adv.mode_combo.currentText() == "CRF 模式") else None,
"bitrate": bitrate_val if (adv and adv.mode_combo.currentText() == "比特率模式") else None,
"codec": self.codec_combo.currentText(),
"hwaccel": hwaccel_val,
"extra": adv.parse_list_field(adv.extra_edit.toPlainText()) if adv else None,
"ffmpeg": adv.ffmpeg_edit.text().strip() if adv else "ffmpeg",
"ffprobe": adv.ffprobe_edit.text().strip() if adv else "ffprobe",
"manual": adv.parse_list_field(adv.manual_edit.toPlainText()) if (adv and adv.manual_switch.isChecked()) else None,
"video_ext": exts or [".mp4", ".mkv"],
"compress_dir_name": self.compress_dir_edit.text().strip() or "compress",
"resolution": self.resolution_edit.text().strip() or None,
"fps": self.fps_spin.value(),
"test_video_resolution": adv.test_res_edit.text().strip() if adv else "1920x1080",
"test_video_fps": adv.test_fps_spin.value() if adv else 30,
"test_video_input": adv.test_in_edit.text().strip() if adv else "compress_video_test.mp4",
"test_video_output": adv.test_out_edit.text().strip() if adv else "compressed_video_test.mp4",
"disable_hwaccel_when_fail": self.disable_hw_switch.isChecked(),
}
return payload
def build_config(self) -> Config:
if Config is None:
raise RuntimeError("未能导入 Config。请确认 main.py 可用且在同目录。")
payload = self._collect_config_payload()
try:
cfg = Config(**payload)
except ValidationError as e:
show_error(self, "配置校验失败", str(e))
raise
return cfg
# ----- 预览/运行 -----
def on_preview(self):
if get_cmd is None:
show_error(self, "缺少 get_cmd", "未能导入 get_cmd请检查 main.py")
return
in_path = self.input_edit.text().strip()
out_path = self.output_edit.text().strip()
if not in_path or not out_path:
show_error(self, "缺少路径", "请先选择输入与输出文件")
return
try:
cfg = self.build_config()
if main_module is not None:
setattr(main_module, "config", cfg) # 挂载到 main 供 get_cmd 读取
cmd = get_cmd(in_path, out_path)
self.preview_text.setPlainText(" ".join([repr(c) if " " in c else c for c in cmd]))
show_success(self, "命令已生成", "如需执行请点击运行")
except Exception as e:
show_error(self, "生成失败", str(e))
def on_run(self):
if get_cmd is None:
show_error(self, "缺少 get_cmd", "未能导入 get_cmd请检查 main.py")
return
in_path = self.input_edit.text().strip()
out_path = self.output_edit.text().strip()
if not in_path or not out_path:
show_error(self, "缺少路径", "请先选择输入与输出文件")
return
try:
cfg = self.build_config()
if main_module is not None:
setattr(main_module, "config", cfg)
cmd = get_cmd(in_path, out_path)
# 使用 subprocess.run 执行
completed = subprocess.run(cmd, capture_output=True, text=True)
log = (completed.stdout or "") + "\n" + (completed.stderr or "")
self.preview_text.setPlainText(log)
if completed.returncode == 0:
show_success(self, "执行成功", "视频压缩完成")
else:
show_error(self, "执行失败", f"返回码 {completed.returncode}")
except Exception as e:
show_error(self, "执行异常", str(e))
# --------------------------- 高级设置页 --------------------------------------
class AdvancedPage(QWidget):
def __init__(self, parent: QWidget | None = None):
super().__init__(parent)
layout = QVBoxLayout(self)
layout.setContentsMargins(20, 16, 20, 20)
layout.setSpacing(12)
# 模式切换CRF / Bitrate 互斥
mode_card = GroupCard("编码模式", "CRF 与 比特率 二选一")
self.mode_combo = ComboBox()
self.mode_combo.addItems(["CRF 模式", "比特率模式"])
self.mode_combo.currentIndexChanged.connect(self._on_mode_changed)
mode_card.add_row("选择模式", self.mode_combo)
self.crf_spin = SpinBox()
self.crf_spin.setRange(0, 51)
self.crf_spin.setValue(23)
mode_card.add_row("CRF 值(0-51)", self.crf_spin)
self.bitrate_edit = LineEdit()
self.bitrate_edit.setPlaceholderText("如 1000k / 2.5M / 1500B")
mode_card.add_row("比特率", self.bitrate_edit)
# 其它高级参数
adv_card = GroupCard("高级参数", "额外 ffmpeg 参数 / 手动命令 / 可执行路径")
self.extra_edit = QTextEdit()
self.extra_edit.setPlaceholderText("每行一个参数;例如:\n-preset\np6\n-rc\nvbr")
adv_card.add_row("额外(extra)", self.extra_edit)
self.manual_switch = SwitchButton("使用手动命令 (manual)")
self.manual_switch.setChecked(False)
adv_card.body.addWidget(self.manual_switch)
self.manual_edit = QTextEdit()
self.manual_edit.setPlaceholderText("每行一个参数(会插入到 ffmpeg -i {input} {manual} {output} 中)")
self.manual_edit.setEnabled(False)
self.manual_switch.checkedChanged.connect(lambda c: self.manual_edit.setEnabled(c))
adv_card.body.addWidget(self.manual_edit)
self.ffmpeg_edit = LineEdit()
self.ffmpeg_edit.setText("ffmpeg")
adv_card.add_row("ffmpeg 路径", self.ffmpeg_edit)
self.ffprobe_edit = LineEdit()
self.ffprobe_edit.setText("ffprobe")
adv_card.add_row("ffprobe 路径", self.ffprobe_edit)
# 测试参数
test_card = GroupCard("测试", "用于快速验证配置是否能跑通")
self.test_res_edit = LineEdit(); self.test_res_edit.setText("1920x1080")
self.test_fps_spin = SpinBox(); self.test_fps_spin.setRange(0, 999); self.test_fps_spin.setValue(30)
self.test_in_edit = LineEdit(); self.test_in_edit.setText("compress_video_test.mp4")
self.test_out_edit = LineEdit(); self.test_out_edit.setText("compressed_video_test.mp4")
test_card.add_row("测试分辨率", self.test_res_edit)
test_card.add_row("测试 FPS", self.test_fps_spin)
test_card.add_row("测试输入文件名", self.test_in_edit)
test_card.add_row("测试输出文件名", self.test_out_edit)
# backup_card = GroupCard("备份配置", "导入导出当前配置")
layout.addWidget(mode_card)
layout.addWidget(adv_card)
layout.addWidget(test_card)
layout.addStretch(1)
self._on_mode_changed(0)
def _on_mode_changed(self, idx: int):
is_crf = (idx == 0)
self.crf_spin.setEnabled(is_crf)
self.bitrate_edit.setEnabled(not is_crf)
# 解析多行 -> list[str]
@staticmethod
def parse_list_field(text: str) -> Optional[list[str]]:
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
return lines or None
# 从通用模板带入高级字段
def apply_template_from_general(self, tpl: dict):
if tpl.get("crf") is not None:
self.mode_combo.setCurrentText("CRF 模式")
self.crf_spin.setValue(int(tpl["crf"]))
self.bitrate_edit.clear()
else:
self.mode_combo.setCurrentText("比特率模式")
self.bitrate_edit.setText(str(tpl.get("bitrate", "5M")))
self.extra_edit.setPlainText("\n".join(tpl.get("extra", [])))
# --------------------------- 主窗体 ------------------------------------------
class MainWindow(FluentWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("视频压缩配置 - Fluent UI")
self.resize(980, 720)
# 主题与主色
setTheme(Theme.AUTO)
setThemeColor("#2563eb") # Tailwind 蓝-600
# 页面
self.general = GeneralPage()
self.general.setObjectName("general_page")
self.advanced = AdvancedPage()
self.advanced.setObjectName("advanced_page")
self.general.link_advanced = self.advanced
# 导航
self.addSubInterface(self.general, FluentIcon.VIDEO, "通用 Config")
self.addSubInterface(self.advanced, FluentIcon.SETTING, "高级 Config")
# 顶部右侧:操作按钮
# self.navigationInterface.addWidget( # type: ignore
# routeKey="spacer",
# widget=QLabel(""),
# onClick=None,
# position=NavigationItemPosition.TOP
# )
# 导入/导出配置
export_btn = NavigationPushButton(FluentIcon.SAVE, "导出配置", False, self.navigationInterface)
export_btn.clicked.connect(self.export_config)
import_btn = NavigationPushButton(FluentIcon.FOLDER, "导入配置", False, self.navigationInterface)
import_btn.clicked.connect(self.import_config)
self.titleBar.raise_() # 确保标题栏在顶层
self.navigationInterface.addWidget( # type: ignore
routeKey="export",
widget=export_btn,
onClick=None,
position=NavigationItemPosition.BOTTOM
)
self.navigationInterface.addWidget( # type: ignore
routeKey="import",
widget=import_btn,
onClick=None,
position=NavigationItemPosition.BOTTOM
)
# ---------------- 导入/导出 -----------------
def export_config(self):
try:
cfg = self.general.build_config()
except ValidationError:
return
fn, _ = QFileDialog.getSaveFileName(self, "导出配置为 JSON", "config.json", "JSON (*.json)")
if not fn:
return
with open(fn, 'w', encoding='utf-8') as f:
json.dump(json.loads(cfg.model_dump_json()), f, ensure_ascii=False, indent=2)
show_success(self, "已导出", os.path.basename(fn))
def import_config(self):
fn, _ = QFileDialog.getOpenFileName(self, "导入配置 JSON", "", "JSON (*.json)")
if not fn:
return
try:
with open(fn, 'r', encoding='utf-8') as f:
data = json.load(f)
# 回填到界面(允许缺省)
self._fill_general(data)
self._fill_advanced(data)
show_success(self, "已导入", os.path.basename(fn))
except Exception as e:
show_error(self, "导入失败", str(e))
def _fill_general(self, d: dict):
if v := d.get("save_to"):
self.general.save_to_combo.setCurrentText(v)
if v := d.get("codec"):
self.general.codec_combo.setCurrentText(v)
hw = d.get("hwaccel")
self.general.hwaccel_combo.setCurrentIndex(0 if hw in (None, "", "None") else max(1, self.general.hwaccel_combo.findText(str(hw))))
if v := d.get("resolution"):
self.general.resolution_edit.setText(v)
if v := d.get("fps"):
self.general.fps_spin.setValue(int(v))
if v := d.get("video_ext"):
if isinstance(v, list):
self.general.video_ext_edit.setText(",".join(v))
else:
self.general.video_ext_edit.setText(str(v))
if v := d.get("compress_dir_name"):
self.general.compress_dir_edit.setText(v)
if v := d.get("disable_hwaccel_when_fail") is not None:
self.general.disable_hw_switch.setChecked(bool(v))
def _fill_advanced(self, d: dict):
crf = d.get("crf")
bitrate = d.get("bitrate")
if crf is not None and bitrate is None:
self.advanced.mode_combo.setCurrentText("CRF 模式")
self.advanced.crf_spin.setValue(int(crf))
self.advanced.bitrate_edit.clear()
else:
self.advanced.mode_combo.setCurrentText("比特率模式")
if bitrate is not None:
self.advanced.bitrate_edit.setText(str(bitrate))
if v := d.get("extra"):
self.advanced.extra_edit.setPlainText("\n".join(v if isinstance(v, list) else [str(v)]))
if v := d.get("manual"):
self.advanced.manual_switch.setChecked(True)
self.advanced.manual_edit.setEnabled(True)
self.advanced.manual_edit.setPlainText("\n".join(v if isinstance(v, list) else [str(v)]))
if v := d.get("ffmpeg"):
self.advanced.ffmpeg_edit.setText(str(v))
if v := d.get("ffprobe"):
self.advanced.ffprobe_edit.setText(str(v))
if v := d.get("test_video_resolution"):
self.advanced.test_res_edit.setText(str(v))
if v := d.get("test_video_fps"):
self.advanced.test_fps_spin.setValue(int(v))
if v := d.get("test_video_input"):
self.advanced.test_in_edit.setText(str(v))
if v := d.get("test_video_output"):
self.advanced.test_out_edit.setText(str(v))
# --------------------------- 入口 --------------------------------------------
if __name__ == "__main__":
app = QApplication(sys.argv)
win = MainWindow()
win.show()
sys.exit(app.exec())

138
VideoCompress/get_frame.py Normal file
View File

@ -0,0 +1,138 @@
import json
import logging
import subprocess
from fractions import Fraction
from decimal import Decimal
from typing import Optional, Tuple
ffprobe:str = "ffprobe"
class FFProbeError(RuntimeError):
pass
def _run_ffprobe(args: list[str]) -> dict:
"""运行 ffprobe 并以 JSON 返回,若失败抛异常。"""
# 始终要求 JSON 输出,便于稳健解析
base = [ffprobe, "-v", "error", "-print_format", "json"]
proc = subprocess.run(base + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
raise FFProbeError(proc.stderr.strip() or "ffprobe 调用失败")
try:
return json.loads(proc.stdout or "{}")
except json.JSONDecodeError as e:
raise FFProbeError(f"无法解析 ffprobe 输出为 JSON: {e}")
def _try_nb_frames(path: str, stream_index: int) -> Optional[int]:
data = _run_ffprobe([
"-select_streams", f"v:{stream_index}",
"-show_entries", "stream=nb_frames",
path
])
streams = data.get("streams") or []
if not streams:
logging.debug("_try_nb_frames: failed no stream")
return None
nb = streams[0].get("nb_frames")
if nb and nb != "N/A":
try:
n = int(nb)
return n if n >= 0 else None
except ValueError:
logging.debug(f"_try_nb_frames: failed nb not positive int: {nb}")
return None
logging.debug(f"_try_nb_frames: failed nb NA: {nb}")
return None
def _try_avgfps_times_duration(path: str, stream_index: int) -> Optional[int]:
# 读 avg_frame_rate
s = _run_ffprobe([
"-select_streams", f"v:{stream_index}",
"-show_entries", "stream=avg_frame_rate",
path
])
streams = s.get("streams") or []
if not streams:
return None
afr = streams[0].get("avg_frame_rate")
if not afr or afr in ("0/0", "N/A"):
return None
# 读容器时长(单位:秒)
f = _run_ffprobe(["-show_entries", "format=duration", path])
dur_str = (f.get("format") or {}).get("duration")
if not dur_str:
logging.debug(f"_try_avgfps_times_duration: failed no dur_str, {f}")
return None
try:
fps = Fraction(afr) # 形如 "30000/1001"
dur = Decimal(dur_str)
# 四舍五入到最近整数,避免系统性低估
est = int(dur * Decimal(fps.numerator) / Decimal(fps.denominator) + Decimal("0.5"))
return est if est >= 0 else None
except Exception as e:
logging.debug("_try_avgfps_times_duration: failed",exc_info=e)
return None
def _try_count_packets(path: str, stream_index: int) -> Optional[int]:
# 统计读取到的包数(不解码)。大多容器≈帧数,但不保证 1:1
data = _run_ffprobe([
"-select_streams", f"v:{stream_index}",
"-count_packets",
"-show_entries", "stream=nb_read_packets",
path
])
streams = data.get("streams") or []
if not streams:
logging.debug("_try_count_packets: failed no stream")
return None
nbp = streams[0].get("nb_read_packets")
try:
n = int(nbp)
return n if n >= 0 else None
except Exception as e:
logging.debug("_try_count_packets: failed",exc_info=e)
return None
def get_video_frame_count(
path: str,
stream_index: int = 0,
fallback_order: Tuple[str, ...] = ("nb_frames", "avg*dur", "count_packets"),
) -> int|None:
"""
估计/获取视频总帧数(带回退)。
参数:
- path: 视频文件路径
- stream_index: 选择哪个视频流,默认 0
- allow_slow_decode: 是否允许用解码全片的方式(最慢但最准)
- fallback_order: 回退顺序,四种方法的别名可选:
"nb_frames" -> 直接读元数据中的总帧数
"avg*dur" -> 平均帧率 × 时长(最快估算)
"count_packets" -> 统计包数(较快,接近帧数但不保证)
返回:
(frame_count, method_used)
异常:
- FileNotFoundError: ffprobe 未安装
- FFProbeError: ffprobe 调用异常或无法解析
- RuntimeError: 所有方法均失败
"""
methods = {
"nb_frames": _try_nb_frames,
"avg*dur": _try_avgfps_times_duration,
"count_packets": _try_count_packets,
}
for key in fallback_order:
try:
func = methods.get(key)
if not func:
continue
n = func(path, stream_index)
if isinstance(n, int) and n >= 0:
return n
else:
logging.debug(f"Failed to get frame with {key}")
except Exception as e:
logging.debug(f"Errored to get frame with {key}.",exc_info=e)
return None
raise RuntimeError("无法获取或估计帧数:所有回退方法均失败。")

View File

@ -7,397 +7,400 @@ from datetime import datetime
from time import time from time import time
from rich.logging import RichHandler from rich.logging import RichHandler
from rich.progress import Progress from rich.progress import Progress
from typing import Optional from pickle import dumps, loads
from typing import Optional, Callable,Literal
import atexit import atexit
import re import re
import threading import get_frame
import queue from pydantic import BaseModel,Field,field_validator,model_validator
import psutil
class Config(BaseModel):
save_to:Literal["single","multi"] = Field("single",description="保存到单文件夹或者每个子文件夹创建compress_dir")
crf: Optional[int] = Field(None, ge=0, le=51, description="CRF值范围0-51")
bitrate: Optional[str] = Field(None, description="比特率,格式如: 1000k, 2.5M, 1500B")
codec: str = Field("h264",description="ffmpeg的codec如果使用GPU需要对应设置")
hwaccel:Optional[Literal["amf","qsv","cuda"]] = Field(None,description="使用GPU加速")
extra:Optional[list[str]] = Field(None,description="插入到ffmpeg输出前的自定义参数")
ffmpeg:str = "ffmpeg"
ffprobe:str = "ffprobe"
manual:Optional[list[str]] = Field(None,description=r"手动设置ffmpeg命令ffmpeg -i {input} {manual} {output}")
video_ext:list[str] = Field([".mp4", ".mkv"],description="视频文件后缀,含.")
compress_dir_name:str = Field("compress",description="压缩文件夹名称")
resolution: Optional[str] = Field(None,description="统一到特定尺寸None为不使用缩放")
fps:int = Field(30,description="fps",ge=0)
test_video_resolution:str = "1920x1080"
test_video_fps:int = Field(30,ge=0)
test_video_input:str = "compress_video_test.mp4"
test_video_output:str = "compressed_video_test.mp4"
disable_hwaccel_when_fail:bool = Field(True,description="当运行失败时,禁用硬件加速")
@field_validator('bitrate')
@classmethod
def validate_bitrate(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return v
pattern = r'^[\d\.]+[MkB]*$'
if not re.match(pattern, v):
raise ValueError('bitrate格式不正确应为数字+单位(M/k/B),如: 1000k, 2.5M')
return v
@field_validator('resolution')
@classmethod
def validate_resolution(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return v
pattern = r'^((-1)|\d+):((-1)|\d+)$'
if not re.match(pattern, v):
raise ValueError('resolution格式不正确应为{数字/-1}:{数字/-1}')
return v
@field_validator("compress_dir_name")
# @field_validator("test_video_input")
# @field_validator("test_video_output")
@classmethod
def valid_path(cls, v:str) -> str:
if re.search(r'[\\/:*?"<>|\x00-\x1F]',v):
raise ValueError("某配置不符合目录名语法")
return v
@model_validator(mode='after')
def validate_mutual_exclusive(self):
crf_none = self.crf is None
bitrate_none = self.bitrate is None
# 有且只有一者为None
if crf_none == bitrate_none:
raise ValueError('crf和bitrate必须互斥有且只有一个为None')
return self
root = None root = None
if os.environ.get("INSTALL", "0") == "1":
CFG_FILE= Path(os.getenv("APPDATA", "C:/")) / "VideoCompress" / "config.json"
else:
CFG_FILE= Path(sys.path[0]) / "config.json"
CFG = { CFG = {
"save_to": "single", "save_to": "single",
"crf":"18", "crf": "18",
"bitrate": None, "bitrate": None,
"codec": "h264", "codec": "h264",
"hwaccel": None,
"extra": [], "extra": [],
"ffmpeg": "ffmpeg", "ffmpeg": "ffmpeg",
"manual": None, "manual": None,
"video_ext": [".mp4", ".mkv"], "video_ext": [".mp4", ".mkv"],
"compress_dir_name": "compress", "compress_dir_name": "compress",
"resolution": "-1:1080", "resolution": None,
"fps": "30", "fps": "30",
"test_video_resolution": "1920x1080", "test_video_resolution": "1920x1080",
"test_video_fps": "30", "test_video_fps": "30",
"test_video_input": "compress_video_test.mp4", "test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4", "test_video_output": "compressed_video_test.mp4",
"max_concurrent_instances": 2, "disable_hwaccel_when_fail": True,
"cpu_monitor_interval": 3, # CPU监控间隔
"cpu_monitor_duration": 30, # 统计持续时间5分钟
} }
# CPU监控相关全局变量
ffmpeg_processes = {} # 存储活动的ffmpeg进程
cpu_stats = {"system": [], "ffmpeg": []} # CPU使用率统计
cpu_monitor_thread = None
cpu_monitor_lock = threading.Lock()
current_instances = 0
instance_lock = threading.Lock()
def get_config_path() -> Path: def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
"""获取配置文件路径""" if isinstance(video_path, Path):
if os.environ.get("INSTALL", "0") == "1": video_path = str(video_path.resolve())
return Path(os.getenv("APPDATA", "C:/")) / "VideoCompress" / "config.json" if isinstance(output_file, Path):
else: output_file = str(output_file.resolve())
return Path("config.json").resolve()
def get_cmd(video_path,output_file):
if CFG["manual"] is not None: if CFG["manual"] is not None:
command=[ command = [CFG["ffmpeg"], "-hide_banner", "-i", video_path]
CFG["ffmpeg"],
"-hide_banner",
"-i", video_path
]
command.extend(CFG["manual"]) command.extend(CFG["manual"])
command.append(output_file) command.append(output_file)
return command return command
command = [
CFG["ffmpeg"],
"-hide_banner",
]
if CFG["hwaccel"] is not None:
command.extend(
[
"-hwaccel",
CFG["hwaccel"],
"-hwaccel_output_format",
CFG["hwaccel"],
]
)
command.extend(
[
"-i",
video_path,
]
)
if CFG["bitrate"] is not None: if CFG["bitrate"] is not None:
command = [
CFG["ffmpeg"], if CFG["resolution"] is not None:
"-hide_banner", command.extend(
"-i", video_path, [
"-vf",
f"scale={CFG['resolution']}",
] ]
if CFG['resolution'] is not None: )
command.extend([ command.extend(
"-vf", f"scale={CFG['resolution']}",]) [
command.extend([ "-c:v",
"-c:v", CFG["codec"], CFG["codec"],
"-b:v", CFG["bitrate"], "-b:v",
"-r",CFG["fps"], CFG["bitrate"],
"-r",
str(CFG["fps"]),
"-y", "-y",
]) ]
)
else: else:
command = [ if CFG["resolution"] is not None:
CFG["ffmpeg"], command.extend(
"-hide_banner", [
"-i", video_path, "-vf",
f"scale={CFG['resolution']}",
] ]
if CFG['resolution'] is not None: )
command.extend([ command.extend(
"-vf", f"scale={CFG['resolution']}",]) [
command.extend([ "-c:v",
"-c:v", CFG["codec"], CFG["codec"],
"-global_quality", str(CFG["crf"]), "-global_quality",
"-r",CFG["fps"], str(CFG["crf"]),
"-r",
str(CFG["fps"]),
"-y", "-y",
]) ]
)
command.extend(CFG["extra"]) command.extend(CFG["extra"])
command.append(output_file) command.append(output_file)
logging.debug(f"Create CMD: {command}")
return command return command
# 配置logging # 配置logging
def setup_logging(): def setup_logging():
log_dir = Path("logs") log_dir = Path("logs")
log_dir.mkdir(exist_ok=True) log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log" log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log"
stream = RichHandler(rich_tracebacks=True,tracebacks_show_locals=True) stream = RichHandler(rich_tracebacks=True, tracebacks_show_locals=True)
stream.setLevel(logging.INFO) stream.setLevel(logging.INFO)
stream.setFormatter(logging.Formatter("%(message)s")) stream.setFormatter(logging.Formatter("%(message)s"))
file = logging.FileHandler(log_file, encoding='utf-8') file = logging.FileHandler(log_file, encoding="utf-8")
file.setLevel(logging.DEBUG) file.setLevel(logging.DEBUG)
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.DEBUG,
format='%(asctime)s - %(levelname) 7s - %(message)s', format="%(asctime)s - %(levelname) 7s - %(message)s",
handlers=[ handlers=[file, stream],
file,
stream
]
) )
def fmt_time(t:float|int) -> str:
if t>3600: def fmt_time(t: float | int) -> str:
if t > 3600:
return f"{t//3600}h {t//60}min {t%60}s" return f"{t//3600}h {t//60}min {t%60}s"
elif t>60: elif t > 60:
return f"{t//60}min {t%60}s" return f"{t//60}min {t%60}s"
else: else:
return f"{round(t)}s" return f"{round(t)}s"
def cpu_monitor():
"""CPU监控线程函数"""
global cpu_stats
while True: def process_video(
try: video_path: Path,
# 获取系统CPU使用率 compress_dir: Optional[Path] = None,
system_cpu = psutil.cpu_percent(interval=1) update_func: Optional[Callable[[Optional[int], Optional[str]], None]] = None,
):
# 获取所有ffmpeg进程的CPU使用率
ffmpeg_cpu_total = 0
active_processes = []
with cpu_monitor_lock:
for proc_info in ffmpeg_processes.values():
try:
proc = proc_info['process']
if proc.is_running():
# print(proc,proc.cpu_percent() / psutil.cpu_count())
ffmpeg_cpu_total += proc.cpu_percent() / psutil.cpu_count()
active_processes.append(proc_info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 更新统计数据
with cpu_monitor_lock:
cpu_stats["system"].append(system_cpu)
cpu_stats["ffmpeg"].append(ffmpeg_cpu_total)
# 保持最近5分钟的数据
max_samples = CFG["cpu_monitor_duration"] // CFG["cpu_monitor_interval"]
if len(cpu_stats["system"]) > max_samples:
cpu_stats["system"] = cpu_stats["system"][-max_samples:]
if len(cpu_stats["ffmpeg"]) > max_samples:
cpu_stats["ffmpeg"] = cpu_stats["ffmpeg"][-max_samples:]
except KeyboardInterrupt as e:
raise e
except Exception as e:
logging.error(f"CPU监控异常: {e}")
# 等待下一次监控
threading.Event().wait(CFG["cpu_monitor_interval"])
def start_cpu_monitor():
"""启动CPU监控线程"""
global cpu_monitor_thread
if cpu_monitor_thread is None or not cpu_monitor_thread.is_alive():
cpu_monitor_thread = threading.Thread(target=cpu_monitor, daemon=True)
cpu_monitor_thread.start()
logging.info("CPU监控线程已启动")
def get_cpu_usage_stats():
"""获取CPU使用率统计"""
with cpu_monitor_lock:
if not cpu_stats["system"] or not cpu_stats["ffmpeg"]:
return None, None
system_avg = sum(cpu_stats["system"]) / len(cpu_stats["system"])
ffmpeg_avg = sum(cpu_stats["ffmpeg"]) / len(cpu_stats["ffmpeg"])
return system_avg, ffmpeg_avg
def should_increase_instances():
"""判断是否应该增加实例数"""
system_avg, ffmpeg_avg = get_cpu_usage_stats()
if system_avg is None or ffmpeg_avg is None:
return False
# 条件: 系统CPU - FFmpeg CPU > FFmpeg CPU * 2 + 0.1
available_cpu = 100 - system_avg
threshold = ffmpeg_avg # 10% = 0.1 * 100
logging.debug(f"CPU统计: 系统平均={system_avg:.1f}%, FFmpeg平均={ffmpeg_avg:.1f}%, 可用={available_cpu:.1f}%, 阈值={threshold:.1f}%")
return available_cpu > threshold
def register_ffmpeg_process(proc_id, process):
"""注册ffmpeg进程用于监控"""
with cpu_monitor_lock:
ffmpeg_processes[proc_id] = {
'process': psutil.Process(process.pid),
'start_time': time()
}
def unregister_ffmpeg_process(proc_id):
"""注销ffmpeg进程"""
with cpu_monitor_lock:
if proc_id in ffmpeg_processes:
del ffmpeg_processes[proc_id]
def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_func=None, proc_id=None):
global current_instances
use=None
sz=video_path.stat().st_size//(1024*1024)
bgn=time()
if compress_dir is None: if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在) # 在视频文件所在目录下创建 compress 子目录(如果不存在)
compress_dir = video_path.parent / CFG["compress_dir_name"] compress_dir = video_path.parent / CFG["compress_dir_name"]
else: else:
assert root
compress_dir /= video_path.parent.relative_to(root) compress_dir /= video_path.parent.relative_to(root)
assert isinstance(compress_dir,Path) assert isinstance(compress_dir, Path)
compress_dir.mkdir(exist_ok=True,parents=True) compress_dir.mkdir(exist_ok=True, parents=True)
# 输出文件路径:与原文件同名,保存在 compress 目录下 # 输出文件路径:与原文件同名,保存在 compress 目录下
output_file = compress_dir / (video_path.stem + video_path.suffix) output_file = compress_dir / (video_path.stem + video_path.suffix)
if output_file.is_file(): if output_file.is_file():
logging.warning(f"文件{output_file}存在,跳过") logging.warning(f"文件{output_file}存在,跳过")
return use return False
video_path_str = str(video_path.absolute()) video_path_str = str(video_path.absolute())
command = get_cmd(video_path_str,output_file) command = get_cmd(video_path_str, output_file)
try: try:
with instance_lock:
current_instances += 1
logging.debug(f"启动FFmpeg进程 {proc_id}: {video_path.name}")
result = subprocess.Popen( result = subprocess.Popen(
command, command,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
encoding="utf-8", encoding="utf-8",
text=True text=True,
) )
# 注册进程用于CPU监控 total = ""
if proc_id:
register_ffmpeg_process(proc_id, result)
while result.poll() is None: while result.poll() is None:
line = " " line = " "
while result.poll() is None and line[-1:] not in "\r\n": while result.poll() is None and line[-1:] not in "\r\n":
line+=result.stderr.read(1) assert result.stderr is not None
if 'warning' in line.lower(): line += result.stderr.read(1)
logging.warning(f"[FFmpeg {proc_id}]({video_path_str}): {line}") total += line[-1]
elif 'error' in line.lower(): # print(line[-1])
logging.error(f"[FFmpeg {proc_id}]({video_path_str}): {line}") if "warning" in line.lower():
elif "frame=" in line: logging.warning(f"[FFmpeg]({video_path_str}): {line}")
match = re.search(r"frame=\s*(\d+)",line) elif "error" in line.lower():
if match: logging.error(f"[FFmpeg]({video_path_str}): {line}")
frame_number = int(match.group(1)) elif "assertion" in line.lower():
if update_func is not None: logging.error(f"[FFmpeg]({video_path_str}): {line}")
update_func(frame_number) elif "frame=" in line and update_func is not None:
# print(line,end="")
match = re.search(r"frame=\s*(\d+)", line)
frame_number = int(match.group(1)) if match else None
match = re.search(r"[\d\.]+x", line)
rate = match.group(0) if match else None
update_func(frame_number, rate)
if result.returncode != 0: if result.returncode != 0:
logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode}cmd={' '.join(map(str,command))}") logging.error(
logging.error(result.stdout.read()) f"处理文件 {video_path_str} 失败"
logging.error(result.stderr.read()) )
logging.debug(f"返回码: {result.returncode}; cmd={' '.join(command)}")
output_file.unlink(missing_ok=True)
assert result.stdout is not None
logging.debug(result.stdout.read())
logging.debug(total)
if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in [
"h264_mediacodec",
"hevc_mediacodec",
]:
logging.info(
"mediacodec硬件加速器已知在较短片段上存在异常将禁用加速重试。"
)
output_file.unlink(missing_ok=True)
bak = CFG.copy()
CFG["hwaccel"] = None
CFG["codec"] = "h264" if CFG["codec"] == "h264_mediacodec" else "hevc"
assert not output_file.exists()
ret = process_video(video_path, compress_dir, update_func)
CFG.update(bak)
if not ret:
logging.error("重试仍然失败。")
return False
else:
return True
elif CFG["disable_hwaccel_when_fail"] and CFG["hwaccel"] is not None:
logging.info("正在禁用硬件加速器重试,进度条可能发生混乱。")
output_file.unlink(missing_ok=True)
bak = CFG.copy()
CFG["hwaccel"] = None
if (
CFG["codec"].endswith("_mediacodec")
or CFG["codec"].endswith("_qsv")
or CFG["codec"].endswith("_nvenc")
or CFG["codec"].endswith("_amf")
):
CFG["codec"] = CFG["codec"].split("_")[0]
assert not output_file.exists()
ret = process_video(video_path, compress_dir, update_func)
CFG.update(bak)
if not ret:
logging.error("重试仍然失败。")
return False
else: else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}") logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
except KeyboardInterrupt as e:raise e except KeyboardInterrupt as e:
raise e
except Exception as e: except Exception as e:
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",exc_info=e) logging.error(
finally: f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",
# 注销进程监控 exc_info=e,
if proc_id: )
unregister_ffmpeg_process(proc_id) return False
return True
with instance_lock:
current_instances -= 1
logging.debug(f"FFmpeg进程 {proc_id} 已结束")
return use
def traverse_directory(root_dir: Path): def traverse_directory(root_dir: Path):
global current_instances
video_extensions = set(CFG["video_ext"]) video_extensions = set(CFG["video_ext"])
sm = None
# 获取视频文件列表和帧数信息 # 获取视频文件列表和帧数信息
video_files = [] video_files:list[Path] = []
que = [root_dir] que = list(root_dir.glob("*"))
while que: while que:
d = que.pop() d = que.pop()
for file in d.glob("*"): for file in d.glob("*") if d.is_dir() else [d]:
if file.parent.name == CFG["compress_dir_name"] or file.name == CFG["compress_dir_name"]: if (
file.parent.name == CFG["compress_dir_name"]
or file.name == CFG["compress_dir_name"]
):
continue continue
if file.is_file() and file.suffix.lower() in video_extensions: if file.is_file() and file.suffix.lower() in video_extensions:
video_files.append(file) video_files.append(file)
elif file.is_dir(): elif file.is_dir():
que.append(file) que.append(file)
# exit()
if not video_files: if not video_files:
logging.warning("未找到需要处理的视频文件") logging.warning("未找到需要处理的视频文件")
return return
# 获取视频信息 # 获取视频信息
frames: dict[Path, float] = {}
cached_data: dict[Path, float] = {}
info_file = Path("video_info.cache")
if info_file.is_file():
try:
cached_data = loads(info_file.read_bytes())
if isinstance(cached_data, dict):
logging.debug("Loaded video info from cache.")
else:
cached_data = {}
except Exception as e:
logging.debug("Failed to load video info cache.", exc_info=e)
with Progress() as prog: with Progress() as prog:
task = prog.add_task("正在获取视频信息", total=len(video_files)) task = prog.add_task("正在获取视频信息", total=len(video_files))
frames: dict[Path, float] = {}
for file in video_files: for file in video_files:
prog.advance(task) prog.advance(task)
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split() if file in cached_data and cached_data[file] > 0:
cmd.append(str(file)) frames[file] = cached_data[file]
proc = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if proc.returncode != 0:
logging.debug(f"无法获取视频信息: {file}, 返回码: {proc.returncode}")
frames[file] = 0
continue continue
if proc.stdout.strip(): fr = get_frame.get_video_frame_count(str(file.resolve()))
if fr is None:
logging.debug(
f"无法获取视频信息: {file}, 时长为N/A默认使用0s"
)
frames[file] = 0 if fr is None else fr
if 0 in frames.values():
logging.warning(
f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。"
)
prog.remove_task(task)
try: try:
avg_frame_rate, duration = proc.stdout.strip().split('\n') info_file.write_bytes(dumps(frames))
tmp = avg_frame_rate.split('/') logging.debug("Saved video info to cache.")
avg_frame_rate = float(tmp[0]) / float(tmp[1]) except Exception as e:
if duration == "N/A": logging.debug("Failed to save video info cache.", exc_info=e)
duration = 0
logging.debug(f"无法获取视频信息: {file}, 时长为N/A默认使用0s")
duration = float(duration)
frames[file] = duration * avg_frame_rate
except (ValueError, IndexError) as e:
logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
frames[file] = 0
logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件") logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件")
# 启动CPU监控
start_cpu_monitor()
# 创建进度条 # 创建进度条
with Progress() as prog: with Progress() as prog:
total_frames = sum(frames.values()) total_frames = sum(frames.values())
main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames)) main_task = prog.add_task(
"总进度", total=total_frames if total_frames > 0 else len(frames)
)
# 创建文件队列 # 创建文件队列
file_queue = queue.Queue()
for file in frames.keys(): for file in frames.keys():
file_queue.put(file)
# 进度跟踪 # 进度跟踪
progress_trackers = {}
completed_files = 0
total_completed_frames = 0
def create_progress_updater(file_path, task_id):
def update_progress(frame_count):
nonlocal total_completed_frames
if file_path in progress_trackers:
old_frames = progress_trackers[file_path]
diff = frame_count - old_frames
total_completed_frames += diff
else:
total_completed_frames += frame_count
progress_trackers[file_path] = frame_count
if frames[file_path] > 0:
prog.update(task_id, completed=frame_count)
else:
prog.update(task_id, description=f"{file_path.relative_to(root_dir)} 已处理{frame_count}")
# 更新总进度
if total_frames > 0:
prog.update(main_task, completed=total_completed_frames)
return update_progress
def process_file_worker():
nonlocal completed_files
while True:
try:
file = file_queue.get(timeout=1)
except queue.Empty:
break
filename = file.relative_to(root_dir) filename = file.relative_to(root_dir)
# 创建文件级进度条 # 创建文件级进度条
@ -406,107 +409,71 @@ def traverse_directory(root_dir: Path):
else: else:
file_task = prog.add_task(f"{filename}", total=frames[file]) file_task = prog.add_task(f"{filename}", total=frames[file])
progress_updater = create_progress_updater(file, file_task) with prog._lock:
completed_start = prog._tasks[main_task].completed
# 处理视频 def update_progress(x, rate):
proc_id = f"worker_{threading.current_thread().ident}_{completed_files}" if frames[file] == 0:
prog.update(
file_task,
description=f"{filename} 已处理{x}{f'速率{rate}' if rate else ''}",
)
else:
prog.update(
file_task,
completed=x,
description=f"{filename} {f'速率{rate}' if rate else ''}",
)
prog.update(main_task, completed=completed_start + x)
if CFG["save_to"] == "single": if CFG["save_to"] == "single":
process_video(file, root_dir/"Compress", progress_updater, proc_id) process_video(
file, root_dir / CFG["compress_dir_name"], update_progress
)
else: else:
process_video(file, None, progress_updater, proc_id) process_video(file, None, update_progress)
# 更新完成计数
with instance_lock:
completed_files += 1
if total_frames == 0: # 如果没有总帧数,按文件数计算
prog.update(main_task, completed=completed_files)
# 移除文件级进度条 # 移除文件级进度条
prog.update(main_task, completed=completed_start + frames[file])
prog.remove_task(file_task) prog.remove_task(file_task)
file_queue.task_done()
# 动态管理线程数
active_threads = []
max_workers = CFG["max_concurrent_instances"]
def manage_workers():
nonlocal active_threads
while completed_files < len(frames) or any(t.is_alive() for t in active_threads):
# 清理已完成的线程
active_threads = [t for t in active_threads if t.is_alive()]
# 检查是否需要增加实例
current_worker_count = len(active_threads)
if current_worker_count < max_workers and not file_queue.empty():
# 检查CPU使用率运行5分钟后开始检查
should_add_worker = False
if len(cpu_stats["system"]) >= 10: # 至少有5分钟的数据
if current_worker_count >= 1: # 已有实例运行
should_add_worker = should_increase_instances()
if should_add_worker:
logging.info("CPU资源充足启动第二个压缩实例")
else:
should_add_worker = False
if should_add_worker:
worker_thread = threading.Thread(target=process_file_worker, daemon=True)
worker_thread.start()
active_threads.append(worker_thread)
logging.debug(f"启动新的工作线程,当前活动线程数: {len(active_threads)}")
threading.Event().wait(5) # 每5秒检查一次
# 等待所有线程完成
for thread in active_threads:
thread.join()
# 启动第一个工作线程
if not file_queue.empty():
first_worker = threading.Thread(target=process_file_worker, daemon=True)
first_worker.start()
active_threads.append(first_worker)
logging.info("启动第一个压缩实例")
# 启动线程管理器
manager_thread = threading.Thread(target=manage_workers, daemon=True)
manager_thread.start()
# 等待管理线程完成
manager_thread.join()
logging.info(f"所有视频处理完成,共处理了 {completed_files} 个文件")
def test():
os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
try: try:
subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode() info_file.unlink(missing_ok=True)
except KeyboardInterrupt as e:raise e except Exception as e:
logging.warning("无法删除视频信息缓存文件", exc_info=e)
def test():
os.environ["PATH"] = (
Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
)
try:
subprocess.run(
[CFG["ffmpeg"], "-version"], stdout=-3, stderr=-3
).check_returncode()
except Exception as e: except Exception as e:
print(__file__) print(__file__)
logging.critical("无法运行ffmpeg") logging.critical("无法运行ffmpeg")
exit(-1) exit(-1)
try: try:
ret = subprocess.run( ret = subprocess.run(
f"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(), f"{CFG['ffmpeg']} -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True text=True,
) )
if ret.returncode != 0: if ret.returncode != 0:
logging.warning("无法生成测试视频.") logging.warning("无法生成测试视频.")
logging.debug(ret.stdout) logging.debug(ret.stdout)
logging.debug(ret.stderr) logging.debug(ret.stderr)
ret.check_returncode() ret.check_returncode()
cmd = get_cmd(CFG["test_video_input"],CFG["test_video_output"],) cmd = get_cmd(
CFG["test_video_input"],
CFG["test_video_output"],
)
ret = subprocess.run( ret = subprocess.run(
cmd, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
) )
if ret.returncode != 0: if ret.returncode != 0:
logging.error("测试视频压缩失败") logging.error("测试视频压缩失败")
@ -514,23 +481,28 @@ def test():
logging.debug(ret.stderr) logging.debug(ret.stderr)
logging.error("Error termination via test failed.") logging.error("Error termination via test failed.")
exit(-1) exit(-1)
if get_frame.get_video_frame_count("compress_video_test.mp4") is None:
logging.error("测试读取帧数失败,将无法正确显示进度。")
os.remove("compress_video_test.mp4") os.remove("compress_video_test.mp4")
os.remove("compressed_video_test.mp4") os.remove("compressed_video_test.mp4")
except KeyboardInterrupt as e:raise e except KeyboardInterrupt as e:
raise e
except Exception as e: except Exception as e:
if os.path.exists("compress_video_test.mp4"): if os.path.exists("compress_video_test.mp4"):
os.remove("compress_video_test.mp4") os.remove("compress_video_test.mp4")
logging.warning("测试未通过,继续运行可能出现未定义行为。") logging.warning("测试未通过,继续运行可能出现未定义行为。")
logging.debug("Test error",exc_info=e) logging.debug("Test error", exc_info=e)
def exit_pause(): def exit_pause():
if os.name == 'nt': if os.name == "nt":
os.system("pause") os.system("pause")
elif os.name == 'posix': elif os.name == "posix":
os.system("read -p 'Press Enter to continue...'") os.system("read -p 'Press Enter to continue...'")
def main(_root = None):
def main(_root=None):
atexit.register(exit_pause) atexit.register(exit_pause)
@ -538,17 +510,31 @@ def main(_root = None):
setup_logging() setup_logging()
tot_bgn = time() tot_bgn = time()
logging.info("-------------------------------") logging.info("-------------------------------")
logging.info(datetime.now().strftime('Video Compress started at %Y/%m/%d %H:%M')) logging.info(datetime.now().strftime("Video Compress started at %Y/%m/%d %H:%M"))
if get_config_path().exists(): if CFG_FILE.exists():
try: try:
import json import json
cfg:dict = json.loads(get_config_path().read_text())
CFG.update(cfg) cfg: dict = json.loads(CFG_FILE.read_text())
except KeyboardInterrupt as e:raise e cfg_model = Config(**cfg)
CFG.update(cfg_model.model_dump())
get_frame.ffprobe = CFG["ffprobe"]
logging.debug(cfg_model)
logging.debug(CFG)
except KeyboardInterrupt as e:
raise e
except Exception as e: except Exception as e:
logging.warning("Invalid config file, ignored.") logging.warning("Invalid config file, ignored.")
logging.debug(e) logging.debug(e)
else:
try:
import json
CFG_FILE.write_text(json.dumps(CFG, indent=4))
logging.info("Config file created.")
except Exception as e:
logging.warning("Failed to create config file.", exc_info=e)
if _root is not None: if _root is not None:
root = Path(_root) root = Path(_root)
@ -579,9 +565,16 @@ def main(_root = None):
logging.info(f"Elapsed time: {fmt_time(tot_end-tot_bgn)}") logging.info(f"Elapsed time: {fmt_time(tot_end-tot_bgn)}")
logging.info("Normal termination of Video Compress.") logging.info("Normal termination of Video Compress.")
except KeyboardInterrupt: except KeyboardInterrupt:
logging.warning("Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.") logging.warning(
"Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED."
)
except Exception as e: except Exception as e:
logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e) logging.error(
"Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",
exc_info=e,
)
if __name__ == "__main__": if __name__ == "__main__":
sys.argv.append(r'C:\Users\flt\Documents\WeChat Files\wxid_m8h0igh8p52p22\FileStorage\Video')
main() main()

View File

@ -0,0 +1 @@
rich

View File

@ -1,8 +1,11 @@
import PyPDF2 # PyMuPDF import PyPDF2 # PyMuPDF
from PyPDF2.generic import IndirectObject
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Optional
from itertools import repeat
def copy_pdf_pages(input_path: str, output_path: str) -> bool: def copy_pdf_pages(input_path: Path, output_path: Path) -> bool:
""" """
移除PDF文件的所有限制 移除PDF文件的所有限制
@ -17,13 +20,36 @@ def copy_pdf_pages(input_path: str, output_path: str) -> bool:
try: try:
with open(input_path, 'rb') as input_file: with open(input_path, 'rb') as input_file:
reader = PyPDF2.PdfReader(input_file) reader = PyPDF2.PdfReader(input_file)
writer = PyPDF2.PdfWriter() writer = PyPDF2.PdfWriter()
# 复制所有页面 # 复制所有页面
for page in reader.pages: for page in reader.pages:
writer.add_page(page) writer.add_page(page)
try:
que = list(zip(repeat(None),reader.outline))
last:Optional[IndirectObject] = None
for par, it in que:
if isinstance(it, list):
que.extend(zip(repeat(last),it))
continue
title = getattr(it, 'title', None)
if title is None:
try:
title = str(it)
except Exception:
print(f"警告:无法获取书签标题,跳过该书签.")
continue
page_num = reader.get_destination_page_number(it)
if page_num is None:
continue
last = writer.add_outline_item(title, page_num, parent=par)
except Exception as e:
print(f"警告:{input_path.name}书签处理失败.")
# 写入新文件(不设置任何加密或限制) # 写入新文件(不设置任何加密或限制)
with open(output_path, 'wb') as output_file: with open(output_path, 'wb') as output_file:
writer.write(output_file) writer.write(output_file)
@ -34,50 +60,6 @@ def copy_pdf_pages(input_path: str, output_path: str) -> bool:
print(f"移除PDF限制时发生错误: {e}") print(f"移除PDF限制时发生错误: {e}")
return False return False
# def copy_pdf_pages(input_file, output_file):
# """
# 读取PDF文件并逐页复制到新的PDF文件
# Args:
# input_file (str): 输入PDF文件路径
# output_file (str): 输出PDF文件路径
# """
# try:
# # 检查输入文件是否存在
# if not os.path.exists(input_file):
# print(f"错误:输入文件 '{input_file}' 不存在")
# return False
# # 打开输入PDF文件
# pdf_document = fitz.open(input_file)
# # 创建新的PDF文档
# new_pdf = fitz.open()
# new_pdf.insert_pdf(pdf_document)
# # 保存输出文件
# new_pdf.save(output_file)
# # 关闭文档
# pdf_document.close()
# new_pdf.close()
# return True
# except FileNotFoundError:
# print(f"错误:找不到文件 '{input_file}'")
# return False
# except PermissionError:
# print(f"错误:权限不足,无法访问文件")
# return False
# except Exception as pdf_error:
# error_msg = str(pdf_error).lower()
# if "damaged" in error_msg or "corrupt" in error_msg:
# print(f"错误PDF文件 '{input_file}' 已损坏")
# else:
# print(f"发生错误:{str(pdf_error)}")
# return False
def main(): def main():
"""主函数""" """主函数"""
if len(sys.argv) < 2: if len(sys.argv) < 2:
@ -89,12 +71,12 @@ def main():
else: else:
input_path = Path(sys.argv[1]) input_path = Path(sys.argv[1])
if input_path.is_dir(): if input_path.is_dir():
files = list(input_path.glob("**/*.pdf")) files = list(input_path.rglob("*.pdf"))
else: else:
print("正在处理",input_path.name) print("正在处理",input_path.name)
output_file = input_path.with_name(f"{input_path.stem}_decrypt.pdf") output_file = input_path.with_name(f"{input_path.stem}_decrypt.pdf")
success = copy_pdf_pages(input_path, output_file) suc = copy_pdf_pages(input_path, output_file)
print("处理完成" if success else "处理失败") print("处理完成" if suc else "处理失败")
return return
total = len(files) total = len(files)
@ -102,15 +84,13 @@ def main():
for i, pdf_file in enumerate(files, start=1): for i, pdf_file in enumerate(files, start=1):
rate= round(i/total *100) rate= round(i/total *100)
print(f"进度: ", "-"* (rate//5)," "*(20-rate//5), f" {rate}%",sep="",end="\r") print(f"进度: ", "-"* (rate//5)," "*(20-rate//5), f" {rate}%",sep="",end="\r")
import time
# time.sleep(1) # 模拟处理时间
if not pdf_file.is_file(): if not pdf_file.is_file():
print(f"跳过非PDF文件{pdf_file}") print(f"跳过非PDF文件{pdf_file}")
continue continue
output_file = pdf_file.with_name(f"{pdf_file.stem}_decrypt.pdf") output_file = pdf_file.with_name(f"{pdf_file.stem}_decrypt.pdf")
success = copy_pdf_pages(pdf_file, output_file) suc = copy_pdf_pages(pdf_file, output_file)
if not success: if not suc:
print(f"{pdf_file.name} 处理失败") print(f"{pdf_file.name} 处理失败")
if __name__ == "__main__": if __name__ == "__main__":

153
pdf_unlock/ui.py Normal file
View File

@ -0,0 +1,153 @@
from __future__ import annotations
from pathlib import Path
import threading
import traceback
import tkinter as tk
from tkinter import filedialog, messagebox
import customtkinter as ctk
import sys
from tkinterdnd2 import TkinterDnD, DND_FILES # type: ignore
DND_AVAILABLE = True
from main import copy_pdf_pages # type: ignore
from main import main as dummy_main # to avoid linter error
APP_TITLE = "PDF 解锁(拖入即可)"
SUFFIX = "_decrypt"
def parse_dropped_paths(tcl_list: str, tk_root: tk.Misc) -> list[Path]:
"""将 DND 的 raw 字符串解析为 Path 列表(兼容空格/花括号)。"""
return [Path(p) for p in tk_root.tk.splitlist(tcl_list)]
def gather_pdfs(paths: list[Path]) -> list[Path]:
"""根据传入路径自动识别:单文件 / 多文件 / 目录(递归),提取所有 PDF。"""
out: list[Path] = []
for p in paths:
if p.is_dir():
out.extend([f for f in p.rglob("*.pdf") if f.is_file()])
elif p.is_file() and p.suffix.lower() == ".pdf":
out.append(p)
# 去重并按路径排序
uniq = sorted({f.resolve() for f in out})
return list(uniq)
def output_path_for(in_path: Path) -> Path:
return in_path.with_name(f"{in_path.stem}{SUFFIX}.pdf")
class App:
def __init__(self, root: tk.Tk):
self.root = root
self.root.title(APP_TITLE)
self.root.geometry("720x360")
ctk.set_appearance_mode("System")
ctk.set_default_color_theme("blue")
# ---- 单一可点击/可拖拽区域 ----
self.drop = ctk.CTkFrame(self.root, height=240, corner_radius=12)
self.drop.pack(fill="both", expand=True, padx=20, pady=20)
self.label = ctk.CTkLabel(
self.drop,
text=(
"将 PDF 文件或文件夹拖入此区域即可开始解锁\n"
"输出在原文件同目录,文件名加上 _decrypt 后缀"
+ ("(拖拽可用)")
),
font=("微软雅黑", 16),
justify="center",
)
self.label.place(relx=0.5, rely=0.5, anchor="center")
# 点击同样可选择(依然只有这一个控件)
self.drop.bind("<Button-1>", self._on_click_select)
self.label.bind("<Button-1>", self._on_click_select)
if DND_AVAILABLE:
self.drop.drop_target_register(DND_FILES) # type: ignore
self.drop.dnd_bind("<<Drop>>", self._on_drop) # type: ignore
# ---- 事件 ----
def _on_click_select(self, _evt=None):
# 仅一个简单文件选择器;若想选目录,可直接把目录拖进来
files = filedialog.askopenfilenames(
title="选择 PDF 文件(可多选)",
filetypes=[("PDF 文件", "*.pdf"), ("所有文件", "*.*")],
)
if files:
self._start_process([Path(f) for f in files])
def _on_drop(self, event):
try:
paths = parse_dropped_paths(event.data, self.root)
except Exception:
return
self._start_process(paths)
# ---- 核心处理 ----
def _start_process(self, raw_paths: list[Path]):
if copy_pdf_pages is None:
messagebox.showerror(
"错误",
"未能从 main.py 导入 copy_pdf_pages(input_path: Path, output_path: Path) -> bool",
)
return
pdfs = gather_pdfs(raw_paths)
if not pdfs:
messagebox.showwarning("提示", "未找到任何 PDF 文件。")
return
# 后台线程,避免 UI 卡死
self.label.configure(text=f"发现 {len(pdfs)} 个 PDF开始处理…")
t = threading.Thread(target=self._worker, args=(pdfs,), daemon=True)
t.start()
def _worker(self, pdfs: list[Path]):
ok, fail = 0, 0
errors: list[str] = []
for f in pdfs:
try:
out_path = output_path_for(f)
# 简化:若已存在,直接覆盖
out_path.parent.mkdir(parents=True, exist_ok=True)
success = bool(copy_pdf_pages(f, out_path)) # type: ignore
if success:
ok += 1
else:
fail += 1
except Exception as e:
fail += 1
errors.append(f"{f}: {e}{traceback.format_exc()}")
summary = f"完成:成功 {ok},失败 {fail}。输出文件位于各自原目录。"
self._set_status(summary)
if errors:
# 仅在有错误时弹出详情
messagebox.showerror("部分失败", summary + "\n" + "\n".join(errors[:3]))
else:
messagebox.showinfo("完成", summary)
def _set_status(self, text: str):
self.label.configure(text=text)
def main():
root: tk.Misc
if DND_AVAILABLE:
root = TkinterDnD.Tk() # type: ignore
else:
root = tk.Tk()
App(root)
root.mainloop()
if __name__ == "__main__":
if len(sys.argv)>=2:
dummy_main()
else:
main()