Compare commits

...

3 Commits

Author SHA1 Message Date
afc5c76fd4 up readme 2025-09-21 23:41:48 +08:00
5bc7284263 pdf reader 2025-09-21 23:40:50 +08:00
f12791864d clean train in videocomrepe 2025-09-12 15:09:11 +08:00
6 changed files with 218 additions and 212 deletions

7
VideoCompress/.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
logs
test
config.json
*.xml
tmp
build
dist

View File

@ -22,10 +22,9 @@ class VideoConfig:
crf: int = 18
codec: str = "h264"
ffmpeg: str = "ffmpeg"
video_ext: List[str] = None
extra: List[str] = None
video_ext: List[str] = [".mp4", ".mkv"]
extra: List[str] = []
manual: Optional[List[str]] = None
train: bool = False
bitrate: Optional[str] = None
def __post_init__(self):
@ -550,9 +549,6 @@ class ConfigUI(QMainWindow):
group.addLayout(custom_layout)
# 实验性功能
self.train_checkbox = QCheckBox("启用训练模式 (实验性)")
self.train_checkbox.setToolTip("实验性功能,可能不稳定")
group.addWidget(self.train_checkbox)
return group
@ -775,7 +771,6 @@ class ConfigUI(QMainWindow):
if self.config.manual:
self.custom_edit.setText(" ".join(self.config.manual))
self.train_checkbox.setChecked(self.config.train)
def _save_config(self):
"""保存配置"""
@ -841,7 +836,6 @@ class ConfigUI(QMainWindow):
if custom_text:
config.manual = custom_text.split()
config.train = self.train_checkbox.isChecked()
# 保存文件
config_path = self._get_config_path()

View File

