diff --git a/ImageCompress/.gitignore b/ImageCompress/.gitignore new file mode 100644 index 0000000..409ab9c --- /dev/null +++ b/ImageCompress/.gitignore @@ -0,0 +1 @@ +ffmpeg.7z \ No newline at end of file diff --git a/ImageCompress/compress/out.png b/ImageCompress/compress/out.png new file mode 100644 index 0000000..9a5a18f Binary files /dev/null and b/ImageCompress/compress/out.png differ diff --git a/ImageCompress/out.png b/ImageCompress/out.png new file mode 100644 index 0000000..1d39686 Binary files /dev/null and b/ImageCompress/out.png differ diff --git a/ImageCompress/tmp.py b/ImageCompress/tmp.py new file mode 100644 index 0000000..31bd35c --- /dev/null +++ b/ImageCompress/tmp.py @@ -0,0 +1,24 @@ +import numpy as np +import matplotlib.pyplot as plt + +# 定义初始数组和目标大小 +arr = np.random.rand(10) * 10 # 随机生成10个在0到10之间的数 +TARGET_SIZE = 50 + +# 计算数组的初始大小和每步的变化量 +current_size = np.sum(arr) +steps = 100 # 逐步逼近的步数 +scaling_factor = (TARGET_SIZE - current_size) / steps + +# 绘图初始化 +fig, ax = plt.subplots() +line, = ax.plot(arr, marker='o') +ax.set_ylim(0, max(arr) + 10) + +# 逐步逼近目标 +for i in range(steps): + arr += scaling_factor * (arr / np.sum(arr)) # 按比例调整数组元素 + line.set_ydata(arr) # 更新线条数据 + plt.title(f'Step {i+1}: Size = {np.sum(arr):.2f}') + plt.pause(0.1) # 动态显示每一步 +plt.show() diff --git a/VideoCompress/main_min.py b/VideoCompress/main_min.py new file mode 100644 index 0000000..2d7fd0b --- /dev/null +++ b/VideoCompress/main_min.py @@ -0,0 +1,214 @@ +import subprocess +from pathlib import Path +import sys +import logging +from datetime import datetime +from time import time +import atexit + +root = None +CODEC=None + + +# 配置logging +def setup_logging(): + log_dir = Path("logs") + log_dir.mkdir(exist_ok=True) + log_file = log_dir / f"video_compress_{datetime.now().strftime('%Y%m%d')}.log" + stream = logging.StreamHandler() + stream.setLevel(logging.DEBUG) + stream.setFormatter(logging.Formatter("%(message)s")) + + file = logging.FileHandler(log_file, encoding='utf-8') + file.setLevel(logging.INFO) + + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(levelname) 7s - %(message)s', + handlers=[ + file, + stream + ] + ) + + +def process_video(video_path: Path): + global esti_data + use=None + sz=video_path.stat().st_size//(1024*1024) + logging.debug(f"开始处理文件: {video_path.relative_to(root)},大小{sz}M") + + + bgn=time() + compress_dir = video_path.parent / "compress" + compress_dir.mkdir(exist_ok=True) + + # 输出文件路径:与原文件同名,保存在 compress 目录下 + output_file = compress_dir / (video_path.stem + video_path.suffix) + if output_file.is_file(): + logging.warning(f"文件{output_file}存在,跳过") + return use + + # 4x + # command = [ + # "ffmpeg.exe", # 可以修改为 ffmpeg 的完整路径,例如:C:/ffmpeg/bin/ffmpeg.exe + # "-hide_banner", # 隐藏 ffmpeg 的横幅信息 + # "-i", str(video_path.absolute()), + # "-filter:v", "setpts=0.25*PTS", # 设置视频高度为 1080,宽度按比例自动计算 + # "-filter:a", "atempo=4.0", + # "-c:v", "h264_qsv", # 使用 Intel Quick Sync Video 编码 + # "-global_quality", "28", # 设置全局质量(数值越低质量越高) + # "-r","30", + # "-preset", "fast", # 设置压缩速度为慢(压缩效果较好) + # "-y", + # str(output_file.absolute()) + # ] + + # 1x + if CODEC=="h264_amf": + command = [ + "ffmpeg.exe", + "-hide_banner", # 隐藏 ffmpeg 的横幅信息 + "-i", str(video_path.absolute()), + "-vf", "scale=-1:1080", # 设置视频高度为 1080,宽度按比例自动计算 + "-c:v", CODEC, # 使用 Intel Quick Sync Video 编码 + "-global_quality", "28", # 设置全局质量(数值越低质量越高) + "-c:a", "copy", # 音频不做处理,直接拷贝 + "-r","30", + "-y", + str(output_file) + ] + else: + command = [ + "ffmpeg.exe", + "-hide_banner", # 隐藏 ffmpeg 的横幅信息 + "-i", str(video_path.absolute()), + "-vf", "scale=-1:1080", # 设置视频高度为 1080,宽度按比例自动计算 + "-c:v", CODEC, # 使用 Intel Quick Sync Video 编码 + "-global_quality", "28", # 设置全局质量(数值越低质量越高) + "-c:a", "copy", # 音频不做处理,直接拷贝 + "-r","30", + "-preset", "slow", + "-y", + str(output_file) + ] + + + try: + result = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + encoding="utf-8", + text=True + ) + + if result.stderr: + for line in result.stderr.splitlines(): + if 'warning' in line.lower(): + logging.warning(f"[FFmpeg]({video_path}): {line}") + elif 'error' in line.lower(): + logging.error(f"[FFmpeg]({video_path}): {line}") + + if result.returncode != 0: + logging.error(f"处理文件 {video_path} 失败,返回码: {result.returncode},cmd={' '.join(command)}") + logging.error(result.stdout) + logging.error(result.stderr) + else: + logging.debug(f"文件处理成功: {video_path} -> {output_file}") + + end=time() + + + except Exception as e: + logging.error(f"执行 ffmpeg 命令时发生异常, 文件:{video_path},cmd={' '.join(command)}",exc_info=e) + return use + +def traverse_directory(root_dir: Path): + video_extensions = {".mp4", ".mkv"} + sm=None + + logging.debug(f"开始遍历目录: {root_dir}") + + for file in root_dir.rglob("*"): + if file.parent.name == "compress":continue + if file.is_file() and file.suffix.lower() in video_extensions: + t = process_video(file) + +@atexit.register +def exit_handler(): + subprocess.run("pause", shell=True) + +def _test(): + try: + subprocess.run(f"ffmpeg -f lavfi -i testsrc=size=1280x720:rate=30 -t 1 -y -c:v {CODEC} -pix_fmt yuv420p test.mp4",stdout=-3,stderr=-3).check_returncode() + subprocess.run(f"ffmpeg -i test.mp4 -c:v {CODEC} -pix_fmt yuv420p -y test2.mp4",stdout=-3,stderr=-3).check_returncode() + return True + except subprocess.CalledProcessError: + return False + finally: + Path("test.mp4").unlink(True) + Path("test2.mp4").unlink(True) + +if __name__ == "__main__": + setup_logging() + tot_bgn = time() + logging.info("-------------------------------") + logging.info(datetime.now().strftime('Video Compress started at %Y/%m/%d %H:%M')) + + try: + subprocess.run("ffmpeg.exe -version",stdout=-3,stderr=-3).check_returncode() + except subprocess.CalledProcessError: + logging.critical("FFmpeg 未安装或不在系统 PATH 中。") + sys.exit(1) + + + if len(sys.argv) < 2: + print(f"推荐用法:python {__file__} <目标目录>") + root = Path(input("请输入目标目录:")) + while not root.is_dir(): + root = Path(input("请输入目标目录:")) + else: + root = Path(sys.argv[1]) + if len(sys.argv) == 3: + CODEC=sys.argv[2] + logging.info(f"使用编码器因为cmd:{CODEC}") + if not root.is_dir(): + print("提供的路径不是一个有效目录。") + logging.warning("Error termination via invalid input.") + sys.exit(1) + + if CODEC is None: + logging.info("检测可用编码器") + try: + ret = subprocess.run(["ffmpeg","-codecs"],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True) + ret.check_returncode() + avai = [] + if "cuda" in ret.stdout:avai.append("cuda") + if "amf" in ret.stdout:avai.append("amf") + if "qsv" in ret.stdout:avai.append("qsv") + avai.append("h264") + + for c in avai: + CODEC = "h264_" + c + if _test(): + break + if not _test: + logging.critical("没有可用的h264编码器。") + exit(1) + except Exception as e: + logging.error("Error termination via unhandled error.",exc_info=e) + finally: + logging.info(f"使用编码器:{CODEC}") + + try: + traverse_directory(root) + tot_end = time() + logging.info(f"Elapsed time: {(tot_end-tot_bgn)}s") + logging.info("Normal termination of Video Compress.") + except KeyboardInterrupt: + logging.warning("Error termination via keyboard interrupt, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.") + except Exception as e: + logging.error("Error termination via unhandled error, CHECK IF LAST PROCSSING VIDEO IS COMPLETED.",exc_info=e) + + diff --git a/libseat/README.md b/libseat/README.md new file mode 100644 index 0000000..647d07f --- /dev/null +++ b/libseat/README.md @@ -0,0 +1,373 @@ +# 某校图书馆预约系统分析 + +登录界面如图 + + +输入任意账号、密码,正确填验证码,F12抓包。注意到访问`http://[URL]/rest/auth?answer=bmx8&captchaId=qxtd5hl5h8wz`,返回`{"status":"fail","code":"13","message":"登录失败: 用户名或密码不正确","data":null}`且无其他访问,确定为登录API。 +观察url参数,未发现账号密码;请求为GET不存在data。仔细观察请求体,注意到headers中有username和password,测试证明为账号密码。 + +显然账号密码加密,尝试直接重放请求失败,注意到headers中的`x-hmac-request-key`等,推测存在加密。 + +## headers中的`X-hmac-request-key`等参数 + +全局搜索关键字`x-hmac-request-key`,找到唯一代码 +```javascript +{ + var t = ""; + t = "post" === e.method ? g("post") : g("get"), + e.headers = c()({}, e.headers, { + Authorization: sessionStorageProxy.getItem("token"), + "X-request-id": t.id, + "X-request-date": t.date, + "X-hmac-request-key": t.requestKey + }) +} +``` +动态调试找到`g`的实现 +``` +function g(e) { + var t = function() { + for (var e = [], t = 0; t < 36; t++) + e[t] = "0123456789abcdef".substr(Math.floor(16 * Math.random()), 1); + return e[14] = "4", + e[19] = "0123456789abcdef".substr(3 & e[19] | 8, 1), + e[8] = e[13] = e[18] = e[23] = "-", + e.join("") + }() + , n = (new Date).getTime() + , r = "seat::" + t + "::" + n + "::" + e.toUpperCase() + , o = p.a.decrypt(h.default.prototype.$NUMCODE); + return { + id: t, + date: n, + requestKey: m.HmacSHA256(r, o).toString() + } +} +``` + +注意到`o = p.a.decrypt(h.default.prototype.$NUMCODE);`未知,确定`h.default.prototype.$NUMCODE`是常量,且`p.a.decrypt`与时间无关,直接动态调试拿到解密结果`o=leos3cr3t`. + +其他算法实现均未引用其他代码,转写为python +```python +def generate_uuid(): + hex_digits = '0123456789abcdef' + e = [random.choice(hex_digits) for _ in range(36)] + e[14] = '4' + e[19] = hex_digits[(int(e[19], 16) & 0x3) | 0x8] + for i in [8, 13, 18, 23]: + e[i] = '-' + return ''.join(e) + +def g(e: str): + uuid = generate_uuid() + timestamp = int(time.time() * 1000) + r = f"seat::{uuid}::{timestamp}::{e.upper()}" + secret_key = b"leos3cr3t" + hmac_obj = hmac.new(secret_key, r.encode('utf-8'), hashlib.sha256) + request_key = hmac_obj.hexdigest() + return { + "id": uuid, + "date": timestamp, + "requestKey": request_key + } +``` + +至此,使用上述`g`动态生成hmac后,其他内容不变条件下成功重放请求,可以拿到登录token。 + +注意到账号密码均加密,尝试逆向。 + +## 账号密码加密 + +从`http://[URL]/rest/auth?answer=bmx8&captchaId=qxtd5hl5h8wz`请求的启动器中找到关键函数`handleLogin` + +```javascript +handleLogin: function() { + var t = this; + if ("" == this.form.username || "" == this.form.password) + return this.$alert(this.$t("placeholder.accountInfo"), this.$t("common.tips"), { + confirmButtonText: this.$t("common.confirm"), + type: "warning", + callback: function(t) {} + }); + this.loginLoading = !0, + Object(m.y)({ + username: d(this.form.username) + "_encrypt", + password: d(this.form.password) + "_encrypt", + answer: this.form.answer, + captchaId: this.captchaId + }).then(function(s) { + "success" == s.data.status ? (sessionStorageProxy.setItem("loginType", "login"), + sessionStorageProxy.setItem("token", s.data.data.token), + t.loginLoading = !1, + t.getUserInfoFunc()) : (t.$warning(s.data.message), + t.refreshCode(), + t.clearLoginForm(), + t.loginLoading = !1) + }).catch(function(s) { + console.log("出错了:", s), + t.$error(t.$t("common.networkError")), + t.loginLoading = !1 + }) +}, +``` +注意到 +```javascript +username: d(this.form.username) + "_encrypt", +password: d(this.form.password) + "_encrypt", +``` +动态调试找到`d`的实现 +``` +var .... + , l = "server_date_time" + , u = "client_date_time" + , d = function(t) { + var s = arguments.length > 1 && void 0 !== arguments[1] ? arguments[1] : l + , e = arguments.length > 2 && void 0 !== arguments[2] ? arguments[2] : u + , a = r.a.enc.Utf8.parse(e) + , i = r.a.enc.Utf8.parse(s) + , n = r.a.enc.Utf8.parse(t); + return r.a.AES.encrypt(n, i, { + iv: a, + mode: r.a.mode.CBC, + padding: r.a.pad.Pkcs7 + }).toString() +} +``` +显然`arguments.length`恒为1,故`s=l="server_date_time"`,`e=u="client_date_time"`,至此AES加密的参数均已知,python实现该AES加密后验证使用的AES为标准AES加密。 +故python实现账号密码的加密: +```python +def encrypt(t, s="server_date_time", e="client_date_time"): + key = s.encode('utf-8') + iv = e.encode('utf-8') + data = t.encode('utf-8') + + cipher = AES.new(key, AES.MODE_CBC, iv) + ct_bytes = cipher.encrypt(pad(data, AES.block_size)) # Pkcs7 padding + ct_base64 = b64encode(ct_bytes).decode('utf-8') + return ct_base64+"_encrypt" +``` + +关于验证码,`http://[URL]/auth/createCaptcha`返回验证码base64,提交时为URL参数,均未加密。 +至此,完成整个登录过程的逆向。 + +## 其他API + +所有其他API请求headers均带有`X-hmac-request-key`,逻辑同上。URL参数中的token为登录API返回值。 + +## 完整demo + +这是一个座位检查demo,查询某个区域是否有余座,如有则通过`serverchan_sdk`发生通知。 +需按照实际情况修改常量。 +为了识别验证码,调用了某验证码识别API,该平台为收费API,与本人无关,不对此负责,如有违规烦请告知。 + +Github账号出问题了只能直接贴代码了 + +```python +import time +import random +import hmac +import hashlib +import requests +from datetime import datetime +import json +from serverchan_sdk import sc_send; + + +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad +from base64 import b64encode + +URL = "http://[your libseat url]" +UID = "[uid]" # 图书馆账号 +PWD = "[password]" # 图书馆密码 +USERNAME = "[username]" # 验证码平台用户名 +PASSWORD = "[password]" # 验证码平台密码 +TOKEN = "[token]" + +def encrypt(t, s="server_date_time", e="client_date_time"): + key = s.encode('utf-8') + iv = e.encode('utf-8') + data = t.encode('utf-8') + + cipher = AES.new(key, AES.MODE_CBC, iv) + ct_bytes = cipher.encrypt(pad(data, AES.block_size)) # Pkcs7 padding + ct_base64 = b64encode(ct_bytes).decode('utf-8') + return ct_base64+"_encrypt" + +def b64_api(username, password, b64, ID): + data = {"username": username, "password": password, "ID": ID, "b64": b64, "version": "3.1.1"} + data_json = json.dumps(data) + result = json.loads(requests.post("http://www.fdyscloud.com.cn/tuling/predict", data=data_json).text) + return result + +def recapture(): + res = requests.get(URL+"/auth/createCaptcha") + ret = res.json() + im = ret["captchaImage"][21:] + result = b64_api(username=USERNAME, password=PASSWORD, b64=im, ID="04897896") + return ret["captchaId"],result["data"]["result"] + +def login(username,password): + captchaId, ans = recapture() + url = URL+"/rest/auth" + parm = { + "answer": ans.lower(), + "captchaId": captchaId, + } + headers = build_head("post", None) + headers.update({ + "Username": encrypt(username), + "Password": encrypt(password), + "Logintype": "PC" + }) + res = requests.post(url, headers=headers, params=parm) + # print("Status:", res.status_code) + ret = res.json() + # print("Response:", ret) + if ret["status"] == "fail": + return None + else: + return ret["data"]["token"] + +def generate_uuid(): + hex_digits = '0123456789abcdef' + e = [random.choice(hex_digits) for _ in range(36)] + e[14] = '4' + e[19] = hex_digits[(int(e[19], 16) & 0x3) | 0x8] + for i in [8, 13, 18, 23]: + e[i] = '-' + return ''.join(e) + +def g(e: str): + uuid = generate_uuid() + timestamp = int(time.time() * 1000) + r = f"seat::{uuid}::{timestamp}::{e.upper()}" + secret_key = b"leos3cr3t" + hmac_obj = hmac.new(secret_key, r.encode('utf-8'), hashlib.sha256) + request_key = hmac_obj.hexdigest() + return { + "id": uuid, + "date": timestamp, + "requestKey": request_key + } + +def build_head(e: str,token:str): + sig = g(e) + headers = { + "Authorization": token, + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", + "X-hmac-request-key": sig["requestKey"], + "X-request-date": str(sig["date"]), + "X-request-id": sig["id"] + } + if token is None: + headers.pop("Authorization") + return headers + +def get_floor_data(token: str,buildingId= "1"): + date = datetime.now().strftime("%Y-%m-%d") # 获取当前日期 + url = f"{URL}/rest/v2/room/stats2/{buildingId}/{date}" + params = { + "buildingId": buildingId, + "date": date, + "token": token + } + headers = build_head("get",token) + + response = requests.get(url, headers=headers, params=params) + + print("Status:", response.status_code) + res = response.json() + if res["status"] != "success": + print("Error:", res) + return -1 + else: + ret = [] + for room in res["data"]: + ret.append({"roomId": room["roomId"], "room": room["room"], "free": room["free"], "inUse": room["inUse"], "totalSeats": room["totalSeats"]}) + # print(f"Room ID: {room['roomId']}, Name: {room['room']}, free: {room['free']}, inUse: {room['inUse']}, total: {room['totalSeats']}") + return ret + +def get_room_data(token: str,id:str = "10"): + date = datetime.now().strftime("%Y-%m-%d") # 获取当前日期 + # 替换为目标接口地址 + print(date) + url = f"{URL}/rest/v2/room/layoutByDate/{id}/{date}" + params = { + "id": id, + "date": date, + "token": token + } + + sig = g("get") + headers = { + "Authorization": token, + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", + "X-hmac-request-key": sig["requestKey"], + "X-request-date": str(sig["date"]), + "X-request-id": sig["id"] + } + + response = requests.get(url, headers=headers, params=params) + + print("Status:", response.status_code) + ret = response.json() + if ret["status"] != "success": + print("Error:", ret) + return -1 + else: + print("Data:", ret["data"]["name"]) + print("Response:", ) + +def send_message(msg): + print(sc_send(TOKEN, "图书馆座位", msg)) + +def main(dep=0): + if dep>3: + print("无法获取数据!") + send_message("无法获取数据!") + return + years = [2021, 2022, 2023, 2024] + house = [] + classes = [1,2,3] + num = range(1,21) + token = None + try: + print("正在尝试登录...") + tried =0 + while token is None and tried <= 5: + id = UID + # id = str(random.choice(years)) + random.choice(house) + str(random.choice(classes)) + str(random.choice(num)).zfill(2) + token = login(id,PWD) + if token is None: + print("登陆失败:",token) + time.sleep(random.randint(10, 30)) + tried += 1 + if token is None: + print("登录失败,请检查账号密码或网络连接。") + main(dep+1) + return + print("登录成功,Token:", token) + data = get_floor_data(token, "1") # 获取一楼数据 + if data == -1: + print("获取数据失败,请检查网络连接或Token是否有效。") + main(dep+1) + else: + ret = [] + for room in data: + if room["free"] > 0: + ret.append(room) + if ret: + msg = "|区域|空座|占用率|\n|-|-|-|\n" + for room in ret: + msg += f"|{room['room']}|{room['free']}|{1-room['free']/room['totalSeats']:0.2f}|\n" + send_message(msg) + except Exception as e: + print("发生错误:", e) + main(dep+1) + +if __name__ == "__main__": + main() + +``` + diff --git a/libseat/image-1.png b/libseat/image-1.png new file mode 100644 index 0000000..333d42a Binary files /dev/null and b/libseat/image-1.png differ diff --git a/libseat/image-2.png b/libseat/image-2.png new file mode 100644 index 0000000..2f60df4 Binary files /dev/null and b/libseat/image-2.png differ diff --git a/libseat/image.png b/libseat/image.png new file mode 100644 index 0000000..79156cd Binary files /dev/null and b/libseat/image.png differ diff --git a/libseat/inter.py b/libseat/inter.py new file mode 100644 index 0000000..deb0499 --- /dev/null +++ b/libseat/inter.py @@ -0,0 +1,116 @@ +# app.py ── 运行:python app.py +import re, threading, datetime as dt +from flask import Flask, render_template_string, request, redirect, url_for, flash + +app = Flask(__name__) +app.secret_key = "secret" + +# ---------- ❶ 你要周期执行的业务函数 ---------- +def my_task(): + print(f"[{dt.datetime.now():%F %T}] 🎉 my_task 被调用") + +# ---------- ❷ 将 “1h / 30min / 45s” 解析为秒 ---------- +def parse_interval(expr: str) -> int: + m = re.match(r"^\s*(\d+)\s*(h|hr|hour|m|min|minute|s|sec)\s*$", expr, re.I) + if not m: + raise ValueError("格式不正确——示例: 1h、30min、45s") + n, unit = int(m[1]), m[2].lower() + return n * 3600 if unit.startswith("h") else n * 60 if unit.startswith("m") else n + +# ---------- ❸ 简易调度器 (Thread + Event) ---------- +class LoopWorker(threading.Thread): + def __init__(self, interval_s: int, fn): + super().__init__(daemon=True) + self.interval_s = interval_s + self.fn = fn + self._stop = threading.Event() + + def cancel(self): + self._stop.set() + + def run(self): + while not self._stop.wait(self.interval_s): + try: + self.fn() + except Exception as e: + print("任务执行出错:", e) + +worker: LoopWorker | None = None # 全局保存当前线程 +current_expr: str = "" # 全局保存当前表达式 + +# ---------- ❹ Tailwind + Alpine 渲染 ---------- +PAGE = """ + + +
+ +
+ 当前任务:
+ {% if worker %}
+ 每 {{current_expr}} 触发一次(刷新页面查看最新状态)
+ {% else %}
+ 未设置
+ {% endif %}
+