Compare commits

..

2 Commits

Author SHA1 Message Date
db56f1da62 update VideoCompress 2026-01-11 13:19:56 +08:00
9ba34f8d2e optimize CLI and config 2026-01-11 13:05:35 +08:00
3 changed files with 76 additions and 25 deletions

View File

@ -1,9 +1,9 @@
{ {
"save_to": "single", "save_to": "single",
"crf": 18, "crf": null,
"bitrate": null, "bitrate": "15M",
"codec": "h264", "codec": "h264_qsv",
"hwaccel": null, "hwaccel": "qsv",
"extra": [], "extra": [],
"ffmpeg": "ffmpeg", "ffmpeg": "ffmpeg",
"ffprobe": "ffprobe", "ffprobe": "ffprobe",

View File

@ -8,5 +8,14 @@
".mp4", ".mp4",
".mkv" ".mkv"
], ],
"resolution": null "resolution": "1920x1080",
"extra": [],
"manual": null,
"compress_dir_name": "compress",
"fps": 30,
"test_video_resolution": "1920x1080",
"test_video_fps": 30,
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
"disable_hwaccel_when_fail": true
} }

View File

@ -13,6 +13,8 @@ import atexit
import re import re
import get_frame import get_frame
import json import json
import argparse
import shutil
try: try:
from pydantic import BaseModel, Field, field_validator, model_validator from pydantic import BaseModel, Field, field_validator, model_validator
@ -169,6 +171,7 @@ else:
except Exception as e: except Exception as e:
logging.warning("Failed to create config file.", exc_info=e) logging.warning("Failed to create config file.", exc_info=e)
current_running_file:Optional[Path] = None
def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]: def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
if isinstance(video_path, Path): if isinstance(video_path, Path):
@ -250,22 +253,27 @@ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
# 配置logging # 配置logging
def setup_logging(): def setup_logging(verbose: bool = False):
log_dir = Path("logs") log_dir = Path("logs")
log_dir.mkdir(exist_ok=True) log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log" log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log"
stream = RichHandler(rich_tracebacks=True, tracebacks_show_locals=True) stream = RichHandler(level=logging.DEBUG if verbose else logging.INFO,
stream.setLevel(logging.INFO) rich_tracebacks=True, tracebacks_show_locals=True)
# stream.setLevel(logging.DEBUG if verbose else logging.INFO)
stream.setFormatter(logging.Formatter("%(message)s")) stream.setFormatter(logging.Formatter("%(message)s"))
file = logging.FileHandler(log_file, encoding="utf-8") file = logging.FileHandler(log_file, encoding="utf-8")
file.setLevel(logging.DEBUG) file.setLevel(logging.DEBUG)
# 清除现有的handlers避免多次调用basicConfig无效
logging.getLogger().handlers.clear()
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.DEBUG,
format="%(asctime)s - %(levelname) 7s - %(message)s", format="%(asctime)s - %(levelname) 7s - %(message)s",
handlers=[file, stream], handlers=[stream, file],
) )
logging.debug("Logging is set up.")
def fmt_time(t: float | int) -> str: def fmt_time(t: float | int) -> str:
@ -282,6 +290,7 @@ def process_video(
compress_dir: Optional[Path] = None, compress_dir: Optional[Path] = None,
update_func: Optional[Callable[[Optional[int], Optional[str]], None]] = None, update_func: Optional[Callable[[Optional[int], Optional[str]], None]] = None,
): ):
global current_running_file
if compress_dir is None: if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在) # 在视频文件所在目录下创建 compress 子目录(如果不存在)
@ -301,6 +310,7 @@ def process_video(
video_path_str = str(video_path.absolute()) video_path_str = str(video_path.absolute())
command = get_cmd(video_path_str, output_file) command = get_cmd(video_path_str, output_file)
current_running_file = output_file
try: try:
result = subprocess.Popen( result = subprocess.Popen(
@ -334,6 +344,8 @@ def process_video(
rate = match.group(0) if match else None rate = match.group(0) if match else None
update_func(frame_number, rate) update_func(frame_number, rate)
current_running_file = None
if result.returncode != 0: if result.returncode != 0:
logging.error( logging.error(
f"处理文件 {video_path_str} 失败" f"处理文件 {video_path_str} 失败"
@ -385,7 +397,15 @@ def process_video(
logging.error("重试仍然失败。") logging.error("重试仍然失败。")
return False return False
else: else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}") if video_path.stat().st_size <= output_file.stat().st_size:
logging.info(
f"压缩后文件比原文件大,直接复制原文件: {video_path_str}"
)
output_file.unlink(missing_ok=True)
shutil.copy2(video_path, output_file)
return True
else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
except KeyboardInterrupt as e: except KeyboardInterrupt as e:
raise e raise e
@ -394,6 +414,9 @@ def process_video(
f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}", f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",
exc_info=e, exc_info=e,
) )
if current_running_file is not None:
current_running_file.unlink(missing_ok=True)
current_running_file = None
return False return False
return True return True
@ -571,27 +594,46 @@ def exit_pause():
elif os.name == "posix": elif os.name == "posix":
os.system("read -p 'Press Enter to continue...'") os.system("read -p 'Press Enter to continue...'")
def finalize():
global current_running_file
if current_running_file is not None:
try:
current_running_file.unlink(missing_ok=True)
except Exception as e:
try:
logging.error(
"Failed to delete incomplete output file after keyboard interrupt. CHECK IF LAST PROCSSING VIDEO IS COMPLETED",
exc_info=e,
)
except Exception:
print("Failed to delete incomplete output file after keyboard interrupt. CHECK IF LAST PROCSSING VIDEO IS COMPLETED")
current_running_file = None
def main(_root=None): def main(_root=None):
atexit.register(exit_pause) atexit.register(exit_pause)
atexit.register(finalize)
global root, current_running_file
if _root is not None:
setup_logging()
root = Path(_root)
else:
parser = argparse.ArgumentParser()
parser.add_argument("directory", nargs="?", help="目标目录路径")
parser.add_argument("--verbose", "-v", action="store_true", help="启用详细日志记录")
args = parser.parse_args()
if not args.directory:
print("Error termination via invalid input.")
sys.exit(1)
root = Path(args.directory)
setup_logging(args.verbose)
global root
setup_logging()
tot_bgn = time() tot_bgn = time()
logging.info("-------------------------------") logging.info("-------------------------------")
logging.info(datetime.now().strftime("Video Compress started at %Y/%m/%d %H:%M")) logging.info(datetime.now().strftime("Video Compress started at %Y/%m/%d %H:%M"))
if _root is not None:
root = Path(_root)
else:
# 通过命令行参数传入需要遍历的目录
if len(sys.argv) < 2:
print(f"用法python {__file__} <目标目录>")
logging.warning("Error termination via invalid input.")
sys.exit(1)
root = Path(sys.argv[1])
if root.name.lower() == CFG.compress_dir_name.lower(): if root.name.lower() == CFG.compress_dir_name.lower():
logging.critical("请修改目标目录名为非compress。") logging.critical("请修改目标目录名为非compress。")
logging.error("Error termination via invalid input.") logging.error("Error termination via invalid input.")
@ -601,7 +643,7 @@ def main(_root=None):
test() test()
if not root.is_dir(): if not root.is_dir():
print("提供的路径不是一个有效目录。") logging.critical("提供的路径不是一个有效目录。")
logging.warning("Error termination via invalid input.") logging.warning("Error termination via invalid input.")
sys.exit(1) sys.exit(1)
@ -612,11 +654,11 @@ def main(_root=None):
logging.info("Normal termination of Video Compress.") logging.info("Normal termination of Video Compress.")
except KeyboardInterrupt: except KeyboardInterrupt:
logging.warning( logging.warning(
"Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED." "Error termination via keyboard interrupt."
) )
except Exception as e: except Exception as e:
logging.error( logging.error(
"Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.", "Error termination via unhandled error",
exc_info=e, exc_info=e,
) )