@ -7,18 +7,14 @@ from datetime import datetime
from time import time
from rich.logging import RichHandler
from rich.progress import Progress
from pickle import dumps, loads
from typing import Optional
import atexit
import re
import threading
import queue
import psutil
from concurrent.futures import ThreadPoolExecutor, as_completed
root = None
TRAIN = False
ESTI_FILE = Path(sys.path[0])/"esti.out"
CFG_FILE = Path(sys.path[0])/"config.json"
CFG = {
"save_to": "single",
@ -29,11 +25,9 @@ CFG = {
"ffmpeg": "ffmpeg",
"manual": None,
"video_ext": [".mp4", ".mkv"],
"train": False,
"compress_dir_name": "compress",
"resolution": "-1:1080",
"fps": "30",
"esti_data_file": "estiminate_data.dat",
"test_video_resolution": "1920x1080",
"test_video_fps": "30",
"test_video_input": "compress_video_test.mp4",
@ -42,7 +36,6 @@ CFG = {
"cpu_monitor_interval": 3, # CPU监控间隔
"cpu_monitor_duration": 30, # 统计持续时间5分钟
}
esti=None # :tuple[list[int],list[float]]
# CPU监控相关全局变量
ffmpeg_processes = {} # 存储活动的ffmpeg进程
@ -98,18 +91,6 @@ def get_cmd(video_path,output_file):
command.append(output_file)
return command
def train_init():
global esti_data,TRAIN,data_file
data_file = Path(CFG["esti_data_file"])
if data_file.exists():
esti_data=loads(data_file.read_bytes())
if not isinstance(esti_data,tuple):
esti_data=([],[])
else:
esti_data=([],[])
TRAIN=True
atexit.register(save_esti)
# print(esti_data)
# 配置logging
@ -133,82 +114,6 @@ def setup_logging():
]
)
def polyfit_manual(x, y, degree=2):
"""手动实现二次多项式最小二乘拟合"""
n = len(x)
if n != len(y):
raise ValueError("输入的x和y长度必须相同")
# 对于二次多项式 y = ax^2 + bx + c
# 构建矩阵方程 A * [a, b, c]^T = B
# 其中 A = [[sum(x^4), sum(x^3), sum(x^2)],
# [sum(x^3), sum(x^2), sum(x)],
# [sum(x^2), sum(x), n]]
# B = [sum(x^2 * y), sum(x * y), sum(y)]
# 计算需要的和
sum_x = sum(x)
sum_x2 = sum(xi**2 for xi in x)
sum_x3 = sum(xi**3 for xi in x)
sum_x4 = sum(xi**4 for xi in x)
sum_y = sum(y)
sum_xy = sum(xi*yi for xi, yi in zip(x, y))
sum_x2y = sum(xi**2*yi for xi, yi in zip(x, y))
# 构建矩阵A和向量B
A = [
[sum_x4, sum_x3, sum_x2],
[sum_x3, sum_x2, sum_x],
[sum_x2, sum_x, n]
]
B = [sum_x2y, sum_xy, sum_y]
# 使用高斯消元法解线性方程组
# 将增广矩阵 [A|B] 转换为行阶梯形式
AB = [row + [b] for row, b in zip(A, B)]
n_rows = len(AB)
# 高斯消元
for i in range(n_rows):
# 寻找当前列中最大元素所在的行
max_row = i
for j in range(i + 1, n_rows):
if abs(AB[j][i]) > abs(AB[max_row][i]):
max_row = j
# 交换行
AB[i], AB[max_row] = AB[max_row], AB[i]
# 将当前行主元归一化
pivot = AB[i][i]
if pivot == 0:
raise ValueError("矩阵奇异,无法求解")
for j in range(i, n_rows + 1):
AB[i][j] /= pivot
# 消元
for j in range(n_rows):
if j != i:
factor = AB[j][i]
for k in range(i, n_rows + 1):
AB[j][k] -= factor * AB[i][k]
# 提取结果
coeffs = [AB[i][n_rows] for i in range(n_rows)]
return coeffs # [a, b, c] 对应 ax^2 + bx + c
def save_esti():
try:
if len(esti_data[0]) > 0:
coeffs = polyfit_manual(esti_data[0], esti_data[1])
# 保存为逗号分隔的文本格式
ESTI_FILE.write_text(','.join(map(str, coeffs)))
except Exception as e:
logging.warning("保存估算数据失败")
logging.debug("error at save_esti",exc_info=e)
def fmt_time(t:float|int) -> str:
if t>3600:
return f"{t//3600}h {t//60}min {t%60}s"
@ -253,11 +158,10 @@ def cpu_monitor():
if len(cpu_stats["ffmpeg"]) > max_samples:
cpu_stats["ffmpeg"] = cpu_stats["ffmpeg"][-max_samples:]
logging.debug(f"CPU监控: 系统={system_cpu:.1f}%, FFmpeg总计={ffmpeg_cpu_total:.1f}%, 活动进程={len(active_processes)}")
except KeyboardInterrupt as e:
raise e
except Exception as e:
logging.debug(f"CPU监控异常: {e}")
logging.error(f"CPU监控异常: {e}")
# 等待下一次监控
threading.Event().wait(CFG["cpu_monitor_interval"])
@ -310,37 +214,8 @@ def unregister_ffmpeg_process(proc_id):
if proc_id in ffmpeg_processes:
del ffmpeg_processes[proc_id]
def func(sz:int,src=False):
if TRAIN:
try:
data_file.write_bytes(dumps(esti_data))
except KeyboardInterrupt as e:raise e
except Exception as e:
logging.warning("无法保存数据",exc_info=e)
try:
if TRAIN:
if len(esti_data[0])==0:
return -1 if src else "NaN"
coeffs = polyfit_manual(esti_data[0], esti_data[1])
t = coeffs[0]*sz**2 + coeffs[1]*sz + coeffs[2]
elif esti is not None:
t = esti[0]*sz**2 + esti[1]*sz + esti[2]
# print(t,sz)
else:
logging.warning(f"Unexpected condition at func->TRAIN")
return -1 if src else "NaN"
t = round(t)
if src:
return t
return fmt_time(t)
except KeyboardInterrupt as e:raise e
except Exception as e:
logging.warning("无法计算预计时间")
logging.debug("esti time exception", exc_info=e)
return -1 if src else "NaN"
def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_func=None, proc_id=None):
global esti_data, current_instances
global current_instances
use=None
sz=video_path.stat().st_size//(1024*1024)
@ -351,6 +226,7 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun
else:
compress_dir /= video_path.parent.relative_to(root)
assert isinstance(compress_dir,Path)
compress_dir.mkdir(exist_ok=True,parents=True)
# 输出文件路径:与原文件同名,保存在 compress 目录下
@ -402,10 +278,6 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_fun
else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
end=time()
if TRAIN:
esti_data[0].append(sz)
esti_data[1].append(end-bgn)
except KeyboardInterrupt as e:raise e
except Exception as e:
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",exc_info=e)
@ -425,68 +297,52 @@ def traverse_directory(root_dir: Path):
global current_instances
video_extensions = set(CFG["video_ext"])
sm=None
if esti is not None:
raise DeprecationWarning("不再支持训练模式")
logging.info(f"正在估算时间(当存在大量小文件时,估算值将会很离谱)")
sm = 0
for file in root_dir.rglob("*"):
if file.parent.name.lower() == CFG["compress_dir_name"].lower():continue
# 获取视频文件列表和帧数信息
video_files = []
que = list(root_dir.glob("*"))
while que:
d = que.pop()
for file in d.glob("*"):
if file.parent.name == CFG["compress_dir_name"] or file.name == CFG["compress_dir_name"]:
continue
if file.is_file() and file.suffix.lower() in video_extensions:
sz=file.stat().st_size//(1024*1024)
tmp = func(sz,True)
if not isinstance(tmp,int):
logging.error("无法预估时间,因为预估函数返回非整数")
elif tmp == -1:
logging.error("无法预估时间,因为预估函数返回了异常")
sm += tmp
logging.info(f"预估用时:{fmt_time(sm)}")
else:
# 获取视频文件列表和帧数信息
video_files = []
que = list(root_dir.glob("*"))
while que:
d = que.pop()
for file in d.glob("*"):
if file.parent.name == CFG["compress_dir_name"] or file.name == CFG["compress_dir_name"]:
continue
if file.is_file() and file.suffix.lower() in video_extensions:
video_files.append(file)
elif file.is_dir():
que.append(file)
video_files.append(file)
elif file.is_dir():
que.append(file)
# exit()
if not video_files:
logging.warning("未找到需要处理的视频文件")
return
if not video_files:
logging.warning("未找到需要处理的视频文件")
return
# 获取视频信息
with Progress() as prog:
task = prog.add_task("正在获取视频信息", total=len(video_files))
frames: dict[Path, float] = {}
for file in video_files:
prog.advance(task)
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split()
cmd.append(str(file))
proc = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if proc.returncode != 0:
logging.debug(f"无法获取视频信息: {file}, 返回码: {proc.returncode}")
# 获取视频信息
with Progress() as prog:
task = prog.add_task("正在获取视频信息", total=len(video_files))
frames: dict[Path, float] = {}
for file in video_files:
prog.advance(task)
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split()
cmd.append(str(file))
proc = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if proc.returncode != 0:
logging.debug(f"无法获取视频信息: {file}, 返回码: {proc.returncode}")
frames[file] = 0
continue
if proc.stdout.strip():
try:
avg_frame_rate, duration = proc.stdout.strip().split('\n')
tmp = avg_frame_rate.split('/')
avg_frame_rate = float(tmp[0]) / float(tmp[1])
if duration == "N/A":
duration = 0
logging.debug(f"无法获取视频信息: {file}, 时长为N/A默认使用0s")
duration = float(duration)
frames[file] = duration * avg_frame_rate
except (ValueError, IndexError) as e:
logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
frames[file] = 0
continue
if proc.stdout.strip():
try:
avg_frame_rate, duration = proc.stdout.strip().split('\n')
tmp = avg_frame_rate.split('/')
avg_frame_rate = float(tmp[0]) / float(tmp[1])
if duration == "N/A":
duration = 0
logging.debug(f"无法获取视频信息: {file}, 时长为N/A默认使用0s")
duration = float(duration)
frames[file] = duration * avg_frame_rate
except (ValueError, IndexError) as e:
logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
frames[file] = 0
logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件")
@ -662,19 +518,6 @@ def test():
logging.warning("测试未通过,继续运行可能出现未定义行为。")
logging.debug("Test error",exc_info=e)
def init_train():
global esti
if CFG["train"]:
train_init()
else:
if ESTI_FILE.exists():
try:
# 从文件读取系数
coeffs_str = ESTI_FILE.read_text().strip().split(',')
esti = [float(coeff) for coeff in coeffs_str]
except KeyboardInterrupt as e:raise e
except Exception as e:
logging.warning(f"预测输出文件{str(ESTI_FILE)}存在但无法读取", exc_info=e)
def exit_pause():
if os.name == 'nt':
@ -686,7 +529,7 @@ def main(_root = None):
atexit.register(exit_pause)
global root, esti
global root
setup_logging()
tot_bgn = time()
logging.info("-------------------------------")
@ -720,8 +563,6 @@ def main(_root = None):
logging.info("开始验证环境")
test()
init_train()
if not root.is_dir():
print("提供的路径不是一个有效目录。")
logging.warning("Error termination via invalid input.")

