Compare commits

...

20 Commits

Author SHA1 Message Date
afc5c76fd4 up readme 2025-09-21 23:41:48 +08:00
5bc7284263 pdf reader 2025-09-21 23:40:50 +08:00
f12791864d clean train in videocomrepe 2025-09-12 15:09:11 +08:00
b4c04343e8 update README
1. update root README, pointer to all proj
2. create all README
Powered by claude code, maybe mistake, checked and commit manually.
2025-09-12 14:58:58 +08:00
f539792fd1 multi 2025-09-06 23:47:17 +08:00
70e46124a0 update mw 2025-09-04 16:26:52 +08:00
e70435e807 cord example 2025-08-28 22:57:07 +08:00
c2fd6857cc update cord 2025-08-28 22:23:42 +08:00
fc4fc9a5f0 Change default value of save_to cfg 2025-08-26 20:08:31 +08:00
0ea0e3c69b Merge branch 'master' of flt6.top:flt/tools 2025-08-26 20:02:27 +08:00
d6e29483ca Enhance progress when length is not avai 2025-08-26 20:00:33 +08:00
9d3c263683 fix melt and boil col 2025-08-20 10:47:17 +08:00
f1f71bcc4b fix melt and boil col 2025-08-20 10:45:32 +08:00
cddd5d7e3b mw_tool fix req 2025-08-20 10:31:41 +08:00
5313891771 use new ver 2025-08-20 10:27:58 +08:00
78b82192a1 update 2025-08-20 10:27:38 +08:00
9c92a3a449 fix 2025-08-20 09:50:45 +08:00
397084995e update 2025-08-20 09:39:00 +08:00
547f36a074 reaction table 2025-08-19 23:29:18 +08:00
08ba942ae6 new 2025-08-19 21:45:03 +08:00
38 changed files with 3013 additions and 924 deletions

4
.gitignore vendored
View File

@ -8,4 +8,6 @@ __pycache__
*.log *.log
test test
.venv .venv
uv.lock uv.lock
.claude
CLAUDE.md

View File

@ -0,0 +1,27 @@
# English_Listening_cut
英语听力音频文件自动分割工具。
## 功能
自动分割英语听力考试音频文件,通过检测静音段落将长音频分割为多个小段,并按照规定格式组织输出文件。
## 使用方法
1. 准备MP3格式的听力音频文件
2. 运行 `python main.py 音频文件.mp3`
3. 程序将在OUTPUT目录中创建结构化的分割文件
## 输出结构
分割后的文件将按以下结构组织:
- BASIC文件夹基础题目
- 1-5文件夹按题号分类
- Others文件夹其他片段
## 特性
- 自动静音检测
- 智能分割算法
- 规范化文件命名
- 结构化输出目录

32
ImageCompress/README.md Normal file
View File

@ -0,0 +1,32 @@
# ImageCompress
批量图片压缩工具。
## 功能
使用FFmpeg对图片进行批量压缩支持精确控制目标文件大小通过迭代优化实现精准的大小控制。
## 使用方法
1. 运行 `python b.py [目录/文件路径]`
2. 根据提示输入目标文件大小KB
3. 程序将自动压缩图片至指定大小
## 支持格式
- JPG
- PNG
- WebP
- GIF
## 特性
- 精确大小控制
- 迭代优化算法
- 批量处理支持
- 多进程并行处理
- 质量自动调整
## 依赖要求
需要安装FFmpeg.exe并确保在系统路径中可访问。

142
README.md
View File

