merge
This commit is contained in:
1
ImageCompress/.gitignore
vendored
Normal file
1
ImageCompress/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
ffmpeg.7z
|
BIN
ImageCompress/compress/out.png
Normal file
BIN
ImageCompress/compress/out.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 181 KiB |
BIN
ImageCompress/out.png
Normal file
BIN
ImageCompress/out.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 18 KiB |
24
ImageCompress/tmp.py
Normal file
24
ImageCompress/tmp.py
Normal file
@ -0,0 +1,24 @@
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
# 定义初始数组和目标大小
|
||||
arr = np.random.rand(10) * 10 # 随机生成10个在0到10之间的数
|
||||
TARGET_SIZE = 50
|
||||
|
||||
# 计算数组的初始大小和每步的变化量
|
||||
current_size = np.sum(arr)
|
||||
steps = 100 # 逐步逼近的步数
|
||||
scaling_factor = (TARGET_SIZE - current_size) / steps
|
||||
|
||||
# 绘图初始化
|
||||
fig, ax = plt.subplots()
|
||||
line, = ax.plot(arr, marker='o')
|
||||
ax.set_ylim(0, max(arr) + 10)
|
||||
|
||||
# 逐步逼近目标
|
||||
for i in range(steps):
|
||||
arr += scaling_factor * (arr / np.sum(arr)) # 按比例调整数组元素
|
||||
line.set_ydata(arr) # 更新线条数据
|
||||
plt.title(f'Step {i+1}: Size = {np.sum(arr):.2f}')
|
||||
plt.pause(0.1) # 动态显示每一步
|
||||
plt.show()
|
214
VideoCompress/main_min.py
Normal file
214
VideoCompress/main_min.py
Normal file
@ -0,0 +1,214 @@
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
import sys
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from time import time
|
||||
import atexit
|
||||
|
||||
root = None
|
||||
CODEC=None
|
||||
|
||||
|
||||
# 配置logging
|
||||
def setup_logging():
|
||||
log_dir = Path("logs")
|
||||
log_dir.mkdir(exist_ok=True)
|
||||
log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log"
|
||||
stream = logging.StreamHandler()
|
||||
stream.setLevel(logging.DEBUG)
|
||||
stream.setFormatter(logging.Formatter("%(message)s"))
|
||||
|
||||
file = logging.FileHandler(log_file, encoding='utf-8')
|
||||
file.setLevel(logging.INFO)
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format='%(asctime)s - %(levelname) 7s - %(message)s',
|
||||
handlers=[
|
||||
file,
|
||||
stream
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def process_video(video_path: Path):
|
||||
global esti_data
|
||||
use=None
|
||||
sz=video_path.stat().st_size//(1024*1024)
|
||||
logging.debug(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M")
|
||||
|
||||
|
||||
bgn=time()
|
||||
compress_dir = video_path.parent / "compress"
|
||||
compress_dir.mkdir(exist_ok=True)
|
||||
|
||||
# 输出文件路径:与原文件同名,保存在 compress 目录下
|
||||
output_file = compress_dir / (video_path.stem + video_path.suffix)
|
||||
if output_file.is_file():
|
||||
logging.warning(f"文件{output_file}存在,跳过")
|
||||
return use
|
||||
|
||||
# 4x
|
||||
# command = [
|
||||
# "ffmpeg.exe", # 可以修改为 ffmpeg 的完整路径,例如:C:/ffmpeg/bin/ffmpeg.exe
|
||||
# "-hide_banner", # 隐藏 ffmpeg 的横幅信息
|
||||
# "-i", str(video_path.absolute()),
|
||||
# "-filter:v", "setpts=0.25*PTS", # 设置视频高度为 1080,宽度按比例自动计算
|
||||
# "-filter:a", "atempo=4.0",
|
||||
# "-c:v", "h264_qsv", # 使用 Intel Quick Sync Video 编码
|
||||
# "-global_quality", "28", # 设置全局质量(数值越低质量越高)
|
||||
# "-r","30",
|
||||
# "-preset", "fast", # 设置压缩速度为慢(压缩效果较好)
|
||||
# "-y",
|
||||
# str(output_file.absolute())
|
||||
# ]
|
||||
|
||||
# 1x
|
||||
if CODEC=="h264_amf":
|
||||
command = [
|
||||
"ffmpeg.exe",
|
||||
"-hide_banner", # 隐藏 ffmpeg 的横幅信息
|
||||
"-i", str(video_path.absolute()),
|
||||
"-vf", "scale=-1:1080", # 设置视频高度为 1080,宽度按比例自动计算
|
||||
"-c:v", CODEC, # 使用 Intel Quick Sync Video 编码
|
||||
"-global_quality", "28", # 设置全局质量(数值越低质量越高)
|
||||
"-c:a", "copy", # 音频不做处理,直接拷贝
|
||||
"-r","30",
|
||||
"-y",
|
||||
str(output_file)
|
||||
]
|
||||
else:
|
||||
command = [
|
||||
"ffmpeg.exe",
|
||||
"-hide_banner", # 隐藏 ffmpeg 的横幅信息
|
||||
"-i", str(video_path.absolute()),
|
||||
"-vf", "scale=-1:1080", # 设置视频高度为 1080,宽度按比例自动计算
|
||||
"-c:v", CODEC, # 使用 Intel Quick Sync Video 编码
|
||||
"-global_quality", "28", # 设置全局质量(数值越低质量越高)
|
||||
"-c:a", "copy", # 音频不做处理,直接拷贝
|
||||
"-r","30",
|
||||
"-preset", "slow",
|
||||
"-y",
|
||||
str(output_file)
|
||||
]
|
||||
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
command,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
encoding="utf-8",
|
||||
text=True
|
||||
)
|
||||
|
||||
if result.stderr:
|
||||
for line in result.stderr.splitlines():
|
||||
if 'warning' in line.lower():
|
||||
logging.warning(f"[FFmpeg]({video_path}): {line}")
|
||||
elif 'error' in line.lower():
|
||||
logging.error(f"[FFmpeg]({video_path}): {line}")
|
||||
|
||||
if result.returncode != 0:
|
||||
logging.error(f"处理文件 {video_path} 失败,返回码: {result.returncode},cmd={' '.join(command)}")
|
||||
logging.error(result.stdout)
|
||||
logging.error(result.stderr)
|
||||
else:
|
||||
logging.debug(f"文件处理成功: {video_path} -> {output_file}")
|
||||
|
||||
end=time()
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{video_path},cmd={' '.join(command)}",exc_info=e)
|
||||
return use
|
||||
|
||||
def traverse_directory(root_dir: Path):
|
||||
video_extensions = {".mp4", ".mkv"}
|
||||
sm=None
|
||||
|
||||
logging.debug(f"开始遍历目录: {root_dir}")
|
||||
|
||||
for file in root_dir.rglob("*"):
|
||||
if file.parent.name == "compress":continue
|
||||
if file.is_file() and file.suffix.lower() in video_extensions:
|
||||
t = process_video(file)
|
||||
|
||||
@atexit.register
|
||||
def exit_handler():
|
||||
subprocess.run("pause", shell=True)
|
||||
|
||||
def _test():
|
||||
try:
|
||||
subprocess.run(f"ffmpeg -f lavfi -i testsrc=size=1280x720:rate=30 -t 1 -y -c:v {CODEC} -pix_fmt yuv420p test.mp4",stdout=-3,stderr=-3).check_returncode()
|
||||
subprocess.run(f"ffmpeg -i test.mp4 -c:v {CODEC} -pix_fmt yuv420p -y test2.mp4",stdout=-3,stderr=-3).check_returncode()
|
||||
return True
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
finally:
|
||||
Path("test.mp4").unlink(True)
|
||||
Path("test2.mp4").unlink(True)
|
||||
|
||||
if __name__ == "__main__":
|
||||
setup_logging()
|
||||
tot_bgn = time()
|
||||
logging.info("-------------------------------")
|
||||
logging.info(datetime.now().strftime('Video Compress started at %Y/%m/%d %H:%M'))
|
||||
|
||||
try:
|
||||
subprocess.run("ffmpeg.exe -version",stdout=-3,stderr=-3).check_returncode()
|
||||
except subprocess.CalledProcessError:
|
||||
logging.critical("FFmpeg 未安装或不在系统 PATH 中。")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print(f"推荐用法:python {__file__} <目标目录>")
|
||||
root = Path(input("请输入目标目录:"))
|
||||
while not root.is_dir():
|
||||
root = Path(input("请输入目标目录:"))
|
||||
else:
|
||||
root = Path(sys.argv[1])
|
||||
if len(sys.argv) == 3:
|
||||
CODEC=sys.argv[2]
|
||||
logging.info(f"使用编码器因为cmd:{CODEC}")
|
||||
if not root.is_dir():
|
||||
print("提供的路径不是一个有效目录。")
|
||||
logging.warning("Error termination via invalid input.")
|
||||
sys.exit(1)
|
||||
|
||||
if CODEC is None:
|
||||
logging.info("检测可用编码器")
|
||||
try:
|
||||
ret = subprocess.run(["ffmpeg","-codecs"],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True)
|
||||
ret.check_returncode()
|
||||
avai = []
|
||||
if "cuda" in ret.stdout:avai.append("cuda")
|
||||
if "amf" in ret.stdout:avai.append("amf")
|
||||
if "qsv" in ret.stdout:avai.append("qsv")
|
||||
avai.append("h264")
|
||||
|
||||
for c in avai:
|
||||
CODEC = "h264_" + c
|
||||
if _test():
|
||||
break
|
||||
if not _test:
|
||||
logging.critical("没有可用的h264编码器。")
|
||||
exit(1)
|
||||
except Exception as e:
|
||||
logging.error("Error termination via unhandled error.",exc_info=e)
|
||||
finally:
|
||||
logging.info(f"使用编码器:{CODEC}")
|
||||
|
||||
try:
|
||||
traverse_directory(root)
|
||||
tot_end = time()
|
||||
logging.info(f"Elapsed time: {(tot_end-tot_bgn)}s")
|
||||
logging.info("Normal termination of Video Compress.")
|
||||
except KeyboardInterrupt:
|
||||
logging.warning("Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.")
|
||||
except Exception as e:
|
||||
logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e)
|
||||
|
||||
|
373
libseat/README.md
Normal file
373
libseat/README.md
Normal file
@ -0,0 +1,373 @@
|
||||
# 某校图书馆预约系统分析
|
||||
|
||||
登录界面如图
|
||||

