Compare commits

...

16 Commits

Author SHA1 Message Date
db56f1da62 update VideoCompress 2026-01-11 13:19:56 +08:00
9ba34f8d2e optimize CLI and config 2026-01-11 13:05:35 +08:00
cae41d9bb0 update VideoCompress 2026-01-11 12:40:07 +08:00
6f304a634c add README in calc utils 2026-01-11 11:43:35 +08:00
5e94b202b5 calc utils 2026-01-11 11:37:11 +08:00
5fca3520f6 Merge branch 'master' of flt6.top:flt/tools 2025-12-21 14:26:07 +08:00
6dd2501fec upgrade VideoCompress 2025-12-21 14:23:24 +08:00
a1b26632e9 github_down 2025-12-21 14:21:12 +08:00
9bb73a633f single file for ui 2025-11-02 17:02:32 +08:00
1e550961d2 pdf_unlock: add ui 2025-11-01 00:12:29 +08:00
d454f8c8f4 pdf_unlock: inline no recursion 2025-10-29 23:05:14 +08:00
abf64d9cd6 pdf unlock: add outlines 2025-10-29 22:45:45 +08:00
f56675c486 use pydantic to validate config 2025-10-29 00:24:13 +08:00
4ae07c57cc fix and enhance get frame. 2025-10-24 22:37:18 +08:00
983ad0c8b6 format 2025-10-20 22:35:24 +08:00
072a198032 Enhance Videocompress 2025-10-20 22:35:03 +08:00
21 changed files with 2641 additions and 6027 deletions

View File

@ -4,4 +4,5 @@ config.json
*.xml *.xml
tmp tmp
build build
dist dist
video_info.cache

View File

@ -1,12 +1,23 @@
{ {
"save_to": "single", "save_to": "single",
"crf": 18, "crf": null,
"codec": "h264", "bitrate": "15M",
"codec": "h264_qsv",
"hwaccel": "qsv",
"extra": [],
"ffmpeg": "ffmpeg", "ffmpeg": "ffmpeg",
"ffprobe": "ffprobe",
"manual": null,
"video_ext": [ "video_ext": [
".mp4", ".mp4",
".mkv" ".mkv"
], ],
"extra": [], "compress_dir_name": "compress",
"train": false "resolution": null,
"fps": 30,
"test_video_resolution": "1920x1080",
"test_video_fps": 30,
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
"disable_hwaccel_when_fail": true
} }

View File

@ -2,9 +2,20 @@
"save_to": "single", "save_to": "single",
"bitrate": "3M", "bitrate": "3M",
"codec": "h264_mediacodec", "codec": "h264_mediacodec",
"hwaccel": "mediacodec",
"ffmpeg": "ffmpeg", "ffmpeg": "ffmpeg",
"video_ext": [ "video_ext": [
".mp4", ".mp4",
".mkv" ".mkv"
], ],
"resolution": "1920x1080",
"extra": [],
"manual": null,
"compress_dir_name": "compress",
"fps": 30,
"test_video_resolution": "1920x1080",
"test_video_fps": 30,
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
"disable_hwaccel_when_fail": true
} }

View File

