Former-commit-id: 3af1a19b5b0e0bb4dbc367468b36c62aa8762a91
This commit is contained in:
2025-06-20 22:38:46 +08:00
parent 492b37c912
commit 42460ad72f
4 changed files with 264 additions and 91 deletions

4
login/info.json Normal file
View File

@ -0,0 +1,4 @@
{
"username": "13504022184",
"password": "password"
}

69
main.py
View File

@ -5,8 +5,9 @@ import numpy as np
import ch340 import ch340
import atexit import atexit
import utils import utils
from utils import State, History from utils import State, History,login_to_platform,send_data_to_platform
from scipy.signal import find_peaks from scipy.signal import find_peaks
import tkinter as tk
class MAT: class MAT:
@ -121,14 +122,14 @@ class MAT:
self.control_logger.info(f"检测到about进入about模式当前体积: {val:.2f} ml") self.control_logger.info(f"检测到about进入about模式当前体积: {val:.2f} ml")
# 3. end返回colored置end_check标记开始检查 # 3. end返回colored置end_check标记开始检查
elif ret == "colored": # elif ret == "colored":
if not self.state.in_end_check: # if not self.state.in_end_check:
self.state.enter_end_check(now) # self.state.enter_end_check(now)
if self.colored_volume is None: # if self.colored_volume is None:
self.colored_volume = val # self.colored_volume = val
self.colored_time = now # self.colored_time = now
self.colored_im = im.copy() # self.colored_im = im.copy()
self.endpoint_logger.info(f"检测到colored开始end检查记录体积: {val:.2f} ml") # self.endpoint_logger.info(f"检测到colored开始end检查记录体积: {val:.2f} ml")
# === 状态检查逻辑 === # === 状态检查逻辑 ===
# middle检查: 进入middle的bounce_time后在最近bounce_time内middle比例<70%返回fast状态 # middle检查: 进入middle的bounce_time后在最近bounce_time内middle比例<70%返回fast状态
@ -172,14 +173,21 @@ class MAT:
return ret return ret
def end_check(self,dep=0): def end_check(self,dep=0):
if not self.running:
return "colored"
if dep == 1:
suc,im = self.cap.read()
name = f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
cv2.imwrite(name,im)
self.colored_im = name
if dep > 5: if dep > 5:
# self.endpoint_logger.info(f"colored比例{.2%}>=70%,确认滴定终点") # self.endpoint_logger.info(f"colored比例{.2%}>=70%,确认滴定终点")
self.colored_volume = self.total_volume self.colored_volume = self.total_volume
self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml") self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml")
self.running = False self.running = False
self.ch340.stop() self.ch340.stop()
if self.colored_im is not None: # if self.colored_im is not None:
cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im) # cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im)
return "colored" return "colored"
suc,im = self.cap.read() suc,im = self.cap.read()
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV) hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
@ -208,7 +216,8 @@ class MAT:
cv2.imshow("Frame", im) cv2.imshow("Frame", im)
return self.end_check(dep+1) return self.end_check(dep+1)
# time.sleep(2) # time.sleep(2)
else: else:
self.colored_im = None
return dep return dep
def _display_status(self, im, detection_result, rate, volume): def _display_status(self, im, detection_result, rate, volume):
@ -258,7 +267,9 @@ class MAT:
if hasattr(self, 'capFile') and self.capFile.isOpened(): if hasattr(self, 'capFile') and self.capFile.isOpened():
self.capFile.release() self.capFile.release()
cv2.destroyAllWindows() cv2.destroyAllWindows()
def format_date_time(self, timestamp):
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, about_time=1, cap_dir="Videos"): def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, about_time=1, cap_dir="Videos"):
self.running = True self.running = True
@ -266,6 +277,8 @@ class MAT:
self.speeds = [quick_speed, slow_speed, end_speed, 1.0] self.speeds = [quick_speed, slow_speed, end_speed, 1.0]
self.need_check = False self.need_check = False
self.edited = False self.edited = False
self.volume_list = []
self.color_list = []
if cap_dir is not None: if cap_dir is not None:
vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mkv" vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mkv"
@ -346,12 +359,23 @@ class MAT:
self.volume_logger.info(f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次") self.volume_logger.info(f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次")
self.ch340_push(speed) self.ch340_push(speed)
self.total_volume += speed self.total_volume += speed
self.volume_list.append(round(self.total_volume,2))
try:
self.color_list.append(self.state.mode.name)
except Exception as e:
self.control_logger.error(f"报表失败: {e}")
# if (self.state.is_about_mode() and time.time() - self.ch340.start > end_time) or not self.state.is_about_mode(): # if (self.state.is_about_mode() and time.time() - self.ch340.start > end_time) or not self.state.is_about_mode():
if self._pred() is None: if self._pred() is None:
self.control_logger.error("预测失败,跳过当前帧") self.control_logger.error("预测失败,跳过当前帧")
continue continue
# try:
# self.control_logger.info("推送安全体积")
# self.ch340_push(0.15)
# except Exception as e:
# self.control_logger.error(f"推送失败: {e}")
# 释放视频录制器 # 释放视频录制器
if hasattr(self, 'capFile') and self.capFile.isOpened(): if hasattr(self, 'capFile') and self.capFile.isOpened():
self.capFile.release() self.capFile.release()
@ -364,27 +388,42 @@ class MAT:
"理论体积": f"{self.colored_volume:.2f} ml", "理论体积": f"{self.colored_volume:.2f} ml",
"实验时长": f"{time.time() - self.start_time:.2f}" "实验时长": f"{time.time() - self.start_time:.2f}"
} }
upload_data = {
"start_time":self.format_date_time(self.start_time),
"end_time":self.format_date_time(time.time()),
"volume_record": f'{self.volume_list}',
'voltage_record': f'{[]}',
'color_record': f'{self.color_list}',
'final_volume': f'{self.colored_volume}',
}
self.system_logger.info("实验结果:") self.system_logger.info("实验结果:")
for key, value in experiment_results.items(): for key, value in experiment_results.items():
self.system_logger.info(f" {key}: {value}") self.system_logger.info(f" {key}: {value}")
return upload_data, self.colored_im
if __name__ == "__main__": if __name__ == "__main__":
token = login_to_platform("13504022184","password")
# 创建MAT类的实例并运行 # 创建MAT类的实例并运行
mat = MAT( mat = MAT(
videoSourceIndex = 1, videoSourceIndex = 1,
bounce_time=4, bounce_time=4,
end_bounce_time=0.01, end_bounce_time=0.01,
k = 34 k = 27
) )
mat.state.mode = State.Mode.FAST mat.state.mode = State.Mode.FAST
mat.run( final_data, finish_picture = mat.run(
slow_speed = 0.05, slow_speed = 0.05,
quick_speed = 0.8, quick_speed = 0.8,
about_time=3, about_time=3,
# cap_dir=None # cap_dir=None
) )
with open("log.json","w") as f:
import json
json.dump(final_data, f, indent=4)
send_data_to_platform(token, final_data, finish_picture)

