Compare commits

...

4 Commits

Author SHA1 Message Date
db56f1da62 update VideoCompress 2026-01-11 13:19:56 +08:00
9ba34f8d2e optimize CLI and config 2026-01-11 13:05:35 +08:00
cae41d9bb0 update VideoCompress 2026-01-11 12:40:07 +08:00
6f304a634c add README in calc utils 2026-01-11 11:43:35 +08:00
8 changed files with 648 additions and 5948 deletions

View File

@ -1,22 +1,22 @@
{
"save_to": "single",
"bitrate": null,
"crf": 26,
"crf": null,
"bitrate": "15M",
"codec": "h264_qsv",
"hwaccel": "qsv",
"extra": [],
"ffmpeg": "C:/tools/ffmpeg/bin/ffmpeg.exe",
"ffprobe": "C:/tools/ffmpeg/bin/ffprobe",
"ffmpeg": "ffmpeg",
"ffprobe": "ffprobe",
"manual": null,
"video_ext": [
".mp4",
".mkv"
],
"compress_dir_name": "compress_qsv",
"compress_dir_name": "compress",
"resolution": null,
"fps": "30",
"fps": 30,
"test_video_resolution": "1920x1080",
"test_video_fps": "30",
"test_video_fps": 30,
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
"disable_hwaccel_when_fail": true

View File

@ -8,5 +8,14 @@
".mp4",
".mkv"
],
"resolution": null
"resolution": "1920x1080",
"extra": [],
"manual": null,
"compress_dir_name": "compress",
"fps": 30,
"test_video_resolution": "1920x1080",
"test_video_fps": 30,
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
"disable_hwaccel_when_fail": true
}

View File

@ -122,11 +122,15 @@ def get_video_frame_count(
}
for key in fallback_order:
try:
try:
func = methods.get(key)
if not func:
continue
n = func(path, stream_index)
except Exception:
logging.debug(f"Errored to get frame with {key}.",exc_info=True)
continue
if isinstance(n, int) and n >= 0:
return n
else:

View File