@ -0,0 +1,599 @@
# -*- coding: utf-8 -*-
"""
Fluent Design 配置界面PySide6 + QFluentWidgets
功能要点:
- 拆分「通用」与「高级」两页,符合需求
- 内置 GPU 模板CPU/libx264、NVIDIA(NVENC/cuda)、Intel(QSV)、AMD(AMF)
- 与 Pydantic Config 对接:即时校验,错误信息以 InfoBar 呈现
- 生成/预览 ffmpeg 命令可对单文件执行subprocess.run
- 可导入/导出配置JSON
依赖:
pip install PySide6 qfluentwidgets pydantic
注意:
- 按需在同目录准备 main.py导出
from pydantic import BaseModel
class Config(BaseModel): ...
def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]: ...
- 本 UI 会在运行前,把实例化后的配置对象挂到 main 模块:`main.config = cfg`
若你的 get_cmd 内部使用全局 config将能读到该对象。
"""
from __future__ import annotations
import json
import os
import re
import sys
import subprocess
from typing import Optional
from PySide6.QtCore import Qt
from PySide6.QtWidgets import (
QApplication, QFileDialog, QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QTextEdit
)
# QFluentWidgets
from qfluentwidgets import (
FluentWindow, setTheme, Theme, setThemeColor, FluentIcon,
NavigationItemPosition, PrimaryPushButton, PushButton, LineEdit,
ComboBox, SpinBox, SwitchButton, InfoBar, InfoBarPosition,
CardWidget, NavigationPushButton
)
# Pydantic
from pydantic import ValidationError
# ---- 导入业务对象 -----------------------------------------------------------
try:
import main as main_module # 用于挂载 config 实例
from main import Config, get_cmd # 直接调用用户提供的方法
except Exception as e: # 允许先运行 UI 草拟界面,不阻断
main_module = None
Config = None # type: ignore
get_cmd = None # type: ignore
print("[警告] 无法从 main 导入 Config / get_cmd", e)
# --------------------------- 工具函数 ----------------------------------------
def show_error(parent: QWidget, title: str, message: str):
InfoBar.error(
title=title,
content=message,
orient=Qt.Horizontal,
isClosable=True,
position=InfoBarPosition.TOP_RIGHT,
duration=5000,
parent=parent,
)
def show_success(parent: QWidget, title: str, message: str):
InfoBar.success(
title=title,
content=message,
orient=Qt.Horizontal,
isClosable=True,
position=InfoBarPosition.TOP_RIGHT,
duration=3500,
parent=parent,
)
# --------------------------- GPU 模板 ----------------------------------------
GPU_TEMPLATES = {
"CPU (libx264)": {
"hwaccel": None,
"codec": "libx264",
"extra": ["-preset", "slow"],
"crf": 23,
"bitrate": None,
},
"NVIDIA (NVENC/cuda)": {
"hwaccel": "cuda",
"codec": "h264_nvenc",
"extra": ["-preset", "p6", "-rc", "vbr"],
"crf": None,
"bitrate": "5M",
},
"Intel (QSV)": {
"hwaccel": "qsv",
"codec": "h264_qsv",
"extra": ["-preset", "balanced"],
"crf": None,
"bitrate": "5M",
},
"AMD (AMF)": {
"hwaccel": "amf",
"codec": "h264_amf",
"extra": ["-quality", "balanced"],
"crf": None,
"bitrate": "5M",
},
}
SAVE_TO_OPTIONS = ["single", "multi"]
HWACCEL_OPTIONS: list[Optional[str]] = [None, "cuda", "qsv", "amf"]
# --------------------------- 卡片基础 ----------------------------------------
class GroupCard(CardWidget):
def __init__(self, title: str, subtitle: str = "", parent: QWidget | None = None):
super().__init__(parent)
self.setMinimumWidth(720)
self.v = QVBoxLayout(self)
self.v.setContentsMargins(16, 12, 16, 16)
t = QLabel(f"<b>{title}</b>")
t.setProperty("cssClass", "cardTitle")
s = QLabel(subtitle)
s.setProperty("cssClass", "cardSubTitle")
s.setStyleSheet("color: rgba(0,0,0,0.55);")
self.v.addWidget(t)
if subtitle:
self.v.addWidget(s)
self.body = QVBoxLayout()
self.body.setSpacing(10)
self.v.addLayout(self.body)
def add_row(self, label: str, widget: QWidget):
row = QHBoxLayout()
lab = QLabel(label)
lab.setMinimumWidth(150)
row.addWidget(lab)
row.addWidget(widget, 1)
self.body.addLayout(row)
# --------------------------- 通用设置页 --------------------------------------
class GeneralPage(QWidget):
def __init__(self, parent: QWidget | None = None):
super().__init__(parent)
layout = QVBoxLayout(self)
layout.setContentsMargins(20, 16, 20, 20)
layout.setSpacing(12)
# --- I/O 区 ---
io_card = GroupCard("输入/输出", "选择要压缩的视频与输出文件")
self.input_edit = LineEdit(self)
self.input_edit.setPlaceholderText("选择输入视频文件... (.mp4/.mkv 等)")
btn_in = PrimaryPushButton("选择文件")
btn_in.clicked.connect(self._pick_input)
io_row = QHBoxLayout()
io_row.addWidget(self.input_edit, 1)
io_row.addWidget(btn_in)
self.output_edit = LineEdit(self)
self.output_edit.setPlaceholderText("选择输出文件路径,例如:/path/compressed.mp4")
btn_out = PushButton("选择输出")
btn_out.clicked.connect(self._pick_output)
oo_row = QHBoxLayout()
oo_row.addWidget(self.output_edit, 1)
oo_row.addWidget(btn_out)
io_card.body.addLayout(io_row)
io_card.body.addLayout(oo_row)
# --- 基础编码参数 ---
base_card = GroupCard("基础编码", "最常用的编码参数")
self.save_to_combo = ComboBox()
self.save_to_combo.addItems(SAVE_TO_OPTIONS)
base_card.add_row("保存方式(save_to)", self.save_to_combo)
self.codec_combo = ComboBox()
self.codec_combo.addItems(["h264", "libx264", "h264_nvenc", "h264_qsv", "h264_amf", "hevc", "hevc_nvenc", "hevc_qsv", "hevc_amf"])
base_card.add_row("编码器(codec)", self.codec_combo)
self.hwaccel_combo = ComboBox()
self.hwaccel_combo.addItems(["(无)", "cuda", "qsv", "amf"])
base_card.add_row("GPU 加速(hwaccel)", self.hwaccel_combo)
self.resolution_edit = LineEdit()
self.resolution_edit.setPlaceholderText("如 1920:1080 或 -1:1080留空为不缩放")
base_card.add_row("分辨率(resolution)", self.resolution_edit)
self.fps_spin = SpinBox()
self.fps_spin.setRange(0, 999)
self.fps_spin.setValue(30)
base_card.add_row("帧率(fps)", self.fps_spin)
self.video_ext_edit = LineEdit()
self.video_ext_edit.setText(".mp4,.mkv")
base_card.add_row("视频后缀(video_ext)", self.video_ext_edit)
self.compress_dir_edit = LineEdit()
self.compress_dir_edit.setText("compress")
base_card.add_row("压缩目录名(compress_dir_name)", self.compress_dir_edit)
self.disable_hw_switch = SwitchButton("失败时禁用硬件加速(disable_hwaccel_when_fail)")
self.disable_hw_switch.setChecked(True)
base_card.body.addWidget(self.disable_hw_switch)
# --- 模板 ---
tpl_card = GroupCard("GPU 模板", "一键应用推荐参数(会覆盖 codec/hwaccel/crf/bitrate/extra")
self.tpl_combo = ComboBox()
self.tpl_combo.addItems(list(GPU_TEMPLATES.keys()))
btn_apply_tpl = PrimaryPushButton("应用模板")
btn_apply_tpl.clicked.connect(self.apply_template)
row_tpl = QHBoxLayout()
row_tpl.addWidget(self.tpl_combo, 1)
row_tpl.addWidget(btn_apply_tpl)
tpl_card.body.addLayout(row_tpl)
# --- 运行/预览 ---
run_card = GroupCard("执行", "根据当前配置生成并运行 ffmpeg 命令")
self.preview_text = QTextEdit()
self.preview_text.setReadOnly(True)
btn_preview = PushButton("生成命令预览")
btn_preview.clicked.connect(self.on_preview)
btn_run = PrimaryPushButton("运行压缩 (subprocess.run)")
btn_run.clicked.connect(self.on_run)
rrow = QHBoxLayout()
rrow.addWidget(btn_preview)
rrow.addWidget(btn_run)
run_card.body.addWidget(self.preview_text)
run_card.body.addLayout(rrow)
# 排版
layout.addWidget(io_card)
layout.addWidget(base_card)
layout.addWidget(tpl_card)
layout.addWidget(run_card)
layout.addStretch(1)
# 高级区联动:由 AdvancedPage 控制
self.link_advanced: Optional[AdvancedPage] = None
# ----- 文件选择 -----
def _pick_input(self):
fn, _ = QFileDialog.getOpenFileName(self, "选择输入视频", "", "视频文件 (*.mp4 *.mkv *.mov *.avi *.*)")
if fn:
self.input_edit.setText(fn)
def _pick_output(self):
fn, _ = QFileDialog.getSaveFileName(self, "选择输出文件", "compressed.mp4", "视频文件 (*.mp4 *.mkv)")
if fn:
self.output_edit.setText(fn)
# ----- 模板应用 -----
def apply_template(self):
name = self.tpl_combo.currentText()
t = GPU_TEMPLATES.get(name, {})
# 覆盖通用字段
codec = t.get("codec")
if codec:
self.codec_combo.setCurrentText(codec)
hw = t.get("hwaccel")
if hw is None:
self.hwaccel_combo.setCurrentIndex(0)
else:
self.hwaccel_combo.setCurrentText(str(hw))
# 覆盖高级字段(若可用)
if self.link_advanced:
self.link_advanced.apply_template_from_general(t)
show_success(self, "已应用模板", f"{name} 参数已填充")
# ----- 组装配置 -----
def _collect_config_payload(self) -> dict:
# CRF / bitrate 值来自高级区
adv = self.link_advanced
crf_val = adv.crf_spin.value() if adv else None
if adv and adv.mode_combo.currentText() == "CRF 模式":
bitrate_val = None
else:
bitrate_val = adv.bitrate_edit.text().strip() or None
# hwaccel
_hw_text = self.hwaccel_combo.currentText()
hwaccel_val = None if _hw_text == "(无)" else _hw_text
# video_ext 解析
exts = [x.strip() for x in self.video_ext_edit.text().split(',') if x.strip()]
payload = {
"save_to": self.save_to_combo.currentText(),
"crf": crf_val if (adv and adv.mode_combo.currentText() == "CRF 模式") else None,
"bitrate": bitrate_val if (adv and adv.mode_combo.currentText() == "比特率模式") else None,
"codec": self.codec_combo.currentText(),
"hwaccel": hwaccel_val,
"extra": adv.parse_list_field(adv.extra_edit.toPlainText()) if adv else None,
"ffmpeg": adv.ffmpeg_edit.text().strip() if adv else "ffmpeg",
"ffprobe": adv.ffprobe_edit.text().strip() if adv else "ffprobe",
"manual": adv.parse_list_field(adv.manual_edit.toPlainText()) if (adv and adv.manual_switch.isChecked()) else None,
"video_ext": exts or [".mp4", ".mkv"],
"compress_dir_name": self.compress_dir_edit.text().strip() or "compress",
"resolution": self.resolution_edit.text().strip() or None,
"fps": self.fps_spin.value(),
"test_video_resolution": adv.test_res_edit.text().strip() if adv else "1920x1080",
"test_video_fps": adv.test_fps_spin.value() if adv else 30,
"test_video_input": adv.test_in_edit.text().strip() if adv else "compress_video_test.mp4",
"test_video_output": adv.test_out_edit.text().strip() if adv else "compressed_video_test.mp4",
"disable_hwaccel_when_fail": self.disable_hw_switch.isChecked(),
}
return payload
def build_config(self) -> Config:
if Config is None:
raise RuntimeError("未能导入 Config。请确认 main.py 可用且在同目录。")
payload = self._collect_config_payload()
try:
cfg = Config(**payload)
except ValidationError as e:
show_error(self, "配置校验失败", str(e))
raise
return cfg
# ----- 预览/运行 -----
def on_preview(self):
if get_cmd is None:
show_error(self, "缺少 get_cmd", "未能导入 get_cmd请检查 main.py")
return
in_path = self.input_edit.text().strip()
out_path = self.output_edit.text().strip()
if not in_path or not out_path:
show_error(self, "缺少路径", "请先选择输入与输出文件")
return
try:
cfg = self.build_config()
if main_module is not None:
setattr(main_module, "config", cfg) # 挂载到 main 供 get_cmd 读取
cmd = get_cmd(in_path, out_path)
self.preview_text.setPlainText(" ".join([repr(c) if " " in c else c for c in cmd]))
show_success(self, "命令已生成", "如需执行请点击运行")
except Exception as e:
show_error(self, "生成失败", str(e))
def on_run(self):
if get_cmd is None:
show_error(self, "缺少 get_cmd", "未能导入 get_cmd请检查 main.py")
return
in_path = self.input_edit.text().strip()
out_path = self.output_edit.text().strip()
if not in_path or not out_path:
show_error(self, "缺少路径", "请先选择输入与输出文件")
return
try:
cfg = self.build_config()
if main_module is not None:
setattr(main_module, "config", cfg)
cmd = get_cmd(in_path, out_path)
# 使用 subprocess.run 执行
completed = subprocess.run(cmd, capture_output=True, text=True)
log = (completed.stdout or "") + "\n" + (completed.stderr or "")
self.preview_text.setPlainText(log)
if completed.returncode == 0:
show_success(self, "执行成功", "视频压缩完成")
else:
show_error(self, "执行失败", f"返回码 {completed.returncode}")
except Exception as e:
show_error(self, "执行异常", str(e))
# --------------------------- 高级设置页 --------------------------------------
class AdvancedPage(QWidget):
def __init__(self, parent: QWidget | None = None):
super().__init__(parent)
layout = QVBoxLayout(self)
layout.setContentsMargins(20, 16, 20, 20)
layout.setSpacing(12)
# 模式切换CRF / Bitrate 互斥
mode_card = GroupCard("编码模式", "CRF 与 比特率 二选一")
self.mode_combo = ComboBox()
self.mode_combo.addItems(["CRF 模式", "比特率模式"])
self.mode_combo.currentIndexChanged.connect(self._on_mode_changed)
mode_card.add_row("选择模式", self.mode_combo)
self.crf_spin = SpinBox()
self.crf_spin.setRange(0, 51)
self.crf_spin.setValue(23)
mode_card.add_row("CRF 值(0-51)", self.crf_spin)
self.bitrate_edit = LineEdit()
self.bitrate_edit.setPlaceholderText("如 1000k / 2.5M / 1500B")
mode_card.add_row("比特率", self.bitrate_edit)
# 其它高级参数
adv_card = GroupCard("高级参数", "额外 ffmpeg 参数 / 手动命令 / 可执行路径")
self.extra_edit = QTextEdit()
self.extra_edit.setPlaceholderText("每行一个参数;例如:\n-preset\np6\n-rc\nvbr")
adv_card.add_row("额外(extra)", self.extra_edit)
self.manual_switch = SwitchButton("使用手动命令 (manual)")
self.manual_switch.setChecked(False)
adv_card.body.addWidget(self.manual_switch)
self.manual_edit = QTextEdit()
self.manual_edit.setPlaceholderText("每行一个参数(会插入到 ffmpeg -i {input} {manual} {output} 中)")
self.manual_edit.setEnabled(False)
self.manual_switch.checkedChanged.connect(lambda c: self.manual_edit.setEnabled(c))
adv_card.body.addWidget(self.manual_edit)
self.ffmpeg_edit = LineEdit()
self.ffmpeg_edit.setText("ffmpeg")
adv_card.add_row("ffmpeg 路径", self.ffmpeg_edit)
self.ffprobe_edit = LineEdit()
self.ffprobe_edit.setText("ffprobe")
adv_card.add_row("ffprobe 路径", self.ffprobe_edit)
# 测试参数
test_card = GroupCard("测试", "用于快速验证配置是否能跑通")
self.test_res_edit = LineEdit(); self.test_res_edit.setText("1920x1080")
self.test_fps_spin = SpinBox(); self.test_fps_spin.setRange(0, 999); self.test_fps_spin.setValue(30)
self.test_in_edit = LineEdit(); self.test_in_edit.setText("compress_video_test.mp4")
self.test_out_edit = LineEdit(); self.test_out_edit.setText("compressed_video_test.mp4")
test_card.add_row("测试分辨率", self.test_res_edit)
test_card.add_row("测试 FPS", self.test_fps_spin)
test_card.add_row("测试输入文件名", self.test_in_edit)
test_card.add_row("测试输出文件名", self.test_out_edit)
# backup_card = GroupCard("备份配置", "导入导出当前配置")
layout.addWidget(mode_card)
layout.addWidget(adv_card)
layout.addWidget(test_card)
layout.addStretch(1)
self._on_mode_changed(0)
def _on_mode_changed(self, idx: int):
is_crf = (idx == 0)
self.crf_spin.setEnabled(is_crf)
self.bitrate_edit.setEnabled(not is_crf)
# 解析多行 -> list[str]
@staticmethod
def parse_list_field(text: str) -> Optional[list[str]]:
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
return lines or None
# 从通用模板带入高级字段
def apply_template_from_general(self, tpl: dict):
if tpl.get("crf") is not None:
self.mode_combo.setCurrentText("CRF 模式")
self.crf_spin.setValue(int(tpl["crf"]))
self.bitrate_edit.clear()
else:
self.mode_combo.setCurrentText("比特率模式")
self.bitrate_edit.setText(str(tpl.get("bitrate", "5M")))
self.extra_edit.setPlainText("\n".join(tpl.get("extra", [])))
# --------------------------- 主窗体 ------------------------------------------
class MainWindow(FluentWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("视频压缩配置 - Fluent UI")
self.resize(980, 720)
# 主题与主色
setTheme(Theme.AUTO)
setThemeColor("#2563eb") # Tailwind 蓝-600
# 页面
self.general = GeneralPage()
self.general.setObjectName("general_page")
self.advanced = AdvancedPage()
self.advanced.setObjectName("advanced_page")
self.general.link_advanced = self.advanced
# 导航
self.addSubInterface(self.general, FluentIcon.VIDEO, "通用 Config")
self.addSubInterface(self.advanced, FluentIcon.SETTING, "高级 Config")
# 顶部右侧:操作按钮
# self.navigationInterface.addWidget( # type: ignore
# routeKey="spacer",
# widget=QLabel(""),
# onClick=None,
# position=NavigationItemPosition.TOP
# )
# 导入/导出配置
export_btn = NavigationPushButton(FluentIcon.SAVE, "导出配置", False, self.navigationInterface)
export_btn.clicked.connect(self.export_config)
import_btn = NavigationPushButton(FluentIcon.FOLDER, "导入配置", False, self.navigationInterface)
import_btn.clicked.connect(self.import_config)
self.titleBar.raise_() # 确保标题栏在顶层
self.navigationInterface.addWidget( # type: ignore
routeKey="export",
widget=export_btn,
onClick=None,
position=NavigationItemPosition.BOTTOM
)
self.navigationInterface.addWidget( # type: ignore
routeKey="import",
widget=import_btn,
onClick=None,
position=NavigationItemPosition.BOTTOM
)
# ---------------- 导入/导出 -----------------
def export_config(self):
try:
cfg = self.general.build_config()
except ValidationError:
return
fn, _ = QFileDialog.getSaveFileName(self, "导出配置为 JSON", "config.json", "JSON (*.json)")
if not fn:
return
with open(fn, 'w', encoding='utf-8') as f:
json.dump(json.loads(cfg.model_dump_json()), f, ensure_ascii=False, indent=2)
show_success(self, "已导出", os.path.basename(fn))
def import_config(self):
fn, _ = QFileDialog.getOpenFileName(self, "导入配置 JSON", "", "JSON (*.json)")
if not fn:
return
try:
with open(fn, 'r', encoding='utf-8') as f:
data = json.load(f)
# 回填到界面(允许缺省)
self._fill_general(data)
self._fill_advanced(data)
show_success(self, "已导入", os.path.basename(fn))
except Exception as e:
show_error(self, "导入失败", str(e))
def _fill_general(self, d: dict):
if v := d.get("save_to"):
self.general.save_to_combo.setCurrentText(v)
if v := d.get("codec"):
self.general.codec_combo.setCurrentText(v)
hw = d.get("hwaccel")
self.general.hwaccel_combo.setCurrentIndex(0 if hw in (None, "", "None") else max(1, self.general.hwaccel_combo.findText(str(hw))))
if v := d.get("resolution"):
self.general.resolution_edit.setText(v)
if v := d.get("fps"):
self.general.fps_spin.setValue(int(v))
if v := d.get("video_ext"):
if isinstance(v, list):
self.general.video_ext_edit.setText(",".join(v))
else:
self.general.video_ext_edit.setText(str(v))
if v := d.get("compress_dir_name"):
self.general.compress_dir_edit.setText(v)
if v := d.get("disable_hwaccel_when_fail") is not None:
self.general.disable_hw_switch.setChecked(bool(v))
def _fill_advanced(self, d: dict):
crf = d.get("crf")
bitrate = d.get("bitrate")
if crf is not None and bitrate is None:
self.advanced.mode_combo.setCurrentText("CRF 模式")
self.advanced.crf_spin.setValue(int(crf))
self.advanced.bitrate_edit.clear()
else:
self.advanced.mode_combo.setCurrentText("比特率模式")
if bitrate is not None:
self.advanced.bitrate_edit.setText(str(bitrate))
if v := d.get("extra"):
self.advanced.extra_edit.setPlainText("\n".join(v if isinstance(v, list) else [str(v)]))
if v := d.get("manual"):
self.advanced.manual_switch.setChecked(True)
self.advanced.manual_edit.setEnabled(True)
self.advanced.manual_edit.setPlainText("\n".join(v if isinstance(v, list) else [str(v)]))
if v := d.get("ffmpeg"):
self.advanced.ffmpeg_edit.setText(str(v))
if v := d.get("ffprobe"):
self.advanced.ffprobe_edit.setText(str(v))
if v := d.get("test_video_resolution"):
self.advanced.test_res_edit.setText(str(v))
if v := d.get("test_video_fps"):
self.advanced.test_fps_spin.setValue(int(v))
if v := d.get("test_video_input"):
self.advanced.test_in_edit.setText(str(v))
if v := d.get("test_video_output"):
self.advanced.test_out_edit.setText(str(v))
# --------------------------- 入口 --------------------------------------------
if __name__ == "__main__":
app = QApplication(sys.argv)
win = MainWindow()
win.show()
sys.exit(app.exec())

142
VideoCompress/get_frame.py Normal file
View File

@ -0,0 +1,142 @@
import json
import logging
import subprocess
from fractions import Fraction
from decimal import Decimal
from typing import Optional, Tuple
ffprobe:str = "ffprobe"
class FFProbeError(RuntimeError):
pass
def _run_ffprobe(args: list[str]) -> dict:
"""运行 ffprobe 并以 JSON 返回,若失败抛异常。"""
# 始终要求 JSON 输出,便于稳健解析
base = [ffprobe, "-v", "error", "-print_format", "json"]
proc = subprocess.run(base + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
raise FFProbeError(proc.stderr.strip() or "ffprobe 调用失败")
try:
return json.loads(proc.stdout or "{}")
except json.JSONDecodeError as e:
raise FFProbeError(f"无法解析 ffprobe 输出为 JSON: {e}")
def _try_nb_frames(path: str, stream_index: int) -> Optional[int]:
data = _run_ffprobe([
"-select_streams", f"v:{stream_index}",
"-show_entries", "stream=nb_frames",
path
])
streams = data.get("streams") or []
if not streams:
logging.debug("_try_nb_frames: failed no stream")
return None
nb = streams[0].get("nb_frames")
if nb and nb != "N/A":
try:
n = int(nb)
return n if n >= 0 else None
except ValueError:
logging.debug(f"_try_nb_frames: failed nb not positive int: {nb}")
return None
logging.debug(f"_try_nb_frames: failed nb NA: {nb}")
return None
def _try_avgfps_times_duration(path: str, stream_index: int) -> Optional[int]:
# 读 avg_frame_rate
s = _run_ffprobe([
"-select_streams", f"v:{stream_index}",
"-show_entries", "stream=avg_frame_rate",
path
])
streams = s.get("streams") or []
if not streams:
return None
afr = streams[0].get("avg_frame_rate")
if not afr or afr in ("0/0", "N/A"):
return None
# 读容器时长(单位:秒)
f = _run_ffprobe(["-show_entries", "format=duration", path])
dur_str = (f.get("format") or {}).get("duration")
if not dur_str:
logging.debug(f"_try_avgfps_times_duration: failed no dur_str, {f}")
return None
try:
fps = Fraction(afr) # 形如 "30000/1001"
dur = Decimal(dur_str)
# 四舍五入到最近整数,避免系统性低估
est = int(dur * Decimal(fps.numerator) / Decimal(fps.denominator) + Decimal("0.5"))
return est if est >= 0 else None
except Exception as e:
logging.debug("_try_avgfps_times_duration: failed",exc_info=e)
return None
def _try_count_packets(path: str, stream_index: int) -> Optional[int]:
# 统计读取到的包数(不解码)。大多容器≈帧数,但不保证 1:1
data = _run_ffprobe([
"-select_streams", f"v:{stream_index}",
"-count_packets",
"-show_entries", "stream=nb_read_packets",
path
])
streams = data.get("streams") or []
if not streams:
logging.debug("_try_count_packets: failed no stream")
return None
nbp = streams[0].get("nb_read_packets")
try:
n = int(nbp)
return n if n >= 0 else None
except Exception as e:
logging.debug("_try_count_packets: failed",exc_info=e)
return None
def get_video_frame_count(
path: str,
stream_index: int = 0,
fallback_order: Tuple[str, ...] = ("nb_frames", "avg*dur", "count_packets"),
) -> int|None:
"""
估计/获取视频总帧数(带回退)。
参数:
- path: 视频文件路径
- stream_index: 选择哪个视频流,默认 0
- allow_slow_decode: 是否允许用解码全片的方式(最慢但最准)
- fallback_order: 回退顺序,四种方法的别名可选:
"nb_frames" -> 直接读元数据中的总帧数
"avg*dur" -> 平均帧率 × 时长(最快估算)
"count_packets" -> 统计包数(较快,接近帧数但不保证)
返回:
(frame_count, method_used)
异常:
- FileNotFoundError: ffprobe 未安装
- FFProbeError: ffprobe 调用异常或无法解析
- RuntimeError: 所有方法均失败
"""
methods = {
"nb_frames": _try_nb_frames,
"avg*dur": _try_avgfps_times_duration,
"count_packets": _try_count_packets,
}
for key in fallback_order:
try:
try:
func = methods.get(key)
if not func:
continue
n = func(path, stream_index)
except Exception:
logging.debug(f"Errored to get frame with {key}.",exc_info=True)
continue
if isinstance(n, int) and n >= 0:
return n
else:
logging.debug(f"Failed to get frame with {key}")
except Exception as e:
logging.debug(f"Errored to get frame with {key}.",exc_info=e)
return None
raise RuntimeError("无法获取或估计帧数:所有回退方法均失败。")

View File

@ -8,317 +8,565 @@ from time import time
from rich.logging import RichHandler from rich.logging import RichHandler
from rich.progress import Progress from rich.progress import Progress
from pickle import dumps, loads from pickle import dumps, loads
from typing import Optional,Callable from typing import Optional, Callable, Literal, List, Any, TYPE_CHECKING
import atexit import atexit
import re import re
import get_frame
import json
import argparse
import shutil
try:
from pydantic import BaseModel, Field, field_validator, model_validator
HAS_PYDANTIC = True
class Config(BaseModel):
save_to: Literal["single", "multi"] = Field("single", description="保存到单文件夹或者每个子文件夹创建compress_dir")
crf: Optional[int] = Field(18, ge=0, le=51, description="CRF值范围0-51")
bitrate: Optional[str] = Field(None, description="比特率,格式如: 1000k, 2.5M, 1500B")
codec: str = Field("h264", description="ffmpeg的codec如果使用GPU需要对应设置")
hwaccel: Optional[Literal["amf", "qsv", "cuda"]] = Field(None, description="使用GPU加速")
extra: List[str] = Field([], description="插入到ffmpeg输出前的自定义参数")
ffmpeg: str = "ffmpeg"
ffprobe: str = "ffprobe"
manual: Optional[List[str]] = Field(None, description=r"手动设置ffmpeg命令ffmpeg -i {input} {manual} {output}")
video_ext: List[str] = Field([".mp4", ".mkv"], description="视频文件后缀,含.")
compress_dir_name: str = Field("compress", description="压缩文件夹名称")
resolution: Optional[str] = Field(None, description="统一到特定尺寸None为不使用缩放")
fps: int = Field(30, description="fps", ge=0)
test_video_resolution: str = "1920x1080"
test_video_fps: int = Field(30, ge=0)
test_video_input: str = "compress_video_test.mp4"
test_video_output: str = "compressed_video_test.mp4"
disable_hwaccel_when_fail: bool = Field(True, description="当运行失败时,禁用硬件加速")
@field_validator('bitrate')
@classmethod
def validate_bitrate(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return v
pattern = r'^[\d\.]+[MkB]*$'
if not re.match(pattern, v):
raise ValueError('bitrate格式不正确应为数字+单位(M/k/B),如: 1000k, 2.5M')
return v
@field_validator('resolution')
@classmethod
def validate_resolution(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return v
pattern = r'^((-1)|\d+):((-1)|\d+)$'
if not re.match(pattern, v):
raise ValueError('resolution格式不正确应为{数字/-1}:{数字/-1}')
return v
@field_validator("compress_dir_name")
@classmethod
def valid_path(cls, v: str) -> str:
if re.search(r'[\\/:*?"<>|\x00-\x1F]', v):
raise ValueError("某配置不符合目录名语法")
return v
@model_validator(mode='after')
def validate_mutual_exclusive(self):
crf_none = self.crf is None
bitrate_none = self.bitrate is None
# 有且只有一者为None
if crf_none == bitrate_none:
raise ValueError('crf和bitrate必须互斥有且只有一个为None')
return self
def dump(self):
return self.model_dump()
except ImportError:
HAS_PYDANTIC = False
from dataclasses import dataclass, asdict
import copy
@dataclass
class Config:
save_to: str = "single"
crf: Optional[int] = 18
bitrate: Optional[str] = None
codec: str = "h264"
hwaccel: Optional[str] = None
extra: List[str] = []
ffmpeg: str = "ffmpeg"
ffprobe: str = "ffprobe"
manual: Optional[List[str]] = None
video_ext: List[str] = [".mp4", ".mkv"]
compress_dir_name: str = "compress"
resolution: Optional[str] = None
fps: int = 30
test_video_resolution: str = "1920x1080"
test_video_fps: int = 30
test_video_input: str = "compress_video_test.mp4"
test_video_output: str = "compressed_video_test.mp4"
disable_hwaccel_when_fail: bool = True
def update(self, other):
if isinstance(other, dict):
d = other
elif isinstance(other, Config):
d = asdict(other)
else:
return
for k, v in d.items():
if hasattr(self, k):
setattr(self, k, v)
def copy(self):
return copy.deepcopy(self)
def dump(self):
return asdict(self)
root = None root = None
CFG_FILE = Path(sys.path[0])/"config.json" if os.environ.get("INSTALL", "0") == "1":
CFG = { CFG_FILE = Path(os.getenv("APPDATA", "C:/")) / "VideoCompress" / "config.json"
"save_to": "single", else:
"crf":"18", CFG_FILE = Path(sys.path[0]) / "config.json"
"bitrate": None,
"codec": "h264",
"extra": [],
"ffmpeg": "ffmpeg",
"manual": None,
"video_ext": [".mp4", ".mkv"],
"compress_dir_name": "compress",
"resolution": "-1:1080",
"fps": "30",
"test_video_resolution": "1920x1080",
"test_video_fps": "30",
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
}
if CFG_FILE.exists():
try:
import json
def get_cmd(video_path:str|Path,output_file:str|Path) -> list[str]: if HAS_PYDANTIC:
assert BaseModel # type: ignore
assert issubclass(Config, BaseModel)
CFG = Config.model_validate_json(CFG_FILE.read_text())
else:
assert Config
cfg:dict[str, Any] = json.loads(CFG_FILE.read_text())
CFG = Config(**cfg)
get_frame.ffprobe = CFG.ffprobe
logging.debug(CFG)
except KeyboardInterrupt as e:
raise e
except Exception as e:
logging.warning("Invalid config file, ignored.")
logging.debug(e)
else:
try:
if HAS_PYDANTIC:
if TYPE_CHECKING:
assert BaseModel # type: ignore
assert issubclass(Config, BaseModel)
CFG = Config() # type: ignore
CFG_FILE.write_text(CFG.model_dump_json(indent=4))
else:
import json
if TYPE_CHECKING:
assert Config
assert asdict # type: ignore
CFG = Config() # type: ignore
CFG_FILE.write_text(json.dumps(asdict(CFG), indent=4))
logging.info("Config file created.")
except Exception as e:
logging.warning("Failed to create config file.", exc_info=e)
current_running_file:Optional[Path] = None
def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
if isinstance(video_path, Path): if isinstance(video_path, Path):
video_path = str(video_path.resolve()) video_path = str(video_path.resolve())
if isinstance(output_file, Path): if isinstance(output_file, Path):
output_file = str(output_file.resolve()) output_file = str(output_file.resolve())
if CFG["manual"] is not None: if CFG.manual is not None:
command=[ command = [CFG.ffmpeg, "-hide_banner", "-i", video_path]
CFG["ffmpeg"], command.extend(CFG.manual)
"-hide_banner",
"-i", video_path
]
command.extend(CFG["manual"])
command.append(output_file) command.append(output_file)
return command return command
if CFG["bitrate"] is not None: command = [
command = [ CFG.ffmpeg,
CFG["ffmpeg"], "-hide_banner",
"-hide_banner", ]
"-i", video_path, if CFG.hwaccel is not None:
] command.extend(
if CFG['resolution'] is not None: [
command.extend([ "-hwaccel",
"-vf", f"scale={CFG['resolution']}",]) CFG.hwaccel,
command.extend([ "-hwaccel_output_format",
"-c:v", CFG["codec"], CFG.hwaccel,
"-b:v", CFG["bitrate"],
"-r",CFG["fps"],
"-y",
])
else:
command = [
CFG["ffmpeg"],
"-hide_banner",
"-i", video_path,
] ]
if CFG['resolution'] is not None: )
command.extend([ command.extend(
"-vf", f"scale={CFG['resolution']}",]) [
command.extend([ "-i",
"-c:v", CFG["codec"], video_path,
"-global_quality", str(CFG["crf"]), ]
"-r",CFG["fps"], )
"-y",
]) if CFG.bitrate is not None:
command.extend(CFG["extra"]) if CFG.resolution is not None:
command.extend(
[
"-vf",
f"scale={CFG.resolution}",
]
)
command.extend(
[
"-c:v",
CFG.codec,
"-b:v",
CFG.bitrate,
"-r",
str(CFG.fps),
"-y",
]
)
else:
if CFG.resolution is not None:
command.extend(
[
"-vf",
f"scale={CFG.resolution}",
]
)
command.extend(
[
"-c:v",
CFG.codec,
"-global_quality",
str(CFG.crf),
"-r",
str(CFG.fps),
"-y",
]
)
command.extend(CFG.extra)
command.append(output_file) command.append(output_file)
logging.debug(f"Create CMD: {command}") logging.debug(f"Create CMD: {command}")
return command return command
# 配置logging # 配置logging
def setup_logging(): def setup_logging(verbose: bool = False):
log_dir = Path("logs") log_dir = Path("logs")
log_dir.mkdir(exist_ok=True) log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log" log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log"
stream = RichHandler(rich_tracebacks=True,tracebacks_show_locals=True) stream = RichHandler(level=logging.DEBUG if verbose else logging.INFO,
stream.setLevel(logging.INFO) rich_tracebacks=True, tracebacks_show_locals=True)
# stream.setLevel(logging.DEBUG if verbose else logging.INFO)
stream.setFormatter(logging.Formatter("%(message)s")) stream.setFormatter(logging.Formatter("%(message)s"))
file = logging.FileHandler(log_file, encoding='utf-8') file = logging.FileHandler(log_file, encoding="utf-8")
file.setLevel(logging.DEBUG) file.setLevel(logging.DEBUG)
# 清除现有的handlers避免多次调用basicConfig无效
logging.getLogger().handlers.clear()
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.DEBUG,
format='%(asctime)s - %(levelname) 7s - %(message)s', format="%(asctime)s - %(levelname) 7s - %(message)s",
handlers=[ handlers=[stream, file],
file,
stream
]
) )
logging.debug("Logging is set up.")
def fmt_time(t:float|int) -> str:
if t>3600: def fmt_time(t: float | int) -> str:
if t > 3600:
return f"{t//3600}h {t//60}min {t%60}s" return f"{t//3600}h {t//60}min {t%60}s"
elif t>60: elif t > 60:
return f"{t//60}min {t%60}s" return f"{t//60}min {t%60}s"
else: else:
return f"{round(t)}s" return f"{round(t)}s"
def process_video( def process_video(
video_path: Path, video_path: Path,
compress_dir:Optional[Path]=None , compress_dir: Optional[Path] = None,
update_func:Optional[Callable[[Optional[int],Optional[str]],None]]=None): update_func: Optional[Callable[[Optional[int], Optional[str]], None]] = None,
):
global current_running_file
if compress_dir is None: if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在) # 在视频文件所在目录下创建 compress 子目录(如果不存在)
compress_dir = video_path.parent / CFG["compress_dir_name"] compress_dir = video_path.parent / CFG.compress_dir_name
else: else:
assert root assert root
compress_dir /= video_path.parent.relative_to(root) compress_dir /= video_path.parent.relative_to(root)
assert isinstance(compress_dir,Path) assert isinstance(compress_dir, Path)
compress_dir.mkdir(exist_ok=True,parents=True) compress_dir.mkdir(exist_ok=True, parents=True)
# 输出文件路径:与原文件同名,保存在 compress 目录下 # 输出文件路径:与原文件同名,保存在 compress 目录下
output_file = compress_dir / (video_path.stem + video_path.suffix) output_file = compress_dir / (video_path.stem + video_path.suffix)
if output_file.is_file(): if output_file.is_file():
logging.warning(f"文件{output_file}存在,跳过") logging.warning(f"文件{output_file}存在,跳过")
return return False
video_path_str = str(video_path.absolute()) video_path_str = str(video_path.absolute())
command = get_cmd(video_path_str,output_file) command = get_cmd(video_path_str, output_file)
current_running_file = output_file
try: try:
result = subprocess.Popen( result = subprocess.Popen(
command, command,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
encoding="utf-8", encoding="utf-8",
text=True text=True,
) )
total = ""
while result.poll() is None: while result.poll() is None:
line = " " line = " "
while result.poll() is None and line[-1:] not in "\r\n": while result.poll() is None and line[-1:] not in "\r\n":
assert result.stderr is not None assert result.stderr is not None
line+=result.stderr.read(1) line += result.stderr.read(1)
total += line[-1]
# print(line[-1]) # print(line[-1])
if 'warning' in line.lower(): if "warning" in line.lower():
logging.warning(f"[FFmpeg]({video_path_str}): {line}") logging.warning(f"[FFmpeg]({video_path_str}): {line}")
elif 'error' in line.lower(): elif "error" in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "assertion" in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}") logging.error(f"[FFmpeg]({video_path_str}): {line}")
elif "frame=" in line and update_func is not None: elif "frame=" in line and update_func is not None:
# print(line,end="") # print(line,end="")
match = re.search(r"frame=\s*(\d+)",line) match = re.search(r"frame=\s*(\d+)", line)
frame_number = int(match.group(1)) if match else None frame_number = int(match.group(1)) if match else None
match = re.search(r"[\d\.]+x",line) match = re.search(r"[\d\.]+x", line)
rate = match.group(0) if match else None rate = match.group(0) if match else None
update_func(frame_number,rate) update_func(frame_number, rate)
current_running_file = None
if result.returncode != 0: if result.returncode != 0:
logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode}cmd={' '.join(command)}") logging.error(
logging.error(result.stdout) f"处理文件 {video_path_str} 失败"
logging.error(result.stderr) )
logging.debug(f"返回码: {result.returncode}; cmd={' '.join(command)}")
output_file.unlink(missing_ok=True)
assert result.stdout is not None
logging.debug(result.stdout.read())
logging.debug(total)
if CFG.hwaccel == "mediacodec" and CFG.codec in [
"h264_mediacodec",
"hevc_mediacodec",
]:
logging.info(
"mediacodec硬件加速器已知在较短片段上存在异常将禁用加速重试。"
)
output_file.unlink(missing_ok=True)
bak = CFG.codec, CFG.hwaccel
CFG.hwaccel = None
CFG.codec = "h264" if CFG.codec == "h264_mediacodec" else "hevc"
assert not output_file.exists()
ret = process_video(video_path, compress_dir, update_func)
CFG.codec, CFG.hwaccel = bak
if not ret:
logging.error("重试仍然失败。")
return False
else:
return True
elif CFG.disable_hwaccel_when_fail and CFG.hwaccel is not None:
logging.info("正在禁用硬件加速器重试,进度条可能发生混乱。")
output_file.unlink(missing_ok=True)
if TYPE_CHECKING:
assert BaseModel # type: ignore
assert isinstance(CFG, BaseModel)
bak = CFG.codec, CFG.hwaccel
CFG.hwaccel = None
if (
CFG.codec.endswith("_mediacodec")
or CFG.codec.endswith("_qsv")
or CFG.codec.endswith("_nvenc")
or CFG.codec.endswith("_amf")
):
CFG.codec = CFG.codec.split("_")[0]
assert not output_file.exists()
ret = process_video(video_path, compress_dir, update_func)
CFG.codec, CFG.hwaccel = bak
if not ret:
logging.error("重试仍然失败。")
return False
else: else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}") if video_path.stat().st_size <= output_file.stat().st_size:
logging.info(
except KeyboardInterrupt as e:raise e f"压缩后文件比原文件大,直接复制原文件: {video_path_str}"
)
output_file.unlink(missing_ok=True)
shutil.copy2(video_path, output_file)
return True
else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
except KeyboardInterrupt as e:
raise e
except Exception as e: except Exception as e:
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",exc_info=e) logging.error(
f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",
exc_info=e,
)
if current_running_file is not None:
current_running_file.unlink(missing_ok=True)
current_running_file = None
return False
return True
def traverse_directory(root_dir: Path): def traverse_directory(root_dir: Path):
video_extensions = set(CFG["video_ext"]) video_extensions = set(CFG.video_ext)
sm=None sm = None
# 获取视频文件列表和帧数信息 # 获取视频文件列表和帧数信息
video_files = [] video_files:list[Path] = []
que = list(root_dir.glob("*")) que = list(root_dir.glob("*"))
while que: while que:
d = que.pop() d = que.pop()
for file in d.glob("*") if d.is_dir() else [d]: for file in d.glob("*") if d.is_dir() else [d]:
if file.parent.name == CFG["compress_dir_name"] or file.name == CFG["compress_dir_name"]: if (
file.parent.name == CFG.compress_dir_name
or file.name == CFG.compress_dir_name
):
continue continue
if file.is_file() and file.suffix.lower() in video_extensions: if file.is_file() and file.suffix.lower() in video_extensions:
video_files.append(file) video_files.append(file)
elif file.is_dir(): elif file.is_dir():
que.append(file) que.append(file)
if not video_files:
if not video_files: logging.warning("未找到需要处理的视频文件")
logging.warning("未找到需要处理的视频文件") return
return
# 获取视频信息 # 获取视频信息
frames: dict[Path, float] = {} frames: dict[Path, float] = {}
cached_data: dict[Path, float] = {}
info_file = Path("video_info.cache") info_file = Path("video_info.cache")
if info_file.is_file(): if info_file.is_file():
try: try:
cached_data = loads(info_file.read_bytes()) cached_data = loads(info_file.read_bytes())
if isinstance(cached_data, dict): if isinstance(cached_data, dict):
frames = cached_data
logging.debug("Loaded video info from cache.") logging.debug("Loaded video info from cache.")
else:
cached_data = {}
except Exception as e: except Exception as e:
logging.debug("Failed to load video info cache.",exc_info=e) logging.debug("Failed to load video info cache.", exc_info=e)
with Progress() as prog: with Progress() as prog:
task = prog.add_task("正在获取视频信息", total=len(video_files)) task = prog.add_task("正在获取视频信息", total=len(video_files))
for file in video_files: for file in video_files:
prog.advance(task) prog.advance(task)
if file in frames and frames[file]>0: if file in cached_data and cached_data[file] > 0:
frames[file] = cached_data[file]
continue continue
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split() fr = get_frame.get_video_frame_count(str(file.resolve()))
cmd.append(str(file.resolve())) if fr is None:
proc = subprocess.run(cmd, capture_output=True, text=True) logging.debug(
if proc.returncode != 0: f"无法获取视频信息: {file}, 时长为N/A默认使用0s"
logging.debug(f"无法获取视频信息: {file}, 返回码: {proc.returncode}") )
frames[file] = 0 frames[file] = 0 if fr is None else fr
continue
if proc.stdout.strip():
try:
avg_frame_rate, duration = proc.stdout.strip().split('\n')
tmp = avg_frame_rate.split('/')
avg_frame_rate = float(tmp[0]) / float(tmp[1])
if duration == "N/A":
duration = 0
logging.debug(f"无法获取视频信息: {file}, 时长为N/A默认使用0s")
duration = float(duration)
frames[file] = duration * avg_frame_rate
except (ValueError, IndexError) as e:
logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
frames[file] = 0
if 0 in frames.values(): if 0 in frames.values():
logging.warning(f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。") logging.warning(
f"视频{', '.join([f.name for f,frames in frames.items() if frames==0])}文件帧数信息获取失败。总进度估计将不准确。"
)
prog.remove_task(task) prog.remove_task(task)
try: try:
info_file.write_bytes(dumps(frames)) info_file.write_bytes(dumps(frames))
logging.debug("Saved video info to cache.") logging.debug("Saved video info to cache.")
except Exception as e: except Exception as e:
logging.debug("Failed to save video info cache.",exc_info=e) logging.debug("Failed to save video info cache.", exc_info=e)
logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件") logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件")
# 创建进度条 # 创建进度条
with Progress() as prog: with Progress() as prog:
total_frames = sum(frames.values()) total_frames = sum(frames.values())
main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames)) main_task = prog.add_task(
"总进度", total=total_frames if total_frames > 0 else len(frames)
)
# 创建文件队列 # 创建文件队列
for file in frames.keys(): for file in frames.keys():
# 进度跟踪 # 进度跟踪
filename = file.relative_to(root_dir) filename = file.relative_to(root_dir)
# 创建文件级进度条 # 创建文件级进度条
if frames[file] == 0: if frames[file] == 0:
file_task = prog.add_task(f"{filename}") file_task = prog.add_task(f"{filename}")
else: else:
file_task = prog.add_task(f"{filename}",total=frames[file]) file_task = prog.add_task(f"{filename}", total=frames[file])
with prog._lock: with prog._lock:
completed_start = prog._tasks[main_task].completed completed_start = prog._tasks[main_task].completed
def update_progress(x, rate): def update_progress(x, rate):
if frames[file] == 0: if frames[file] == 0:
prog.update(file_task,description=f"{filename} 已处理{x}{f'速率{rate}' if rate else ''}") prog.update(
file_task,
description=f"{filename} 已处理{x}{f'速率{rate}' if rate else ''}",
)
else: else:
prog.update(file_task,completed=x,description=f"{filename} {f'速率{rate}' if rate else ''}") prog.update(
prog.update(main_task, completed=completed_start+x) file_task,
completed=x,
if CFG["save_to"] == "single": description=f"{filename} {f'速率{rate}' if rate else ''}",
process_video(file, root_dir/CFG["compress_dir_name"], update_progress) )
prog.update(main_task, completed=completed_start + x)
if CFG.save_to == "single":
process_video(
file, root_dir / CFG.compress_dir_name, update_progress
)
else: else:
process_video(file, None, update_progress) process_video(file, None, update_progress)
# 移除文件级进度条 # 移除文件级进度条
prog.update(main_task, completed=completed_start+frames[file]) prog.update(main_task, completed=completed_start + frames[file])
prog.remove_task(file_task) prog.remove_task(file_task)
try: try:
info_file.unlink(missing_ok=True) info_file.unlink(missing_ok=True)
except Exception as e: except Exception as e:
logging.warning("无法删除视频信息缓存文件",exc_info=e) logging.warning("无法删除视频信息缓存文件", exc_info=e)
def test(): def test():
os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"] os.environ["PATH"] = (
Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
)
try: try:
subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode() subprocess.run(
[CFG.ffmpeg, "-version"], stdout=-3, stderr=-3
).check_returncode()
except Exception as e: except Exception as e:
print(__file__) print(__file__)
logging.critical("无法运行ffmpeg") logging.critical("无法运行ffmpeg")
exit(-1) exit(-1)
try: try:
ret = subprocess.run( ret = subprocess.run(
f"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(), f"{CFG.ffmpeg} -hide_banner -f lavfi -i testsrc=duration=1:size={CFG.test_video_resolution}:rate={CFG.test_video_fps} -c:v libx264 -y -pix_fmt yuv420p {CFG.test_video_input}".split(),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True text=True,
) )
if ret.returncode != 0: if ret.returncode != 0:
logging.warning("无法生成测试视频.") logging.warning("无法生成测试视频.")
logging.debug(ret.stdout) logging.debug(ret.stdout)
logging.debug(ret.stderr) logging.debug(ret.stderr)
ret.check_returncode() ret.check_returncode()
cmd = get_cmd(CFG["test_video_input"],CFG["test_video_output"],) cmd = get_cmd(
CFG.test_video_input,
CFG.test_video_output,
)
ret = subprocess.run( ret = subprocess.run(
cmd, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
) )
if ret.returncode != 0: if ret.returncode != 0:
logging.error("测试视频压缩失败") logging.error("测试视频压缩失败")
@ -326,74 +574,95 @@ def test():
logging.debug(ret.stderr) logging.debug(ret.stderr)
logging.error("Error termination via test failed.") logging.error("Error termination via test failed.")
exit(-1) exit(-1)
if get_frame.get_video_frame_count("compress_video_test.mp4") is None:
logging.error("测试读取帧数失败,将无法正确显示进度。")
os.remove("compress_video_test.mp4") os.remove("compress_video_test.mp4")
os.remove("compressed_video_test.mp4") os.remove("compressed_video_test.mp4")
except KeyboardInterrupt as e:raise e except KeyboardInterrupt as e:
raise e
except Exception as e: except Exception as e:
if os.path.exists("compress_video_test.mp4"): if os.path.exists("compress_video_test.mp4"):
os.remove("compress_video_test.mp4") os.remove("compress_video_test.mp4")
logging.warning("测试未通过,继续运行可能出现未定义行为。") logging.warning("测试未通过,继续运行可能出现未定义行为。")
logging.debug("Test error",exc_info=e) logging.debug("Test error", exc_info=e)
def exit_pause(): def exit_pause():
if os.name == 'nt': if os.name == "nt":
os.system("pause") os.system("pause")
elif os.name == 'posix': elif os.name == "posix":
os.system("read -p 'Press Enter to continue...'") os.system("read -p 'Press Enter to continue...'")
def main(_root = None): def finalize():
global current_running_file
atexit.register(exit_pause) if current_running_file is not None:
global root
setup_logging()
tot_bgn = time()
logging.info("-------------------------------")
logging.info(datetime.now().strftime('Video Compress started at %Y/%m/%d %H:%M'))
if CFG_FILE.exists():
try: try:
import json current_running_file.unlink(missing_ok=True)
cfg:dict = json.loads(CFG_FILE.read_text())
CFG.update(cfg)
except KeyboardInterrupt as e:raise e
except Exception as e: except Exception as e:
logging.warning("Invalid config file, ignored.") try:
logging.debug(e) logging.error(
"Failed to delete incomplete output file after keyboard interrupt. CHECK IF LAST PROCSSING VIDEO IS COMPLETED",
exc_info=e,
)
except Exception:
print("Failed to delete incomplete output file after keyboard interrupt. CHECK IF LAST PROCSSING VIDEO IS COMPLETED")
current_running_file = None
def main(_root=None):
atexit.register(exit_pause)
atexit.register(finalize)
global root, current_running_file
if _root is not None: if _root is not None:
setup_logging()
root = Path(_root) root = Path(_root)
else: else:
# 通过命令行参数传入需要遍历的目录 parser = argparse.ArgumentParser()
if len(sys.argv) < 2: parser.add_argument("directory", nargs="?", help="目标目录路径")
print(f"用法python {__file__} <目标目录>") parser.add_argument("--verbose", "-v", action="store_true", help="启用详细日志记录")
logging.warning("Error termination via invalid input.") args = parser.parse_args()
if not args.directory:
print("Error termination via invalid input.")
sys.exit(1) sys.exit(1)
root = Path(sys.argv[1]) root = Path(args.directory)
setup_logging(args.verbose)
if root.name.lower() == CFG["compress_dir_name"].lower(): tot_bgn = time()
logging.info("-------------------------------")
logging.info(datetime.now().strftime("Video Compress started at %Y/%m/%d %H:%M"))
if root.name.lower() == CFG.compress_dir_name.lower():
logging.critical("请修改目标目录名为非compress。") logging.critical("请修改目标目录名为非compress。")
logging.error("Error termination via invalid input.") logging.error("Error termination via invalid input.")
sys.exit(1) sys.exit(1)
logging.info("开始验证环境") logging.info("开始验证环境")
test() test()
if not root.is_dir(): if not root.is_dir():
print("提供的路径不是一个有效目录。") logging.critical("提供的路径不是一个有效目录。")
logging.warning("Error termination via invalid input.") logging.warning("Error termination via invalid input.")
sys.exit(1) sys.exit(1)
try: try:
traverse_directory(root) traverse_directory(root)
tot_end = time() tot_end = time()
logging.info(f"Elapsed time: {fmt_time(tot_end-tot_bgn)}") logging.info(f"Elapsed time: {fmt_time(tot_end-tot_bgn)}")
logging.info("Normal termination of Video Compress.") logging.info("Normal termination of Video Compress.")
except KeyboardInterrupt: except KeyboardInterrupt:
logging.warning("Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.") logging.warning(
"Error termination via keyboard interrupt."
)
except Exception as e: except Exception as e:
logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e) logging.error(
"Error termination via unhandled error",
exc_info=e,
)
if __name__ == "__main__": if __name__ == "__main__":
# sys.argv.append(r'C:\Users\flt\Documents\WeChat Files\wxid_m8h0igh8p52p22\FileStorage\Video')
main() main()