40
pdf_index/README.md Normal file
View File

@ -0,0 +1,40 @@
# Streamlit PDF 目录查看器
一个最小可运行的 Streamlit 应用:
- 读取 `doc.pdf`(放在当前目录),或通过页面上传 PDF。
- 使用 PyMuPDF 提取 PDF 目录Table of Contents
- 在下拉框选择目录项后,显示该目录项对应的页面范围(到下一个目录项前一页)。
- 使用 `st.pdf` 组件内嵌查看选定页面范围的临时 PDF。
## 快速开始Windows / cmd
1) 建议创建虚拟环境(可选)
```cmd
python -m venv .venv
.venv\Scripts\activate
```
2) 安装依赖
```cmd
pip install -r requirements.txt
```
3) 将你的 PDF 放到本目录并命名为 `doc.pdf`(或在页面中上传)。
4) 运行应用
```cmd
streamlit run app.py
```
## 用法说明
- 左侧边栏可上传 PDF若本地存在 `doc.pdf`,也会自动被加载。
- 目录下拉框显示形如 `title (page)`
- 若 PDF 无目录,本应用会提示;可选择“全部页面”查看。
## 已知限制
- 目录页码通常为 PDF 内部页码(从 1 开始),个别 PDF 的 TOC 可能与实际页面偏移不一致。
- 页面范围切片依赖 TOC 顺序,若 TOC 不规范可能导致范围不准。
## 说明
使用streamlit_pdf_viewer而不是官方的streamlit_pdf是因为在手机上streamlit_pdf无法显示。