|
||||
|
||||
输入任意账号、密码,正确填验证码,F12抓包。注意到访问`http://[URL]/rest/auth?answer=bmx8&captchaId=qxtd5hl5h8wz`,返回`{"status":"fail","code":"13","message":"登录失败: 用户名或密码不正确","data":null}`且无其他访问,确定为登录API。
|
||||
观察url参数,未发现账号密码;请求为GET不存在data。仔细观察请求体,注意到headers中有username和password,测试证明为账号密码。
|
||||

|
||||
显然账号密码加密,尝试直接重放请求失败,注意到headers中的`x-hmac-request-key`等,推测存在加密。
|
||||
|
||||
## headers中的`X-hmac-request-key`等参数
|
||||
|
||||
全局搜索关键字`x-hmac-request-key`,找到唯一代码
|
||||
```javascript
|
||||
{
|
||||
var t = "";
|
||||
t = "post" === e.method ? g("post") : g("get"),
|
||||
e.headers = c()({}, e.headers, {
|
||||
Authorization: sessionStorageProxy.getItem("token"),
|
||||
"X-request-id": t.id,
|
||||
"X-request-date": t.date,
|
||||
"X-hmac-request-key": t.requestKey
|
||||
})
|
||||
}
|
||||
```
|
||||
动态调试找到`g`的实现
|
||||
```
|
||||
function g(e) {
|
||||
var t = function() {
|
||||
for (var e = [], t = 0; t < 36; t++)
|
||||
e[t] = "0123456789abcdef".substr(Math.floor(16 * Math.random()), 1);
|
||||
return e[14] = "4",
|
||||
e[19] = "0123456789abcdef".substr(3 & e[19] | 8, 1),
|
||||
e[8] = e[13] = e[18] = e[23] = "-",
|
||||
e.join("")
|
||||
}()
|
||||
, n = (new Date).getTime()
|
||||
, r = "seat::" + t + "::" + n + "::" + e.toUpperCase()
|
||||
, o = p.a.decrypt(h.default.prototype.$NUMCODE);
|
||||
return {
|
||||
id: t,
|
||||
date: n,
|
||||
requestKey: m.HmacSHA256(r, o).toString()
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
注意到`o = p.a.decrypt(h.default.prototype.$NUMCODE);`未知,确定`h.default.prototype.$NUMCODE`是常量,且`p.a.decrypt`与时间无关,直接动态调试拿到解密结果`o=leos3cr3t`.
|
||||
|
||||
其他算法实现均未引用其他代码,转写为python
|
||||
```python
|
||||
def generate_uuid():
|
||||
hex_digits = '0123456789abcdef'
|
||||
e = [random.choice(hex_digits) for _ in range(36)]
|
||||
e[14] = '4'
|
||||
e[19] = hex_digits[(int(e[19], 16) & 0x3) | 0x8]
|
||||
for i in [8, 13, 18, 23]:
|
||||
e[i] = '-'
|
||||
return ''.join(e)
|
||||
|
||||
def g(e: str):
|
||||
uuid = generate_uuid()
|
||||
timestamp = int(time.time() * 1000)
|
||||
r = f"seat::{uuid}::{timestamp}::{e.upper()}"
|
||||
secret_key = b"leos3cr3t"
|
||||
hmac_obj = hmac.new(secret_key, r.encode('utf-8'), hashlib.sha256)
|
||||
request_key = hmac_obj.hexdigest()
|
||||
return {
|
||||
"id": uuid,
|
||||
"date": timestamp,
|
||||
"requestKey": request_key
|
||||
}
|
||||
```
|
||||
|
||||
至此,使用上述`g`动态生成hmac后,其他内容不变条件下成功重放请求,可以拿到登录token。
|
||||
|
||||
注意到账号密码均加密,尝试逆向。
|
||||
|
||||
## 账号密码加密
|
||||
|
||||
从`http://[URL]/rest/auth?answer=bmx8&captchaId=qxtd5hl5h8wz`请求的启动器中找到关键函数`handleLogin`
|
||||