File diff suppressed because it is too large Load Diff

4
calc_utils/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
dist
__pycache__
*.egg-info
.venv

73
calc_utils/README.md Normal file
View File

@ -0,0 +1,73 @@
# calc-utils
个人使用的计算化学工具集,主要基于 [ASE (Atomic Simulation Environment)](https://wiki.fysik.dtu.dk/ase/) 和 [RDKit](https://www.rdkit.org/)。
包含了一些方便的转换工具以及针对特定服务器环境PBS/Slurm/Custom定制的 `ase.calculators.gaussian` 补丁。
专用软件,`futils.gaussian`在不同服务器环境中无法直接运行,必须予以修改。
## 安装
需要 Python 3.12+。
```bash
git clone https://github.com/your-repo/calc-utils.git
cd calc-utils
pip install .
```
## 功能模块
### 1. `futils.gaussian` (**Breaking Change**)
这是一个对 `ase.calculators.gaussian` 的深度定制和 Monkey Patch。
**注意:导入此模块会直接修改 `ase.calculators.gaussian` 中的类定义。**
主要修改内容:
- **强制任务提交脚本**:计算器的 `command` 默认被设置为调用 `gsub_wait` 脚本。
- 默认路径硬编码为 `/home/fanhj/calcs/lele/tools/gsub_wait`(需要在 `futils/gaussian.py` 中按需修改 `GSUB` 变量)。
- **文件后缀变更**:输入文件使用 `.gin` 而非 `.gjf`,输出文件默认读取 `.out`
- **参数增强**`__init__` 方法提供了更详细的 Type Hinting 和默认参数(如 `mem="30GB"`, `proc=32`)。
- **辅助方法**:增加了 `mod()` 方法用于快速复制并修改计算器参数。
```python
from futils.gaussian import Gaussian
from ase import Atoms
# 使用定制后的 Gaussian 计算器
# 注意:这会尝试调用 gsub_wait 提交任务
calc = Gaussian(label='test_calc', method='B3LYP', basis='6-31G(d)')
```
### 2. `futils.rdkit2ase`
提供 RDKit 分子对象 (`rdkit.Chem.Mol`) 与 ASE 原子对象 (`ase.Atoms`) 之间的无缝转换,**保留 3D 坐标**。
```python
from futils.rdkit2ase import MolToAtoms, AtomsToMol
from rdkit import Chem
# RDKit -> ASE
mol = Chem.MolFromMolFile("molecule.mol")
atoms = MolToAtoms(mol)
# ASE -> RDKit
new_mol = AtomsToMol(atoms)
```
### 3. `futils.rdkit_utils`
一些 RDKit 绘图辅助函数。
- `draw2D(mol)`: 生成 SVG 格式的 2D 分子图。
- `draw3D(mol)`: 使用 IPythonConsole 绘制 3D 分子图。
## 脚本工具 (`bin/`)
本项目包含了一些用于任务提交管理的 Shell 脚本,适用于特定的集群环境。
- **`gsub`**: 任务提交脚本。支持本地或通过 SSH 远程提交到名为 `cluster` 的主机。
- **`gsub_wait`**: 提交任务并阻塞等待完成,用于 ASE Calculator 的 `command` 调用,以便实现 Python 脚本的同步执行。
**配置说明**
使用前请检查 `bin/` 下的脚本以及 `futils/gaussian.py` 中的 `GSUB` 路径,根据您的服务器环境进行调整。

176
calc_utils/bin/gsub Normal file
View File

@ -0,0 +1,176 @@
#!/bin/bash
set -u
# Usage: gsub <jobname>
job=${1:-}
if [[ -z "$job" ]]; then
echo "Usage: $0 <jobname-without-extension>"
exit 1
fi
# ==========================================
# 0. 安全检测函数 (Safety Check)
# ==========================================
check_dangerous_path() {
local path="${1:-}"
# 1. Empty check
if [[ -z "$path" ]]; then
echo "Error: Empty path is dangerous for deletion." >&2
return 1
fi
# 2. Root check
if [[ "$path" == "/" ]]; then
echo "Error: Root path '/' is dangerous for deletion." >&2
return 1
fi
# 3. Space check (optional, but good for safety)
if [[ "$path" =~ ^[[:space:]]+$ ]]; then
echo "Error: Whitespace path is dangerous." >&2
return 1
fi
return 0
}
# ==========================================
# 1. 检查运行环境 (Check Host)
# ==========================================
# 如果不是 cluster尝试通过 SSH 远程调用
host_short=$(hostname -s 2>/dev/null || hostname)
if [[ "$host_short" != "cluster" ]]; then
# 假设本地挂载路径 /mnt/home 对应远程 /home (根据原脚本逻辑调整)
cur_dir=$(pwd)
remote_dir="${cur_dir//\/mnt\/home/\/home}"
# 定位当前脚本并转换为远程路径
# 获取脚本所在目录的绝对路径
script_dir=$(cd "$(dirname "$0")" && pwd)
script_name=$(basename "$0")
local_script="$script_dir/$script_name"
# 同样对脚本路径进行替换
remote_script="${local_script//\/mnt\/home/\/home}"
# 尝试在远程执行自己
echo "Running remotely on cluster: $remote_script" >&2
ssh cluster "cd '$remote_dir' && '$remote_script' '$job'"
exit $?
fi
# ==========================================
# 2. 准备作业 (Prepare Job)
# ==========================================
gin_file="$job.gin"
if [[ ! -f "$gin_file" ]]; then
echo "Error: $gin_file not found in $(pwd)"
exit 2
fi
# 解析配置确定资源 (Parse Proc)
# 查找 %NProcShared=XX
proc=$(sed -n 's/^%NProcShared=\([0-9]\+\).*$/\1/pI' "$gin_file" | head -n 1)
queue=""
ppn=""
if [[ "$proc" == "32" ]]; then
queue="n32"
ppn="32"
elif [[ "$proc" == "20" ]]; then
queue="n20"
ppn="20"
else
echo "Error: Unsupported NProcShared=$proc in $gin_file. Only 20 or 32 allowed."
exit 1
fi
# 清理旧文件 (Clean up old output)
if [[ -f "$job.out" ]]; then
# 原脚本逻辑:休眠并删除
# echo "Warning: $job.out exists. Deleting..." >&2
# 使用安全检查
if check_dangerous_path "$job.out"; then
rm "$job.out"
else
echo "Skipping deletion of unsafe path: $job.out" >&2
exit 1
fi
fi
# ==========================================
# 3. 生成作业脚本 (.job)
# ==========================================
job_file="$job.job"
# 使用 heredoc 动态生成 PBS 脚本
# 整合了原 g16_32.pbs 的内容和 gsub32 的追加内容
cat > "$job_file" <<EOF
#!/bin/sh
#PBS -l nodes=1:ppn=$ppn
#PBS -q $queue
#PBS -j oe
#PBS -N $job
cd \$PBS_O_WORKDIR
# Define Safety Check Function in Job Script
check_rm_path() {
p="\$1"
# Empty check
if [ -z "\$p" ]; then
echo "Refusing to delete empty path"
return 1
fi
# Root check
if [ "\$p" = "/" ]; then
echo "Refusing to delete root path"
return 1
fi
return 0
}
export g16root=/share/apps/soft
source \$g16root/g16/bsd/g16.profile
# Create Scratch Directory
if [ -n "\$USER" ] && [ -n "\$PBS_JOBID" ]; then
mkdir -p /scr/\$USER/\$PBS_JOBID
export GAUSS_SCRDIR=/scr/\$USER/\$PBS_JOBID
else
echo "Error: USER or PBS_JOBID not set. Cannot setup scratch."
exit 1
fi
NODES=\`cat \$PBS_NODEFILE | uniq\`
echo "--------------------------------------------------------"
echo " JOBID: \$PBS_JOBID"
echo " The job was started at \`date\`"
echo " The job was running at \$NODES."
echo "--------------------------------------------------------"
# Run G16 Job
echo "Executing: g16 < $gin_file > $job.out"
g16 < $gin_file > $job.out
echo "--------------------------------------------------------"
echo " The job was finished at \`date\`"
echo "--------------------------------------------------------"
# Delete the tmp File (Cleanup Scratch)
echo "Cleaning up \$GAUSS_SCRDIR"
if check_rm_path "\$GAUSS_SCRDIR"; then
rm -rf "\$GAUSS_SCRDIR"
fi
EOF
# ==========================================
# 4. 提交作业 (Submit)
# ==========================================
# qsub 会输出 Job ID例如 12345.cluster
qsub "$job_file"

116
calc_utils/bin/gsub_wait Normal file
View File

@ -0,0 +1,116 @@
#!/bin/bash
set -u
# Usage: gsub_wait <jobname>
job=${1:-}
if [[ -z "$job" ]]; then
echo "Usage: $0 <jobname-without-extension>"
exit 1
fi
# ==========================================
# 1. 提交任务 (Submit Job)
# ==========================================
# 确定 gsub 命令位置
# 优先查找当前目录下的 gsub否则查找 PATH
if [[ -x "./gsub" ]]; then
GSUB_CMD="./gsub"
else
GSUB_CMD="gsub"
fi
# 调用 gsub 并捕获输出
# 注意gsub 内部可能通过 SSH 在远程执行,最终返回 qsub 的输出
output=$($GSUB_CMD "$job")
echo "$output"
# ==========================================
# 2. 检查是否需要等待 (Check Silent Mode)
# ==========================================
# 如果 GSUB_SILENT 为 1则不进行监控直接退出
if [[ "${GSUB_SILENT:-0}" == "1" ]]; then
exit 0
fi
# ==========================================
# 3. 监控任务进度 (Monitor Progress)
# ==========================================
# 尝试提取 Job ID (例如: 67147.cluster -> 67147)
jobid_full=$(echo "$output" | grep -oE '[0-9]+\.cluster|[0-9]+' | head -n 1 || true)
if [[ -n "$jobid_full" ]]; then
jobid=${jobid_full%%.*}
# 准备参数
out_file="$job.out"
gin_file="$job.gin"
end_file="$job.job.o$jobid"
if [[ ! -f "$gin_file" ]]; then
# 如果 gin 文件找不到(可能是远程路径问题?),跳过监控
echo "Warning: $gin_file not found nearby. Skipping monitor."
exit 0
fi
# 计算 Total Steps: (--link1-- 数量) + 1
link_count=$(grep -c -- "--link1--" "$gin_file" || true)
total=$((link_count + 1))
cntDone=0
cntSCF=0
last_lines=0
echo "Monitoring Job $jobid..."
while true; do
# A. 检查 PBS 结束文件 (Job 完成标志)
if [[ -f "$end_file" ]]; then
echo "Job finished (found $end_file)."
break
fi
# B. 检查并读取 .out 输出文件
if [[ -f "$out_file" ]]; then
curr_lines=$(wc -l < "$out_file" 2>/dev/null || echo 0)
# 如果文件变小(被截断或重新生成),重置读取位置
if (( curr_lines < last_lines )); then last_lines=0; fi
if (( curr_lines > last_lines )); then
# 逐行处理新增内容
# 使用进程替换 < <(...) 避免管道导致的子shell变量丢失问题
while IFS= read -r line; do
# 检查 SCF Done
# 正则匹配: SCF Done: ... E ... = (数值) A.U.
if [[ "$line" =~ SCF[[:space:]]Done:.*E.*=[[:space:]]*([-0-9.]+)[[:space:]]*A\.U\. ]]; then
energy="${BASH_REMATCH[1]}"
cntSCF=$((cntSCF + 1))
echo "$job: SCF Done: $energy [$cntSCF] ($cntDone/$total)"
fi
# 检查 Termination
if [[ "$line" == *"termination of Gaussian"* ]]; then
cntDone=$((cntDone + 1))
echo "$job: task done ($cntDone/$total)"
fi
done < <(tail -n "+$((last_lines + 1))" "$out_file")
last_lines=$curr_lines
fi
fi
sleep 2
done
# C. 最终校验
if (( cntDone != total )); then
echo "Warning: cntDone ($cntDone) != total ($total)"
fi
else
echo "Could not parse Job ID from output. Monitor skipped."
fi

View File

@ -0,0 +1,19 @@
# from .patch_gaussian import GSUB
from .rdkit2ase import MolToAtoms,AtomsToMol
from .rdkit_utils import draw2D,draw3D
from rdkit.Chem import AllChem
from rdkit import Chem
from ase import atoms
# import patch_gaussian as gaussian
__all__ = [
# 'GSUB',
'MolToAtoms',
'AtomsToMol',
'draw2D',
'draw3D',
'Chem',
'AllChem',
'atoms',
'gaussian'
]

View File

@ -0,0 +1,367 @@
from ase.calculators import gaussian
from ase.calculators.calculator import FileIOCalculator
from ase.io import read, write
from typing import TYPE_CHECKING, Optional, Literal
from copy import deepcopy
GSUB = "/home/fanhj/calcs/lele/tools/gsub_wait"
gau_src= gaussian.Gaussian
gau_dyn_src = gaussian.GaussianDynamics
methodType = Optional[
str
| Literal[
"HF",
"MP2",
"MP3",
"MP4",
"MP4(DQ)",
"MP4(SDQ)",
"MP5",
"CCSD",
"CCSDT",
"QCISD",
"CID",
"CISD",
"CIS",
"B3LYP",
"B3PW91",
"BLYP",
"PBE",
"PBE0",
"M06",
"M062X",
"M06L",
"M06HF",
"CAM-B3LYP",
"wb97xd",
"wb97xd3",
"LC-wPBE",
"HSE06",
"LSDA",
"SVWN",
"PW91",
"mPW1PW91",
"HCTH",
"HCTH147",
"HCTH407",
"TPSSh",
"TPSS",
"revPBE",
"PBEPBE",
"B2PLYP",
"mPW2PLYP",
"B2PLYPD3",
"PBE0DH",
"PBEQIDH",
]
]
basisType = Optional[
str
| Literal[
"STO-3G",
"3-21G",
"6-31G",
"6-31G(d)",
"6-31G(d,p)",
"6-31+G(d)",
"6-31+G(d,p)",
"6-31++G(d,p)",
"6-311G",
"6-311G(d)",
"6-311G(d,p)",
"6-311+G(d)",
"6-311+G(d,p)",
"6-311++G(d,p)",
"cc-pVDZ",
"cc-pVTZ",
"cc-pVQZ",
"cc-pV5Z",
"cc-pV6Z",
"aug-cc-pVDZ",
"aug-cc-pVTZ",
"aug-cc-pVQZ",
"aug-cc-pV5Z",
"def2SVP",
"def2SVPD",
"def2TZVP",
"def2TZVPD",
"def2QZVP",
"def2QZVPP",
"LANL2DZ",
"LANL2MB",
"SDD",
"CEP-4G",
"CEP-31G",
"CEP-121G",
"DGDZVP",
"DGDZVP2",
"Gen",
"GenECP",
]
]
scrfSolventType = Optional[
str
| Literal['Water', 'Acetone', 'Acetonitrile', 'Aniline', 'Benzene', 'Bromoform', 'Butanol',
'CarbonDisulfide', 'CarbonTetrachloride', 'Chlorobenzene', 'Chloroform', 'Cyclohexane',
'Dichloroethane', 'Dichloromethane', 'Diethylether', 'Dimethylformamide', 'Dimethylsulfoxide',
'Ethanol', 'Ethylacetate', 'Heptane', 'Hexane', 'Methanol', 'Nitromethane', 'Octanol',
'Pyridine', 'Tetrahydrofuran', 'Toluene', 'Xylene'
]
]
class Gaussian(gau_src):
mem:int
nprocshared:Literal[20,32]
charge: int
mult: Optional[int]
def __init__(self,
proc:Literal[20,32]=32,
charge:int=0,
mult:Optional[int]=None,
mem="30GB",
label='Gaussian',
method:methodType=None,
basis:basisType=None,
fitting_basis:Optional[str]=None,
output_type:Literal['N','P']='P',
basisfile:Optional[str]=None,
basis_set:Optional[str]=None,
xc:Optional[str]=None,
extra:Optional[str]=None,
ioplist:Optional[list[str]]=None,
addsec=None,
spinlist=None,
zefflist=None,
qmomlist=None,
nmagmlist=None,
znuclist=None,
radnuclearlist=None,
chk:Optional[str]=None,
oldchk:Optional[str]=None,
nprocshared:Optional[int]=None,
scrfSolvent:scrfSolventType=None,
scrf:Optional[str]=None,
em:Optional[Literal["GD2","GD3","GD4","GD3BJ"]|str]=None,
**kwargs):
'''
Parameters
-----------
proc: int
A short name for nprocshared
method: str
Level of theory to use, e.g. ``hf``, ``ccsd``, ``mp2``, or ``b3lyp``.
Overrides ``xc`` (see below).
xc: str
Level of theory to use. Translates several XC functionals from
their common name (e.g. ``PBE``) to their internal Gaussian name
(e.g. ``PBEPBE``).
basis: str
The basis set to use. If not provided, no basis set will be requested,
which usually results in ``STO-3G``. Maybe omitted if basisfile is set
(see below).
fitting_basis: str
The name of the fitting basis set to use.
output_type: str
Level of output to record in the Gaussian
output file - this may be ``N``- normal or ``P`` -
additional.
basisfile: str
The name of the basis file to use. If a value is provided, basis may
be omitted (it will be automatically set to 'gen')
basis_set: str
The basis set definition to use. This is an alternative
to basisfile, and would be the same as the contents
of such a file.
charge: int
The system charge. If not provided, it will be automatically
determined from the ``Atoms`` objects initial_charges.
mult: int
The system multiplicity (``spin + 1``). If not provided, it will be
automatically determined from the ``Atoms`` objects
``initial_magnetic_moments``.
extra: str
Extra lines to be included in the route section verbatim.
It should not be necessary to use this, but it is included for
backwards compatibility.
ioplist: list
A collection of IOPs definitions to be included in the route line.
addsec: str
Text to be added after the molecular geometry specification, e.g. for
defining masses with ``freq=ReadIso``.
spinlist: list
A list of nuclear spins to be added into the nuclear
propeties section of the molecule specification.
zefflist: list
A list of effective charges to be added into the nuclear
propeties section of the molecule specification.
qmomlist: list
A list of nuclear quadropole moments to be added into
the nuclear propeties section of the molecule
specification.
nmagmlist: list
A list of nuclear magnetic moments to be added into
the nuclear propeties section of the molecule
specification.
znuclist: list
A list of nuclear charges to be added into the nuclear
propeties section of the molecule specification.
radnuclearlist: list
A list of nuclear radii to be added into the nuclear
propeties section of the molecule specification.
params: dict
Contains any extra keywords and values that will be included in either
the link0 section or route section of the gaussian input file.
To be included in the link0 section, the keyword must be one of the
following: ``mem``, ``chk``, ``oldchk``, ``schk``, ``rwf``,
``oldmatrix``, ``oldrawmatrix``, ``int``, ``d2e``, ``save``,
``nosave``, ``errorsave``, ``cpu``, ``nprocshared``, ``gpucpu``,
``lindaworkers``, ``usessh``, ``ssh``, ``debuglinda``.
Any other keywords will be placed (along with their values) in the
route section.
'''
if nprocshared is not None and proc is not None:
if nprocshared == proc:print("Providing both nprocshared and proc is not recomanded")
else:
raise ValueError("both nprocshared and proc provided, and inequal.")
if scrfSolvent is not None and scrf is not None:
raise ValueError("scrfSolvent and scrf both not None")
if scrfSolvent is not None:
scrf = "Solvent="+scrfSolvent
optional_keys = ['chk','oldchk','scrf', 'geom', 'integral', 'density', 'nosymm', 'symmetry', 'units',
'temperature', 'pressure', 'counterpoise', 'gfinput', 'gfprint', 'test',
'output', 'punch', 'prop', 'pseudo', 'restart', 'scale', 'sparse', 'window', 'em']
for key in optional_keys:
val = locals().get(key,None)
if val is not None:
kwargs[key] = val
super().__init__(
nprocshared = proc,
mem=mem,
label=label,
command=GSUB+" "+label,
charge = charge,
mult = mult,
method = method,
basis = basis,
fitting_basis = fitting_basis,
output_type = output_type,
basisfile = basisfile,
basis_set = basis_set,
xc = xc,
extra = extra,
ioplist = ioplist,
addsec = addsec,
spinlist = spinlist,
zefflist = zefflist,
qmomlist = qmomlist,
nmagmlist = nmagmlist,
znuclist = znuclist,
radnuclearlist = radnuclearlist,
**kwargs
)
assert self.fileio_rules
self.fileio_rules.stdin_name = '{prefix}.gin'
def mod(self,charge:int=0, mult:int=1) -> "Gaussian":
new = deepcopy(self)
new.charge = charge
new.mult = mult
return new
def write_input(self, atoms, properties=None, system_changes=None):
FileIOCalculator.write_input(self, atoms, properties, system_changes)
if TYPE_CHECKING:
assert isinstance(self.label, str)
assert self.parameters
write(self.label + '.gin', atoms, properties=properties,
format='gaussian-in', parallel=False, **self.parameters)
def read_results(self):
if TYPE_CHECKING:
assert isinstance(self.label, str)
output = read(self.label + '.out', format='gaussian-out')
assert output
self.calc = output.calc
self.results = output.calc.results
class GaussianDynamics(gau_dyn_src):
def __init__(self, atoms, calc=None):
super().__init__(atoms, calc=calc)
def run(self, **kwargs):
calc_old = self.atoms.calc
params_old = deepcopy(self.calc.parameters)
self.delete_keywords(kwargs)
self.delete_keywords(self.calc.parameters)
self.set_keywords(kwargs)
self.calc.set(**kwargs)
self.atoms.calc = self.calc
try:
self.atoms.get_potential_energy()
except OSError:
converged = False
else:
converged = True
atoms = read(self.calc.label + '.out')
self.atoms.cell = atoms.cell
self.atoms.positions = atoms.positions
self.atoms.calc = atoms.calc
self.calc.parameters = params_old
self.calc.reset()
if calc_old is not None:
self.atoms.calc = calc_old
return converged
class GaussianOptimizer(GaussianDynamics):
keyword = 'opt'
special_keywords = {
'fmax': '{}',
'steps': 'maxcycle={}',
}
class GaussianIRC(GaussianDynamics):
keyword = 'irc'
special_keywords = {
'direction': '{}',
'steps': 'maxpoints={}',
}
gaussian.Gaussian = Gaussian
gaussian.GaussianDynamics = GaussianDynamics
gaussian.GaussianOptimizer = GaussianOptimizer
gaussian.GaussianIRC = GaussianIRC
__all__ = [
'Gaussian',
'GaussianDynamics',
'GaussianOptimizer',
'GaussianIRC',
'GSUB'
]

View File

@ -0,0 +1,145 @@
from rdkit import Chem
from ase import atoms
from typing import Optional
import numpy as np
from rdkit.Geometry import Point3D
def MolToAtoms(mol: Chem.Mol, confID=-1) -> atoms.Atoms:
conf = mol.GetConformer(confID)
symbols = [atom.GetSymbol() for atom in mol.GetAtoms()]
positions = [
(conf.GetAtomPosition(i).x,
conf.GetAtomPosition(i).y,
conf.GetAtomPosition(i).z)
for i in range(mol.GetNumAtoms())
]
aseAtoms = atoms.Atoms(symbols=symbols, positions=positions)
charges = [atom.GetFormalCharge() for atom in mol.GetAtoms()]
aseAtoms.set_initial_charges(charges)
return aseAtoms
def AtomsToMol(
atoms: atoms.Atoms,
mol: Optional[Chem.Mol] = None,
conf_id: int = 0,
charge: Optional[int] = None,
allow_reorder: bool = False,
inplace = False
) -> Chem.Mol:
"""
Convert ASE Atoms -> RDKit Mol.
If mol is provided:
- verify natoms and element symbols match (unless allow_reorder=True)
- update (or add) conformer coordinates so mol matches atoms
If mol is None:
- create a new Mol with atoms only, set 3D coords
- Determine bonds from geometry using rdDetermineBonds.DetermineBonds()
Parameters
----------
atoms : ase.Atoms
Must have positions (Å).
mol : rdkit.Chem.Mol | None
Optional template mol.
conf_id : int
Conformer id to update/use. If mol has no conformer, one will be added.
charge : int | None
Total molecular charge used by DetermineBonds (recommended if known).
If None, will try to infer from formal charges when mol is provided;
otherwise defaults to 0 for new mol.
allow_reorder : bool
If True, will not enforce symbol-by-index equality (only checks counts).
Most of the time you want False to guarantee consistency.
inplace : bool
If True, will modify input mol instead of make a copy.
Returns
-------
rdkit.Chem.Mol
"""
positions = np.asarray(atoms.get_positions(), dtype=float)
symbols = list(atoms.get_chemical_symbols())
n = len(symbols)
# -------- case 1: update existing mol --------
if mol is not None:
if mol.GetNumAtoms() != n:
raise ValueError(f"mol has {mol.GetNumAtoms()} atoms but ASE atoms has {n}.")
mol_symbols = [a.GetSymbol() for a in mol.GetAtoms()]
if not allow_reorder:
if mol_symbols != symbols:
raise ValueError(
"Element symbols mismatch by index between mol and atoms.\n"
f"mol: {mol_symbols}\n"
f"atoms: {symbols}\n"
"If you REALLY know what you're doing, set allow_reorder=True "
"and handle mapping yourself."
)
else:
# only check multiset counts
if sorted(mol_symbols) != sorted(symbols):
raise ValueError("Element symbol counts differ between mol and atoms.")
if not inplace:
mol = Chem.Mol(mol) # copy
if mol.GetNumConformers() == 0:
conf = Chem.Conformer(n)
conf.Set3D(True)
mol.AddConformer(conf, assignId=True)
# pick conformer
try:
conf = mol.GetConformer(conf_id)
except ValueError:
# create new conformer if requested id doesn't exist
conf = Chem.Conformer(n)
conf.Set3D(True)
mol.AddConformer(conf, assignId=True)
conf = mol.GetConformer(mol.GetNumConformers() - 1)
if conf_id!=0:
print("Warning: Failed to pick conformer.")
for i in range(n):
x, y, z = map(float, positions[i])
conf.SetAtomPosition(i, Point3D(x, y, z))
# charge inference if not given
if charge is None:
charge = int(sum(a.GetFormalCharge() for a in mol.GetAtoms()))
return mol
# -------- case 2: build mol + determine bonds --------
rw = Chem.RWMol()
for sym in symbols:
rw.AddAtom(Chem.Atom(sym))
new_mol = rw.GetMol()
conf = Chem.Conformer(n)
conf.Set3D(True)
for i in range(n):
x, y, z = map(float, positions[i])
conf.SetAtomPosition(i, Point3D(x, y, z))
new_mol.AddConformer(conf, assignId=True)
# Determine bonds from geometry
if charge is None:
charge = 0
try:
from rdkit.Chem import rdDetermineBonds
rdDetermineBonds.DetermineBonds(new_mol, charge=charge)
except Exception as e:
raise RuntimeError(
"DetermineBonds failed. This can happen for metals/ions/odd geometries.\n"
"Consider providing a template mol, or implement custom distance-based bonding.\n"
f"Original error: {e}"
)
return new_mol

View File

@ -0,0 +1,14 @@
from rdkit import Chem
from rdkit.Chem import Draw
from IPython.display import SVG
from rdkit.Chem.Draw import IPythonConsole
def draw2D(mol:Chem.Mol,confId:int=-1):
d = Draw.MolDraw2DSVG(250, 200)
d.drawOptions().addAtomIndices = True
d.DrawMolecule(mol,confId=confId)
d.FinishDrawing()
return SVG(d.GetDrawingText())
draw3D = lambda m3d,confId=-1: IPythonConsole.drawMol3D(m3d,confId=confId)

10
calc_utils/pyproject.toml Normal file
View File

@ -0,0 +1,10 @@
[project]
name = "calc-utils"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"ase>=3.27.0",
"rdkit>=2025.9.3",
]

39
github_down/gen.py Normal file
View File

@ -0,0 +1,39 @@
from subprocess import run
proxies = [
"https://hk.gh-proxy.org/",
"https://cdn.gh-proxy.org/",
"https://gh-proxy.org/",
"https://edgeone.gh-proxy.org/",
"https://gh-proxy.top/",
"https://gh-proxy.net/",
"https://gh-proxy.com/",
"https://ghfast.top/",
"https://gh.ddlc.top/",
"https://gh.llkk.cc/",
"https://ghproxy.homeboyc.cn/",
"",
]
cmd = [
"aria2c",
"--auto-file-renaming=false",
"--retry-wait=2",
"--max-tries=0",
"--timeout=10",
"--connect-timeout=5",
"--lowest-speed-limit=100K",
"-x","4","-s","24","-k","1M",
]
try:
run("aria2c --version", check=True, capture_output=True)
except Exception:
print("请先安装aria2")
exit(1)
input_url = input("请输入GitHub文件链接: ").strip()
cmd.extend(
[f"{proxy}{input_url}" for proxy in proxies]
)
run(cmd)

250
github_down/gen_win.cpp Normal file
View File

@ -0,0 +1,250 @@
#define NOMINMAX
#include <windows.h>
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <filesystem>
#include <stdexcept>
#include <clocale>
// ---------------- UTF helpers ----------------
static std::wstring utf8_to_wstring(const std::string& s) {
if (s.empty()) return L"";
int wlen = MultiByteToWideChar(CP_UTF8, 0, s.c_str(), (int)s.size(), nullptr, 0);
std::wstring w(wlen, L'\0');
MultiByteToWideChar(CP_UTF8, 0, s.c_str(), (int)s.size(), w.data(), wlen);
return w;
}
static std::string trim_copy(std::string s) {
auto is_space = [](unsigned char c) { return c==' '||c=='\t'||c=='\n'||c=='\r'; };
while (!s.empty() && is_space((unsigned char)s.back())) s.pop_back();
size_t i = 0;
while (i < s.size() && is_space((unsigned char)s[i])) i++;
return s.substr(i);
}
// Quote a single arg for CreateProcess command line.
static std::wstring quote_arg_windows(const std::wstring& arg) {
bool need_quotes = arg.empty();
for (wchar_t c : arg) {
if (c == L' ' || c == L'\t' || c == L'\n' || c == L'\v' || c == L'"') {
need_quotes = true; break;
}
}
if (!need_quotes) return arg;
std::wstring out = L"\"";
size_t bs_count = 0;
for (wchar_t c : arg) {
if (c == L'\\') {
bs_count++;
} else if (c == L'"') {
out.append(bs_count * 2 + 1, L'\\');
out.push_back(L'"');
bs_count = 0;
} else {
if (bs_count) out.append(bs_count, L'\\');
bs_count = 0;
out.push_back(c);
}
}
if (bs_count) out.append(bs_count * 2, L'\\');
out.push_back(L'"');
return out;
}
// ---------------- RAII cleanup guard ----------------
struct TempBundleGuard {
std::filesystem::path dir;
std::filesystem::path aria2_path;
~TempBundleGuard() {
// Best-effort cleanup
try {
if (!aria2_path.empty() && std::filesystem::exists(aria2_path)) {
std::error_code ec;
std::filesystem::remove(aria2_path, ec);
}
if (!dir.empty() && std::filesystem::exists(dir)) {
std::error_code ec;
std::filesystem::remove_all(dir, ec);
}
} catch (...) {
// swallow
}
}
};
// ---------------- Resource extraction ----------------
static bool extract_resource_to_file(
const wchar_t* res_name,
const std::filesystem::path& out_path
) {
HRSRC hrsrc = FindResourceW(nullptr, res_name, RT_RCDATA);
if (!hrsrc) return false;
HGLOBAL hglob = LoadResource(nullptr, hrsrc);
if (!hglob) return false;
DWORD size = SizeofResource(nullptr, hrsrc);
void* data = LockResource(hglob);
if (!data || size == 0) return false;
std::ofstream ofs(out_path, std::ios::binary);
ofs.write(reinterpret_cast<const char*>(data), size);
return ofs.good();
}
// ---------------- Process runner ----------------
static int run_process(const std::wstring& exe_path, const std::vector<std::wstring>& args, bool show) {
std::wstring cmdline = quote_arg_windows(exe_path);
for (const auto& a : args) {
cmdline.push_back(L' ');
cmdline += quote_arg_windows(a);
}
STARTUPINFOW si{};
si.cb = sizeof(si);
si.dwFlags = STARTF_USESHOWWINDOW;
si.wShowWindow = SW_SHOW;
PROCESS_INFORMATION pi{};
// CreateProcessW may modify buffer
std::vector<wchar_t> buf(cmdline.begin(), cmdline.end());
buf.push_back(L'\0');
BOOL ok = CreateProcessW(
nullptr,
buf.data(),
nullptr, nullptr,
FALSE,
show ? 0 : CREATE_NO_WINDOW,
nullptr,
nullptr,
&si,
&pi
);
if (!ok) {
return -1;
}
WaitForSingleObject(pi.hProcess, INFINITE);
DWORD code = 0;
GetExitCodeProcess(pi.hProcess, &code);
CloseHandle(pi.hThread);
CloseHandle(pi.hProcess);
return (int)code;
}
// ---------------- Get temp directory ----------------
static std::filesystem::path get_temp_dir() {
wchar_t buf[MAX_PATH + 1];
DWORD n = GetTempPathW(MAX_PATH, buf);
if (n == 0 || n > MAX_PATH) throw std::runtime_error("GetTempPathW failed");
return std::filesystem::path(buf);
}
static std::filesystem::path make_unique_subdir(const std::filesystem::path& base) {
// Use PID + tick count for uniqueness
DWORD pid = GetCurrentProcessId();
ULONGLONG t = GetTickCount64();
std::filesystem::path dir = base / L"MyAria2Bundle";
std::filesystem::path uniq = dir / (L"run_" + std::to_wstring(pid) + L"_" + std::to_wstring(t));
std::error_code ec;
std::filesystem::create_directories(uniq, ec);
if (ec) throw std::runtime_error("create_directories failed");
return uniq;
}
int main() {
// Windows console UTF-8
SetConsoleCP(65001);
SetConsoleOutputCP(65001);
std::setlocale(LC_ALL, ".UTF-8");
// Proxies list (same as your python)
const std::vector<std::string> proxies = {
"https://hk.gh-proxy.org/",
"https://cdn.gh-proxy.org/",
"https://gh-proxy.org/",
"https://edgeone.gh-proxy.org/",
"https://gh-proxy.top/",
"https://gh-proxy.net/",
"https://gh-proxy.com/",
"https://ghfast.top/",
"https://gh.ddlc.top/",
"https://gh.llkk.cc/",
"https://ghproxy.homeboyc.cn/",
""
};
TempBundleGuard guard;
try {
// Extract aria2c.exe to %TEMP%\MyAria2Bundle\run_xxx\aria2c.exe
auto temp_base = get_temp_dir();
guard.dir = make_unique_subdir(temp_base);
guard.aria2_path = guard.dir / L"aria2c.exe";
if (!extract_resource_to_file(L"ARIA2_BIN", guard.aria2_path)) {
std::cerr << "释放 aria2c.exe 失败:资源 ARIA2_BIN 不存在或写入失败。\n";
return 1;
}
{
int rc = run_process(guard.aria2_path, {L"--version"},false);
if (rc != 0) {
std::cerr << "aria2c 自检失败exit=" << rc << ")。\n";
return 1;
}
}
std::cout << "请输入GitHub文件链接: ";
std::string input_url;
std::getline(std::cin, input_url);
input_url = trim_copy(input_url);
if (input_url.empty()) {
std::cerr << "输入为空,退出。\n";
return 1;
}
std::vector<std::wstring> args = {
L"--auto-file-renaming=false",
L"--retry-wait=2",
L"--max-tries=0",
L"--timeout=10",
L"--connect-timeout=5",
L"--lowest-speed-limit=100K",
L"-x", L"4",
L"-s", L"24",
L"-k", L"1M"
};
for (const auto& p : proxies) {
std::string full = p + input_url;
args.push_back(utf8_to_wstring(full));
}
int rc = run_process(guard.aria2_path, args,true);
if (rc != 0) {
std::cerr << "aria2c 执行失败exit=" << rc << ")。\n";
}
return rc;
} catch (const std::exception& e) {
std::cerr << "异常: " << e.what() << "\n";
return 1;
}
}

1
github_down/resource.rc Normal file
View File

@ -0,0 +1 @@
ARIA2_BIN RCDATA "aria2c.exe"

View File

@ -1,8 +1,11 @@
import PyPDF2 # PyMuPDF import PyPDF2 # PyMuPDF
from PyPDF2.generic import IndirectObject
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Optional
from itertools import repeat
def copy_pdf_pages(input_path: str, output_path: str) -> bool: def copy_pdf_pages(input_path: Path, output_path: Path) -> bool:
""" """
移除PDF文件的所有限制 移除PDF文件的所有限制
@ -17,12 +20,35 @@ def copy_pdf_pages(input_path: str, output_path: str) -> bool:
try: try:
with open(input_path, 'rb') as input_file: with open(input_path, 'rb') as input_file:
reader = PyPDF2.PdfReader(input_file) reader = PyPDF2.PdfReader(input_file)
writer = PyPDF2.PdfWriter() writer = PyPDF2.PdfWriter()
# 复制所有页面 # 复制所有页面
for page in reader.pages: for page in reader.pages:
writer.add_page(page) writer.add_page(page)
try:
que = list(zip(repeat(None),reader.outline))
last:Optional[IndirectObject] = None
for par, it in que:
if isinstance(it, list):
que.extend(zip(repeat(last),it))
continue
title = getattr(it, 'title', None)
if title is None:
try:
title = str(it)
except Exception:
print(f"警告:无法获取书签标题,跳过该书签.")
continue
page_num = reader.get_destination_page_number(it)
if page_num is None:
continue
last = writer.add_outline_item(title, page_num, parent=par)
except Exception as e:
print(f"警告:{input_path.name}书签处理失败.")
# 写入新文件(不设置任何加密或限制) # 写入新文件(不设置任何加密或限制)
with open(output_path, 'wb') as output_file: with open(output_path, 'wb') as output_file:
@ -34,50 +60,6 @@ def copy_pdf_pages(input_path: str, output_path: str) -> bool:
print(f"移除PDF限制时发生错误: {e}") print(f"移除PDF限制时发生错误: {e}")
return False return False
# def copy_pdf_pages(input_file, output_file):
# """
# 读取PDF文件并逐页复制到新的PDF文件
# Args:
# input_file (str): 输入PDF文件路径
# output_file (str): 输出PDF文件路径
# """
# try:
# # 检查输入文件是否存在
# if not os.path.exists(input_file):
# print(f"错误:输入文件 '{input_file}' 不存在")
# return False
# # 打开输入PDF文件
# pdf_document = fitz.open(input_file)
# # 创建新的PDF文档
# new_pdf = fitz.open()
# new_pdf.insert_pdf(pdf_document)
# # 保存输出文件
# new_pdf.save(output_file)
# # 关闭文档
# pdf_document.close()
# new_pdf.close()
# return True
# except FileNotFoundError:
# print(f"错误:找不到文件 '{input_file}'")
# return False
# except PermissionError:
# print(f"错误:权限不足,无法访问文件")
# return False
# except Exception as pdf_error:
# error_msg = str(pdf_error).lower()
# if "damaged" in error_msg or "corrupt" in error_msg:
# print(f"错误PDF文件 '{input_file}' 已损坏")
# else:
# print(f"发生错误:{str(pdf_error)}")
# return False
def main(): def main():
"""主函数""" """主函数"""
if len(sys.argv) < 2: if len(sys.argv) < 2:
@ -89,12 +71,12 @@ def main():
else: else:
input_path = Path(sys.argv[1]) input_path = Path(sys.argv[1])
if input_path.is_dir(): if input_path.is_dir():
files = list(input_path.glob("**/*.pdf")) files = list(input_path.rglob("*.pdf"))
else: else:
print("正在处理",input_path.name) print("正在处理",input_path.name)
output_file = input_path.with_name(f"{input_path.stem}_decrypt.pdf") output_file = input_path.with_name(f"{input_path.stem}_decrypt.pdf")
success = copy_pdf_pages(input_path, output_file) suc = copy_pdf_pages(input_path, output_file)
print("处理完成" if success else "处理失败") print("处理完成" if suc else "处理失败")
return return
total = len(files) total = len(files)
@ -102,15 +84,13 @@ def main():
for i, pdf_file in enumerate(files, start=1): for i, pdf_file in enumerate(files, start=1):
rate= round(i/total *100) rate= round(i/total *100)
print(f"进度: ", "-"* (rate//5)," "*(20-rate//5), f" {rate}%",sep="",end="\r") print(f"进度: ", "-"* (rate//5)," "*(20-rate//5), f" {rate}%",sep="",end="\r")
import time
# time.sleep(1) # 模拟处理时间
if not pdf_file.is_file(): if not pdf_file.is_file():
print(f"跳过非PDF文件{pdf_file}") print(f"跳过非PDF文件{pdf_file}")
continue continue
output_file = pdf_file.with_name(f"{pdf_file.stem}_decrypt.pdf") output_file = pdf_file.with_name(f"{pdf_file.stem}_decrypt.pdf")
success = copy_pdf_pages(pdf_file, output_file) suc = copy_pdf_pages(pdf_file, output_file)
if not success: if not suc:
print(f"{pdf_file.name} 处理失败") print(f"{pdf_file.name} 处理失败")
if __name__ == "__main__": if __name__ == "__main__":

153
pdf_unlock/ui.py Normal file
View File

@ -0,0 +1,153 @@
from __future__ import annotations
from pathlib import Path
import threading
import traceback
import tkinter as tk
from tkinter import filedialog, messagebox
import customtkinter as ctk
import sys
from tkinterdnd2 import TkinterDnD, DND_FILES # type: ignore
DND_AVAILABLE = True
from main import copy_pdf_pages # type: ignore
from main import main as dummy_main # to avoid linter error
APP_TITLE = "PDF 解锁(拖入即可)"
SUFFIX = "_decrypt"
def parse_dropped_paths(tcl_list: str, tk_root: tk.Misc) -> list[Path]:
"""将 DND 的 raw 字符串解析为 Path 列表(兼容空格/花括号)。"""
return [Path(p) for p in tk_root.tk.splitlist(tcl_list)]
def gather_pdfs(paths: list[Path]) -> list[Path]:
"""根据传入路径自动识别:单文件 / 多文件 / 目录(递归),提取所有 PDF。"""
out: list[Path] = []
for p in paths:
if p.is_dir():
out.extend([f for f in p.rglob("*.pdf") if f.is_file()])
elif p.is_file() and p.suffix.lower() == ".pdf":
out.append(p)
# 去重并按路径排序
uniq = sorted({f.resolve() for f in out})
return list(uniq)
def output_path_for(in_path: Path) -> Path:
return in_path.with_name(f"{in_path.stem}{SUFFIX}.pdf")
class App:
def __init__(self, root: tk.Tk):
self.root = root
self.root.title(APP_TITLE)
self.root.geometry("720x360")
ctk.set_appearance_mode("System")
ctk.set_default_color_theme("blue")
# ---- 单一可点击/可拖拽区域 ----
self.drop = ctk.CTkFrame(self.root, height=240, corner_radius=12)
self.drop.pack(fill="both", expand=True, padx=20, pady=20)
self.label = ctk.CTkLabel(
self.drop,
text=(
"将 PDF 文件或文件夹拖入此区域即可开始解锁\n"
"输出在原文件同目录,文件名加上 _decrypt 后缀"
+ ("(拖拽可用)")
),
font=("微软雅黑", 16),
justify="center",
)
self.label.place(relx=0.5, rely=0.5, anchor="center")
# 点击同样可选择(依然只有这一个控件)
self.drop.bind("<Button-1>", self._on_click_select)
self.label.bind("<Button-1>", self._on_click_select)
if DND_AVAILABLE:
self.drop.drop_target_register(DND_FILES) # type: ignore
self.drop.dnd_bind("<<Drop>>", self._on_drop) # type: ignore
# ---- 事件 ----
def _on_click_select(self, _evt=None):
# 仅一个简单文件选择器;若想选目录,可直接把目录拖进来
files = filedialog.askopenfilenames(
title="选择 PDF 文件(可多选)",
filetypes=[("PDF 文件", "*.pdf"), ("所有文件", "*.*")],
)
if files:
self._start_process([Path(f) for f in files])
def _on_drop(self, event):
try:
paths = parse_dropped_paths(event.data, self.root)
except Exception:
return
self._start_process(paths)
# ---- 核心处理 ----
def _start_process(self, raw_paths: list[Path]):
if copy_pdf_pages is None:
messagebox.showerror(
"错误",
"未能从 main.py 导入 copy_pdf_pages(input_path: Path, output_path: Path) -> bool",
)
return
pdfs = gather_pdfs(raw_paths)
if not pdfs:
messagebox.showwarning("提示", "未找到任何 PDF 文件。")
return
# 后台线程,避免 UI 卡死
self.label.configure(text=f"发现 {len(pdfs)} 个 PDF开始处理…")
t = threading.Thread(target=self._worker, args=(pdfs,), daemon=True)
t.start()
def _worker(self, pdfs: list[Path]):
ok, fail = 0, 0
errors: list[str] = []
for f in pdfs:
try:
out_path = output_path_for(f)
# 简化:若已存在,直接覆盖
out_path.parent.mkdir(parents=True, exist_ok=True)
success = bool(copy_pdf_pages(f, out_path)) # type: ignore
if success:
ok += 1
else:
fail += 1
except Exception as e:
fail += 1
errors.append(f"{f}: {e}{traceback.format_exc()}")
summary = f"完成:成功 {ok},失败 {fail}。输出文件位于各自原目录。"
self._set_status(summary)
if errors:
# 仅在有错误时弹出详情
messagebox.showerror("部分失败", summary + "\n" + "\n".join(errors[:3]))
else:
messagebox.showinfo("完成", summary)
def _set_status(self, text: str):
self.label.configure(text=text)
def main():
root: tk.Misc
if DND_AVAILABLE:
root = TkinterDnD.Tk() # type: ignore
else:
root = tk.Tk()
App(root)
root.mainloop()
if __name__ == "__main__":
if len(sys.argv)>=2:
dummy_main()
else:
main()