95
tmp.py
View File

@ -1,80 +1,23 @@
import numpy as np from datetime import datetime
import json
class DistributionChangeDetector: import requests
def __init__(self, baseline_windows: list[np.ndarray]): import time
""" from utils import login_to_platform,send_data_to_platform
参数 baseline_windows: List of arrays代表初始稳定期的多个窗口
"""
self.baseline = self._compute_baseline(baseline_windows)
def _compute_stats(self, window: np.ndarray) -> tuple[float, float, float]:
"""返回 (P_under30, std, mode)"""
p_under30 = np.mean(window < 30)
std = np.std(window, ddof=1)
# 快速估计众数:最大 bin 的中心
hist, bin_edges = np.histogram(window, bins=50)
max_bin_index = np.argmax(hist)
mode_est = (bin_edges[max_bin_index] + bin_edges[max_bin_index + 1]) / 2
return p_under30, std, mode_est
def _compute_baseline(self, windows: list[np.ndarray]) -> tuple[np.ndarray, np.ndarray]:
"""
返回 baseline 向量 (P0, σ0, mode0) 和对应标准差(用于归一化)
"""
stats = np.array([self._compute_stats(w) for w in windows])
mean = stats.mean(axis=0)
std = stats.std(axis=0) + 1e-6 # 防止除0
return mean, std
def update(self, window: np.ndarray) -> float:
"""
输入:当前窗口数据(长度 = 窗口大小)
输出:变化分数(越大表示分布越偏离基准)
"""
x = np.array(self._compute_stats(window))
mean, std = self.baseline
norm_diff = (x - mean) / std
change_score = np.linalg.norm(norm_diff)
return float(change_score)
import cv2 token = login_to_platform("13504022184","password")
def gen_data():
cap = cv2.VideoCapture()
cap.open(1)
while True:
ret, frame = cap.read()
cv2.imshow("Camera Feed", frame)
if not ret:
break
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
s = hsv[:, :, 1] # 直接提取饱和度通道
s = s[s > 0] # 只保留非零饱和度值,减少噪声
yield s
if cv2.waitKey(1) & 0xFF == ord('a'):
break
def format_date_time(timestamp):
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
gen = gen_data() print(token)
baseline_data = [gen.__next__() for _ in range(5)] # 获取10个窗口作为基线 final_data ={
'start_time': '2025-06-17 14:59:33',
det = DistributionChangeDetector(baseline_data) 'end_time': '2025-06-17 15:01:18',
'volume_record': '[0.8, 1.6, 2.4000000000000004, 3.2, 4.0, 4.8, 5.6, 6.3999999999999995, 7.199999999999999, 7.999999999999999, 8.799999999999999, 9.6, 10.4, 11.200000000000001, 12.000000000000002, 12.800000000000002, 13.200000000000003, 13.600000000000003, 14.000000000000004, 14.400000000000004, 14.800000000000004, 15.200000000000005, 15.600000000000005, 16.000000000000004, 16.400000000000002, 16.8, 17.2, 17.599999999999998, 17.999999999999996, 18.399999999999995, 18.799999999999994, 19.199999999999992, 19.59999999999999, 19.99999999999999, 20.399999999999988, 20.799999999999986, 21.199999999999985, 21.599999999999984, 21.999999999999982, 22.39999999999998, 22.79999999999998, 23.199999999999978, 23.599999999999977, 23.999999999999975, 24.399999999999974, 24.799999999999972, 25.19999999999997, 25.59999999999997, 25.999999999999968, 26.399999999999967, 26.799999999999965, 27.199999999999964, 27.599999999999962, 27.99999999999996, 28.39999999999996, 28.447450351715048, 28.49745035171505, 28.54745035171505, 28.59745035171505, 28.98371173143383, 28.781944286823233, 28.761131591796836]',
'voltage_record': "[]",
'color_record': "['transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'transport', 'middle', 'transport', 'transport', 'transport', 'transport', 'middle', 'colored']",
results = [] 'final_volume': "28.761131591796836"
for x in gen: }
out = det.update(x) # print(json.dumps(final_data))
if out is not None: send_data_to_platform(token, final_data, "im.jpg")
results.append(out)
# 作图查看
import matplotlib.pyplot as plt
plt.plot(results, label="ChangeScore")
plt.xlabel("Window index")
plt.ylabel("Score")
plt.title("Streaming Change Detection")
plt.legend()
plt.show()

