@ -13,6 +13,8 @@ import atexit
import re
import re
import get_frame
import get_frame
import json
import json
import argparse
import shutil
try :
try :
from pydantic import BaseModel , Field , field_validator , model_validator
from pydantic import BaseModel , Field , field_validator , model_validator
@ -169,6 +171,7 @@ else:
except Exception as e :
except Exception as e :
logging . warning ( " Failed to create config file. " , exc_info = e )
logging . warning ( " Failed to create config file. " , exc_info = e )
current_running_file : Optional [ Path ] = None
def get_cmd ( video_path : str | Path , output_file : str | Path ) - > list [ str ] :
def get_cmd ( video_path : str | Path , output_file : str | Path ) - > list [ str ] :
if isinstance ( video_path , Path ) :
if isinstance ( video_path , Path ) :
@ -250,22 +253,27 @@ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
# 配置logging
# 配置logging
def setup_logging ( ) :
def setup_logging ( verbose : bool = False ) :
log_dir = Path ( " logs " )
log_dir = Path ( " logs " )
log_dir . mkdir ( exist_ok = True )
log_dir . mkdir ( exist_ok = True )
log_file = log_dir / f " video_compress_ { datetime . now ( ) . strftime ( ' % Y % m %d ' ) } .log "
log_file = log_dir / f " video_compress_ { datetime . now ( ) . strftime ( ' % Y % m %d ' ) } .log "
stream = RichHandler ( rich_tracebacks = True , tracebacks_show_locals = True )
stream = RichHandler ( level = logging . DEBUG if verbose else logging . INFO ,
stream . setLevel ( logging . INFO )
rich_tracebacks = True , tracebacks_show_locals = True )
# stream.setLevel(logging.DEBUG if verbose else logging.INFO)
stream . setFormatter ( logging . Formatter ( " %(message)s " ) )
stream . setFormatter ( logging . Formatter ( " %(message)s " ) )
file = logging . FileHandler ( log_file , encoding = " utf-8 " )
file = logging . FileHandler ( log_file , encoding = " utf-8 " )
file . setLevel ( logging . DEBUG )
file . setLevel ( logging . DEBUG )
# 清除现有的handlers, 避免多次调用basicConfig无效
logging . getLogger ( ) . handlers . clear ( )
logging . basicConfig (
logging . basicConfig (
level = logging . DEBUG ,
level = logging . DEBUG ,
format = " %(asctime)s - %(levelname) 7s - %(message)s " ,
format = " %(asctime)s - %(levelname) 7s - %(message)s " ,
handlers = [ file , stream ] ,
handlers = [ stream , file ] ,
)
)
logging . debug ( " Logging is set up. " )
def fmt_time ( t : float | int ) - > str :
def fmt_time ( t : float | int ) - > str :
@ -282,6 +290,7 @@ def process_video(
compress_dir : Optional [ Path ] = None ,
compress_dir : Optional [ Path ] = None ,
update_func : Optional [ Callable [ [ Optional [ int ] , Optional [ str ] ] , None ] ] = None ,
update_func : Optional [ Callable [ [ Optional [ int ] , Optional [ str ] ] , None ] ] = None ,
) :
) :
global current_running_file
if compress_dir is None :
if compress_dir is None :
# 在视频文件所在目录下创建 compress 子目录(如果不存在)
# 在视频文件所在目录下创建 compress 子目录(如果不存在)
@ -301,6 +310,7 @@ def process_video(
video_path_str = str ( video_path . absolute ( ) )
video_path_str = str ( video_path . absolute ( ) )
command = get_cmd ( video_path_str , output_file )
command = get_cmd ( video_path_str , output_file )
current_running_file = output_file
try :
try :
result = subprocess . Popen (
result = subprocess . Popen (
@ -333,6 +343,8 @@ def process_video(
match = re . search ( r " [ \ d \ .]+x " , line )
match = re . search ( r " [ \ d \ .]+x " , line )
rate = match . group ( 0 ) if match else None
rate = match . group ( 0 ) if match else None
update_func ( frame_number , rate )
update_func ( frame_number , rate )
current_running_file = None
if result . returncode != 0 :
if result . returncode != 0 :
logging . error (
logging . error (
@ -385,7 +397,15 @@ def process_video(
logging . error ( " 重试仍然失败。 " )
logging . error ( " 重试仍然失败。 " )
return False
return False
else :
else :
logging . debug ( f " 文件处理成功: { video_path_str } -> { output_file } " )
if video_path . stat ( ) . st_size < = output_file . stat ( ) . st_size :
logging . info (
f " 压缩后文件比原文件大,直接复制原文件: { video_path_str } "
)
output_file . unlink ( missing_ok = True )
shutil . copy2 ( video_path , output_file )
return True
else :
logging . debug ( f " 文件处理成功: { video_path_str } -> { output_file } " )
except KeyboardInterrupt as e :
except KeyboardInterrupt as e :
raise e
raise e
@ -394,6 +414,9 @@ def process_video(
f " 执行 ffmpeg 命令时发生异常, 文件: { str ( video_path_str ) } , cmd={ ' ' . join ( map ( str , command ) ) } " ,
f " 执行 ffmpeg 命令时发生异常, 文件: { str ( video_path_str ) } , cmd={ ' ' . join ( map ( str , command ) ) } " ,
exc_info = e ,
exc_info = e ,
)
)
if current_running_file is not None :
current_running_file . unlink ( missing_ok = True )
current_running_file = None
return False
return False
return True
return True
@ -571,27 +594,46 @@ def exit_pause():
elif os . name == " posix " :
elif os . name == " posix " :
os . system ( " read -p ' Press Enter to continue... ' " )
os . system ( " read -p ' Press Enter to continue... ' " )
def finalize ( ) :
global current_running_file
if current_running_file is not None :
try :
current_running_file . unlink ( missing_ok = True )
except Exception as e :
try :
logging . error (
" Failed to delete incomplete output file after keyboard interrupt. CHECK IF LAST PROCSSING VIDEO IS COMPLETED " ,
exc_info = e ,
)
except Exception :
print ( " Failed to delete incomplete output file after keyboard interrupt. CHECK IF LAST PROCSSING VIDEO IS COMPLETED " )
current_running_file = None
def main ( _root = None ) :
def main ( _root = None ) :
atexit . register ( exit_pause )
atexit . register ( exit_pause )
atexit . register ( finalize )
global root
global root , current_running_file
setup_logging ( )
if _root is not None :
setup_logging ( )
root = Path ( _root )
else :
parser = argparse . ArgumentParser ( )
parser . add_argument ( " directory " , nargs = " ? " , help = " 目标目录路径 " )
parser . add_argument ( " --verbose " , " -v " , action = " store_true " , help = " 启用详细日志记录 " )
args = parser . parse_args ( )
if not args . directory :
print ( " Error termination via invalid input. " )
sys . exit ( 1 )
root = Path ( args . directory )
setup_logging ( args . verbose )
tot_bgn = time ( )
tot_bgn = time ( )
logging . info ( " ------------------------------- " )
logging . info ( " ------------------------------- " )
logging . info ( datetime . now ( ) . strftime ( " Video Compress started at % Y/ % m/ %d % H: % M " ) )
logging . info ( datetime . now ( ) . strftime ( " Video Compress started at % Y/ % m/ %d % H: % M " ) )
if _root is not None :
root = Path ( _root )
else :
# 通过命令行参数传入需要遍历的目录
if len ( sys . argv ) < 2 :
print ( f " 用法: python { __file__ } <目标目录> " )
logging . warning ( " Error termination via invalid input. " )
sys . exit ( 1 )
root = Path ( sys . argv [ 1 ] )
if root . name . lower ( ) == CFG . compress_dir_name . lower ( ) :
if root . name . lower ( ) == CFG . compress_dir_name . lower ( ) :
logging . critical ( " 请修改目标目录名为非compress。 " )
logging . critical ( " 请修改目标目录名为非compress。 " )
logging . error ( " Error termination via invalid input. " )
logging . error ( " Error termination via invalid input. " )
@ -601,7 +643,7 @@ def main(_root=None):
test ( )
test ( )
if not root . is_dir ( ) :
if not root . is_dir ( ) :
print ( " 提供的路径不是一个有效目录。 " )
logging . critical ( " 提供的路径不是一个有效目录。 " )
logging . warning ( " Error termination via invalid input. " )
logging . warning ( " Error termination via invalid input. " )
sys . exit ( 1 )
sys . exit ( 1 )
@ -612,11 +654,11 @@ def main(_root=None):
logging . info ( " Normal termination of Video Compress. " )
logging . info ( " Normal termination of Video Compress. " )
except KeyboardInterrupt :
except KeyboardInterrupt :
logging . warning (
logging . warning (
" Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED . "
" Error termination via keyboard interrupt. "
)
)
except Exception as e :
except Exception as e :
logging . error (
logging . error (
" Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED. " ,
" Error termination via unhandled error " ,
exc_info = e ,
exc_info = e ,
)
)