@ -8,33 +8,37 @@ from time import time
from rich.logging import RichHandler
from rich.progress import Progress
from pickle import dumps, loads
from typing import Optional, Callable,Literal
from typing import Optional, Callable, Literal, List, Any, TYPE_CHECKING
import atexit
import re
import get_frame
from pydantic import BaseModel,Field,field_validator,model_validator
import json
import argparse
import shutil
class Config(BaseModel):
save_to:Literal["single","multi"] = Field("single",description="保存到单文件夹或者每个子文件夹创建compress_dir")
crf: Optional[int] = Field(None, ge=0, le=51, description="CRF值范围0-51")
try:
from pydantic import BaseModel, Field, field_validator, model_validator
HAS_PYDANTIC = True
class Config(BaseModel):
save_to: Literal["single", "multi"] = Field("single", description="保存到单文件夹或者每个子文件夹创建compress_dir")
crf: Optional[int] = Field(18, ge=0, le=51, description="CRF值范围0-51")
bitrate: Optional[str] = Field(None, description="比特率,格式如: 1000k, 2.5M, 1500B")
codec: str = Field("h264",description="ffmpeg的codec如果使用GPU需要对应设置")
hwaccel:Optional[Literal["amf","qsv","cuda"]] = Field(None,description="使用GPU加速")
extra:Optional[list[str]] = Field(None,description="插入到ffmpeg输出前的自定义参数")
ffmpeg:str = "ffmpeg"
ffprobe:str = "ffprobe"
manual:Optional[list[str]] = Field(None,description=r"手动设置ffmpeg命令ffmpeg -i {input} {manual} {output}")
video_ext:list[str] = Field([".mp4", ".mkv"],description="视频文件后缀,含.")
compress_dir_name:str = Field("compress",description="压缩文件夹名称")
resolution: Optional[str] = Field(None,description="统一到特定尺寸None为不使用缩放")
fps:int = Field(30,description="fps",ge=0)
test_video_resolution:str = "1920x1080"
test_video_fps:int = Field(30,ge=0)
test_video_input:str = "compress_video_test.mp4"
test_video_output:str = "compressed_video_test.mp4"
disable_hwaccel_when_fail:bool = Field(True,description="当运行失败时,禁用硬件加速")
codec: str = Field("h264", description="ffmpeg的codec如果使用GPU需要对应设置")
hwaccel: Optional[Literal["amf", "qsv", "cuda"]] = Field(None, description="使用GPU加速")
extra: List[str] = Field([], description="插入到ffmpeg输出前的自定义参数")
ffmpeg: str = "ffmpeg"
ffprobe: str = "ffprobe"
manual: Optional[List[str]] = Field(None, description=r"手动设置ffmpeg命令ffmpeg -i {input} {manual} {output}")
video_ext: List[str] = Field([".mp4", ".mkv"], description="视频文件后缀,含.")
compress_dir_name: str = Field("compress", description="压缩文件夹名称")
resolution: Optional[str] = Field(None, description="统一到特定尺寸None为不使用缩放")
fps: int = Field(30, description="fps", ge=0)
test_video_resolution: str = "1920x1080"
test_video_fps: int = Field(30, ge=0)
test_video_input: str = "compress_video_test.mp4"
test_video_output: str = "compressed_video_test.mp4"
disable_hwaccel_when_fail: bool = Field(True, description="当运行失败时,禁用硬件加速")
@field_validator('bitrate')
@classmethod
@ -57,15 +61,12 @@ class Config(BaseModel):
return v
@field_validator("compress_dir_name")
# @field_validator("test_video_input")
# @field_validator("test_video_output")
@classmethod
def valid_path(cls, v:str) -> str:
if re.search(r'[\\/:*?"<>|\x00-\x1F]',v):
def valid_path(cls, v: str) -> str:
if re.search(r'[\\/:*?"<>|\x00-\x1F]', v):
raise ValueError("某配置不符合目录名语法")
return v
@model_validator(mode='after')
def validate_mutual_exclusive(self):
crf_none = self.crf is None
@ -77,32 +78,100 @@ class Config(BaseModel):
return self
def dump(self):
return self.model_dump()
except ImportError:
HAS_PYDANTIC = False
from dataclasses import dataclass, asdict
import copy
@dataclass
class Config:
save_to: str = "single"
crf: Optional[int] = 18
bitrate: Optional[str] = None
codec: str = "h264"
hwaccel: Optional[str] = None
extra: List[str] = []
ffmpeg: str = "ffmpeg"
ffprobe: str = "ffprobe"
manual: Optional[List[str]] = None
video_ext: List[str] = [".mp4", ".mkv"]
compress_dir_name: str = "compress"
resolution: Optional[str] = None
fps: int = 30
test_video_resolution: str = "1920x1080"
test_video_fps: int = 30
test_video_input: str = "compress_video_test.mp4"
test_video_output: str = "compressed_video_test.mp4"
disable_hwaccel_when_fail: bool = True
def update(self, other):
if isinstance(other, dict):
d = other
elif isinstance(other, Config):
d = asdict(other)
else:
return
for k, v in d.items():
if hasattr(self, k):
setattr(self, k, v)
def copy(self):
return copy.deepcopy(self)
def dump(self):
return asdict(self)
root = None
if os.environ.get("INSTALL", "0") == "1":
CFG_FILE= Path(os.getenv("APPDATA", "C:/")) / "VideoCompress" / "config.json"
CFG_FILE = Path(os.getenv("APPDATA", "C:/")) / "VideoCompress" / "config.json"
else:
CFG_FILE= Path(sys.path[0]) / "config.json"
CFG = {
"save_to": "single",
"crf": "18",
"bitrate": None,
"codec": "h264",
"hwaccel": None,
"extra": [],
"ffmpeg": "ffmpeg",
"manual": None,
"video_ext": [".mp4", ".mkv"],
"compress_dir_name": "compress",
"resolution": None,
"fps": "30",
"test_video_resolution": "1920x1080",
"test_video_fps": "30",
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
"disable_hwaccel_when_fail": True,
}
CFG_FILE = Path(sys.path[0]) / "config.json"
if CFG_FILE.exists():
try:
import json
if HAS_PYDANTIC:
assert BaseModel # type: ignore
assert issubclass(Config, BaseModel)
CFG = Config.model_validate_json(CFG_FILE.read_text())
else:
assert Config
cfg:dict[str, Any] = json.loads(CFG_FILE.read_text())
CFG = Config(**cfg)
get_frame.ffprobe = CFG.ffprobe
logging.debug(CFG)
except KeyboardInterrupt as e:
raise e
except Exception as e:
logging.warning("Invalid config file, ignored.")
logging.debug(e)
else:
try:
if HAS_PYDANTIC:
if TYPE_CHECKING:
assert BaseModel # type: ignore
assert issubclass(Config, BaseModel)
CFG = Config() # type: ignore
CFG_FILE.write_text(CFG.model_dump_json(indent=4))
else:
import json
if TYPE_CHECKING:
assert Config
assert asdict # type: ignore
CFG = Config() # type: ignore
CFG_FILE.write_text(json.dumps(asdict(CFG), indent=4))
logging.info("Config file created.")
except Exception as e:
logging.warning("Failed to create config file.", exc_info=e)
current_running_file:Optional[Path] = None
def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
if isinstance(video_path, Path):
@ -110,23 +179,23 @@ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
if isinstance(output_file, Path):
output_file = str(output_file.resolve())
if CFG["manual"] is not None:
command = [CFG["ffmpeg"], "-hide_banner", "-i", video_path]
command.extend(CFG["manual"])
if CFG.manual is not None:
command = [CFG.ffmpeg, "-hide_banner", "-i", video_path]
command.extend(CFG.manual)
command.append(output_file)
return command
command = [
CFG["ffmpeg"],
CFG.ffmpeg,
"-hide_banner",
]
if CFG["hwaccel"] is not None:
if CFG.hwaccel is not None:
command.extend(
[
"-hwaccel",
CFG["hwaccel"],
CFG.hwaccel,
"-hwaccel_output_format",
CFG["hwaccel"],
CFG.hwaccel,
]
)
@ -137,69 +206,74 @@ def get_cmd(video_path: str | Path, output_file: str | Path) -> list[str]:
]
)
if CFG["bitrate"] is not None:
if CFG.bitrate is not None:
if CFG["resolution"] is not None:
if CFG.resolution is not None:
command.extend(
[
"-vf",
f"scale={CFG['resolution']}",
f"scale={CFG.resolution}",
]
)
command.extend(
[
"-c:v",
CFG["codec"],
CFG.codec,
"-b:v",
CFG["bitrate"],
CFG.bitrate,
"-r",
str(CFG["fps"]),
str(CFG.fps),
"-y",
]
)
else:
if CFG["resolution"] is not None:
if CFG.resolution is not None:
command.extend(
[
"-vf",
f"scale={CFG['resolution']}",
f"scale={CFG.resolution}",
]
)
command.extend(
[
"-c:v",
CFG["codec"],
CFG.codec,
"-global_quality",
str(CFG["crf"]),
str(CFG.crf),
"-r",
str(CFG["fps"]),
str(CFG.fps),
"-y",
]
)
command.extend(CFG["extra"])
command.extend(CFG.extra)
command.append(output_file)
logging.debug(f"Create CMD: {command}")
return command
# 配置logging
def setup_logging():
def setup_logging(verbose: bool = False):
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log"
stream = RichHandler(rich_tracebacks=True, tracebacks_show_locals=True)
stream.setLevel(logging.INFO)
stream = RichHandler(level=logging.DEBUG if verbose else logging.INFO,
rich_tracebacks=True, tracebacks_show_locals=True)
# stream.setLevel(logging.DEBUG if verbose else logging.INFO)
stream.setFormatter(logging.Formatter("%(message)s"))
file = logging.FileHandler(log_file, encoding="utf-8")
file.setLevel(logging.DEBUG)
# 清除现有的handlers避免多次调用basicConfig无效
logging.getLogger().handlers.clear()
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname) 7s - %(message)s",
handlers=[file, stream],
handlers=[stream, file],
)
logging.debug("Logging is set up.")
def fmt_time(t: float | int) -> str:
@ -216,10 +290,11 @@ def process_video(
compress_dir: Optional[Path] = None,
update_func: Optional[Callable[[Optional[int], Optional[str]], None]] = None,
):
global current_running_file
if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在)
compress_dir = video_path.parent / CFG["compress_dir_name"]
compress_dir = video_path.parent / CFG.compress_dir_name
else:
assert root
compress_dir /= video_path.parent.relative_to(root)
@ -235,6 +310,7 @@ def process_video(
video_path_str = str(video_path.absolute())
command = get_cmd(video_path_str, output_file)
current_running_file = output_file
try:
result = subprocess.Popen(
@ -268,6 +344,8 @@ def process_video(
rate = match.group(0) if match else None
update_func(frame_number, rate)
current_running_file = None
if result.returncode != 0:
logging.error(
f"处理文件 {video_path_str} 失败"
@ -277,7 +355,7 @@ def process_video(
assert result.stdout is not None
logging.debug(result.stdout.read())
logging.debug(total)
if CFG["hwaccel"] == "mediacodec" and CFG["codec"] in [
if CFG.hwaccel == "mediacodec" and CFG.codec in [
"h264_mediacodec",
"hevc_mediacodec",
]:
@ -285,35 +363,47 @@ def process_video(
"mediacodec硬件加速器已知在较短片段上存在异常将禁用加速重试。"
)
output_file.unlink(missing_ok=True)
bak = CFG.copy()
CFG["hwaccel"] = None
CFG["codec"] = "h264" if CFG["codec"] == "h264_mediacodec" else "hevc"
bak = CFG.codec, CFG.hwaccel
CFG.hwaccel = None
CFG.codec = "h264" if CFG.codec == "h264_mediacodec" else "hevc"
assert not output_file.exists()
ret = process_video(video_path, compress_dir, update_func)
CFG.update(bak)
CFG.codec, CFG.hwaccel = bak
if not ret:
logging.error("重试仍然失败。")
return False
else:
return True
elif CFG["disable_hwaccel_when_fail"] and CFG["hwaccel"] is not None:
elif CFG.disable_hwaccel_when_fail and CFG.hwaccel is not None:
logging.info("正在禁用硬件加速器重试,进度条可能发生混乱。")
output_file.unlink(missing_ok=True)
bak = CFG.copy()
CFG["hwaccel"] = None
if TYPE_CHECKING:
assert BaseModel # type: ignore
assert isinstance(CFG, BaseModel)
bak = CFG.codec, CFG.hwaccel
CFG.hwaccel = None
if (
CFG["codec"].endswith("_mediacodec")
or CFG["codec"].endswith("_qsv")
or CFG["codec"].endswith("_nvenc")
or CFG["codec"].endswith("_amf")
CFG.codec.endswith("_mediacodec")
or CFG.codec.endswith("_qsv")
or CFG.codec.endswith("_nvenc")
or CFG.codec.endswith("_amf")
):
CFG["codec"] = CFG["codec"].split("_")[0]
CFG.codec = CFG.codec.split("_")[0]
assert not output_file.exists()
ret = process_video(video_path, compress_dir, update_func)
CFG.update(bak)
CFG.codec, CFG.hwaccel = bak
if not ret:
logging.error("重试仍然失败。")
return False
else:
if video_path.stat().st_size <= output_file.stat().st_size:
logging.info(
f"压缩后文件比原文件大,直接复制原文件: {video_path_str}"
)
output_file.unlink(missing_ok=True)
shutil.copy2(video_path, output_file)
return True
else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
@ -324,12 +414,15 @@ def process_video(
f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",
exc_info=e,
)
if current_running_file is not None:
current_running_file.unlink(missing_ok=True)
current_running_file = None
return False
return True
def traverse_directory(root_dir: Path):
video_extensions = set(CFG["video_ext"])
video_extensions = set(CFG.video_ext)
sm = None
# 获取视频文件列表和帧数信息
video_files:list[Path] = []
@ -338,8 +431,8 @@ def traverse_directory(root_dir: Path):
d = que.pop()
for file in d.glob("*") if d.is_dir() else [d]:
if (
file.parent.name == CFG["compress_dir_name"]
or file.name == CFG["compress_dir_name"]
file.parent.name == CFG.compress_dir_name
or file.name == CFG.compress_dir_name
):
continue
if file.is_file() and file.suffix.lower() in video_extensions:
@ -426,9 +519,9 @@ def traverse_directory(root_dir: Path):
)
prog.update(main_task, completed=completed_start + x)
if CFG["save_to"] == "single":
if CFG.save_to == "single":
process_video(
file, root_dir / CFG["compress_dir_name"], update_progress
file, root_dir / CFG.compress_dir_name, update_progress
)
else:
process_video(file, None, update_progress)
@ -450,7 +543,7 @@ def test():
try:
subprocess.run(
[CFG["ffmpeg"], "-version"], stdout=-3, stderr=-3
[CFG.ffmpeg, "-version"], stdout=-3, stderr=-3
).check_returncode()
except Exception as e:
print(__file__)
@ -458,7 +551,7 @@ def test():
exit(-1)
try:
ret = subprocess.run(
f"{CFG['ffmpeg']} -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(),
f"{CFG.ffmpeg} -hide_banner -f lavfi -i testsrc=duration=1:size={CFG.test_video_resolution}:rate={CFG.test_video_fps} -c:v libx264 -y -pix_fmt yuv420p {CFG.test_video_input}".split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
@ -469,8 +562,8 @@ def test():
logging.debug(ret.stderr)
ret.check_returncode()
cmd = get_cmd(
CFG["test_video_input"],
CFG["test_video_output"],
CFG.test_video_input,
CFG.test_video_output,
)
ret = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
@ -501,52 +594,47 @@ def exit_pause():
elif os.name == "posix":
os.system("read -p 'Press Enter to continue...'")
def finalize():
global current_running_file
if current_running_file is not None:
try:
current_running_file.unlink(missing_ok=True)
except Exception as e:
try:
logging.error(
"Failed to delete incomplete output file after keyboard interrupt. CHECK IF LAST PROCSSING VIDEO IS COMPLETED",
exc_info=e,
)
except Exception:
print("Failed to delete incomplete output file after keyboard interrupt. CHECK IF LAST PROCSSING VIDEO IS COMPLETED")
current_running_file = None
def main(_root=None):
atexit.register(exit_pause)
atexit.register(finalize)
global root
global root, current_running_file
if _root is not None:
setup_logging()
root = Path(_root)
else:
parser = argparse.ArgumentParser()
parser.add_argument("directory", nargs="?", help="目标目录路径")
parser.add_argument("--verbose", "-v", action="store_true", help="启用详细日志记录")
args = parser.parse_args()
if not args.directory:
print("Error termination via invalid input.")
sys.exit(1)
root = Path(args.directory)
setup_logging(args.verbose)
tot_bgn = time()
logging.info("-------------------------------")
logging.info(datetime.now().strftime("Video Compress started at %Y/%m/%d %H:%M"))
if CFG_FILE.exists():
try:
import json
cfg: dict = json.loads(CFG_FILE.read_text())
cfg_model = Config(**cfg)
CFG.update(cfg_model.model_dump())
get_frame.ffprobe = CFG["ffprobe"]
logging.debug(cfg_model)
logging.debug(CFG)
except KeyboardInterrupt as e:
raise e
except Exception as e:
logging.warning("Invalid config file, ignored.")
logging.debug(e)
else:
try:
import json
CFG_FILE.write_text(json.dumps(CFG, indent=4))
logging.info("Config file created.")
except Exception as e:
logging.warning("Failed to create config file.", exc_info=e)
if _root is not None:
root = Path(_root)
else:
# 通过命令行参数传入需要遍历的目录
if len(sys.argv) < 2:
print(f"用法python {__file__} <目标目录>")
logging.warning("Error termination via invalid input.")
sys.exit(1)
root = Path(sys.argv[1])
if root.name.lower() == CFG["compress_dir_name"].lower():
if root.name.lower() == CFG.compress_dir_name.lower():
logging.critical("请修改目标目录名为非compress。")
logging.error("Error termination via invalid input.")
sys.exit(1)
@ -555,7 +643,7 @@ def main(_root=None):
test()
if not root.is_dir():
print("提供的路径不是一个有效目录。")
logging.critical("提供的路径不是一个有效目录。")
logging.warning("Error termination via invalid input.")
sys.exit(1)
@ -566,15 +654,15 @@ def main(_root=None):
logging.info("Normal termination of Video Compress.")
except KeyboardInterrupt:
logging.warning(
"Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED."
"Error termination via keyboard interrupt."
)
except Exception as e:
logging.error(
"Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",
"Error termination via unhandled error",
exc_info=e,
)
if __name__ == "__main__":
sys.argv.append(r'C:\Users\flt\Documents\WeChat Files\wxid_m8h0igh8p52p22\FileStorage\Video')
# sys.argv.append(r'C:\Users\flt\Documents\WeChat Files\wxid_m8h0igh8p52p22\FileStorage\Video')
main()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,73 @@
# calc-utils
个人使用的计算化学工具集,主要基于 [ASE (Atomic Simulation Environment)](https://wiki.fysik.dtu.dk/ase/) 和 [RDKit](https://www.rdkit.org/)。
包含了一些方便的转换工具以及针对特定服务器环境PBS/Slurm/Custom定制的 `ase.calculators.gaussian` 补丁。
专用软件,`futils.gaussian`在不同服务器环境中无法直接运行,必须予以修改。
## 安装
需要 Python 3.12+。
```bash
git clone https://github.com/your-repo/calc-utils.git
cd calc-utils
pip install .
```
## 功能模块
### 1. `futils.gaussian` (**Breaking Change**)
这是一个对 `ase.calculators.gaussian` 的深度定制和 Monkey Patch。
**注意:导入此模块会直接修改 `ase.calculators.gaussian` 中的类定义。**
主要修改内容:
- **强制任务提交脚本**:计算器的 `command` 默认被设置为调用 `gsub_wait` 脚本。
- 默认路径硬编码为 `/home/fanhj/calcs/lele/tools/gsub_wait`(需要在 `futils/gaussian.py` 中按需修改 `GSUB` 变量)。
- **文件后缀变更**:输入文件使用 `.gin` 而非 `.gjf`,输出文件默认读取 `.out`
- **参数增强**`__init__` 方法提供了更详细的 Type Hinting 和默认参数(如 `mem="30GB"`, `proc=32`)。
- **辅助方法**:增加了 `mod()` 方法用于快速复制并修改计算器参数。
```python
from futils.gaussian import Gaussian
from ase import Atoms
# 使用定制后的 Gaussian 计算器
# 注意:这会尝试调用 gsub_wait 提交任务
calc = Gaussian(label='test_calc', method='B3LYP', basis='6-31G(d)')
```
### 2. `futils.rdkit2ase`
提供 RDKit 分子对象 (`rdkit.Chem.Mol`) 与 ASE 原子对象 (`ase.Atoms`) 之间的无缝转换,**保留 3D 坐标**。
```python
from futils.rdkit2ase import MolToAtoms, AtomsToMol
from rdkit import Chem
# RDKit -> ASE
mol = Chem.MolFromMolFile("molecule.mol")
atoms = MolToAtoms(mol)
# ASE -> RDKit
new_mol = AtomsToMol(atoms)
```
### 3. `futils.rdkit_utils`
一些 RDKit 绘图辅助函数。
- `draw2D(mol)`: 生成 SVG 格式的 2D 分子图。
- `draw3D(mol)`: 使用 IPythonConsole 绘制 3D 分子图。
## 脚本工具 (`bin/`)
本项目包含了一些用于任务提交管理的 Shell 脚本,适用于特定的集群环境。
- **`gsub`**: 任务提交脚本。支持本地或通过 SSH 远程提交到名为 `cluster` 的主机。
- **`gsub_wait`**: 提交任务并阻塞等待完成,用于 ASE Calculator 的 `command` 调用,以便实现 Python 脚本的同步执行。
**配置说明**
使用前请检查 `bin/` 下的脚本以及 `futils/gaussian.py` 中的 `GSUB` 路径,根据您的服务器环境进行调整。

176
calc_utils/bin/gsub Normal file
View File

@ -0,0 +1,176 @@
#!/bin/bash
set -u
# Usage: gsub <jobname>
job=${1:-}
if [[ -z "$job" ]]; then
echo "Usage: $0 <jobname-without-extension>"
exit 1
fi
# ==========================================
# 0. 安全检测函数 (Safety Check)
# ==========================================
check_dangerous_path() {
local path="${1:-}"
# 1. Empty check
if [[ -z "$path" ]]; then
echo "Error: Empty path is dangerous for deletion." >&2
return 1
fi
# 2. Root check
if [[ "$path" == "/" ]]; then
echo "Error: Root path '/' is dangerous for deletion." >&2
return 1
fi
# 3. Space check (optional, but good for safety)
if [[ "$path" =~ ^[[:space:]]+$ ]]; then
echo "Error: Whitespace path is dangerous." >&2
return 1
fi
return 0
}
# ==========================================
# 1. 检查运行环境 (Check Host)
# ==========================================
# 如果不是 cluster尝试通过 SSH 远程调用
host_short=$(hostname -s 2>/dev/null || hostname)
if [[ "$host_short" != "cluster" ]]; then
# 假设本地挂载路径 /mnt/home 对应远程 /home (根据原脚本逻辑调整)
cur_dir=$(pwd)
remote_dir="${cur_dir//\/mnt\/home/\/home}"
# 定位当前脚本并转换为远程路径
# 获取脚本所在目录的绝对路径
script_dir=$(cd "$(dirname "$0")" && pwd)
script_name=$(basename "$0")
local_script="$script_dir/$script_name"
# 同样对脚本路径进行替换
remote_script="${local_script//\/mnt\/home/\/home}"
# 尝试在远程执行自己
echo "Running remotely on cluster: $remote_script" >&2
ssh cluster "cd '$remote_dir' && '$remote_script' '$job'"
exit $?
fi
# ==========================================
# 2. 准备作业 (Prepare Job)
# ==========================================
gin_file="$job.gin"
if [[ ! -f "$gin_file" ]]; then
echo "Error: $gin_file not found in $(pwd)"
exit 2
fi
# 解析配置确定资源 (Parse Proc)
# 查找 %NProcShared=XX
proc=$(sed -n 's/^%NProcShared=\([0-9]\+\).*$/\1/pI' "$gin_file" | head -n 1)
queue=""
ppn=""
if [[ "$proc" == "32" ]]; then
queue="n32"
ppn="32"
elif [[ "$proc" == "20" ]]; then
queue="n20"
ppn="20"
else
echo "Error: Unsupported NProcShared=$proc in $gin_file. Only 20 or 32 allowed."
exit 1
fi
# 清理旧文件 (Clean up old output)
if [[ -f "$job.out" ]]; then
# 原脚本逻辑:休眠并删除
# echo "Warning: $job.out exists. Deleting..." >&2
# 使用安全检查
if check_dangerous_path "$job.out"; then
rm "$job.out"
else
echo "Skipping deletion of unsafe path: $job.out" >&2
exit 1
fi
fi
# ==========================================
# 3. 生成作业脚本 (.job)
# ==========================================
job_file="$job.job"
# 使用 heredoc 动态生成 PBS 脚本
# 整合了原 g16_32.pbs 的内容和 gsub32 的追加内容
cat > "$job_file" <<EOF
#!/bin/sh
#PBS -l nodes=1:ppn=$ppn
#PBS -q $queue
#PBS -j oe
#PBS -N $job
cd \$PBS_O_WORKDIR
# Define Safety Check Function in Job Script
check_rm_path() {
p="\$1"
# Empty check
if [ -z "\$p" ]; then
echo "Refusing to delete empty path"
return 1
fi
# Root check
if [ "\$p" = "/" ]; then
echo "Refusing to delete root path"
return 1
fi
return 0
}
export g16root=/share/apps/soft
source \$g16root/g16/bsd/g16.profile
# Create Scratch Directory
if [ -n "\$USER" ] && [ -n "\$PBS_JOBID" ]; then
mkdir -p /scr/\$USER/\$PBS_JOBID
export GAUSS_SCRDIR=/scr/\$USER/\$PBS_JOBID
else
echo "Error: USER or PBS_JOBID not set. Cannot setup scratch."
exit 1
fi
NODES=\`cat \$PBS_NODEFILE | uniq\`
echo "--------------------------------------------------------"
echo " JOBID: \$PBS_JOBID"
echo " The job was started at \`date\`"
echo " The job was running at \$NODES."
echo "--------------------------------------------------------"
# Run G16 Job
echo "Executing: g16 < $gin_file > $job.out"
g16 < $gin_file > $job.out
echo "--------------------------------------------------------"
echo " The job was finished at \`date\`"
echo "--------------------------------------------------------"
# Delete the tmp File (Cleanup Scratch)
echo "Cleaning up \$GAUSS_SCRDIR"
if check_rm_path "\$GAUSS_SCRDIR"; then
rm -rf "\$GAUSS_SCRDIR"
fi
EOF
# ==========================================
# 4. 提交作业 (Submit)
# ==========================================
# qsub 会输出 Job ID例如 12345.cluster
qsub "$job_file"

116
calc_utils/bin/gsub_wait Normal file
View File

@ -0,0 +1,116 @@
#!/bin/bash
set -u
# Usage: gsub_wait <jobname>
job=${1:-}
if [[ -z "$job" ]]; then
echo "Usage: $0 <jobname-without-extension>"
exit 1
fi
# ==========================================
# 1. 提交任务 (Submit Job)
# ==========================================
# 确定 gsub 命令位置
# 优先查找当前目录下的 gsub否则查找 PATH
if [[ -x "./gsub" ]]; then
GSUB_CMD="./gsub"
else
GSUB_CMD="gsub"
fi
# 调用 gsub 并捕获输出
# 注意gsub 内部可能通过 SSH 在远程执行,最终返回 qsub 的输出
output=$($GSUB_CMD "$job")
echo "$output"
# ==========================================
# 2. 检查是否需要等待 (Check Silent Mode)
# ==========================================
# 如果 GSUB_SILENT 为 1则不进行监控直接退出
if [[ "${GSUB_SILENT:-0}" == "1" ]]; then
exit 0
fi
# ==========================================
# 3. 监控任务进度 (Monitor Progress)
# ==========================================
# 尝试提取 Job ID (例如: 67147.cluster -> 67147)
jobid_full=$(echo "$output" | grep -oE '[0-9]+\.cluster|[0-9]+' | head -n 1 || true)
if [[ -n "$jobid_full" ]]; then
jobid=${jobid_full%%.*}
# 准备参数
out_file="$job.out"
gin_file="$job.gin"
end_file="$job.job.o$jobid"
if [[ ! -f "$gin_file" ]]; then
# 如果 gin 文件找不到(可能是远程路径问题?),跳过监控
echo "Warning: $gin_file not found nearby. Skipping monitor."
exit 0
fi
# 计算 Total Steps: (--link1-- 数量) + 1
link_count=$(grep -c -- "--link1--" "$gin_file" || true)
total=$((link_count + 1))
cntDone=0
cntSCF=0
last_lines=0
echo "Monitoring Job $jobid..."
while true; do
# A. 检查 PBS 结束文件 (Job 完成标志)
if [[ -f "$end_file" ]]; then
echo "Job finished (found $end_file)."
break
fi
# B. 检查并读取 .out 输出文件
if [[ -f "$out_file" ]]; then
curr_lines=$(wc -l < "$out_file" 2>/dev/null || echo 0)
# 如果文件变小(被截断或重新生成),重置读取位置
if (( curr_lines < last_lines )); then last_lines=0; fi
if (( curr_lines > last_lines )); then
# 逐行处理新增内容
# 使用进程替换 < <(...) 避免管道导致的子shell变量丢失问题
while IFS= read -r line; do
# 检查 SCF Done
# 正则匹配: SCF Done: ... E ... = (数值) A.U.
if [[ "$line" =~ SCF[[:space:]]Done:.*E.*=[[:space:]]*([-0-9.]+)[[:space:]]*A\.U\. ]]; then
energy="${BASH_REMATCH[1]}"
cntSCF=$((cntSCF + 1))
echo "$job: SCF Done: $energy [$cntSCF] ($cntDone/$total)"
fi
# 检查 Termination
if [[ "$line" == *"termination of Gaussian"* ]]; then
cntDone=$((cntDone + 1))
echo "$job: task done ($cntDone/$total)"
fi
done < <(tail -n "+$((last_lines + 1))" "$out_file")
last_lines=$curr_lines
fi
fi
sleep 2
done
# C. 最终校验
if (( cntDone != total )); then
echo "Warning: cntDone ($cntDone) != total ($total)"
fi
else
echo "Could not parse Job ID from output. Monitor skipped."
fi