187
utils.py
View File

@ -10,6 +10,11 @@ import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation from matplotlib.animation import FuncAnimation
import threading import threading
from collections import deque from collections import deque
import requests
import json
import tkinter as tk
from tkinter import messagebox
import base64
@dataclass @dataclass
class HistoryRecord: class HistoryRecord:
@ -377,6 +382,188 @@ class State:
return ", " + ", ".join(status) if status else "" return ", " + ", ".join(status) if status else ""
def login_to_platform(username, password):
"""登录到平台获取token"""
try:
wwwF = {'userName': username, 'password': password}
url = 'https://jingsai.mools.net/api/login'
response = requests.post(url, wwwF,timeout=2)
if response is None:
print("错误",'网络连接失败 ')
return None
request = json.loads(response.text)
if request['code'] == 1:
# 登陆成功
# print("登陆成功",'登陆成功')
# 从服务器获取到的数据中找到token
token = request['token']
print("成功", "登录成功!")
return token
elif request['code'] == 2:
print("错误",'用户名或密码错误!')
return None
else:
print("错误",'登陆失败 ')
return None
except Exception as e:
print(e)
print("错误", f"发送数据时出错:{e}")
return None
def send_data_to_platform(token, data, picture):
"""将数据发送到平台"""
try:
# if 1:
# 打开图片文件并转换为 Base64 编码
with open(picture, "rb") as picture_file:
picture_data = picture_file.read()
base64_encoded_picture = base64.b64encode(picture_data).decode("utf-8")
print(base64_encoded_picture)
# 更新数据字典,添加 Base64 编码的图片
data["final_image"] = base64_encoded_picture
# print(data)
# 设置请求头
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
url = "https://jingsai.mools.net/api/upload-record"
# 合并 headers 和 data 为一个字典
datas = {**headers, **data}
# 准备 JSON 数据
json_data = json.dumps(data)
# 发送 POST 请求
response = requests.post(url, headers=headers, data=json_data)
request = json.loads(response.text)
# print(request['code'])
# 检查响应
if request['code'] == 1:
print("提交成功", "提交成功")
else:
print("错误", f"网络请求失败,状态码:{response.status_code}\n错误信息:{response.text}")
except Exception as e:
raise e
print("错误", f"发送数据时出错:{e}")
class LoginApp:
def __init__(self, root):
self.root = root
self.root.title("Mlabs AI Titration 1.0")
# 设置窗口图标
self.set_window_icon()
# 定义变量
self.username = tk.StringVar()
self.password = tk.StringVar()
self.token = ''
self.un = ''
self.pw = ''
# 创建登录界面
self.create_login_interface()
# 设置窗口居中
self.center_window()
def center_window(self, width=300, height=120):
"""将窗口居中"""
# 获取屏幕宽度和高度
screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight()
# 计算窗口位置
x = (screen_width - width) // 2
y = (screen_height - height) // 2
# 设置窗口大小和位置
self.root.geometry(f"{width}x{height}+{x}+{y}")
def set_window_icon(self):
"""设置窗口图标"""
try:
# 替换为你的图标文件路径
icon_path = "logo.ico" # Windows系统使用.ico文件
self.root.iconbitmap(icon_path)
except Exception as e:
print(f"设置窗口图标时出错:{e}")
def create_login_interface(self):
# 用户名
tk.Label(self.root, text="用户名:").grid(row=0, column=0, padx=10, pady=5)
self.username_entry = tk.Entry(self.root, textvariable=self.username, width=30)
self.username_entry.grid(row=0, column=1, padx=10, pady=5)
# 密码
tk.Label(self.root, text="密码:").grid(row=1, column=0, padx=10, pady=5)
self.password_entry = tk.Entry(self.root, textvariable=self.password, show="*", width=30)
self.password_entry.grid(row=1, column=1, padx=10, pady=5)
# 登录按钮
login_button = tk.Button(self.root, text="登录", command=self.login)
login_button.grid(row=3, column=0, columnspan=2, pady=10)
# 检查info.json文件是否存在
self.check_info_file()
self.username_entry.focus_set()
def check_info_file(self):
login_folder = "login"
info_file = os.path.join(login_folder, "info.json")
if os.path.exists(info_file):
with open(info_file, "r", encoding="utf-8") as file:
info = json.load(file)
self.username.set(info.get("username", ""))
self.password.set(info.get("password", ""))
else:
print("提示", "未找到本地登录信息,请手动输入登录信息。")
def login(self):
self.un = self.username.get()
self.pw = self.password.get()
if not self.un or not self.pw:
print("错误", "用户名、密码不能为空!")
return
# 这里可以添加登录逻辑,例如发送请求到服务器验证登录信息
self.token = login_to_platform(self.un, self.pw)
# print(self.token)
# 这里可以添加登录逻辑,例如发送请求到服务器验证登录信息
self.save_info_file()
self.root.destroy()
# return username, password
def save_info_file(self):
login_folder = "login"
#如果没有login文件夹则创建一个
if not os.path.exists(login_folder):
os.makedirs(login_folder)
info_file = os.path.join(login_folder, "info.json")
if not os.path.exists(login_folder):
os.makedirs(login_folder)
info = {
"username": self.username.get(),
"password": self.password.get(),
}
with open(info_file, "w", encoding="utf-8") as file:
json.dump(info, file, ensure_ascii=False, indent=4)
def setup_logging(log_level=logging.INFO, log_dir="logs"): def setup_logging(log_level=logging.INFO, log_dir="logs"):
""" """
设置logging配置创建不同模块的logger 设置logging配置创建不同模块的logger