@ -1,94 +1,98 @@
# 个人使用小工具集 # 个人使用小工具集
## [修改连接数](changeConnectionLimit) ## 系统工具
win10修改wifi热点最大连接终端数 ### [修改连接数](changeConnectionLimit)
修改Windows 10 WiFi热点最大连接终端数
Usage`main.bat` ### [字体替换](change_font)
OCR文字检测与自定义字体替换工具
## [学*网预览下载](zxxk_dl) ## 文件处理
下载学*网资源通过预览的形式下载的文件以html形式保存。可以使用adobe Acrobat的打印转为pdf文字版文件以svg下载图片版原图。 ### [图片压缩](ImageCompress)
批量图片压缩工具,支持精确大小控制
1. 研究过程:[process.md](zxxk_dl/process.md) ### [图片整理](tidy_img)
按照手机照片视频命名规范自动分类整理文件
2. usage: `python main.py` ### [文件加密](encryptFiles)
3. requirements: python, requests 基于RSA的文件夹加密解密工具
## adb快速传手机文件 ### [PDF解锁](pdf_unlock)
移除PDF文件权限限制
源文件丢了TODO ### [编码转换](recode)
文本文件字符编码格式转换工具
2024.7.14:放弃,已有项目[双轨快传](https://github.com/weixiansen574/HybridFileXfer)实现 ## 音视频处理
## [并行auto-editor](mult) ### [听力文件拆分](English_Listening_cut)
英语听力考试音频自动分割工具
并行运行auto-editor, 递归转码所有视频。PS网课时用的 ### [批量视频处理](mult)
使用auto-editor批量处理视频去除静音片段
### files ### [视频剪切](video)
简单的视频时间段剪切工具
#### main.py ### [视频时长调整](process_video)
调整视频播放速度以达到目标时长
主程序 ## 学习教育
### main_up.py ### [化学反应坐标图](cord)
生成化学反应坐标图的Web工具
使用hevcCUDA硬解码 ### [化学计算工具](mw_tool)
化合物查询与分子量计算平台
### usage ### [座位表生成](seat_map)
课堂座位安排自动生成工具
`python main.py` ### [任务分配系统](task_assignment)
基于Web的任务分配管理系统
## 菁*网题目下载 ### [学科网下载](zxxk_dl)
学科网教育资源下载工具
### [问卷星抓取](wjx)
问卷星调查数据获取工具
### [教务系统登录](jw)
教务系统自动登录工具URP新版登录逆向API在更改没有持续更新。但是核心算法没变
## 网络下载
### [123pan下载](down)
123pan云盘文件批量下载工具
## 实用工具
### [运动监控](move_warn)
IP摄像头运动检测报警工具
### [随机数生成](rand)
交互式随机数生成器
### [拼音转换](pinyin)
中文姓名转拼音首字母工具
### [对话转换](conversation_trans)
对话数据转Markdown格式工具
---
## 已停止维护
### adb快速传手机文件
源文件丢了,已有项目[双轨快传](https://github.com/weixiansen574/HybridFileXfer)实现
### 菁优网题目下载
**ATTENTION目前制作时的账号疑似被封禁请谨慎使用** **ATTENTION目前制作时的账号疑似被封禁请谨慎使用**
如需打印请使用浏览器打印或转pdf ### 检测视频是否破损video_test
功能已整合到其他工具中
### Usage ### 转移libcef.dllmove_CEF
通过Everything API遍历所有libcef.dll核验md5后转移至指定文件夹并创建软连接
菁优网任意界面F12抓Cookie
运行main.py输入复制的内容根据提示输入网址即可。
### requements
python, requests,bs4
## [听力文件拆分](English_Listening_cut)
基于学英语报的听力做听力文件拆解,拆分到每段对话
### Usage
`python main.py`
## seat_map
文档TODO
## 简单运动报警(move_warn)
对ip摄像头画面进行简单运动捕捉核心代码来自网络。效果运动达到一定范围发出声音警告后自动关闭程序。
### usage
`python main.py`
### requirements
python, opencv
## 检测视频是否破损video_test
## 转移libcef.dllmove_CEF
通过Everything API遍历所有libcef.dll核验md5后转移至指定文件夹并创建软连接。
## [图片整理(tidy_img)](tidy_img)
按照手机(华为)照片视频命名规范(例如`IMG_20170218_164951.jpg``VID_20220116_154728.mp4`)将其按年份分类,递归遍历所有文件。如果重复,文件相同<sup>[1]</sup>由用户判断是否删除相同文件。如果文件冲突<sup>[2]</sup>放入conflict文件夹并在文件名后加sha256前5位。
[1]: 先判断文件大小相同比较sha256值如果均相同认为是同文件
[2]: 上述标准任一不满足

7
VideoCompress/.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
logs
test
config.json
*.xml
tmp
build
dist

174
VideoCompress/README.md Normal file
View File

@ -0,0 +1,174 @@
# VideoCompress 🎬
一个功能强大的视频批量压缩工具,基于 FFmpeg 构建,支持硬件加速和多种压缩配置选项。
## ✨ 主要特性
- **批量处理**: 递归扫描目录,批量压缩多个视频文件
- **硬件加速**: 支持 NVIDIA、AMD、Intel 显卡硬件加速
- **灵活配置**: 支持 CRF 和固定码率两种压缩模式
- **进度显示**: 实时显示压缩进度和预估时间
- **多种界面**: 提供 Tkinter 和 PySide6 两种 GUI 配置界面
- **智能跳过**: 自动跳过已存在的压缩文件
- **格式支持**: 支持 MP4、MKV 等多种视频格式
## 🚀 快速开始
### 环境要求
- Python 3.7+
- FFmpeg已内置 ffmpeg.exe
### 安装依赖
```bash
pip install rich PySide6
```
### 基本使用
1. **启动配置界面**
```bash
python config_ui.py
```
2. **直接压缩视频**
```bash
python main.py <目标目录>
```
3. **配置后压缩**
```bash
python config.py <目标目录>
```
## ⚙️ 配置选项
### 基础设置
- **输出方式**
- `single`: 所有压缩文件保存到统一的 Compress 文件夹
- `multi`: 在每个视频文件旁创建独立的 compress 文件夹
- **编码器**
- `h264`: H.264 编码(兼容性好)
- `hevc`: H.265 编码(文件更小)
### 质量设置
- **CRF 模式**(推荐):
- 数值范围0-51
- 数值越小质量越高,文件越大
- 推荐值18-23
- **固定码率模式**
- 格式:数字+单位(如 2M、500k
- 可以精确控制文件大小
### 硬件加速
- **NVIDIA 显卡**:使用 `h264_nvenc` 或 `hevc_nvenc`
- **AMD 显卡**:使用 `h264_amf` 或 `hevc_amf`
- **Intel 核显**:使用 `h264_qsv` 或 `hevc_qsv`
## 📁 项目结构
```
VideoCompress/
├── main.py # 核心压缩引擎
├── config.py # Tkinter 配置界面
├── config_ui.py # PySide6 现代化配置界面
├── config.json # 配置文件
├── ffmpeg.exe # FFmpeg 可执行文件
├── ffprobe.exe # FFprobe 可执行文件
├── test.py # 测试文件
├── pack.bat # 打包脚本
└── logs/ # 日志文件夹
```
## 🔧 高级用法
### 自定义 FFmpeg 参数
在配置界面的"高级设置"中,可以添加自定义 FFmpeg 参数:
```bash
# 示例:设置线程数
-threads 4
# 示例:自定义预设
-preset slow
# 示例:音频编码设置
-c:a aac -b:a 128k
```
### 手动参数模式
如果需要完全自定义压缩命令,可以在配置文件中设置 `manual` 参数:
```json
{
"manual": ["-vf", "scale=1280:720", "-c:v", "libx264", "-crf", "20"]
}
```
### 训练模式(实验性)
启用训练模式可以让程序学习压缩时间,提供更准确的时间预估:
```json
{
"train": true
}
```
## 📊 输出示例
```
2025-08-26 19:20:15 - INFO - Video Compress started at 2025/08/26 19:20
2025-08-26 19:20:15 - INFO - 开始验证环境
2025-08-26 19:20:16 - INFO - 正在获取视频信息 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100%
2025-08-26 19:20:17 - INFO - 总进度 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 45%
2025-08-26 19:20:17 - INFO - test.mp4 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 67%
```
## 🎯 使用场景
- **批量压缩**: 处理大量视频文件,减小存储空间
- **格式转换**: 将各种格式的视频统一转换为 MP4
- **质量优化**: 在保持视觉质量的前提下减小文件体积
- **硬件加速**: 利用显卡加速,提高压缩效率
## ⚠️ 注意事项
1. 确保有足够的磁盘空间存储压缩后的文件
2. 硬件加速需要相应的显卡驱动支持
3. 压缩过程中避免强制退出,可能导致文件损坏
4. 建议先用小批量文件测试配置效果
## 🐛 常见问题
**Q: 压缩后文件变大了?**
A: 降低 CRF 值或检查原文件是否已经是高压缩格式
**Q: 硬件加速不生效?**
A: 检查显卡驱动是否支持对应的编码器
**Q: 程序卡在某个文件?**
A: 检查该文件是否损坏或格式不被支持
## 📝 更新日志
- **v1.2.1**: 优化进度显示,修复时间预估
- **v1.2**: 添加 PySide6 现代化界面
- **v1.1**: 支持硬件加速和多种编码器
- **v1.0**: 基础批量压缩功能
## 📄 许可证
本项目仅供学习和个人使用。
---
💡 **提示**: 如有问题或建议,欢迎提出 Issue 或 Pull Request。

View File

@ -13,7 +13,7 @@ else:
CONFIG_NAME = Path(sys.path[0])/"config.json" CONFIG_NAME = Path(sys.path[0])/"config.json"
DEFAULT_CONFIG = { DEFAULT_CONFIG = {
"save_to": "multi", "save_to": "single",
"crf": 18, "crf": 18,
"codec": "h264", # could be h264, h264_qsv, h264_nvenc … etc. "codec": "h264", # could be h264, h264_qsv, h264_nvenc … etc.
"ffmpeg": "ffmpeg", "ffmpeg": "ffmpeg",

View File

@ -22,10 +22,9 @@ class VideoConfig:
crf: int = 18 crf: int = 18
codec: str = "h264" codec: str = "h264"
ffmpeg: str = "ffmpeg" ffmpeg: str = "ffmpeg"
video_ext: List[str] = None video_ext: List[str] = [".mp4", ".mkv"]
extra: List[str] = None extra: List[str] = []
manual: Optional[List[str]] = None manual: Optional[List[str]] = None
train: bool = False
bitrate: Optional[str] = None bitrate: Optional[str] = None
def __post_init__(self): def __post_init__(self):
@ -550,9 +549,6 @@ class ConfigUI(QMainWindow):
group.addLayout(custom_layout) group.addLayout(custom_layout)
# 实验性功能 # 实验性功能
self.train_checkbox = QCheckBox("启用训练模式 (实验性)")
self.train_checkbox.setToolTip("实验性功能,可能不稳定")
group.addWidget(self.train_checkbox)
return group return group
@ -775,7 +771,6 @@ class ConfigUI(QMainWindow):
if self.config.manual: if self.config.manual:
self.custom_edit.setText(" ".join(self.config.manual)) self.custom_edit.setText(" ".join(self.config.manual))
self.train_checkbox.setChecked(self.config.train)
def _save_config(self): def _save_config(self):
"""保存配置""" """保存配置"""
@ -841,7 +836,6 @@ class ConfigUI(QMainWindow):
if custom_text: if custom_text:
config.manual = custom_text.split() config.manual = custom_text.split()
config.train = self.train_checkbox.isChecked()
# 保存文件 # 保存文件
config_path = self._get_config_path() config_path = self._get_config_path()

View File

@ -7,14 +7,14 @@ from datetime import datetime
from time import time from time import time
from rich.logging import RichHandler from rich.logging import RichHandler
from rich.progress import Progress from rich.progress import Progress
from pickle import dumps, loads
from typing import Optional from typing import Optional
import atexit import atexit
import re import re
import threading
import queue
import psutil
root = None root = None
TRAIN = False
ESTI_FILE = Path(sys.path[0])/"esti.out"
CFG_FILE = Path(sys.path[0])/"config.json" CFG_FILE = Path(sys.path[0])/"config.json"
CFG = { CFG = {
"save_to": "single", "save_to": "single",
@ -25,10 +25,25 @@ CFG = {
"ffmpeg": "ffmpeg", "ffmpeg": "ffmpeg",
"manual": None, "manual": None,
"video_ext": [".mp4", ".mkv"], "video_ext": [".mp4", ".mkv"],
"train": False "compress_dir_name": "compress",
"resolution": "-1:1080",
"fps": "30",
"test_video_resolution": "1920x1080",
"test_video_fps": "30",
"test_video_input": "compress_video_test.mp4",
"test_video_output": "compressed_video_test.mp4",
"max_concurrent_instances": 2,
"cpu_monitor_interval": 3, # CPU监控间隔
"cpu_monitor_duration": 30, # 统计持续时间5分钟
} }
esti=None # :tuple[list[int],list[float]]
# CPU监控相关全局变量
ffmpeg_processes = {} # 存储活动的ffmpeg进程
cpu_stats = {"system": [], "ffmpeg": []} # CPU使用率统计
cpu_monitor_thread = None
cpu_monitor_lock = threading.Lock()
current_instances = 0
instance_lock = threading.Lock()
def get_cmd(video_path,output_file): def get_cmd(video_path,output_file):
if CFG["manual"] is not None: if CFG["manual"] is not None:
@ -46,40 +61,36 @@ def get_cmd(video_path,output_file):
CFG["ffmpeg"], CFG["ffmpeg"],
"-hide_banner", "-hide_banner",
"-i", video_path, "-i", video_path,
"-vf", "scale=-1:1080", ]
if CFG['resolution'] is not None:
command.extend([
"-vf", f"scale={CFG['resolution']}",])
command.extend([
"-c:v", CFG["codec"], "-c:v", CFG["codec"],
"-b:v", CFG["bitrate"], "-b:v", CFG["bitrate"],
"-r","30", "-r",CFG["fps"],
"-y", "-y",
] ])
else: else:
command = [ command = [
CFG["ffmpeg"], CFG["ffmpeg"],
"-hide_banner", "-hide_banner",
"-i", video_path, "-i", video_path,
"-vf", "scale=-1:1080", ]
if CFG['resolution'] is not None:
command.extend([
"-vf", f"scale={CFG['resolution']}",])
command.extend([
"-c:v", CFG["codec"], "-c:v", CFG["codec"],
"-global_quality", str(CFG["crf"]), "-global_quality", str(CFG["crf"]),
"-r","30", "-r",CFG["fps"],
"-y", "-y",
] ])
command.extend(CFG["extra"]) command.extend(CFG["extra"])
command.append(output_file) command.append(output_file)
return command return command
def train_init():
global esti_data,TRAIN,data_file
data_file = Path("estiminate_data.dat")
if data_file.exists():
esti_data=loads(data_file.read_bytes())
if not isinstance(esti_data,tuple):
esti_data=([],[])
else:
esti_data=([],[])
TRAIN=True
atexit.register(save_esti)
# print(esti_data)
# 配置logging # 配置logging
@ -103,82 +114,6 @@ def setup_logging():
] ]
) )
def polyfit_manual(x, y, degree=2):
"""手动实现二次多项式最小二乘拟合"""
n = len(x)
if n != len(y):
raise ValueError("输入的x和y长度必须相同")
# 对于二次多项式 y = ax^2 + bx + c
# 构建矩阵方程 A * [a, b, c]^T = B
# 其中 A = [[sum(x^4), sum(x^3), sum(x^2)],
# [sum(x^3), sum(x^2), sum(x)],
# [sum(x^2), sum(x), n]]
# B = [sum(x^2 * y), sum(x * y), sum(y)]
# 计算需要的和
sum_x = sum(x)
sum_x2 = sum(xi**2 for xi in x)
sum_x3 = sum(xi**3 for xi in x)
sum_x4 = sum(xi**4 for xi in x)
sum_y = sum(y)
sum_xy = sum(xi*yi for xi, yi in zip(x, y))
sum_x2y = sum(xi**2*yi for xi, yi in zip(x, y))
# 构建矩阵A和向量B
A = [
[sum_x4, sum_x3, sum_x2],
[sum_x3, sum_x2, sum_x],
[sum_x2, sum_x, n]
]
B = [sum_x2y, sum_xy, sum_y]
# 使用高斯消元法解线性方程组
# 将增广矩阵 [A|B] 转换为行阶梯形式
AB = [row + [b] for row, b in zip(A, B)]
n_rows = len(AB)
# 高斯消元
for i in range(n_rows):
# 寻找当前列中最大元素所在的行
max_row = i
for j in range(i + 1, n_rows):
if abs(AB[j][i]) > abs(AB[max_row][i]):
max_row = j
# 交换行
AB[i], AB[max_row] = AB[max_row], AB[i]
# 将当前行主元归一化
pivot = AB[i][i]
if pivot == 0:
raise ValueError("矩阵奇异,无法求解")
for j in range(i, n_rows + 1):
AB[i][j] /= pivot
# 消元
for j in range(n_rows):
if j != i:
factor = AB[j][i]
for k in range(i, n_rows + 1):
AB[j][k] -= factor * AB[i][k]
# 提取结果
coeffs = [AB[i][n_rows] for i in range(n_rows)]
return coeffs # [a, b, c] 对应 ax^2 + bx + c
def save_esti():
try:
if len(esti_data[0]) > 0:
coeffs = polyfit_manual(esti_data[0], esti_data[1])
# 保存为逗号分隔的文本格式
ESTI_FILE.write_text(','.join(map(str, coeffs)))
except Exception as e:
logging.warning("保存估算数据失败")
logging.debug("error at save_esti",exc_info=e)
def fmt_time(t:float|int) -> str: def fmt_time(t:float|int) -> str:
if t>3600: if t>3600:
return f"{t//3600}h {t//60}min {t%60}s" return f"{t//3600}h {t//60}min {t%60}s"
@ -187,53 +122,111 @@ def fmt_time(t:float|int) -> str:
else: else:
return f"{round(t)}s" return f"{round(t)}s"
def func(sz:int,src=False): def cpu_monitor():
if TRAIN: """CPU监控线程函数"""
try: global cpu_stats
data_file.write_bytes(dumps(esti_data))
except KeyboardInterrupt as e:raise e
except Exception as e:
logging.warning("无法保存数据",exc_info=e)
try:
if TRAIN:
if len(esti_data[0])==0:
return -1 if src else "NaN"
coeffs = polyfit_manual(esti_data[0], esti_data[1])
t = coeffs[0]*sz**2 + coeffs[1]*sz + coeffs[2]
elif esti is not None:
t = esti[0]*sz**2 + esti[1]*sz + esti[2]
# print(t,sz)
else:
logging.warning(f"Unexpected condition at func->TRAIN")
return -1 if src else "NaN"
t = round(t)
if src:
return t
return fmt_time(t)
except KeyboardInterrupt as e:raise e
except Exception as e:
logging.warning("无法计算预计时间")
logging.debug("esti time exception", exc_info=e)
return -1 if src else "NaN"
def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_func=None): while True:
global esti_data try:
# 获取系统CPU使用率
system_cpu = psutil.cpu_percent(interval=1)
# 获取所有ffmpeg进程的CPU使用率
ffmpeg_cpu_total = 0
active_processes = []
with cpu_monitor_lock:
for proc_info in ffmpeg_processes.values():
try:
proc = proc_info['process']
if proc.is_running():
# print(proc,proc.cpu_percent() / psutil.cpu_count())
ffmpeg_cpu_total += proc.cpu_percent() / psutil.cpu_count()
active_processes.append(proc_info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 更新统计数据
with cpu_monitor_lock:
cpu_stats["system"].append(system_cpu)
cpu_stats["ffmpeg"].append(ffmpeg_cpu_total)
# 保持最近5分钟的数据
max_samples = CFG["cpu_monitor_duration"] // CFG["cpu_monitor_interval"]
if len(cpu_stats["system"]) > max_samples:
cpu_stats["system"] = cpu_stats["system"][-max_samples:]
if len(cpu_stats["ffmpeg"]) > max_samples:
cpu_stats["ffmpeg"] = cpu_stats["ffmpeg"][-max_samples:]
except KeyboardInterrupt as e:
raise e
except Exception as e:
logging.error(f"CPU监控异常: {e}")
# 等待下一次监控
threading.Event().wait(CFG["cpu_monitor_interval"])
def start_cpu_monitor():
"""启动CPU监控线程"""
global cpu_monitor_thread
if cpu_monitor_thread is None or not cpu_monitor_thread.is_alive():
cpu_monitor_thread = threading.Thread(target=cpu_monitor, daemon=True)
cpu_monitor_thread.start()
logging.info("CPU监控线程已启动")
def get_cpu_usage_stats():
"""获取CPU使用率统计"""
with cpu_monitor_lock:
if not cpu_stats["system"] or not cpu_stats["ffmpeg"]:
return None, None
system_avg = sum(cpu_stats["system"]) / len(cpu_stats["system"])
ffmpeg_avg = sum(cpu_stats["ffmpeg"]) / len(cpu_stats["ffmpeg"])
return system_avg, ffmpeg_avg
def should_increase_instances():
"""判断是否应该增加实例数"""
system_avg, ffmpeg_avg = get_cpu_usage_stats()
if system_avg is None or ffmpeg_avg is None:
return False
# 条件: 系统CPU - FFmpeg CPU > FFmpeg CPU * 2 + 0.1
available_cpu = 100 - system_avg
threshold = ffmpeg_avg # 10% = 0.1 * 100
logging.debug(f"CPU统计: 系统平均={system_avg:.1f}%, FFmpeg平均={ffmpeg_avg:.1f}%, 可用={available_cpu:.1f}%, 阈值={threshold:.1f}%")
return available_cpu > threshold
def register_ffmpeg_process(proc_id, process):
"""注册ffmpeg进程用于监控"""
with cpu_monitor_lock:
ffmpeg_processes[proc_id] = {
'process': psutil.Process(process.pid),
'start_time': time()
}
def unregister_ffmpeg_process(proc_id):
"""注销ffmpeg进程"""
with cpu_monitor_lock:
if proc_id in ffmpeg_processes:
del ffmpeg_processes[proc_id]
def process_video(video_path: Path, compress_dir:Optional[Path]=None, update_func=None, proc_id=None):
global current_instances
use=None use=None
sz=video_path.stat().st_size//(1024*1024) sz=video_path.stat().st_size//(1024*1024)
if esti is not None or TRAIN:
use = func(sz,True)
logging.info(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M预计{fmt_time(use)}")
else:
logging.info(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M")
bgn=time() bgn=time()
if compress_dir is None: if compress_dir is None:
# 在视频文件所在目录下创建 compress 子目录(如果不存在) # 在视频文件所在目录下创建 compress 子目录(如果不存在)
compress_dir = video_path.parent / "compress" compress_dir = video_path.parent / CFG["compress_dir_name"]
else: else:
compress_dir /= video_path.parent.relative_to(root) compress_dir /= video_path.parent.relative_to(root)
assert isinstance(compress_dir,Path)
compress_dir.mkdir(exist_ok=True,parents=True) compress_dir.mkdir(exist_ok=True,parents=True)
# 输出文件路径:与原文件同名,保存在 compress 目录下 # 输出文件路径:与原文件同名,保存在 compress 目录下
@ -246,6 +239,11 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_fun
command = get_cmd(video_path_str,output_file) command = get_cmd(video_path_str,output_file)
try: try:
with instance_lock:
current_instances += 1
logging.debug(f"启动FFmpeg进程 {proc_id}: {video_path.name}")
result = subprocess.Popen( result = subprocess.Popen(
command, command,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
@ -254,17 +252,19 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_fun
text=True text=True
) )
# 注册进程用于CPU监控
if proc_id:
register_ffmpeg_process(proc_id, result)
while result.poll() is None: while result.poll() is None:
line = " " line = " "
while result.poll() is None and line[-1:] not in "\r\n": while result.poll() is None and line[-1:] not in "\r\n":
line+=result.stderr.read(1) line+=result.stderr.read(1)
# print(line[-1])
if 'warning' in line.lower(): if 'warning' in line.lower():
logging.warning(f"[FFmpeg]({video_path_str}): {line}") logging.warning(f"[FFmpeg {proc_id}]({video_path_str}): {line}")
elif 'error' in line.lower(): elif 'error' in line.lower():
logging.error(f"[FFmpeg]({video_path_str}): {line}") logging.error(f"[FFmpeg {proc_id}]({video_path_str}): {line}")
elif "frame=" in line: elif "frame=" in line:
# print(line,end="")
match = re.search(r"frame=\s*(\d+)",line) match = re.search(r"frame=\s*(\d+)",line)
if match: if match:
frame_number = int(match.group(1)) frame_number = int(match.group(1))
@ -272,107 +272,221 @@ def process_video(video_path: Path, compress_dir:Optional[Path]=None ,update_fun
update_func(frame_number) update_func(frame_number)
if result.returncode != 0: if result.returncode != 0:
logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode}cmd={' '.join(command)}") logging.error(f"处理文件 {video_path_str} 失败,返回码: {result.returncode}cmd={' '.join(map(str,command))}")
logging.error(result.stdout) logging.error(result.stdout.read())
logging.error(result.stderr) logging.error(result.stderr.read())
else: else:
logging.debug(f"文件处理成功: {video_path_str} -> {output_file}") logging.debug(f"文件处理成功: {video_path_str} -> {output_file}")
end=time() except KeyboardInterrupt as e:raise e
if TRAIN:
esti_data[0].append(sz)
esti_data[1].append(end-bgn)
except Exception as e: except Exception as e:
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(command)}",exc_info=e) logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{str(video_path_str)}cmd={' '.join(map(str,command))}",exc_info=e)
finally:
# 注销进程监控
if proc_id:
unregister_ffmpeg_process(proc_id)
with instance_lock:
current_instances -= 1
logging.debug(f"FFmpeg进程 {proc_id} 已结束")
return use return use
def traverse_directory(root_dir: Path): def traverse_directory(root_dir: Path):
global current_instances
video_extensions = set(CFG["video_ext"]) video_extensions = set(CFG["video_ext"])
sm=None sm=None
if esti is not None: # 获取视频文件列表和帧数信息
logging.info(f"正在估算时间(当存在大量小文件时,估算值将会很离谱)") video_files = []
sm = 0 que = list(root_dir.glob("*"))
for file in root_dir.rglob("*"): while que:
if file.parent.name == "compress":continue d = que.pop()
for file in d.glob("*"):
if file.parent.name == CFG["compress_dir_name"] or file.name == CFG["compress_dir_name"]:
continue
if file.is_file() and file.suffix.lower() in video_extensions: if file.is_file() and file.suffix.lower() in video_extensions:
sz=file.stat().st_size//(1024*1024) video_files.append(file)
tmp = func(sz,True) elif file.is_dir():
if not isinstance(tmp,int): que.append(file)
logging.error("无法预估时间,因为预估函数返回非整数")
elif tmp == -1:
logging.error("无法预估时间,因为预估函数返回了异常") # exit()
sm += tmp
logging.info(f"预估用时:{fmt_time(sm)}")
else:
# logging.info("正在估算视频帧数,用于显示进度。")
with Progress() as prog:
task = prog.add_task("正在获取视频信息",total=len(list(root_dir.rglob("*"))))
frames = {}
for file in root_dir.rglob("*"):
prog.advance(task)
if file.parent.name == "compress":continue
if file.is_file() and file.suffix.lower() in video_extensions:
cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1 "{str(file)}'
proc = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if proc.returncode != 0:
logging.error(f"无法获取视频信息: {file}, 返回码: {proc.returncode}")
frames[file] = 60
continue
if proc.stdout.strip():
avg_frame_rate, duration = proc.stdout.strip().split('\n')
tmp = avg_frame_rate.split('/')
avg_frame_rate = float(tmp[0]) / float(tmp[1])
if duration == "N/A":
duration = 1000
logging.error(f"无法获取视频信息: {file}, 时长为N/A默认使用1000s。运行时进度条将出现异常。")
duration = float(duration)
frames[file] = duration * avg_frame_rate
if not video_files:
logging.warning("未找到需要处理的视频文件")
return
logging.debug(f"开始遍历目录: {root_dir}") # 获取视频信息
# 定义需要处理的视频后缀(忽略大小写)
with Progress() as prog: with Progress() as prog:
task = prog.add_task("总进度",total=sm if sm is not None else sum(frames.values())) task = prog.add_task("正在获取视频信息", total=len(video_files))
for file in root_dir.rglob("*"): frames: dict[Path, float] = {}
if file.parent.name == "compress":continue for file in video_files:
if file.is_file() and file.suffix.lower() in video_extensions: prog.advance(task)
cur = prog.add_task(f"{file.relative_to(root_dir)}",total=frames[file]) cmd = f'ffprobe -v error -select_streams v:0 -show_entries stream=avg_frame_rate,duration -of default=nokey=1:noprint_wrappers=1'.split()
with prog._lock: cmd.append(str(file))
tmp = prog._tasks[task] proc = subprocess.run(cmd, shell=True, capture_output=True, text=True)
completed_start = tmp.completed if proc.returncode != 0:
logging.debug(f"无法获取视频信息: {file}, 返回码: {proc.returncode}")
def update_progress(x): frames[file] = 0
prog.update(cur,completed=x) continue
prog.update(task, completed=completed_start+x) if proc.stdout.strip():
try:
avg_frame_rate, duration = proc.stdout.strip().split('\n')
tmp = avg_frame_rate.split('/')
avg_frame_rate = float(tmp[0]) / float(tmp[1])
if duration == "N/A":
duration = 0
logging.debug(f"无法获取视频信息: {file}, 时长为N/A默认使用0s")
duration = float(duration)
frames[file] = duration * avg_frame_rate
except (ValueError, IndexError) as e:
logging.debug(f"解析视频信息失败: {file}, 错误: {e}")
frames[file] = 0
logging.debug(f"开始遍历目录: {root_dir}, 共{len(frames)}个视频文件")
# 启动CPU监控
start_cpu_monitor()
# 创建进度条
with Progress() as prog:
total_frames = sum(frames.values())
main_task = prog.add_task("总进度", total=total_frames if total_frames > 0 else len(frames))
# 创建文件队列
file_queue = queue.Queue()
for file in frames.keys():
file_queue.put(file)
# 进度跟踪
progress_trackers = {}
completed_files = 0
total_completed_frames = 0
def create_progress_updater(file_path, task_id):
def update_progress(frame_count):
nonlocal total_completed_frames
if file_path in progress_trackers:
old_frames = progress_trackers[file_path]
diff = frame_count - old_frames
total_completed_frames += diff
else:
total_completed_frames += frame_count
progress_trackers[file_path] = frame_count
if frames[file_path] > 0:
prog.update(task_id, completed=frame_count)
else:
prog.update(task_id, description=f"{file_path.relative_to(root_dir)} 已处理{frame_count}")
# 更新总进度
if total_frames > 0:
prog.update(main_task, completed=total_completed_frames)
return update_progress
def process_file_worker():
nonlocal completed_files
while True:
try:
file = file_queue.get(timeout=1)
except queue.Empty:
break
filename = file.relative_to(root_dir)
# 创建文件级进度条
if frames[file] == 0:
file_task = prog.add_task(f"{filename}")
else:
file_task = prog.add_task(f"{filename}", total=frames[file])
progress_updater = create_progress_updater(file, file_task)
# 处理视频
proc_id = f"worker_{threading.current_thread().ident}_{completed_files}"
if CFG["save_to"] == "single": if CFG["save_to"] == "single":
t = process_video(file, root_dir/"Compress", update_progress) process_video(file, root_dir/"Compress", progress_updater, proc_id)
else: else:
t = process_video(file, update_progress) process_video(file, None, progress_updater, proc_id)
prog.stop_task(cur) # 更新完成计数
prog.remove_task(cur) with instance_lock:
if t is None: completed_files += 1
prog.update(task,completed=completed_start+frames[file]) if total_frames == 0: # 如果没有总帧数,按文件数计算
else: prog.update(main_task, completed=completed_files)
prog.advance(task,t)
# 移除文件级进度条
prog.remove_task(file_task)
file_queue.task_done()
# 动态管理线程数
active_threads = []
max_workers = CFG["max_concurrent_instances"]
def manage_workers():
nonlocal active_threads
while completed_files < len(frames) or any(t.is_alive() for t in active_threads):
# 清理已完成的线程
active_threads = [t for t in active_threads if t.is_alive()]
# 检查是否需要增加实例
current_worker_count = len(active_threads)
if current_worker_count < max_workers and not file_queue.empty():
# 检查CPU使用率运行5分钟后开始检查
should_add_worker = False
if len(cpu_stats["system"]) >= 10: # 至少有5分钟的数据
if current_worker_count >= 1: # 已有实例运行
should_add_worker = should_increase_instances()
if should_add_worker:
logging.info("CPU资源充足启动第二个压缩实例")
else:
should_add_worker = False
if should_add_worker:
worker_thread = threading.Thread(target=process_file_worker, daemon=True)
worker_thread.start()
active_threads.append(worker_thread)
logging.debug(f"启动新的工作线程,当前活动线程数: {len(active_threads)}")
threading.Event().wait(5) # 每5秒检查一次
# 等待所有线程完成
for thread in active_threads:
thread.join()
# 启动第一个工作线程
if not file_queue.empty():
first_worker = threading.Thread(target=process_file_worker, daemon=True)
first_worker.start()
active_threads.append(first_worker)
logging.info("启动第一个压缩实例")
# 启动线程管理器
manager_thread = threading.Thread(target=manage_workers, daemon=True)
manager_thread.start()
# 等待管理线程完成
manager_thread.join()
logging.info(f"所有视频处理完成,共处理了 {completed_files} 个文件")
def test(): def test():
os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"] os.environ["PATH"] = Path(__file__).parent.as_posix() + os.pathsep + os.environ["PATH"]
try: try:
subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode() subprocess.run([CFG["ffmpeg"],"-version"],stdout=-3,stderr=-3).check_returncode()
except KeyboardInterrupt as e:raise e
except Exception as e: except Exception as e:
print(__file__) print(__file__)
logging.critical("无法运行ffmpeg") logging.critical("无法运行ffmpeg")
exit(-1) exit(-1)
try: try:
ret = subprocess.run( ret = subprocess.run(
"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size=1920x1080:rate=30 -c:v libx264 -y -pix_fmt yuv420p compress_video_test.mp4", f"ffmpeg -hide_banner -f lavfi -i testsrc=duration=1:size={CFG['test_video_resolution']}:rate={CFG['test_video_fps']} -c:v libx264 -y -pix_fmt yuv420p {CFG['test_video_input']}".split(),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
text=True text=True
@ -382,7 +496,7 @@ def test():
logging.debug(ret.stdout) logging.debug(ret.stdout)
logging.debug(ret.stderr) logging.debug(ret.stderr)
ret.check_returncode() ret.check_returncode()
cmd = get_cmd("compress_video_test.mp4","compressed_video_test.mp4",) cmd = get_cmd(CFG["test_video_input"],CFG["test_video_output"],)
ret = subprocess.run( ret = subprocess.run(
cmd, cmd,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
@ -397,24 +511,13 @@ def test():
exit(-1) exit(-1)
os.remove("compress_video_test.mp4") os.remove("compress_video_test.mp4")
os.remove("compressed_video_test.mp4") os.remove("compressed_video_test.mp4")
except KeyboardInterrupt as e:raise e
except Exception as e: except Exception as e:
if os.path.exists("compress_video_test.mp4"): if os.path.exists("compress_video_test.mp4"):
os.remove("compress_video_test.mp4") os.remove("compress_video_test.mp4")
logging.warning("测试未通过,继续运行可能出现未定义行为。") logging.warning("测试未通过,继续运行可能出现未定义行为。")
logging.debug("Test error",exc_info=e) logging.debug("Test error",exc_info=e)
def init_train():
global esti
if CFG["train"]:
train_init()
else:
if ESTI_FILE.exists():
try:
# 从文件读取系数
coeffs_str = ESTI_FILE.read_text().strip().split(',')
esti = [float(coeff) for coeff in coeffs_str]
except Exception as e:
logging.warning(f"预测输出文件{str(ESTI_FILE)}存在但无法读取", exc_info=e)
def exit_pause(): def exit_pause():
if os.name == 'nt': if os.name == 'nt':
@ -426,7 +529,7 @@ def main(_root = None):
atexit.register(exit_pause) atexit.register(exit_pause)
global root, esti global root
setup_logging() setup_logging()
tot_bgn = time() tot_bgn = time()
logging.info("-------------------------------") logging.info("-------------------------------")
@ -437,6 +540,7 @@ def main(_root = None):
import json import json
cfg:dict = json.loads(CFG_FILE.read_text()) cfg:dict = json.loads(CFG_FILE.read_text())
CFG.update(cfg) CFG.update(cfg)
except KeyboardInterrupt as e:raise e
except Exception as e: except Exception as e:
logging.warning("Invalid config file, ignored.") logging.warning("Invalid config file, ignored.")
logging.debug(e) logging.debug(e)
@ -451,7 +555,7 @@ def main(_root = None):
sys.exit(1) sys.exit(1)
root = Path(sys.argv[1]) root = Path(sys.argv[1])
if root.name == "compress": if root.name.lower() == CFG["compress_dir_name"].lower():
logging.critical("请修改目标目录名为非compress。") logging.critical("请修改目标目录名为非compress。")
logging.error("Error termination via invalid input.") logging.error("Error termination via invalid input.")
sys.exit(1) sys.exit(1)
@ -459,8 +563,6 @@ def main(_root = None):
logging.info("开始验证环境") logging.info("开始验证环境")
test() test()
init_train()
if not root.is_dir(): if not root.is_dir():
print("提供的路径不是一个有效目录。") print("提供的路径不是一个有效目录。")
logging.warning("Error termination via invalid input.") logging.warning("Error termination via invalid input.")

View File

@ -0,0 +1,19 @@
# changeConnectionLimit
修改Windows 10 WiFi热点最大连接终端数的工具。
## 功能
通过修改注册表设置和重启相关服务来调整WiFi热点允许连接的设备数量上限。
## 使用方法
1. 以管理员身份运行 `main.bat`
2. 根据提示输入期望的最大连接数
3. 等待脚本完成注册表修改和服务重启
## 注意事项
- 需要管理员权限
- 仅适用于Windows系统
- 修改后需要重启WiFi热点服务才能生效

33
change_font/README.md Normal file
View File

@ -0,0 +1,33 @@
# change_font
An OCR-powered tool that detects text in images and replaces it with custom fonts.
说明:用于将已有文档所有手写文字转变为自定义字体,尽量保持文字格式位置不变。
注意:**只将手写替换,保持印刷体不变。**
使用好未来https://ai.100tal.com/的API可能需要付费。
## Description
This tool uses optical character recognition (OCR) to identify text within images and replaces the detected text with your choice of custom fonts, maintaining the original layout and positioning.
## Usage
1. Configure your API credentials in the script
2. Place your source images in the designated folder
3. Run `python main.py`
4. The tool will process images and generate new versions with replaced fonts
## Requirements
- Python libraries: opencv, numpy, matplotlib, Pillow
- Valid API credentials for the OCR service
- Source images to process
## Setup
Before running, make sure to:
- Install required Python packages
- Configure your API credentials in the configuration section
- Prepare your input images in the correct format

View File

@ -0,0 +1,20 @@
# conversation_trans
将对话数据转换为Markdown格式的工具。
## 功能
将Claude AI聊天记录或其他对话JSON数据转换为格式化的Markdown文档提取思考时间、内容和回复等信息。
## 使用方法
1. 将对话JSON数据放入脚本中
2. 运行 `python main.py`
3. 程序将生成带有对话标题的Markdown文件
## 特性
- 解析消息树结构
- 提取思考过程和回复内容
- 生成格式化的Markdown输出
- 自动生成文件名

27
cord/README.md Normal file
View File

@ -0,0 +1,27 @@
# cord
化学反应坐标图生成工具。
## 功能
创建化学反应的反应坐标图,使用贝塞尔曲线展示反应过程中的能量变化,支持交互式图表显示。
## 使用方法
1. 运行 `streamlit run main.py`
2. 在网页界面中上传包含反应数据的Excel或CSV文件
3. 文件需包含"Name"和"Energy"两列
4. 程序将自动生成反应坐标图
## 数据格式
- 支持Excel (.xlsx) 和CSV (.csv) 格式
- 能量单位Hartree程序自动转换为kcal/mol
- 列名Name反应物/过渡态/产物名称Energy能量值
## 特性
- 交互式Web界面
- 自动单位转换
- 美观的曲线图表
- 支持多步反应

View File

@ -60,7 +60,6 @@ def plot_reaction_coordinate(changed=None, _lines=None):
last=(-1,-1) last=(-1,-1)
maxy = data["Energy"].max() maxy = data["Energy"].max()
miny = data["Energy"].min() miny = data["Energy"].min()
varyy = maxy - miny varyy = maxy - miny
@ -87,6 +86,11 @@ def plot_reaction_coordinate(changed=None, _lines=None):
ax1.xaxis.set_ticks([]) ax1.xaxis.set_ticks([])
ax1.set_ylabel("Energy (kcal/mol)") ax1.set_ylabel("Energy (kcal/mol)")
ax1.set_ylim(miny-varyy*0.1, maxy+varyy*0.1) ax1.set_ylim(miny-varyy*0.1, maxy+varyy*0.1)
if st.session_state.get("xylim", None) is not None:
ax1.set_xlim(st.session_state["xmin"], st.session_state["xmax"])
ax1.set_ylim(st.session_state["ymin"], st.session_state["ymax"])
return fig,lines return fig,lines
# 创建图形和坐标轴 # 创建图形和坐标轴
@ -105,6 +109,7 @@ def callback_gen(x,typ=0):
return callback return callback
def on_save(): def on_save():
global out_file global out_file
# for slider in slides: # for slider in slides:
@ -119,13 +124,16 @@ def on_save():
@st.cache_resource @st.cache_resource
def load_data(file): def load_data(file):
# 读取数据文件 # 读取数据文件
if file is not None: try:
try: if st.session_state.get("use_example", False):
data = create_example()[0]
else:
data = pd.read_excel(file) if file.name.endswith((".xlsx", ".xls")) else pd.read_csv(file) data = pd.read_excel(file) if file.name.endswith((".xlsx", ".xls")) else pd.read_csv(file)
except Exception as e: if data.columns.tolist() != ["Name", "Energy"]:
st.error(f"Error reading file: {e}") st.warning("Format should be Name, Energy. Modified automatically.")
exit() data.columns = ["Name", "Energy"]
else: except Exception as e:
st.error(f"Error reading file: {e}")
exit() exit()
INFLU_FACTORS = [0.5] * data.shape[0] * 2 # 动态创建数组 INFLU_FACTORS = [0.5] * data.shape[0] * 2 # 动态创建数组
@ -133,67 +141,119 @@ def load_data(file):
ene = data["Energy"].to_numpy() ene = data["Energy"].to_numpy()
K_POS = np.where(ene[1:]>ene[:1],0.03,-0.05) K_POS = np.where(ene[1:]>ene[:1],0.03,-0.05)
K_POS = [-0.05] + K_POS.tolist() K_POS = [-0.05] + K_POS.tolist()
st.info(K_POS)
data["Energy"] -= data["Energy"][0] data["Energy"] -= data["Energy"][0]
data["Energy"]*=627.509 data["Energy"]*=627.509
return data, INFLU_FACTORS,K_POS return data, INFLU_FACTORS,K_POS
@st.cache_data
def create_example():
tmp_file = io.BytesIO()
example = pd.DataFrame({"Name":["reactant","TS","result"], "Energy":[-400.310327,-400.210017,-400.341576,]})
example.to_excel(tmp_file, index=False)
return example,tmp_file
out_file = io.BytesIO() out_file = io.BytesIO()
st.set_page_config( st.set_page_config(
page_title="反应坐标绘制", page_title="反应坐标绘制",
page_icon=":chart_with_upwards_trend:", page_icon=":chart_with_upwards_trend:",
layout="wide", initial_sidebar_state="expanded",layout="wide"
initial_sidebar_state="expanded"
) )
st.title("反应坐标绘制") st.title("反应坐标绘制")
st.write("---") st.write("---")
col1,col2,col3 = st.columns([0.4,0.25,0.25],gap="medium")
with col1: file = st.file_uploader("上传能量文件", type=["xlsx", "xls", "csv"],key="file")
file = st.file_uploader("上传能量文件", type=["xlsx", "xls", "csv"])
data, INFLU_FACTORS,K_POS = load_data(file) if not file and not st.session_state.get("use_example", False) and "datas" not in st.session_state:
# st.set_page_config(layout="centered")
st.write("按照下列格式上传表格。请保证列名和范例一致,或直接下载。")
st.warning("注意Energy单位为Hatree程序将自动转换为kcal/mol的相对能量")
example,tmp_file = create_example()
st.dataframe(example,hide_index=True)
st.download_button("下载模板",data=tmp_file,file_name="reaction_coordinate_example.xlsx")
def use_tmp():
global file
st.session_state["use_example"] = True
file = tmp_file
st.button("使用样例使用",on_click=use_tmp)
st.stop()
else:
pass
# st.set_page_config(layout="wide")
col1,col2 = st.columns([0.4,0.6],gap="medium")
with col2:
if "datas" not in st.session_state:
data, INFLU_FACTORS,K_POS = load_data(file)
st.session_state["datas"] = (data, INFLU_FACTORS,K_POS)
else:
data, INFLU_FACTORS, K_POS = st.session_state["datas"]
fig,lines = plot_reaction_coordinate() fig,lines = plot_reaction_coordinate()
stfig = st.pyplot(fig,False) stfig = st.pyplot(fig,False)
with col1:
st.title("作图参数设置")
with st.expander("调整曲线形状(贝塞尔参数)"):
for i in range(data.shape[0]):
if i!=0:
st.slider(
f'{data.loc[i,"Name"]}',
0.0, 1.0, value=INFLU_FACTORS[i*2-1],
key=f'slider_{i*2}',
on_change=callback_gen(i*2)
)
if i!= data.shape[0] - 1:
st.slider(
f'{data.loc[i,"Name"]}',
0.0, 1.0, value=INFLU_FACTORS[i*2],
key=f'slider_{i*2+1}',
on_change=callback_gen(i*2+1)
)
with st.expander("调整文字位置"):
for i in range(data.shape[0]):
st.slider(
f'{data.loc[i,"Name"]}',
-0.1, 0.1, value=K_POS[i],
key=f'text_slider_{i}',
on_change=callback_gen(i,1)
)
with st.expander("调整坐标系极限"):
if st.session_state.get("xylim", None) is None:
xmin,xmax = plt.xlim()
ymin,ymax = plt.ylim()
st.session_state["xylim"] = (xmin,xmax,ymin,ymax)
xmin,xmax,ymin,ymax = st.session_state["xylim"]
dxmin,dxmax,dymin,dymax = abs(xmin)*0.5,abs(xmax)*0.5,abs(ymin)*0.5,abs(ymax)*0.5
st.slider("x min", xmin-dxmin, xmin+dxmin,value=xmin, key="xmin")
st.slider("x max", xmax-dxmax, xmax+dxmax,value=xmax, key="xmax")
st.slider("y min", ymin-dymin, ymin+dymin,value=ymin, key="ymin")
st.slider("y max", ymax-dymax, ymax+dymax,value=ymax, key="ymax")
with st.expander("导出"):
st.selectbox("导出文件拓展名",[".tiff",".pdf",".png",".pgf"],key="export_format")
btn = st.button("生成文件")
if btn:
st.download_button(
label="下载",
data=on_save(),
file_name="reaction_coordinate"+st.session_state.get("export_format", ".tiff"),
# mime="image/tiff"
)
st.slider("字体大小",8,20, value=12, key="font_size", st.slider("字体大小",8,20, value=12, key="font_size",
on_change=lambda: plt.rcParams.update({'font.size': st.session_state.get("font_size", 12)})) on_change=lambda: plt.rcParams.update({'font.size': st.session_state.get("font_size", 12)}))
st.selectbox("导出文件拓展名",[".tiff",".pdf",".png",".pgf"],key="export_format") if st.button("重置",type="primary"):
st.download_button( st.session_state.clear()
label="Download Plot", st.rerun()
data=on_save(),
file_name="reaction_coordinate"+st.session_state.get("export_format", ".tiff"),
# mime="image/tiff"
)
with col2:
st.write("调整滑块以改变反应坐标图曲线形状。")
for i in range(data.shape[0]):
if i!=0:
st.slider(
f'{data.loc[i,"Name"]}',
0.0, 1.0, value=INFLU_FACTORS[i*2-1],
key=f'slider_{i*2}',
on_change=callback_gen(i*2)
)
if i!= data.shape[0] - 1:
st.slider(
f'{data.loc[i,"Name"]}',
0.0, 1.0, value=INFLU_FACTORS[i*2],
key=f'slider_{i*2+1}',
on_change=callback_gen(i*2+1)
)
with col3:
st.write("调整参数以改变文字位置。")
for i in range(data.shape[0]):
st.slider(
f'{data.loc[i,"Name"]}',
-0.1, 0.1, value=K_POS[i],
key=f'text_slider_{i}',
on_change=callback_gen(i,1)
)
st.write("---") st.write("---")
st.dataframe(data) st.dataframe(data)

26
down/README.md Normal file
View File

@ -0,0 +1,26 @@
# down
123pan云盘文件下载工具。
## 功能
从123pan.com云存储服务下载文件支持通过分享密钥批量下载使用多线程下载提高速度。
## 使用方法
1. 在脚本中配置分享密钥
2. 运行 `python down.py`
3. 程序将自动提取下载链接并使用aria2c进行多线程下载
## 依赖要求
- 需要安装aria2c.exe
- 需要网络连接
- Python requests库
## 特性
- 自动解析分享链接
- 多线程下载
- 支持批量处理
- 断点续传支持通过aria2c

27
encryptFiles/README.md Normal file
View File

@ -0,0 +1,27 @@
# encryptFiles
基于RSA的文件夹加密工具。
## 功能
使用RSA算法对整个文件夹进行加密和解密支持密码保护的私钥提供并行处理以提高性能。
## 使用方法
1. 首次运行:`python main.py [文件夹路径]` - 将生成RSA密钥对并加密文件夹
2. 再次运行:`python main.py [文件夹路径]` - 将解密已加密的文件夹
3. 程序会自动在加密和解密模式之间切换
## 特性
- RSA加密算法确保安全性
- 密码保护的私钥
- 并行处理提高速度
- 进度条显示处理状态
- 自动模式切换(加密/解密)
## 注意事项
- 首次使用会在当前目录生成密钥文件
- 请妥善保管密钥文件和密码
- 加密过程不可逆,请确保有备份

26
jw/README.md Normal file
View File

@ -0,0 +1,26 @@
# jw
教务系统自动登录工具。
## 功能
自动登录学生教务系统,支持验证码自动识别和密码哈希验证,简化教务系统访问流程。
## 使用方法
1. 在代码中配置用户名和密码
2. 运行 `python main.py`
3. 程序将自动处理登录过程
## 特性
- 自动验证码识别
- 密码安全哈希处理
- 会话管理
- 错误重试机制
## 注意事项
- 需要配置正确的用户凭据
- 仅用于合法的教务系统访问
- 请遵守学校相关规定

71
jw/main.py Normal file
View File

@ -0,0 +1,71 @@
import hashlib
import requests
import re
import json
import base64
import tenacity as retry
def recapture(username, password, b64, ID):
data = {"username": username, "password": password, "ID": ID, "b64": b64, "version": "3.1.1"}
data_json = json.dumps(data)
result = json.loads(requests.post("http://www.fdyscloud.com.cn/tuling/predict", data=data_json).text)
return result
def pwd_md5(string: str) -> str:
md5_part1 = hashlib.md5((string + "{Urp602019}").encode()).hexdigest().lower()
md5_part2 = hashlib.md5(string.encode()).hexdigest().lower()
final_result = md5_part1 + '*' + md5_part2
return final_result
@retry.retry(stop=retry.stop_after_attempt(3), wait=retry.wait_random(3,5),reraise=True)
def login(username: str, password: str) -> requests.Session:
print("正在登录...")
session = requests.Session()
req = session.get("http://jwstudent.lnu.edu.cn/login")
req.raise_for_status()
html = req.text
match = re.search(r'name="tokenValue" value="(.+?)">', html)
if match:
token_value = match.group(1)
else:
raise ValueError("未找到 tokenValue")
req = session.get("http://jwstudent.lnu.edu.cn/img/captcha.jpg")
req.raise_for_status()
im = req.content
b64 = base64.b64encode(im).decode('utf-8')
captcha_code = recapture(username="fbcfbc6", password="b0qDHNSSg5LxBRzO3hfpbTE5", b64=b64, ID="04897896")["data"]["result"]
with open("captcha.jpg", "wb") as f:
f.write(im)
print(captcha_code)
hashed_password = pwd_md5(password)
# 模拟请求的 payload
payload = {
"j_username": username,
"j_password": hashed_password,
"j_captcha": captcha_code,
"tokenValue": token_value
}
# 发送 POST 请求
url = "http://jwstudent.lnu.edu.cn/j_spring_security_check" # 替换为实际登录地址
headers = {
"User-Agent": "Mozilla/5.0",
"Content-Type": "application/x-www-form-urlencoded"
}
response = session.post(url, data=payload, headers=headers)
if "发生错误" in response.text:
err = re.search(r'<strong>发生错误!</strong>(.+)', response.text)
if err:
error_message = err.group(1).strip()
raise ValueError(f"登录失败: {error_message}")
raise ValueError("登录失败")
return session
print(login("20211299305", "123"))

View File

@ -13,8 +13,8 @@ from Crypto.Util.Padding import pad
from base64 import b64encode from base64 import b64encode
URL = "http://[your libseat url]" URL = "http://[your libseat url]"
UID = "[uid]" # 图书馆账号 # UID = "[uid]" # 图书馆账号
PWD = "[password]" # 图书馆密码 PWD = "000000" # 图书馆密码
USERNAME = "[username]" # 验证码平台用户名 USERNAME = "[username]" # 验证码平台用户名
PASSWORD = "[password]" # 验证码平台密码 PASSWORD = "[password]" # 验证码平台密码
TOKEN = "[token]" TOKEN = "[token]"
@ -171,8 +171,8 @@ def main(dep=0):
print("正在尝试登录...") print("正在尝试登录...")
tried =0 tried =0
while token is None and tried <= 5: while token is None and tried <= 5:
id = UID # id = UID
# id = str(random.choice(years)) + random.choice(house) + str(random.choice(classes)) + str(random.choice(num)).zfill(2) id = str(random.choice(years)) + random.choice(house) + str(random.choice(classes)) + str(random.choice(num)).zfill(2)
token = login(id,PWD) token = login(id,PWD)
if token is None: if token is None:
print("登陆失败:",token) print("登陆失败:",token)

27
move_warn/README.md Normal file
View File

@ -0,0 +1,27 @@
# move_warn
运动检测监控工具。
## 功能
监控IP摄像头画面当检测到显著运动时触发声音警报适用于简单的安防监控需求。
## 使用方法
1. 确保IP摄像头正常工作并可访问
2. 在代码中配置摄像头IP地址
3. 运行 `python main.py`
4. 程序将开始监控,检测到运动时发出警报并自动关闭
## 特性
- 实时运动检测
- 声音警报提示
- 自动程序终止
- 基于OpenCV的计算机视觉处理
## 注意事项
- 需要配置正确的摄像头地址
- 确保摄像头网络连接稳定
- 检测灵敏度可根据需要调整

32
mult/README.md Normal file
View File

@ -0,0 +1,32 @@
# mult
批量视频处理工具。
## 功能
使用auto-editor和FFmpeg对多个MP4/FLV视频文件进行批量处理自动去除静音片段并转码视频支持并行处理提高效率。
## 使用方法
1. 将要处理的视频文件放在程序目录中
2. 运行 `python main.py`CPU处理`python main_up.py`GPU加速
3. 程序将自动处理所有视频文件
## 支持格式
- MP4
- FLV
## 特性
- 批量视频处理
- 自动静音片段移除
- 并行处理支持
- GPU硬件加速支持main_up.py
- 进程池管理
## 依赖要求
- FFmpeg
- auto-editor
- 足够的磁盘空间用于输出文件

30
mw_tool/README.md Normal file
View File

@ -0,0 +1,30 @@
# mw_tool
化学计算与化合物查询工具。
## 功能
基于Web界面的化学工具提供PubChem数据库集成的化合物搜索、分子量计算和反应化学计量计算功能。
## 使用方法
1. 运行 `streamlit run main.py`
2. 在浏览器中访问显示的本地网址
3. 使用各项功能:
- 化合物搜索和信息查询
- 分子量计算
- 反应化学计量计算
## 主要功能
- **化合物查询**通过PubChem数据库搜索化合物信息
- **分子量计算**:计算化学式的分子量
- **化学计量学**:反应平衡计算和摩尔比计算
- **交互式表格**:数据以表格形式展示,便于查看
## 特性
- 友好的Web界面
- 实时数据库查询
- 多功能集成平台
- 数据表格化展示

File diff suppressed because it is too large Load Diff

581
mw_tool/main_old.py Normal file
View File

@ -0,0 +1,581 @@
import streamlit as st
import pubchempy as pcp
from rdkit import Chem
from rdkit.Chem import rdMolDescriptors
import requests
from io import BytesIO
st.set_page_config(
page_title="质量及密度查询",
layout="wide"
)
# 初始化 session state
if 'compound_data' not in st.session_state:
st.session_state.compound_data = None
def search_compound(query):
"""搜索化合物信息"""
try:
compounds = None
try:
comp = Chem.MolFromSmiles(query)
if comp:
compounds = pcp.get_compounds(query, 'smiles', listkey_count=3)
except Exception:
st.error("使用smiles精确查询失败")
# 尝试通过化学式搜索
if not (isinstance(compounds, list) and len(compounds) != 0):
# 尝试通过名称搜索
try:
compounds = pcp.get_compounds(query, 'name', listkey_count=3)
except Exception:
st.error("使用名称查询失败")
if not (isinstance(compounds, list) and len(compounds) != 0):
try:
compounds = pcp.get_compounds(query, 'formula', listkey_count=3)
except Exception:
st.error("使用化学式精确查询失败")
if isinstance(compounds, list) and len(compounds) > 0:
st.info("成功查询物质基本信息,正在获取更多数据。")
return compounds[0]
else:
return None
except Exception as e:
st.error(f"搜索时发生错误: {str(e)}")
# raise e
return None
def calculate_molecular_weight_from_smiles(smiles):
"""从SMILES计算分子量"""
try:
mol = Chem.MolFromSmiles(smiles)
if mol:
return rdMolDescriptors.CalcExactMolWt(mol)
else:
return None
except Exception as e:
st.error(f"SMILES分子量计算错误: {str(e)}")
return None
def get_pubchem_properties(compound):
"""从PubChem获取密度、熔点、沸点信息"""
try:
# 初始化返回数据
properties = {
'density': None,
'melting_point': None,
'boiling_point': None
}
cid = compound.cid
# 尝试获取物理化学性质相关的记录
try:
url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug_view/data/compound/{cid}/JSON?heading=Experimental+Properties"
data = requests.get(url, timeout=3).json()
for section in data["Record"]["Section"]:
if section["TOCHeading"] == "Chemical and Physical Properties":
for sub in section["Section"]:
if sub["TOCHeading"] == "Experimental Properties":
for prop in sub["Section"]:
prop_heading = prop["TOCHeading"]
if prop_heading == "Density" and not properties['density']:
# 可能有多条不同温度/浓度的记录,逐条返回
properties['density'] = [
info["Value"]["StringWithMarkup"][0]["String"]
for info in prop["Information"]
if "Value" in info and "StringWithMarkup" in info["Value"]
]
elif prop_heading == "Melting Point" and not properties['melting_point']:
properties['melting_point'] = [
info["Value"]["StringWithMarkup"][0]["String"]
for info in prop["Information"]
if "Value" in info and "StringWithMarkup" in info["Value"]
]
elif prop_heading == "Boiling Point" and not properties['boiling_point']:
properties['boiling_point'] = [
info["Value"]["StringWithMarkup"][0]["String"]
for info in prop["Information"]
if "Value" in info and "StringWithMarkup" in info["Value"]
]
return properties
except Exception:
return properties
except Exception as e:
# 静默处理异常返回空的properties字典
return {
'density': None,
'melting_point': None,
'boiling_point': None
}
def sync_calculations(compound_data, mmol=None, mass=None, volume=None, changed_field=None):
"""同步计算mmol、质量、体积"""
if not compound_data:
return mmol, mass, volume
# 确保数值类型转换
try:
molecular_weight = float(compound_data.get('molecular_weight', 0))
density_select = compound_data.get('density_select', None)
density = float(density_select) if density_select is not None else None
except (ValueError, TypeError):
st.error("分子量或密度数据格式错误,无法进行计算")
return mmol, mass, volume
if molecular_weight == 0:
return mmol, mass, volume
try:
if changed_field == 'mmol' and mmol is not None:
# 根据mmol计算质量
mass = (mmol / 1000) * molecular_weight # mmol转mol再乘分子量
# 如果有密度,计算体积
if density and density > 0:
volume = mass / density
elif changed_field == 'mass' and mass is not None:
# 根据质量计算mmol
mmol = (mass / molecular_weight) * 1000 # 质量除分子量得mol再转mmol
# 如果有密度,计算体积
if density and density > 0:
volume = mass / density
elif changed_field == 'volume' and volume is not None and density and density > 0:
# 根据体积计算质量
mass = volume * density
# 根据质量计算mmol
mmol = (mass / molecular_weight) * 1000
except Exception as e:
st.error(f"计算错误: {str(e)}")
return mmol, mass, volume
# 主界面
col1, col2 = st.columns([1, 2])
with col1:
st.subheader("物质查询")
query = st.text_input("输入化学式、名称或SMILES:", placeholder="例如: H2O, water, CCO")
# 添加直接计算分子量选项
calc_mw_only = st.checkbox("仅计算分子量(不查询数据库)", help="勾选此项将跳过数据库查询仅从SMILES计算分子量")
if st.button("查询" if not calc_mw_only else "计算", type="primary"):
if query:
with st.spinner("正在处理..."):
# 如果选择仅计算分子量直接从SMILES计算
if calc_mw_only:
mol_weight = calculate_molecular_weight_from_smiles(query)
if mol_weight:
compound_data = {
'name': "用户输入化合物",
'formula': "从SMILES计算",
'molecular_weight': mol_weight,
'melting_point': None,
'density_src': None,
'melting_point_src': None,
'boiling_point_src': None,
'smiles': query,
"inchi": None,
'found': False
}
st.session_state.compound_data = compound_data
st.success("分子量计算完成!")
else:
st.error("输入的SMILES格式无效")
st.session_state.compound_data = None
else:
# 原有的查询逻辑
compound = search_compound(query)
if compound is not None:
# 查询到化合物
# 获取PubChem的物理化学性质信息
pubchem_properties = get_pubchem_properties(compound)
compound_data = {
'name': compound.iupac_name,
'cid': compound.cid,
'formula': compound.molecular_formula,
'molecular_weight': compound.molecular_weight,
"density_src": pubchem_properties['density'],
'melting_point_src': pubchem_properties['melting_point'],
'boiling_point_src': pubchem_properties['boiling_point'],
'smiles': compound.canonical_smiles,
"inchi": compound.inchi if hasattr(compound, 'inchi') else None,
'found': True,
}
st.session_state.compound_data = compound_data
# 显示查询结果信息
if compound_data['density_src'] or compound_data['melting_point_src'] or compound_data['boiling_point_src']:
properties_found = []
if compound_data['density_src']:
properties_found.append("密度")
if compound_data['melting_point_src']:
properties_found.append("熔点")
if compound_data['boiling_point_src']:
properties_found.append("沸点")
st.success(f"查询成功!(找到{', '.join(properties_found)}信息)")
else:
st.success("查询成功!(未找到物理性质信息)")
else:
# 未查询到检查是否为SMILES
if query:
mol_weight = calculate_molecular_weight_from_smiles(query)
if mol_weight:
compound_data = {
'name': "未知化合物",
'formula': "从SMILES计算",
'molecular_weight': mol_weight,
'melting_point': None,
'density_src': None,
'melting_point_src': None,
'boiling_point_src': None,
'smiles': query,
"inchi": None,
'found': False
}
st.session_state.compound_data = compound_data
st.warning("⚠️ 未在数据库中找到但已从SMILES计算分子量")
else:
st.error("❌ 未找到该化合物且SMILES格式无效")
st.session_state.compound_data = None
with col2:
st.subheader("化合物信息")
if st.session_state.compound_data:
data = st.session_state.compound_data
# 显示基本信息
info_col1, info_col2 = st.columns(2)
with info_col1:
st.metric("物质名称", data['name'])
try:
molecular_weight_value = float(data['molecular_weight'])
st.metric("分子量 (g/mol)", f"{molecular_weight_value:.3f}")
except (ValueError, TypeError):
st.metric("分子量 (g/mol)", "数据格式错误")
with info_col2:
st.metric("化学式", data['formula'])
if data.get("cid"):
st.markdown("### 其他数据")
st.page_link(f"https://pubchem.ncbi.nlm.nih.gov/compound/{data['cid']}",label="**访问PubChem**")
st.image(f"https://pubchem.ncbi.nlm.nih.gov/image/imgsrv.fcgi?cid={data['cid']}&t=s","结构式")
# 添加熔沸点信息的展开区域
if data.get('melting_point_src') or data.get('boiling_point_src'):
with st.expander("熔沸点信息", expanded=False):
col1, col2 = st.columns(2)
with col1:
if data.get('melting_point_src'):
st.markdown("### 熔点数据")
melting_data = data['melting_point_src']
if isinstance(melting_data, list):
for i, mp in enumerate(melting_data, 1):
st.write(f"{i}. {mp}")
else:
st.write(melting_data)
with col2:
if data.get('boiling_point_src'):
st.markdown("### 沸点数据")
boiling_data = data['boiling_point_src']
if isinstance(boiling_data, list):
for i, bp in enumerate(boiling_data, 1):
st.write(f"{i}. {bp}")
else:
st.write(boiling_data)
# 判断是否为液体
melting_data = data['melting_point_src']
if isinstance(melting_data, list) and len(melting_data) > 0:
import re
melting_point = re.search(r'\d*\.\d+', melting_data[0])
if melting_point:
melting_point = float(melting_point.group())
# 检测值变化并执行计算
def handle_change(field_name, new_value, current_value):
try:
# 确保都转换为浮点数
new_value = float(new_value) if new_value is not None else 0.0
current_value = float(current_value) if current_value is not None else 0.0
if abs(new_value - current_value) > 1e-6: # 避免浮点数比较问题
# 同步计算 - 确保数据类型正确
try:
calc_data = {
'molecular_weight': float(data['molecular_weight']),
'density_select': float(st.session_state.get('density_select', 0)) if (show_density and st.session_state.get('density_select')) else None
}
except (ValueError, TypeError):
st.error("化合物数据格式错误,无法进行计算")
return
mmol_calc = mass_calc = volume_calc = 0.0
if field_name == 'mmol':
mmol_calc, mass_calc, volume_calc = sync_calculations(
calc_data, new_value, None, None, 'mmol'
)
elif field_name == 'mass':
mmol_calc, mass_calc, volume_calc = sync_calculations(
calc_data, None, new_value, None, 'mass'
)
elif field_name == 'volume':
mmol_calc, mass_calc, volume_calc = sync_calculations(
calc_data, None, None, new_value, 'volume'
)
elif field_name == 'density':
# 密度变化时,如果已有质量,重新计算体积;如果已有体积,重新计算质量
current_mass = st.session_state.mass_val
current_volume = st.session_state.volume_val
if current_mass > 0:
# 根据质量重新计算体积
mmol_calc, mass_calc, volume_calc = sync_calculations(
calc_data, None, current_mass, None, 'mass'
)
elif current_volume > 0:
# 根据体积重新计算质量
mmol_calc, mass_calc, volume_calc = sync_calculations(
calc_data, None, None, current_volume, 'volume'
)
else:
return # 没有质量或体积数据,无需重新计算
# 更新session state
st.session_state.mmol_val = float(mmol_calc) if mmol_calc is not None else 0.0
st.session_state.mass_val = float(mass_calc) if mass_calc is not None else 0.0
st.session_state.volume_val = float(volume_calc) if volume_calc is not None else 0.0
st.session_state.last_changed = field_name
# 强制刷新页面以更新输入框的值
if field_name != 'density': # 密度变化时不需要rerun因为已经在密度输入处理中rerun了
st.rerun()
except (ValueError, TypeError) as e:
st.error(f"数值转换错误: {str(e)}")
return
# 密度显示选项
show_density = False
if data['density_src']:
show_density = st.checkbox("显示密度信息", value=False)
if show_density:
import re
# 初始化密度值在session state中
if 'density_select' not in st.session_state:
st.session_state.density_select = None
if 'density_input_value' not in st.session_state:
st.session_state.density_input_value = 0.0
density_data = data['density_src']
# print(density_data)
# 如果密度是列表且长度>1让用户选择
if isinstance(density_data, list) and len(density_data) > 1:
st.markdown("**选择密度数据:**")
# 为每个密度选项提取数值并显示
density_options = []
density_values = []
for i, density_str in enumerate(density_data):
# 使用正则表达式提取密度数值
match = re.search(r'\d*\.\d+', str(density_str))
if match:
extracted_value = float(match.group())
density_options.append(f"{extracted_value:.3f}: {density_str}")
density_values.append(extracted_value)
else:
density_options.append(f"0.000: {density_str} (无法提取数值)")
density_values.append(None)
# 用户选择密度
selected_index = st.selectbox(
"选择要使用的密度数据:",
range(len(density_options)),
format_func=lambda x: density_options[x],
key="density_selector"
)
# 获取选中的密度值
if density_values[selected_index] is not None:
selected_density_value = density_values[selected_index]
st.session_state.density_select = selected_density_value
# 显示并允许用户修改密度值
st.markdown("**密度值 (可修改)**")
new_density = st.number_input(
"密度 (g/mL)",
min_value=0.0,
value=float(st.session_state.density_select),
step=0.001,
format="%.3f",
key="density_input",
help="选择的密度值,可以手动修改"
)
# 检测密度值变化
if abs(new_density - st.session_state.density_input_value) > 1e-6:
st.session_state.density_select = new_density
st.session_state.density_input_value = new_density
# 更新compound_data中的密度值用于计算
st.session_state.compound_data['density_select'] = new_density
handle_change('density', 1, 0)
st.rerun()
else:
st.error("所选密度数据无法提取有效数值")
# 如果密度是单个值或列表长度为1
else:
try:
if isinstance(density_data, list):
density_str = str(density_data[0])
else:
density_str = str(density_data)
# 提取密度数值
match = re.search(r'\d*\.\d+', density_str)
if match:
density_value = float(match.group())
st.session_state.density_select = density_value
# 显示并允许用户修改密度值
st.markdown("**密度值 (可修改)**")
new_density = st.number_input(
"密度 (g/mL)",
min_value=0.0,
value=float(st.session_state.density_select),
step=0.001,
format="%.3f",
key="density_input_single",
help="提取的密度值,可以手动修改"
)
# 检测密度值变化
if abs(new_density - st.session_state.density_input_value) > 1e-6:
st.session_state.density_select = new_density
st.session_state.density_input_value = new_density
# 更新compound_data中的密度值用于计算
st.session_state.compound_data['density_select'] = new_density
handle_change('density', 1, 0)
st.rerun()
else:
st.error("无法从密度数据中提取有效数值")
except (ValueError, TypeError):
st.error("密度数据格式错误")
st.markdown("---")
# 计算器部分
st.subheader("用量计算器")
# 初始化值
if 'mmol_val' not in st.session_state:
st.session_state.mmol_val = 0.0
if 'mass_val' not in st.session_state:
st.session_state.mass_val = 0.0
if 'volume_val' not in st.session_state:
st.session_state.volume_val = 0.0
if 'last_changed' not in st.session_state:
st.session_state.last_changed = None
# 创建响应式列布局
density_select = st.session_state.get('density_select')
if show_density and density_select is not None:
calc_col1, calc_col2, calc_col3 = st.columns([1, 1, 1])
else:
calc_col1, calc_col2 = st.columns([1, 1])
calc_col3 = None
with calc_col1:
st.markdown("**物质的量**")
new_mmol = st.number_input(
"用量 (mmol)",
min_value=0.0,
value=float(st.session_state.mmol_val),
step=0.1,
format="%.3f",
key="mmol_input",
help="输入或计算得到的物质的量,单位:毫摩尔"
)
# 检测mmol变化
if st.session_state.last_changed != 'mmol':
handle_change('mmol', new_mmol, st.session_state.mmol_val)
with calc_col2:
st.markdown("**质量**")
new_mass = st.number_input(
"质量 (g)",
min_value=0.0,
value=float(st.session_state.mass_val),
step=0.001,
format="%.3f",
key="mass_input",
help="输入或计算得到的质量,单位:克"
)
# 检测mass变化
if st.session_state.last_changed != 'mass':
handle_change('mass', new_mass, st.session_state.mass_val)
if calc_col3 is not None:
with calc_col3:
st.markdown("**体积**")
new_volume = st.number_input(
"体积 (mL)",
min_value=0.0,
value=float(st.session_state.volume_val),
step=0.01,
format="%.3f",
key="volume_input",
help="输入或计算得到的体积,单位:毫升"
)
# 检测volume变化
if st.session_state.last_changed != 'volume':
handle_change('volume', new_volume, st.session_state.volume_val)
# 重置last_changed状态
st.session_state.last_changed = None
# 清零按钮
if st.button("清零所有数值", type="secondary"):
st.session_state.mmol_val = 0.0
st.session_state.mass_val = 0.0
st.session_state.volume_val = 0.0
st.session_state.last_changed = None
st.rerun()
st.session_state.mmol_val = 0.0
st.session_state.mass_val = 0.0
st.session_state.volume_val = 0.0
st.rerun()
else:
st.info("请在左侧输入要查询的化学物质")

View File

@ -1,3 +1,5 @@
streamlit>=1.28.0 streamlit>=1.28.0
pubchempy>=1.0.4 pubchempy>=1.0.4
rdkit>=2022.9.5 requests>=2.25.0
Pillow>=8.0.0
molmass

40
pdf_index/README.md Normal file
View File

@ -0,0 +1,40 @@
# Streamlit PDF 目录查看器
一个最小可运行的 Streamlit 应用:
- 读取 `doc.pdf`(放在当前目录),或通过页面上传 PDF。
- 使用 PyMuPDF 提取 PDF 目录Table of Contents
- 在下拉框选择目录项后,显示该目录项对应的页面范围(到下一个目录项前一页)。
- 使用 `st.pdf` 组件内嵌查看选定页面范围的临时 PDF。
## 快速开始Windows / cmd
1) 建议创建虚拟环境(可选)
```cmd
python -m venv .venv
.venv\Scripts\activate
```
2) 安装依赖
```cmd
pip install -r requirements.txt
```
3) 将你的 PDF 放到本目录并命名为 `doc.pdf`(或在页面中上传)。
4) 运行应用
```cmd
streamlit run app.py
```
## 用法说明
- 左侧边栏可上传 PDF若本地存在 `doc.pdf`,也会自动被加载。
- 目录下拉框显示形如 `title (page)`
- 若 PDF 无目录,本应用会提示;可选择“全部页面”查看。
## 已知限制
- 目录页码通常为 PDF 内部页码(从 1 开始),个别 PDF 的 TOC 可能与实际页面偏移不一致。
- 页面范围切片依赖 TOC 顺序,若 TOC 不规范可能导致范围不准。
## 说明
使用streamlit_pdf_viewer而不是官方的streamlit_pdf是因为在手机上streamlit_pdf无法显示。

121
pdf_index/app.py Normal file
View File

@ -0,0 +1,121 @@
from dataclasses import dataclass
from typing import Any, List, Optional, Tuple
from pathlib import Path
import fitz # PyMuPDF
import streamlit as st
from streamlit_pdf_viewer import pdf_viewer
@dataclass
class TocItem:
level: int
title: str
page_from: int # 1-based
page_to: Optional[int] # 1-based inclusive; None means until end
def read_toc(doc: fitz.Document) -> List[Tuple[int, str, int]]:
# Returns list of (level, title, page) where page is 1-based per PyMuPDF
toc: List[Tuple[int, str, int]] = []
try:
get_toc: Any = getattr(doc, "get_toc", None)
if callable(get_toc):
toc = get_toc(simple=True) # type: ignore[no-any-return]
except Exception:
toc = []
return [(lvl, title, max(1, pg)) for (lvl, title, pg) in toc]
def normalize_ranges(toc: List[Tuple[int, str, int]], page_count: int) -> List[TocItem]:
if not toc:
return []
items: List[TocItem] = []
for i, (lvl, title, page) in enumerate(toc):
start = min(max(1, page), page_count)
if i + 1 < len(toc):
next_page = toc[i + 1][2]
end = max(1, min(page_count, next_page - 1))
if end < start:
end = start
else:
end = page_count
items.append(TocItem(level=lvl, title=title, page_from=start, page_to=end))
return items
def _hash_doc(doc:fitz.Document):
"This is a fake hash, ENSURE GLOBAL DOCUMENT SAME"
return "12"
@st.cache_resource(hash_funcs={fitz.Document:_hash_doc})
def slice_pdf_pages(src_doc: fitz.Document, page_from: int, page_to: int) -> bytes:
# Create a new PDF with selected 1-based inclusive page range
new_pdf = fitz.open()
try:
start_i = max(1, page_from) - 1
end_i = max(start_i, page_to - 1)
for p in range(start_i, min(end_i, src_doc.page_count - 1) + 1):
new_pdf.insert_pdf(src_doc, from_page=p, to_page=p)
out = new_pdf.tobytes()
return out
finally:
new_pdf.close()
src_doc.close()
def format_label(item: TocItem) -> str:
return f"{item.title} ({item.page_from:03d} - {item.page_to:03d})"
@st.cache_resource
def read_pdf():
pdf_path = Path("doc.pdf")
if not pdf_path.exists():
st.error("找不到doc.pdf")
st.stop()
doc = fitz.open(pdf_path, filetype="pdf")
def _close_doc():
"Never close doc due to cache in global streamlit app."
pass
doc.close = _close_doc
page_count = doc.page_count
# 读取目录
raw_toc = read_toc(doc)
items = normalize_ranges(raw_toc, page_count)
return doc,page_count,items
def main():
st.set_page_config(page_title="PDF 目录查看器", layout="wide")
st.title("PDF 目录查看器")
doc,page_count,items = read_pdf()
st.subheader("目录")
labels = ["请选择"] + [format_label(it) for it in items]
selection = st.selectbox("选择章节", labels, index=0)
if selection == "请选择":
st.stop()
idx = labels.index(selection)
chosen = items[idx]
selected_range = (chosen.page_from, chosen.page_to or page_count)
rng_from, rng_to = selected_range
st.subheader("预览")
try:
sliced_bytes = slice_pdf_pages(doc, rng_from, rng_to)
st.download_button("下载",sliced_bytes,file_name=f"{chosen.title}.pdf")
pdf_viewer(sliced_bytes,rendering=st.session_state.get("render","unwrap"))
# st.pdf(io.BytesIO(sliced_bytes), height=height, key="pdf_preview")
except Exception as e:
st.error(f"渲染失败:{e}")
st.selectbox("使用其他渲染方式",["unwrap","legacy_embed","legacy_iframe"],key="render")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,3 @@
streamlit>=1.37.0
pymupdf>=1.24.0
streamlit_pdf_viewer

31
pdf_unlock/README.md Normal file
View File

@ -0,0 +1,31 @@
# pdf_unlock
PDF限制移除工具。
## 功能
移除PDF文件的访问限制通过重新创建不带加密和访问限制的PDF副本来解除文件保护。
## 使用方法
### 命令行使用
```bash
python main.py <输入文件或目录>
```
### 拖拽使用
将PDF文件或包含PDF文件的文件夹直接拖拽到程序上。
## 特性
- 支持单个文件和批量处理
- 移除密码保护
- 移除复制/打印限制
- 保持原始文件质量
- 自动输出到指定目录
## 注意事项
- 仅用于合法拥有的PDF文件
- 请遵守版权法规
- 原始文件保持不变,生成新的无限制副本

26
pinyin/README.md Normal file
View File

@ -0,0 +1,26 @@
# pinyin
中文姓名拼音首字母转换器。
## 功能
将中文姓名转换为拼音首字母,处理预设的中文姓名列表并输出每个字符拼音的首字母。
## 使用方法
1. 运行 `python gen.py`
2. 程序将处理内置的中文姓名列表
3. 输出每个姓名对应的拼音首字母
## 特性
- 中文转拼音处理
- 首字母提取
- 批量姓名处理
- 简洁输出格式
## 应用场景
- 姓名索引生成
- 拼音首字母排序
- 中文姓名处理工具

34
process_video/README.md Normal file
View File

@ -0,0 +1,34 @@
# process_video
视频时长调整工具。
## 功能
使用FFmpeg调整视频播放速度以达到目标时长支持加速或减速处理同时保持视频质量。
## 使用方法
1. 编译C++程序(如果尚未编译)
2. 运行可执行文件
3. 按提示输入:
- 源视频文件路径
- 目标时长
- 输出文件名
## 特性
- 精确时长控制
- 自动速度计算
- 质量保持优化
- 交互式操作界面
## 依赖要求
- FFmpeg和FFprobe需要安装并在系统PATH中
- C++编译器(用于编译源代码)
## 应用场景
- 视频时长标准化
- 网课视频调速
- 媒体内容适配

30
rand/README.md Normal file
View File

@ -0,0 +1,30 @@
# rand
交互式随机数生成器。
## 功能
生成指定范围内的随机整数列表,支持设置是否允许重复数字,提供交互式操作界面。
## 使用方法
1. 运行 `python main.py`
2. 按照提示输入:
- 随机数的数量
- 数字范围(最小值和最大值)
- 是否允许重复数字
3. 程序将生成并显示随机数列表
## 特性
- 交互式用户界面
- 可配置数字范围
- 重复数字控制选项
- 即时结果显示
## 应用场景
- 抽奖号码生成
- 随机抽样
- 测试数据生成
- 游戏数字生成

36
recode/README.md Normal file
View File

@ -0,0 +1,36 @@
# recode
文件编码转换器。
## 功能
检测并转换文本文件的字符编码格式支持UTF-8、GBK、GB2312等80多种编码格式之间的相互转换。
## 使用方法
```bash
python recode.py [选项] 文件名()
```
## 参数选项
- `-i` : 指定输入编码格式
- `-o` : 指定输出编码格式默认utf-8
- `-c` : 测试特定编码格式
- `-r` : 输出目录(默认:"out"
- `-s` : 显示所有支持的编码格式
## 支持的编码
支持80多种字符编码包括
- UTF-8, UTF-16, UTF-32
- GBK, GB2312, GB18030
- Big5, ASCII
- 各种ISO标准编码等
## 特性
- 自动编码检测
- 批量文件处理
- 广泛的编码支持
- 灵活的输出配置

33
seat_map/README.md Normal file
View File

@ -0,0 +1,33 @@
# seat_map
课堂座位表生成器。
## 功能
从二进制数据文件读取座位信息生成8x6的教室座位安排表并导出为CSV格式支持座位轮换逻辑。
## 使用方法
1. 准备座位数据文件 `seat.dat`
2. 运行 `python main.py`
3. 程序将生成 `seat.csv` 座位表文件
## 输出格式
- 8行6列的座位表
- CSV格式便于查看和打印
- 包含学生姓名信息
- 支持座位轮换算法
## 特性
- 自动座位分配
- 轮换逻辑支持
- CSV格式输出
- 预设学生名单48人
## 应用场景
- 课堂座位安排
- 考试座位分配
- 活动座位管理

31
task_assignment/README.md Normal file
View File

@ -0,0 +1,31 @@
# task_assignment
任务分配管理系统。
## 功能
基于Web的任务分配管理系统用户可以表达对任务的偏好系统自动根据可用性进行任务分配并处理冲突。
## 使用方法
1. 运行 `python app.py`
2. 在浏览器中访问显示的本地地址
3. 使用Web界面进行任务管理
- 配置任务列表
- 收集用户意向
- 查看自动分配结果
## 主要功能
- **任务配置**:设置可分配的任务列表
- **意向收集**:用户表达任务偏好
- **自动分配**:基于可用性和偏好的智能分配
- **冲突处理**:处理任务分配冲突
- **结果展示**:清晰显示分配结果
## 特性
- 友好的Web界面
- 自动化分配算法
- 冲突解决机制
- 实时结果显示

31
video/README.md Normal file
View File

@ -0,0 +1,31 @@
# video
视频剪切工具。
## 功能
使用FFmpeg进行简单的视频剪切根据用户指定的开始和结束时间提取视频片段支持无损剪切。
## 使用方法
1. 运行 `python cut.py <视频文件>`
2. 根据提示输入开始时间格式HH:MM:SS
3. 输入结束时间格式HH:MM:SS
4. 程序将生成剪切后的视频文件
## 特性
- 无损视频剪切
- 简单时间区间指定
- 快速处理速度
- 保持原始视频质量
## 依赖要求
需要在指定路径安装FFmpeg
`E:\green\ffmpeg\bin\ffmpeg.exe`
## 时间格式
输入时间使用标准格式:`小时:分钟:秒`
例如:`00:01:30`1分30秒

27
wjx/README.md Normal file
View File

@ -0,0 +1,27 @@
# wjx
问卷星数据抓取工具。
## 功能
自动登录问卷星(WJX.cn)平台获取调查问卷的回复数据支持会话管理和CSRF令牌处理。
## 使用方法
1.`main.py` 中配置登录凭据
2. 运行 `python main.py`
3. 程序将自动登录并提取问卷数据
## 特性
- 自动登录处理
- 会话管理
- CSRF令牌自动处理
- 调查数据提取
## 注意事项
- 需要有效的问卷星账户
- 请遵守平台使用条款
- 建议将凭据信息外部化配置
- 仅用于合法的数据获取

34
zxxk_dl/README.md Normal file
View File

@ -0,0 +1,34 @@
# zxxk_dl
学科网资源下载工具。
## 功能
下载学科网(ZXXK.com)的教育资源将内容转换为本地HTML文件支持处理单个文件和RAR压缩包。
## 使用方法
1. 运行 `python main.py`
2. 根据提示输入软件ID
3. 程序将自动下载并转换资源为HTML格式
4. 支持连续处理多个资源
## 特性
- JSON API交互
- 图片URL提取
- HTML格式化处理
- 自动文件命名(基于哈希值确保唯一性)
- 支持RAR压缩包处理
- 连续批量处理
## 输出格式
- 本地HTML文件
- 保持原始内容结构
- 便于离线查看和打印
## 注意事项
- 请遵守网站使用条款
- 仅用于合法的教育资源获取