|
||||
```javascript
|
||||
handleLogin: function() {
|
||||
var t = this;
|
||||
if ("" == this.form.username || "" == this.form.password)
|
||||
return this.$alert(this.$t("placeholder.accountInfo"), this.$t("common.tips"), {
|
||||
confirmButtonText: this.$t("common.confirm"),
|
||||
type: "warning",
|
||||
callback: function(t) {}
|
||||
});
|
||||
this.loginLoading = !0,
|
||||
Object(m.y)({
|
||||
username: d(this.form.username) + "_encrypt",
|
||||
password: d(this.form.password) + "_encrypt",
|
||||
answer: this.form.answer,
|
||||
captchaId: this.captchaId
|
||||
}).then(function(s) {
|
||||
"success" == s.data.status ? (sessionStorageProxy.setItem("loginType", "login"),
|
||||
sessionStorageProxy.setItem("token", s.data.data.token),
|
||||
t.loginLoading = !1,
|
||||
t.getUserInfoFunc()) : (t.$warning(s.data.message),
|
||||
t.refreshCode(),
|
||||
t.clearLoginForm(),
|
||||
t.loginLoading = !1)
|
||||
}).catch(function(s) {
|
||||
console.log("出错了:", s),
|
||||
t.$error(t.$t("common.networkError")),
|
||||
t.loginLoading = !1
|
||||
})
|
||||
},
|
||||
```
|
||||
注意到
|
||||
```javascript
|
||||
username: d(this.form.username) + "_encrypt",
|
||||
password: d(this.form.password) + "_encrypt",
|
||||
```
|
||||
动态调试找到`d`的实现
|
||||
```
|
||||
var ....
|
||||
, l = "server_date_time"
|
||||
, u = "client_date_time"
|
||||
, d = function(t) {
|
||||
var s = arguments.length > 1 && void 0 !== arguments[1] ? arguments[1] : l
|
||||
, e = arguments.length > 2 && void 0 !== arguments[2] ? arguments[2] : u
|
||||
, a = r.a.enc.Utf8.parse(e)
|
||||
, i = r.a.enc.Utf8.parse(s)
|
||||
, n = r.a.enc.Utf8.parse(t);
|
||||
return r.a.AES.encrypt(n, i, {
|
||||
iv: a,
|
||||
mode: r.a.mode.CBC,
|
||||
padding: r.a.pad.Pkcs7
|
||||
}).toString()
|
||||
}
|
||||
```
|
||||
显然`arguments.length`恒为1,故`s=l="server_date_time"`,`e=u="client_date_time"`,至此AES加密的参数均已知,python实现该AES加密后验证使用的AES为标准AES加密。
|
||||
故python实现账号密码的加密:
|
||||
```python
|
||||
def encrypt(t, s="server_date_time", e="client_date_time"):
|
||||
key = s.encode('utf-8')
|
||||
iv = e.encode('utf-8')
|
||||
data = t.encode('utf-8')
|
||||
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
ct_bytes = cipher.encrypt(pad(data, AES.block_size)) # Pkcs7 padding
|
||||
ct_base64 = b64encode(ct_bytes).decode('utf-8')
|
||||
return ct_base64+"_encrypt"
|
||||
```
|
||||
|
||||
关于验证码,`http://[URL]/auth/createCaptcha`返回验证码base64,提交时为URL参数,均未加密。
|
||||
至此,完成整个登录过程的逆向。
|
||||
|
||||
## 其他API
|
||||
|
||||
所有其他API请求headers均带有`X-hmac-request-key`,逻辑同上。URL参数中的token为登录API返回值。
|
||||
|
||||
## 完整demo
|
||||
|
||||
这是一个座位检查demo,查询某个区域是否有余座,如有则通过`serverchan_sdk`发生通知。
|
||||
需按照实际情况修改常量。
|
||||
为了识别验证码,调用了某验证码识别API,该平台为收费API,与本人无关,不对此负责,如有违规烦请告知。
|
||||
|
||||
Github账号出问题了只能直接贴代码了
|
||||
|
||||
```python
|
||||
import time
|
||||
import random
|
||||
import hmac
|
||||
import hashlib
|
||||
import requests
|
||||
from datetime import datetime
|
||||
import json
|
||||
from serverchan_sdk import sc_send;
|
||||
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad
|
||||
from base64 import b64encode
|
||||
|
||||
URL = "http://[your libseat url]"
|
||||
UID = "[uid]" # 图书馆账号
|
||||
PWD = "[password]" # 图书馆密码
|
||||
USERNAME = "[username]" # 验证码平台用户名
|
||||
PASSWORD = "[password]" # 验证码平台密码
|
||||
TOKEN = "[token]"
|
||||
|
||||
def encrypt(t, s="server_date_time", e="client_date_time"):
|
||||
key = s.encode('utf-8')
|
||||
iv = e.encode('utf-8')
|
||||
data = t.encode('utf-8')
|
||||
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
ct_bytes = cipher.encrypt(pad(data, AES.block_size)) # Pkcs7 padding
|
||||
ct_base64 = b64encode(ct_bytes).decode('utf-8')
|
||||
return ct_base64+"_encrypt"
|
||||
|
||||
def b64_api(username, password, b64, ID):
|
||||
data = {"username": username, "password": password, "ID": ID, "b64": b64, "version": "3.1.1"}
|
||||
data_json = json.dumps(data)
|
||||
result = json.loads(requests.post("http://www.fdyscloud.com.cn/tuling/predict", data=data_json).text)
|
||||
return result
|
||||
|
||||
def recapture():
|
||||
res = requests.get(URL+"/auth/createCaptcha")
|
||||
ret = res.json()
|
||||
im = ret["captchaImage"][21:]
|
||||
result = b64_api(username=USERNAME, password=PASSWORD, b64=im, ID="04897896")
|
||||
return ret["captchaId"],result["data"]["result"]
|
||||
|
||||
def login(username,password):
|
||||
captchaId, ans = recapture()
|
||||
url = URL+"/rest/auth"
|
||||
parm = {
|
||||
"answer": ans.lower(),
|
||||
"captchaId": captchaId,
|
||||
}
|
||||
headers = build_head("post", None)
|
||||
headers.update({
|
||||
"Username": encrypt(username),
|
||||
"Password": encrypt(password),
|
||||
"Logintype": "PC"
|
||||
})
|
||||
res = requests.post(url, headers=headers, params=parm)
|
||||
# print("Status:", res.status_code)
|
||||
ret = res.json()
|
||||
# print("Response:", ret)
|
||||
if ret["status"] == "fail":
|
||||
return None
|
||||
else:
|
||||
return ret["data"]["token"]
|
||||
|
||||
def generate_uuid():
|
||||
hex_digits = '0123456789abcdef'
|
||||
e = [random.choice(hex_digits) for _ in range(36)]
|
||||
e[14] = '4'
|
||||
e[19] = hex_digits[(int(e[19], 16) & 0x3) | 0x8]
|
||||
for i in [8, 13, 18, 23]:
|
||||
e[i] = '-'
|
||||
return ''.join(e)
|
||||
|
||||
def g(e: str):
|
||||
uuid = generate_uuid()
|
||||
timestamp = int(time.time() * 1000)
|
||||
r = f"seat::{uuid}::{timestamp}::{e.upper()}"
|
||||
secret_key = b"leos3cr3t"
|
||||
hmac_obj = hmac.new(secret_key, r.encode('utf-8'), hashlib.sha256)
|
||||
request_key = hmac_obj.hexdigest()
|
||||
return {
|
||||
"id": uuid,
|
||||
"date": timestamp,
|
||||
"requestKey": request_key
|
||||
}
|
||||
|
||||
def build_head(e: str,token:str):
|
||||
sig = g(e)
|
||||
headers = {
|
||||
"Authorization": token,
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
|
||||
"X-hmac-request-key": sig["requestKey"],
|
||||
"X-request-date": str(sig["date"]),
|
||||
"X-request-id": sig["id"]
|
||||
}
|
||||
if token is None:
|
||||
headers.pop("Authorization")
|
||||
return headers
|
||||
|
||||
def get_floor_data(token: str,buildingId= "1"):
|
||||
date = datetime.now().strftime("%Y-%m-%d") # 获取当前日期
|
||||
url = f"{URL}/rest/v2/room/stats2/{buildingId}/{date}"
|
||||
params = {
|
||||
"buildingId": buildingId,
|
||||
"date": date,
|
||||
"token": token
|
||||
}
|
||||
headers = build_head("get",token)
|
||||
|
||||
response = requests.get(url, headers=headers, params=params)
|
||||
|
||||
print("Status:", response.status_code)
|
||||
res = response.json()
|
||||
if res["status"] != "success":
|
||||
print("Error:", res)
|
||||
return -1
|
||||
else:
|
||||
ret = []
|
||||
for room in res["data"]:
|
||||
ret.append({"roomId": room["roomId"], "room": room["room"], "free": room["free"], "inUse": room["inUse"], "totalSeats": room["totalSeats"]})
|
||||
# print(f"Room ID: {room['roomId']}, Name: {room['room']}, free: {room['free']}, inUse: {room['inUse']}, total: {room['totalSeats']}")
|
||||
return ret
|
||||
|
||||
def get_room_data(token: str,id:str = "10"):
|
||||
date = datetime.now().strftime("%Y-%m-%d") # 获取当前日期
|
||||
# 替换为目标接口地址
|
||||
print(date)
|
||||
url = f"{URL}/rest/v2/room/layoutByDate/{id}/{date}"
|
||||
params = {
|
||||
"id": id,
|
||||
"date": date,
|
||||
"token": token
|
||||
}
|
||||
|
||||
sig = g("get")
|
||||
headers = {
|
||||
"Authorization": token,
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
|
||||
"X-hmac-request-key": sig["requestKey"],
|
||||
"X-request-date": str(sig["date"]),
|
||||
"X-request-id": sig["id"]
|
||||
}
|
||||
|
||||
response = requests.get(url, headers=headers, params=params)
|
||||
|
||||
print("Status:", response.status_code)
|
||||
ret = response.json()
|
||||
if ret["status"] != "success":
|
||||
print("Error:", ret)
|
||||
return -1
|
||||
else:
|
||||
print("Data:", ret["data"]["name"])
|
||||
print("Response:", )
|
||||
|
||||
def send_message(msg):
|
||||
print(sc_send(TOKEN, "图书馆座位", msg))
|
||||
|
||||
def main(dep=0):
|
||||
if dep>3:
|
||||
print("无法获取数据!")
|
||||
send_message("无法获取数据!")
|
||||
return
|
||||
years = [2021, 2022, 2023, 2024]
|
||||
house = []
|
||||
classes = [1,2,3]
|
||||
num = range(1,21)
|
||||
token = None
|
||||
try:
|
||||
print("正在尝试登录...")
|
||||
tried =0
|
||||
while token is None and tried <= 5:
|
||||
id = UID
|
||||
# id = str(random.choice(years)) + random.choice(house) + str(random.choice(classes)) + str(random.choice(num)).zfill(2)
|
||||
token = login(id,PWD)
|
||||
if token is None:
|
||||
print("登陆失败:",token)
|
||||
time.sleep(random.randint(10, 30))
|
||||
tried += 1
|
||||
if token is None:
|
||||
print("登录失败,请检查账号密码或网络连接。")
|
||||
main(dep+1)
|
||||
return
|
||||
print("登录成功,Token:", token)
|
||||
data = get_floor_data(token, "1") # 获取一楼数据
|
||||
if data == -1:
|
||||
print("获取数据失败,请检查网络连接或Token是否有效。")
|
||||
main(dep+1)
|
||||
else:
|
||||
ret = []
|
||||
for room in data:
|
||||
if room["free"] > 0:
|
||||
ret.append(room)
|
||||
if ret:
|
||||
msg = "|区域|空座|占用率|\n|-|-|-|\n"
|
||||
for room in ret:
|
||||
msg += f"|{room['room']}|{room['free']}|{1-room['free']/room['totalSeats']:0.2f}|\n"
|
||||
send_message(msg)
|
||||
except Exception as e:
|
||||
print("发生错误:", e)
|
||||
main(dep+1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
```
|
||||
|
BIN
libseat/image-1.png
Normal file
BIN
libseat/image-1.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 71 KiB |
BIN
libseat/image-2.png
Normal file
BIN
libseat/image-2.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 66 KiB |
BIN
libseat/image.png
Normal file
BIN
libseat/image.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 426 KiB |
116
libseat/inter.py
Normal file
116
libseat/inter.py
Normal file
@ -0,0 +1,116 @@
|
||||
# app.py ── 运行:python app.py
|
||||
import re, threading, datetime as dt
|
||||
from flask import Flask, render_template_string, request, redirect, url_for, flash
|
||||
|
||||
app = Flask(__name__)
|
||||
app.secret_key = "secret"
|
||||
|
||||
# ---------- ❶ 你要周期执行的业务函数 ----------
|
||||
def my_task():
|
||||
print(f"[{dt.datetime.now():%F %T}] 🎉 my_task 被调用")
|
||||
|
||||
# ---------- ❷ 将 “1h / 30min / 45s” 解析为秒 ----------
|
||||
def parse_interval(expr: str) -> int:
|
||||
m = re.match(r"^\s*(\d+)\s*(h|hr|hour|m|min|minute|s|sec)\s*$", expr, re.I)
|
||||
if not m:
|
||||
raise ValueError("格式不正确——示例: 1h、30min、45s")
|
||||
n, unit = int(m[1]), m[2].lower()
|
||||
return n * 3600 if unit.startswith("h") else n * 60 if unit.startswith("m") else n
|
||||
|
||||
# ---------- ❸ 简易调度器 (Thread + Event) ----------
|
||||
class LoopWorker(threading.Thread):
|
||||
def __init__(self, interval_s: int, fn):
|
||||
super().__init__(daemon=True)
|
||||
self.interval_s = interval_s
|
||||
self.fn = fn
|
||||
self._stop = threading.Event()
|
||||
|
||||
def cancel(self):
|
||||
self._stop.set()
|
||||
|
||||
def run(self):
|
||||
while not self._stop.wait(self.interval_s):
|
||||
try:
|
||||
self.fn()
|
||||
except Exception as e:
|
||||
print("任务执行出错:", e)
|
||||
|
||||
worker: LoopWorker | None = None # 全局保存当前线程
|
||||
current_expr: str = "" # 全局保存当前表达式
|
||||
|
||||
# ---------- ❹ Tailwind + Alpine 渲染 ----------
|
||||
PAGE = """
|
||||
<!doctype html>
|
||||
<html lang="zh">
|
||||
<head>
|
||||
<meta charset="UTF-8" />
|
||||
<title>周期设置</title>
|
||||
<script src="https://cdn.tailwindcss.com/3.4.0"></script>
|
||||
</head>
|
||||
<body class="bg-gray-100 min-h-screen flex items-center justify-center">
|
||||
<div class="w-full max-w-md p-8 bg-white rounded-2xl shadow-xl space-y-6"> <header class="flex justify-between items-center">
|
||||
<h1 class="text-2xl font-semibold text-gray-800">周期设置</h1>
|
||||
</header> {% with msgs = get_flashed_messages(with_categories=true) %}
|
||||
{% if msgs %}
|
||||
{% for cat, msg in msgs %}
|
||||
<div class="px-4 py-2 rounded-lg text-sm {{ 'bg-green-100 text-green-800' if cat=='success' else 'bg-red-100 text-red-800' }}">
|
||||
<span>{{ msg }}</span>
|
||||
</div>
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
{% endwith %}
|
||||
|
||||
<form method="post" class="space-y-4">
|
||||
<input name="interval" required placeholder="例:1h / 30min / 45s"
|
||||
class="w-full px-4 py-2 rounded-lg border border-gray-300 focus:ring-2
|
||||
focus:ring-indigo-400 focus:outline-none"/>
|
||||
<button class="w-full py-2 rounded-lg bg-indigo-600 hover:bg-indigo-500 text-white font-medium">
|
||||
设置周期
|
||||
</button>
|
||||
</form>
|
||||
|
||||
<p class="text-xs text-gray-500">
|
||||
当前任务:<br>
|
||||
{% if worker %}
|
||||
每 {{current_expr}} 触发一次(刷新页面查看最新状态)
|
||||
{% else %}
|
||||
未设置
|
||||
{% endif %}
|
||||
</p>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
@app.route("/", methods=["GET", "POST"])
|
||||
def index():
|
||||
global worker, current_expr
|
||||
if request.method == "POST":
|
||||
expr = request.form["interval"].strip()
|
||||
try:
|
||||
if expr == "0":
|
||||
# 如果输入为 "0",则取消当前任务
|
||||
if worker:
|
||||
worker.cancel()
|
||||
worker = None
|
||||
current_expr = ""
|
||||
flash("✅ 已取消定时任务", "success")
|
||||
return redirect(url_for("index"))
|
||||
sec = parse_interval(expr)
|
||||
# 取消旧线程
|
||||
if worker:
|
||||
worker.cancel()
|
||||
# 启动新线程
|
||||
worker = LoopWorker(sec, my_task)
|
||||
worker.start()
|
||||
current_expr = expr # 保存表达式
|
||||
print(expr)
|
||||
flash(f"✅ 已设置:每 {expr} 运行一次 my_task()", "success")
|
||||
except Exception as e:
|
||||
flash(f"❌ {e}", "error")
|
||||
return redirect(url_for("index"))
|
||||
|
||||
return render_template_string(PAGE, worker=worker, current_expr=current_expr)
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(debug=True)
|
205
libseat/libseat.py
Normal file
205
libseat/libseat.py
Normal file
@ -0,0 +1,205 @@
|
||||
import time
|
||||
import random
|
||||
import hmac
|
||||
import hashlib
|
||||
import requests
|
||||
from datetime import datetime
|
||||
import json
|
||||
from serverchan_sdk import sc_send;
|
||||
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad
|
||||
from base64 import b64encode
|
||||
|
||||
URL = "http://[your libseat url]"
|
||||
UID = "[uid]" # 图书馆账号
|
||||
PWD = "[password]" # 图书馆密码
|
||||
USERNAME = "[username]" # 验证码平台用户名
|
||||
PASSWORD = "[password]" # 验证码平台密码
|
||||
TOKEN = "[token]"
|
||||
|
||||
def encrypt(t, s="server_date_time", e="client_date_time"):
|
||||
key = s.encode('utf-8')
|
||||
iv = e.encode('utf-8')
|
||||
data = t.encode('utf-8')
|
||||
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
ct_bytes = cipher.encrypt(pad(data, AES.block_size)) # Pkcs7 padding
|
||||
ct_base64 = b64encode(ct_bytes).decode('utf-8')
|
||||
return ct_base64+"_encrypt"
|
||||
|
||||
def b64_api(username, password, b64, ID):
|
||||
data = {"username": username, "password": password, "ID": ID, "b64": b64, "version": "3.1.1"}
|
||||
data_json = json.dumps(data)
|
||||
result = json.loads(requests.post("http://www.fdyscloud.com.cn/tuling/predict", data=data_json).text)
|
||||
return result
|
||||
|
||||
def recapture():
|
||||
res = requests.get(URL+"/auth/createCaptcha")
|
||||
ret = res.json()
|
||||
im = ret["captchaImage"][21:]
|
||||
result = b64_api(username=USERNAME, password=PASSWORD, b64=im, ID="04897896")
|
||||
return ret["captchaId"],result["data"]["result"]
|
||||
|
||||
def login(username,password):
|
||||
captchaId, ans = recapture()
|
||||
url = URL+"/rest/auth"
|
||||
parm = {
|
||||
"answer": ans.lower(),
|
||||
"captchaId": captchaId,
|
||||
}
|
||||
headers = build_head("post", None)
|
||||
headers.update({
|
||||
"Username": encrypt(username),
|
||||
"Password": encrypt(password),
|
||||
"Logintype": "PC"
|
||||
})
|
||||
res = requests.post(url, headers=headers, params=parm)
|
||||
# print("Status:", res.status_code)
|
||||
ret = res.json()
|
||||
# print("Response:", ret)
|
||||
if ret["status"] == "fail":
|
||||
return None
|
||||
else:
|
||||
return ret["data"]["token"]
|
||||
|
||||
def generate_uuid():
|
||||
hex_digits = '0123456789abcdef'
|
||||
e = [random.choice(hex_digits) for _ in range(36)]
|
||||
e[14] = '4'
|
||||
e[19] = hex_digits[(int(e[19], 16) & 0x3) | 0x8]
|
||||
for i in [8, 13, 18, 23]:
|
||||
e[i] = '-'
|
||||
return ''.join(e)
|
||||
|
||||
def g(e: str):
|
||||
uuid = generate_uuid()
|
||||
timestamp = int(time.time() * 1000)
|
||||
r = f"seat::{uuid}::{timestamp}::{e.upper()}"
|
||||
secret_key = b"leos3cr3t"
|
||||
hmac_obj = hmac.new(secret_key, r.encode('utf-8'), hashlib.sha256)
|
||||
request_key = hmac_obj.hexdigest()
|
||||
return {
|
||||
"id": uuid,
|
||||
"date": timestamp,
|
||||
"requestKey": request_key
|
||||
}
|
||||
|
||||
def build_head(e: str,token:str):
|
||||
sig = g(e)
|
||||
headers = {
|
||||
"Authorization": token,
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
|
||||
"X-hmac-request-key": sig["requestKey"],
|
||||
"X-request-date": str(sig["date"]),
|
||||
"X-request-id": sig["id"]
|
||||
}
|
||||
if token is None:
|
||||
headers.pop("Authorization")
|
||||
return headers
|
||||
|
||||
def get_floor_data(token: str,buildingId= "1"):
|
||||
date = datetime.now().strftime("%Y-%m-%d") # 获取当前日期
|
||||
url = f"{URL}/rest/v2/room/stats2/{buildingId}/{date}"
|
||||
params = {
|
||||
"buildingId": buildingId,
|
||||
"date": date,
|
||||
"token": token
|
||||
}
|
||||
headers = build_head("get",token)
|
||||
|
||||
response = requests.get(url, headers=headers, params=params)
|
||||
|
||||
print("Status:", response.status_code)
|
||||
res = response.json()
|
||||
if res["status"] != "success":
|
||||
print("Error:", res)
|
||||
return -1
|
||||
else:
|
||||
ret = []
|
||||
for room in res["data"]:
|
||||
ret.append({"roomId": room["roomId"], "room": room["room"], "free": room["free"], "inUse": room["inUse"], "totalSeats": room["totalSeats"]})
|
||||
# print(f"Room ID: {room['roomId']}, Name: {room['room']}, free: {room['free']}, inUse: {room['inUse']}, total: {room['totalSeats']}")
|
||||
return ret
|
||||
|
||||
def get_room_data(token: str,id:str = "10"):
|
||||
date = datetime.now().strftime("%Y-%m-%d") # 获取当前日期
|
||||
# 替换为目标接口地址
|
||||
print(date)
|
||||
url = f"{URL}/rest/v2/room/layoutByDate/{id}/{date}"
|
||||
params = {
|
||||
"id": id,
|
||||
"date": date,
|
||||
"token": token
|
||||
}
|
||||
|
||||
sig = g("get")
|
||||
headers = {
|
||||
"Authorization": token,
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
|
||||
"X-hmac-request-key": sig["requestKey"],
|
||||
"X-request-date": str(sig["date"]),
|
||||
"X-request-id": sig["id"]
|
||||
}
|
||||
|
||||
response = requests.get(url, headers=headers, params=params)
|
||||
|
||||
print("Status:", response.status_code)
|
||||
ret = response.json()
|
||||
if ret["status"] != "success":
|
||||
print("Error:", ret)
|
||||
return -1
|
||||
else:
|
||||
print("Data:", ret["data"]["name"])
|
||||
print("Response:", )
|
||||
|
||||
def send_message(msg):
|
||||
print(sc_send(TOKEN, "图书馆座位", msg))
|
||||
|
||||
def main(dep=0):
|
||||
if dep>3:
|
||||
print("无法获取数据!")
|
||||
send_message("无法获取数据!")
|
||||
return
|
||||
years = [2021, 2022, 2023, 2024]
|
||||
house = []
|
||||
classes = [1,2,3]
|
||||
num = range(1,21)
|
||||
token = None
|
||||
try:
|
||||
print("正在尝试登录...")
|
||||
tried =0
|
||||
while token is None and tried <= 5:
|
||||
id = UID
|
||||
# id = str(random.choice(years)) + random.choice(house) + str(random.choice(classes)) + str(random.choice(num)).zfill(2)
|
||||
token = login(id,PWD)
|
||||
if token is None:
|
||||
print("登陆失败:",token)
|
||||
time.sleep(random.randint(10, 30))
|
||||
tried += 1
|
||||
if token is None:
|
||||
print("登录失败,请检查账号密码或网络连接。")
|
||||
main(dep+1)
|
||||
return
|
||||
print("登录成功,Token:", token)
|
||||
data = get_floor_data(token, "1") # 获取一楼数据
|
||||
if data == -1:
|
||||
print("获取数据失败,请检查网络连接或Token是否有效。")
|
||||
main(dep+1)
|
||||
else:
|
||||
ret = []
|
||||
for room in data:
|
||||
if room["free"] > 0:
|
||||
ret.append(room)
|
||||
if ret:
|
||||
msg = "|区域|空座|占用率|\n|-|-|-|\n"
|
||||
for room in ret:
|
||||
msg += f"|{room['room']}|{room['free']}|{1-room['free']/room['totalSeats']:0.2f}|\n"
|
||||
send_message(msg)
|
||||
except Exception as e:
|
||||
print("发生错误:", e)
|
||||
main(dep+1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user