update VideoCompress
This commit is contained in:
@ -8,100 +8,166 @@ from time import time
|
||||
from rich.logging import RichHandler
|
||||
from rich.progress import Progress
|
||||
from pickle import dumps, loads
|
||||
from typing import Optional, Callable,Literal
|
||||
from typing import Optional, Callable, Literal, List, Any, TYPE_CHECKING
|
||||
import atexit
|
||||
import re
|
||||
import get_frame
|
||||
from pydantic import BaseModel,Field,field_validator,model_validator
|
||||
import json
|
||||
|
||||
class Config(BaseModel):
|
||||
save_to:Literal["single","multi"] = Field("single",description="保存到单文件夹,或者每个子文件夹创建compress_dir")
|
||||
crf: Optional[int] = Field(None, ge=0, le=51, description="CRF值,范围0-51")
|
||||
bitrate: Optional[str] = Field(None, description="比特率,格式如: 1000k, 2.5M, 1500B")
|
||||
codec: str = Field("h264",description="ffmpeg的codec,如果使用GPU需要对应设置")
|
||||
hwaccel:Optional[Literal["amf","qsv","cuda"]] = Field(None,description="使用GPU加速")
|
||||
extra:Optional[list[str]] = Field(None,description="插入到ffmpeg输出前的自定义参数")
|
||||
ffmpeg:str = "ffmpeg"
|
||||
ffprobe:str = "ffprobe"
|
||||
manual:Optional[list[str]] = Field(None,description=r"手动设置ffmpeg,命令ffmpeg -i {input} {manual} {output}")
|
||||
video_ext:list[str] = Field([".mp4", ".mkv"],description="视频文件后缀,含.")
|
||||
compress_dir_name:str = Field("compress",description="压缩文件夹名称")
|
||||
resolution: Optional[str] = Field(None,description="统一到特定尺寸,None为不使用缩放")
|
||||
fps:int = Field(30,description="fps",ge=0)
|
||||
test_video_resolution:str = "1920x1080"
|
||||
test_video_fps:int = Field(30,ge=0)
|
||||
test_video_input:str = "compress_video_test.mp4"
|
||||
test_video_output:str = "compressed_video_test.mp4"
|
||||
disable_hwaccel_when_fail:bool = Field(True,description="当运行失败时,禁用硬件加速")
|
||||
|
||||
|
||||
try:
|
||||
from pydantic import BaseModel, Field, field_validator, model_validator
|
||||
HAS_PYDANTIC = True
|
||||
|
||||
@field_validator('bitrate')
|
||||
@classmethod
|
||||
def validate_bitrate(cls, v: Optional[str]) -> Optional[str]:
|
||||
if v is None:
|
||||
class Config(BaseModel):
|
||||
save_to: Literal["single", "multi"] = Field("single", description="保存到单文件夹,或者每个子文件夹创建compress_dir")
|
||||
crf: Optional[int] = Field(18, ge=0, le=51, description="CRF值,范围0-51")
|
||||
bitrate: Optional[str] = Field(None, description="比特率,格式如: 1000k, 2.5M, 1500B")
|
||||
codec: str = Field("h264", description="ffmpeg的codec,如果使用GPU需要对应设置")
|
||||
hwaccel: Optional[Literal["amf", "qsv", "cuda"]] = Field(None, description="使用GPU加速")
|
||||
extra: List[str] = Field([], description="插入到ffmpeg输出前的自定义参数")
|
||||
ffmpeg: str = "ffmpeg"
|
||||
ffprobe: str = "ffprobe"
|
||||
manual: Optional[List[str]] = Field(None, description=r"手动设置ffmpeg,命令ffmpeg -i {input} {manual} {output}")
|
||||
video_ext: List[str] = Field([".mp4", ".mkv"], description="视频文件后缀,含.")
|
||||
compress_dir_name: str = Field("compress", description="压缩文件夹名称")
|
||||
resolution: Optional[str] = Field(None, description="统一到特定尺寸,None为不使用缩放")
|
||||
fps: int = Field(30, description="fps", ge=0)
|
||||
test_video_resolution: str = "1920x1080"
|
||||
test_video_fps: int = Field(30, ge=0)
|
||||
test_video_input: str = "compress_video_test.mp4"
|
||||
test_video_output: str = "compressed_video_test.mp4"
|
||||
disable_hwaccel_when_fail: bool = Field(True, description="当运行失败时,禁用硬件加速")
|
||||
|
||||
@field_validator('bitrate')
|
||||
@classmethod
|
||||
def validate_bitrate(cls, v: Optional[str]) -> Optional[str]:
|
||||
if v is None:
|
||||
return v
|
||||
pattern = r'^[\d\.]+[MkB]*$'
|
||||
if not re.match(pattern, v):
|
||||
raise ValueError('bitrate格式不正确,应为数字+单位(M/k/B),如: 1000k, 2.5M')
|
||||
return v
|
||||
pattern = r'^[\d\.]+[MkB]*$'
|
||||
if not re.match(pattern, v):
|
||||
raise ValueError('bitrate格式不正确,应为数字+单位(M/k/B),如: 1000k, 2.5M')
|
||||
return v
|
||||
|
||||
@field_validator('resolution')
|
||||
@classmethod
|
||||
def validate_resolution(cls, v: Optional[str]) -> Optional[str]:
|
||||
if v is None:
|
||||
|
||||
@field_validator('resolution')
|
||||
@classmethod
|
||||
def validate_resolution(cls, v: Optional[str]) -> Optional[str]:
|
||||
if v is None:
|
||||
return v
|
||||
pattern = r'^((-1)|\d+):((-1)|\d+)$'
|
||||
if not re.match(pattern, v):
|
||||
raise ValueError('resolution格式不正确,应为{数字/-1}:{数字/-1}')
|
||||
return v
|
||||
pattern = r'^((-1)|\d+):((-1)|\d+)$'
|
||||
if not re.match(pattern, v):
|
||||
raise ValueError('resolution格式不正确,应为{数字/-1}:{数字/-1}')
|
||||
return v
|
||||
|
||||
@field_validator("compress_dir_name")
|
||||
# @field_validator("test_video_input")
|
||||
# @field_validator("test_video_output")
|
||||
@classmethod
|
||||
def valid_path(cls, v:str) -> str:
|
||||
if re.search(r'[\\/:*?"<>|\x00-\x1F]',v):
|
||||
raise ValueError("某配置不符合目录名语法")
|
||||
return v
|
||||
|
||||
|
||||
@model_validator(mode='after')
|
||||
def validate_mutual_exclusive(self):
|
||||
crf_none = self.crf is None
|
||||
bitrate_none = self.bitrate is None
|
||||
|
||||
@field_validator("compress_dir_name")
|
||||
@classmethod
|
||||
def valid_path(cls, v: str) -> str:
|
||||
if re.search(r'[\\/:*?"<>|\x00-\x1F]', v):
|
||||
raise ValueError("某配置不符合目录名语法")
|
||||
return v
|
||||
|
||||
@model_validator(mode='after')
|
||||
def validate_mutual_exclusive(self):
|
||||
crf_none = self.crf is None
|
||||
bitrate_none = self.bitrate is None
|
||||
|
||||
# 有且只有一者为None
|
||||
if crf_none == bitrate_none:
|
||||
raise ValueError('crf和bitrate必须互斥:有且只有一个为None')
|
||||
|
||||
return self
|
||||
|
||||
# 有且只有一者为None
|
||||
if crf_none == bitrate_none:
|
||||
raise ValueError('crf和bitrate必须互斥:有且只有一个为None')
|
||||
|
||||
return self
|
||||
|
||||
def dump(self):
|
||||
return self.model_dump()
|
||||
|
||||
except ImportError:
|
||||
HAS_PYDANTIC = False
|
||||
from dataclasses import dataclass, asdict
|
||||
import copy
|
||||
@dataclass
|
||||
class Config:
|
||||
save_to: str = "single"
|
||||
crf: Optional[int] = 18
|
||||
bitrate: Optional[str] = None
|
||||
codec: str = "h264"
|
||||
hwaccel: Optional[str] = None
|
||||
extra: List[str] = []
|
||||
ffmpeg: str = "ffmpeg"
|
||||
ffprobe: str = "ffprobe"
|
||||
manual: Optional[List[str]] = None
|
||||
video_ext: List[str] = [".mp4", ".mkv"]
|
||||
compress_dir_name: str = "compress"
|
||||
resolution: Optional[str] = None
|
||||
fps: int = 30
|
||||
test_video_resolution: str = "1920x1080"
|
||||
test_video_fps: int = 30
|
||||
test_video_input: str = "compress_video_test.mp4"
|
||||
test_video_output: str = "compressed_video_test.mp4"
|
||||
disable_hwaccel_when_fail: bool = True
|
||||
|
||||
def update(self, other):
|
||||
if isinstance(other, dict):
|
||||
d = other
|
||||
elif isinstance(other, Config):
|
||||
d = asdict(other)
|
||||
else:
|
||||
return
|
||||
|
||||
for k, v in d.items():
|
||||
if hasattr(self, k):
|
||||
setattr(self, k, v)
|
||||
|
||||
def copy(self):
|
||||
return copy.deepcopy(self)
|
||||
|
||||
def dump(self):
|
||||
return asdict(self)
|
||||
|
||||
|
||||
root = None
|
||||
if os.environ.get("INSTALL", "0") == "1":
|
||||
CFG_FILE= Path(os.getenv("APPDATA", "C:/")) / "VideoCompress" / "config.json"
|
||||
CFG_FILE = Path(os.getenv("APPDATA", "C:/")) / "VideoCompress" / "config.json"
|
||||
else:
|
||||
CFG_FILE= Path(sys.path[0]) / "config.json"
|
||||
CFG = {
|
||||
"save_to": "single",
|
||||
"crf": "18",
|
||||
"bitrate": None,
|
||||
"codec": "h264",
|
||||
"hwaccel": None,
|
||||
"extra": [],
|
||||
"ffmpeg": "ffmpeg",
|
||||
"manual": None,
|
||||
"video_ext": [".mp4", ".mkv"],
|
||||
"compress_dir_name": "compress",
|
||||
"resolution": None,
|
||||
"fps": "30",
|
||||
"test_video_resolution": "1920x1080",
|
||||
"test_video_fps": "30",
|
||||
"test_video_input": "compress_video_test.mp4",
|
||||
"test_video_output": "compressed_video_test.mp4",
|
||||
"disable_hwaccel_when_fail": True,
|
||||
}
|
||||
CFG_FILE = Path(sys.path[0]) / "config.json"
|
||||
|
||||
if CFG_FILE.exists():
|
||||
try:
|
||||
import json
|
||||
|
||||
if HAS_PYDANTIC:
|
||||
assert BaseModel # type: ignore
|
||||
assert issubclass(Config, BaseModel)
|
||||
CFG = Config.model_validate_json(CFG_FILE.read_text())
|
||||
else:
|
||||
assert Config
|
||||
cfg:dict[str, Any] = json.loads(CFG_FILE.read_text())
|
||||
CFG = Config(**cfg)
|
||||
|
||||
get_frame.ffprobe = CFG.ffprobe
|
||||
logging.debug(CFG)
|
||||
except KeyboardInterrupt as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
logging.warning("Invalid config file, ignored.")
|
||||
logging.debug(e)
|
||||
else:
|
||||
try:
|
||||
if HAS_PYDANTIC:
|
||||
if TYPE_CHECKING:
|
||||
assert BaseModel # type: ignore
|
||||
assert issubclass(Config, BaseModel)
|
||||
CFG = Config() # type: ignore
|
||||
CFG_FILE.write_text(CFG.model_dump_json(indent=4))
|
||||
else:
|
||||
import json
|
||||
if TYPE_CHECKING:
|
||||
assert Config
|
||||
assert asdict # type: ignore
|
||||
CFG = Config() # type: ignore
|
||||
CFG_FILE.write_text(json.dumps(asdict(CFG), indent=4))
|
||||
|
||||
logging.info("Config file created.")
|
||||
except Exception as e:
|
||||
logging.warning("Failed to create config file.", exc_info=e)
|
||||
|
||||
|
||||
def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
|
||||
@ -110,23 +176,23 @@ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
|
||||
if isinstance(output_file, Path):
|
||||
output_file = str(output_file.resolve())
|
||||
|
||||
if CFG["manual"] is not None:
|
||||
command = [CFG["ffmpeg"], "-hide_banner", "-i", video_path]
|
||||
command.extend(CFG["manual"])
|
||||
if CFG.manual is not None:
|
||||
command = [CFG.ffmpeg, "-hide_banner", "-i", video_path]
|
||||
command.extend(CFG.manual)
|
||||
command.append(output_file)
|
||||
return command
|
||||
|
||||
command = [
|
||||
CFG["ffmpeg"],
|
||||
CFG.ffmpeg,
|
||||
"-hide_banner",
|
||||
]
|
||||
if CFG["hwaccel"] is not None:
|
||||
if CFG.hwaccel is not None:
|
||||
command.extend(
|
||||
[
|
||||
"-hwaccel",
|
||||
CFG["hwaccel"],
|
||||
CFG.hwaccel,
|
||||
"-hwaccel_output_format",
|
||||
CFG["hwaccel"],
|
||||
CFG.hwaccel,
|
||||
|
||||
]
|
||||
)
|
||||
@ -137,47 +203,47 @@ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
|
||||
]
|
||||
)
|
||||
|
||||
if CFG["bitrate"] is not None:
|
||||
if CFG.bitrate is not None:
|
||||
|
||||
if CFG["resolution"] is not None:
|
||||
if CFG.resolution is not None:
|
||||
command.extend(
|
||||
[
|
||||
"-vf",
|
||||
f"scale={CFG['resolution']}",
|
||||
f"scale={CFG.resolution}",
|
||||
]
|
||||
)
|
||||
command.extend(
|
||||
[
|
||||
"-c:v",
|
||||
CFG["codec"],
|
||||
CFG.codec,
|
||||
"-b:v",
|
||||
CFG["bitrate"],
|
||||
CFG.bitrate,
|
||||
"-r",
|
||||
str(CFG["fps"]),
|
||||
str(CFG.fps),
|
||||
"-y",
|
||||
]
|
||||
)
|
||||
else:
|
||||
if CFG["resolution"] is not None:
|
||||
if CFG.resolution is not None:
|
||||
command.extend(
|
||||
[
|
||||
"-vf",
|
||||
f"scale={CFG['resolution']}",
|
||||
f"scale={CFG.resolution}",
|
||||
]
|
||||
)
|
||||
command.extend(
|
||||
[
|
||||
"-c:v",
|
||||
CFG["codec"],
|
||||
CFG.codec,
|
||||
"-global_quality",
|
||||
str(CFG["crf"]),
|
||||
str(CFG.crf),
|
||||
"-r",
|
||||
str(CFG["fps"]),
|
||||
str(CFG.fps),
|
||||
"-y",
|
||||
]
|
||||
)
|
||||
|
||||
command.extend(CFG["extra"])
|
||||
command.extend(CFG.extra)
|
||||
command.append(output_file)
|
||||
logging.debug(f"Create CMD: {command}")
|
||||
return command
|
||||
@ -219,7 +285,7 @@ def process_video(
|
||||
|
||||
if compress_dir is None:
|
||||
# 在视频文件所在目录下创建 compress 子目录(如果不存在)
|
||||
compress_dir = video_path.parent / CFG["compress_dir_name"]
|
||||
compress_dir = video_path.parent / CFG.compress_dir_name
|
||||
else:
|
||||
assert root
|
||||
compress_dir /= video_path.parent.relative_to(root)
|
||||
@ -277,7 +343,7 @@ def process_video(
|
||||
assert result.stdout is not None
|
||||
logging.debug(result.stdout.read())
|
||||
logging.debug(total)
|
||||
if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in [
|
||||
if CFG.hwaccel == "mediacodec" and CFG.codec in [
|
||||
"h264_mediacodec",
|
||||
"hevc_mediacodec",
|
||||
]:
|
||||
@ -285,32 +351,36 @@ def process_video(
|
||||
"mediacodec硬件加速器已知在较短片段上存在异常,将禁用加速重试。"
|
||||
)
|
||||
output_file.unlink(missing_ok=True)
|
||||
bak = CFG.copy()
|
||||
CFG["hwaccel"] = None
|
||||
CFG["codec"] = "h264" if CFG["codec"] == "h264_mediacodec" else "hevc"
|
||||
bak = CFG.codec, CFG.hwaccel
|
||||
CFG.hwaccel = None
|
||||
CFG.codec = "h264" if CFG.codec == "h264_mediacodec" else "hevc"
|
||||
assert not output_file.exists()
|
||||
ret = process_video(video_path, compress_dir, update_func)
|
||||
CFG.update(bak)
|
||||
CFG.codec, CFG.hwaccel = bak
|
||||
if not ret:
|
||||
logging.error("重试仍然失败。")
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
elif CFG["disable_hwaccel_when_fail"] and CFG["hwaccel"] is not None:
|
||||
elif CFG.disable_hwaccel_when_fail and CFG.hwaccel is not None:
|
||||
logging.info("正在禁用硬件加速器重试,进度条可能发生混乱。")
|
||||
output_file.unlink(missing_ok=True)
|
||||
bak = CFG.copy()
|
||||
CFG["hwaccel"] = None
|
||||
if TYPE_CHECKING:
|
||||
assert BaseModel # type: ignore
|
||||
assert isinstance(CFG, BaseModel)
|
||||
|
||||
bak = CFG.codec, CFG.hwaccel
|
||||
CFG.hwaccel = None
|
||||
if (
|
||||
CFG["codec"].endswith("_mediacodec")
|
||||
or CFG["codec"].endswith("_qsv")
|
||||
or CFG["codec"].endswith("_nvenc")
|
||||
or CFG["codec"].endswith("_amf")
|
||||
CFG.codec.endswith("_mediacodec")
|
||||
or CFG.codec.endswith("_qsv")
|
||||
or CFG.codec.endswith("_nvenc")
|
||||
or CFG.codec.endswith("_amf")
|
||||
):
|
||||
CFG["codec"] = CFG["codec"].split("_")[0]
|
||||
CFG.codec = CFG.codec.split("_")[0]
|
||||
assert not output_file.exists()
|
||||
ret = process_video(video_path, compress_dir, update_func)
|
||||
CFG.update(bak)
|
||||
CFG.codec, CFG.hwaccel = bak
|
||||
if not ret:
|
||||
logging.error("重试仍然失败。")
|
||||
return False
|
||||
@ -329,7 +399,7 @@ def process_video(
|
||||
|
||||
|
||||
def traverse_directory(root_dir: Path):
|
||||
video_extensions = set(CFG["video_ext"])
|
||||
video_extensions = set(CFG.video_ext)
|
||||
sm = None
|
||||
# 获取视频文件列表和帧数信息
|
||||
video_files:list[Path] = []
|
||||
@ -338,8 +408,8 @@ def traverse_directory(root_dir: Path):
|
||||
d = que.pop()
|
||||
for file in d.glob("*") if d.is_dir() else [d]:
|
||||
if (
|
||||
file.parent.name == CFG["compress_dir_name"]
|
||||
or file.name == CFG["compress_dir_name"]
|
||||
file.parent.name == CFG.compress_dir_name
|
||||
or file.name == CFG.compress_dir_name
|
||||
):
|
||||
continue
|
||||
if file.is_file() and file.suffix.lower() in video_extensions:
|
||||
@ -426,9 +496,9 @@ def traverse_directory(root_dir: Path):
|
||||
)
|
||||
prog.update(main_task, completed=completed_start + x)
|
||||
|
||||
if CFG["save_to"] == "single":
|
||||
if CFG.save_to == "single":
|
||||
process_video(
|
||||
file, root_dir / CFG["compress_dir_name"], update_progress
|
||||
file, root_dir / CFG.compress_dir_name, update_progress
|
||||
)
|
||||
else:
|
||||
process_video(file, None, update_progress)
|
||||
@ -450,7 +520,7 @@ def test():
|
||||
|
||||
try:
|
||||
subprocess.run(
|
||||
[CFG["ffmpeg"], "-version"], stdout=-3, stderr=-3
|
||||
[CFG.ffmpeg, "-version"], stdout=-3, stderr=-3
|
||||
).check_returncode()
|
||||
except Exception as e:
|
||||
print(__file__)
|
||||
@ -458,7 +528,7 @@ def test():
|
||||
exit(-1)
|
||||
try:
|
||||
ret = subprocess.run(
|
||||
f"{CFG['ffmpeg']} -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(),
|
||||
f"{CFG.ffmpeg} -hide_banner -f lavfi -i testsrc=duration=1:size={CFG.test_video_resolution}:rate={CFG.test_video_fps} -c:v libx264 -y -pix_fmt yuv420p {CFG.test_video_input}".split(),
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
@ -469,8 +539,8 @@ def test():
|
||||
logging.debug(ret.stderr)
|
||||
ret.check_returncode()
|
||||
cmd = get_cmd(
|
||||
CFG["test_video_input"],
|
||||
CFG["test_video_output"],
|
||||
CFG.test_video_input,
|
||||
CFG.test_video_output,
|
||||
)
|
||||
ret = subprocess.run(
|
||||
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
|
||||
@ -512,30 +582,6 @@ def main(_root=None):
|
||||
logging.info("-------------------------------")
|
||||
logging.info(datetime.now().strftime("Video Compress started at %Y/%m/%d %H:%M"))
|
||||
|
||||
if CFG_FILE.exists():
|
||||
try:
|
||||
import json
|
||||
|
||||
cfg: dict = json.loads(CFG_FILE.read_text())
|
||||
cfg_model = Config(**cfg)
|
||||
CFG.update(cfg_model.model_dump())
|
||||
get_frame.ffprobe = CFG["ffprobe"]
|
||||
logging.debug(cfg_model)
|
||||
logging.debug(CFG)
|
||||
except KeyboardInterrupt as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
logging.warning("Invalid config file, ignored.")
|
||||
logging.debug(e)
|
||||
else:
|
||||
try:
|
||||
import json
|
||||
|
||||
CFG_FILE.write_text(json.dumps(CFG, indent=4))
|
||||
logging.info("Config file created.")
|
||||
except Exception as e:
|
||||
logging.warning("Failed to create config file.", exc_info=e)
|
||||
|
||||
if _root is not None:
|
||||
root = Path(_root)
|
||||
else:
|
||||
@ -546,7 +592,7 @@ def main(_root=None):
|
||||
sys.exit(1)
|
||||
root = Path(sys.argv[1])
|
||||
|
||||
if root.name.lower() == CFG["compress_dir_name"].lower():
|
||||
if root.name.lower() == CFG.compress_dir_name.lower():
|
||||
logging.critical("请修改目标目录名为非compress。")
|
||||
logging.error("Error termination via invalid input.")
|
||||
sys.exit(1)
|
||||
@ -576,5 +622,5 @@ def main(_root=None):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.argv.append(r'C:\Users\flt\Documents\WeChat Files\wxid_m8h0igh8p52p22\FileStorage\Video')
|
||||
# sys.argv.append(r'C:\Users\flt\Documents\WeChat Files\wxid_m8h0igh8p52p22\FileStorage\Video')
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user