121
pdf_index/app.py Normal file
View File

@ -0,0 +1,121 @@
from dataclasses import dataclass
from typing import Any, List, Optional, Tuple
from pathlib import Path
import fitz # PyMuPDF
import streamlit as st
from streamlit_pdf_viewer import pdf_viewer
@dataclass
class TocItem:
level: int
title: str
page_from: int # 1-based
page_to: Optional[int] # 1-based inclusive; None means until end
def read_toc(doc: fitz.Document) -> List[Tuple[int, str, int]]:
# Returns list of (level, title, page) where page is 1-based per PyMuPDF
toc: List[Tuple[int, str, int]] = []
try:
get_toc: Any = getattr(doc, "get_toc", None)
if callable(get_toc):
toc = get_toc(simple=True) # type: ignore[no-any-return]
except Exception:
toc = []
return [(lvl, title, max(1, pg)) for (lvl, title, pg) in toc]
def normalize_ranges(toc: List[Tuple[int, str, int]], page_count: int) -> List[TocItem]:
if not toc:
return []
items: List[TocItem] = []
for i, (lvl, title, page) in enumerate(toc):
start = min(max(1, page), page_count)
if i + 1 < len(toc):
next_page = toc[i + 1][2]
end = max(1, min(page_count, next_page - 1))
if end < start:
end = start
else:
end = page_count
items.append(TocItem(level=lvl, title=title, page_from=start, page_to=end))
return items
def _hash_doc(doc:fitz.Document):
"This is a fake hash, ENSURE GLOBAL DOCUMENT SAME"
return "12"
@st.cache_resource(hash_funcs={fitz.Document:_hash_doc})
def slice_pdf_pages(src_doc: fitz.Document, page_from: int, page_to: int) -> bytes:
# Create a new PDF with selected 1-based inclusive page range
new_pdf = fitz.open()
try:
start_i = max(1, page_from) - 1
end_i = max(start_i, page_to - 1)
for p in range(start_i, min(end_i, src_doc.page_count - 1) + 1):
new_pdf.insert_pdf(src_doc, from_page=p, to_page=p)
out = new_pdf.tobytes()
return out
finally:
new_pdf.close()
src_doc.close()
def format_label(item: TocItem) -> str:
return f"{item.title} ({item.page_from:03d} - {item.page_to:03d})"
@st.cache_resource
def read_pdf():
pdf_path = Path("doc.pdf")
if not pdf_path.exists():
st.error("找不到doc.pdf")
st.stop()
doc = fitz.open(pdf_path, filetype="pdf")
def _close_doc():
"Never close doc due to cache in global streamlit app."
pass
doc.close = _close_doc
page_count = doc.page_count
# 读取目录
raw_toc = read_toc(doc)
items = normalize_ranges(raw_toc, page_count)
return doc,page_count,items
def main():
st.set_page_config(page_title="PDF 目录查看器", layout="wide")
st.title("PDF 目录查看器")
doc,page_count,items = read_pdf()
st.subheader("目录")
labels = ["请选择"] + [format_label(it) for it in items]
selection = st.selectbox("选择章节", labels, index=0)
if selection == "请选择":
st.stop()
idx = labels.index(selection)
chosen = items[idx]
selected_range = (chosen.page_from, chosen.page_to or page_count)
rng_from, rng_to = selected_range
st.subheader("预览")
try:
sliced_bytes = slice_pdf_pages(doc, rng_from, rng_to)
st.download_button("下载",sliced_bytes,file_name=f"{chosen.title}.pdf")
pdf_viewer(sliced_bytes,rendering=st.session_state.get("render","unwrap"))
# st.pdf(io.BytesIO(sliced_bytes), height=height, key="pdf_preview")
except Exception as e:
st.error(f"渲染失败:{e}")
st.selectbox("使用其他渲染方式",["unwrap","legacy_embed","legacy_iframe"],key="render")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,3 @@
streamlit>=1.37.0
pymupdf>=1.24.0
streamlit_pdf_viewer