Files
ai-titration/main.py
flt6 27c1b08365 final
Former-commit-id: 251b57f490efdc292b0c4c019431219b75ee890c
2025-07-03 22:29:31 +08:00

440 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import time
from datetime import datetime
import numpy as np
import ch340
import atexit
import utils
from utils import State, History,login_to_platform,send_data_to_platform
from scipy.signal import find_peaks
import tkinter as tk
class MAT:
def __init__(self, videoSourceIndex=0, bounce_time=2, end_bounce_time=5,k=30):
# 初始化logging
utils.setup_logging()
self.system_logger = utils.get_system_logger()
self.control_logger = utils.get_control_logger()
self.endpoint_logger = utils.get_endpoint_logger()
self.volume_logger = utils.get_volume_logger()
self.system_logger.info('正在初始化MAT系统...')
self.videoSourceIndex = videoSourceIndex
self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW)
self.ch340 = ch340.CH340()
self.total_volume = 0
self.k = k
self.state = State(bounce_time, end_bounce_time)
atexit.register(self.ch340.stop)
self.history = History(max(bounce_time, end_bounce_time))
self.first_about = 0
self.colored_volume = None
self.colored_time = None
self.colored_im = None
def ch340_pull(self):
self.control_logger.info("开始抽取12ml")
self.ch340.max_speed()
self.ch340.pull(vol=12)
self.control_logger.info('完成抽取')
def ch340_init(self):
# self.ch340.push(speed=1,t=1)
self.ch340.pull(speed=1.2,vol=1.8)
self.control_logger.info('电极位置已初始化')
def ch340_push(self, speed=0.1):
self.ch340.push_async(speed=speed, t=1)
def process_left(self, now: float, value_only=False):
if self.state.mode == State.Mode.CRAZY:
value_only = True
if self.state.mode == State.Mode.ABOUT:
return self.total_volume
st = self.ch340.start
if not value_only:
self.ch340.stop()
if not abs(self.ch340._get_time()-1) < 1e-3:
self.control_logger.warning(f"CH340 timing issue detected: {self.ch340._get_time()}")
return self.total_volume
r = now - st
ret = self.total_volume - (1-r) * self.speeds[self.state.mode.value]
if value_only:
return ret
else:
self.total_volume = ret
def _pred(self):
"""预测当前图像状态,返回'transport''middle''about''colored'"""
suc, im = self.cap.read()
if not suc:
self.control_logger.error("无法从摄像头捕获帧")
return None
# 录制当前帧到视频文件
if hasattr(self, 'capFile') and self.capFile.isOpened():
self.capFile.write(im)
ret, rate = self.predictor(im)
if ret is None:
return None
now = time.time()
val = self.process_left(now, value_only=True)
# 确保val不为None
if val is None:
self.control_logger.warning("体积计算返回None跳过本次记录")
return ret
# 更新滑动窗口历史记录 - 维护最近end_bounce_time内的状态
self.history.add_record(now, ret, rate, val, im)
if self.history.is_empty():
self.control_logger.error("未预期的没有可用的历史记录")
return ret
if self.state.mode == State.Mode.CRAZY:
self._display_status(im, ret, rate, val)
return ret
# === 状态进入逻辑 ===
if now - self._start_time<10:
self._display_status(im, ret, rate, val)
return ret
if not self.edited and self.history.base is not None and rate>self.history.base * 2:
self.edited = True
self.speeds[0] /= 2
# 1. middle: predictor返回middle立即进入slow状态
if ret == "middle":
if self.state.is_fast_mode():
self.process_left(now)
self.state.enter_middle_state(now)
self.control_logger.info(f"检测到middle立即进入slow模式当前体积: {val:.2f} ml")
# 2. about: 返回about且处于middle则进入about状态
elif ret == "about":
if self.state.is_slow_mode():
self.process_left(now)
self.state.enter_about_state(now)
self.control_logger.info(f"检测到about进入about模式当前体积: {val:.2f} ml")
# 3. end返回colored置end_check标记开始检查
# elif ret == "colored":
# if not self.state.in_end_check:
# self.state.enter_end_check(now)
# if self.colored_volume is None:
# self.colored_volume = val
# self.colored_time = now
# self.colored_im = im.copy()
# self.endpoint_logger.info(f"检测到colored开始end检查记录体积: {val:.2f} ml")
# === 状态检查逻辑 ===
# middle检查: 进入middle的bounce_time后在最近bounce_time内middle比例<70%返回fast状态
if self.state.should_check_middle_exit(now) and not self.history.last_end:
# 计算最近bounce_time内的middle比例
recent_records = self.history.get_recent_records(self.state.bounce_time, now)
if recent_records:
trans_ratio = self.history.get_state_ratio("transport", recent_records)
if trans_ratio > 0.4:
self.process_left(now)
self.state.exit_middle_check()
# about状态随middle一起退出
# self.state.exit_about_with_middle()
self.control_logger.info(f"middle比例{trans_ratio:.2%}<60%退出middle检查返回fast模式")
if self.state.mode == State.Mode.ABOUT and self.state.about_check:
h = self.history.get_recent_records(self.about_time/3,now)
ratio = self.history.get_state_ratio("transport", h)
self.history.about_history.append(ratio == 1)
while len(self.history.about_history) > 5:
self.history.about_history.pop(0)
if len(self.history.about_history) == 5 and self.history.last_end < 2:
rate = sum(self.history.about_history) / len(self.history.about_history)
if rate > 0.8:
self.state.exit_about()
self.control_logger.info(f"about比例{ratio:.2%}<80%退出about检查返回middle模式")
ret = self.end_check()
if ret == "colored":
return ret
if not self.history.last_end:
self.history.last_end = max(self.history.last_end,ret)
self.state.about_check = False
# 显示状态信息
self._display_status(im, ret, rate, val)
return ret
def end_check(self,dep=0):
if not self.running:
return "colored"
if dep == 1:
suc,im = self.cap.read()
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
# 降低饱和度通道 (S通道是索引1)
hsv[:, :, 1] = hsv[:, :, 1] * 0.8
# 转换回BGR颜色空间
result = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
name = f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
cv2.imwrite(name,result)
self.colored_im = name
if dep > 3:
# self.endpoint_logger.info(f"colored比例{.2%}>=70%,确认滴定终点")
self.colored_volume = self.total_volume
self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml")
self.running = False
self.ch340.stop()
# if self.colored_im is not None:
# cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im)
return "colored"
suc,im = self.cap.read()
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
s = hsv[:, :, 0]
s = s[s > 0]
hist = cv2.calcHist([s], [0], None, [256], [0, 256])
hist = hist.flatten() # 转换为一维数组
# 峰值检测 - 找到直方图中的峰值
peaks, properties = find_peaks(hist,
height=np.max(hist) * 0.2, # 峰值高度至少是最大值的10%
distance=10, # 峰值之间的最小距离
prominence=np.max(hist) * 0.01) # 峰值的突出度
if np.any(peaks>130):
if dep == 0:
self.process_left(time.time())
self.history.end_history.append(True)
self.control_logger.info(f"检测到colored状态end_check at {dep}")
bgn = time.time()
while time.time()-bgn < 2:
suc,im = self.cap.read()
if hasattr(self, 'capFile') and self.capFile.isOpened():
self.capFile.write(im)
cv2.putText(im, "ENDCHK", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.imshow("Frame", im)
return self.end_check(dep+1)
# time.sleep(2)
else:
self.colored_im = None
return dep
def _display_status(self, im, detection_result, rate, volume):
"""显示状态信息到图像上"""
mode_color = {
State.Mode.FAST: (255, 255, 255),
State.Mode.SLOW: (10, 215, 255),
State.Mode.ABOUT: (228,116,167),
State.Mode.CRAZY: (0, 255, 0),
# State.Mode.END: (0, 0, 255)
}
status_text = f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml"
status_text += self.state.get_status_text()
cv2.putText(im, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7,
mode_color[self.state.mode], 2)
cv2.imshow("Frame", im)
cv2.waitKey(1)
def predictor(self,im):
hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV)
s = hsv[:,:,1]
mask = s>self.k
cv2.imshow('mask',im*mask[:,:,np.newaxis])
tot = mask.shape[0]*mask.shape[1]
val = np.sum(mask)
rate = val/tot
if self.history.base is not None:
base = self.history.base
thr = (min(0.05,base*5), min(0.2,base*13), 0.5)
# thr = (base*5, base*13, 0.5)
if rate < thr[0]:
return "transport",rate
elif rate <thr[1]:
return "middle",rate
elif rate < thr[2]:
return "about",rate
else:
return "colored",rate
else:
return "transport",rate
def __del__(self):
self.cap.release()
if hasattr(self, 'capFile') and self.capFile.isOpened():
self.capFile.release()
cv2.destroyAllWindows()
def format_date_time(self, timestamp):
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, about_time=1, cap_dir="Videos"):
self.running = True
self.start_time = time.time()
self.speeds = [quick_speed, slow_speed, end_speed, 1.0]
self.need_check = False
self.edited = False
self.volume_list = []
self.color_list = []
if cap_dir is not None:
vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mkv"
else:
vid_name = "Disabled"
# 记录实验开始
experiment_params = {
"视频源索引 ": self.videoSourceIndex,
"防抖时间 ": self.state.bounce_time,
"快速模式速度": f"{quick_speed} ml/次",
"慢速模式速度": f"{slow_speed} ml/次",
"录制视频 ": vid_name
}
self.system_logger.info("实验参数:")
for key, value in experiment_params.items():
self.system_logger.info(f" {key}: {value}")
fps = int(self.cap.get(cv2.CAP_PROP_FPS)) or 30
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 初始化视频录制器
if vid_name != "Disabled":
fourcc = cv2.VideoWriter.fourcc(*'x264') # 使用更兼容的编码器
self.capFile = cv2.VideoWriter(vid_name, fourcc, fps, (width, height))
if not self.capFile.isOpened():
self.system_logger.error(f"无法打开视频录制器: {vid_name}")
del self.capFile
else:
self.system_logger.info(f"视频录制已初始化")
self.ch340_init()
cnt = 0
self._start_time = time.time()
self.about_time=about_time
self.crazy = 0
while self.running:
if not self.ch340.running:
if self.state.mode == State.Mode.CRAZY:
match self.crazy:
case 0:
self.control_logger.info("crazy 0")
self.ch340_pull()
self.ch340.speed = 1.00
self.ch340.push_async(vol=12)
self.crazy = 1
case 1:
self.control_logger.info("crazy 1")
self.ch340_pull()
self.ch340.push_async(vol=8)
self.total_volume = 20
cnt = 2
self.crazy = 2
self.state.mode = State.Mode.FAST
else:
if 12 * cnt - self.total_volume < 0.5:
self.ch340_pull()
cnt += 1
time.sleep(0.01)
# 简化的推送逻辑
should_push = False
if self.state.is_fast_mode():
should_push = True
elif self.state.is_slow_mode() and time.time() - self.ch340.start > mid_time:
should_push = True
elif self.state.is_about_mode() and time.time() - self.ch340.start > about_time:
if not self.state.about_first_flag:
self.state.about_check = True
else:
self.state.about_first_flag = False
should_push = True
if should_push:
speed = self.speeds[self.state.mode.value]
self.volume_logger.info(f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次")
self.ch340_push(speed)
self.total_volume += speed
self.volume_list.append(round(self.total_volume,2))
try:
self.color_list.append(self.state.mode.name)
except Exception as e:
self.control_logger.error(f"报表失败: {e}")
# if (self.state.is_about_mode() and time.time() - self.ch340.start > end_time) or not self.state.is_about_mode():
if self._pred() is None:
self.control_logger.error("预测失败,跳过当前帧")
continue
# try:
# self.control_logger.info("推送安全体积")
# self.ch340_push(0.15)
# except Exception as e:
# self.control_logger.error(f"推送失败: {e}")
# 释放视频录制器
if hasattr(self, 'capFile') and self.capFile.isOpened():
self.capFile.release()
self.system_logger.info(f"视频录制完成: {vid_name}")
# 实验结束,记录结果
experiment_results = {
"总体积 ": f"{self.total_volume:.2f} ml",
"终点体积": f"{self.colored_volume:.2f} ml",
"理论体积": f"{self.colored_volume:.2f} ml",
"实验时长": f"{time.time() - self.start_time:.2f}",
"滴定速率": f"{self.total_volume / (time.time() - self.start_time):.2f} mL/s",
}
upload_data = {
"start_time":self.format_date_time(self.start_time),
"end_time":self.format_date_time(time.time()),
"volume_record": f'{self.volume_list}',
'voltage_record': f'{[]}',
'color_record': f'{self.color_list}',
'final_volume': f'{self.colored_volume}',
}
self.system_logger.info("实验结果:")
for key, value in experiment_results.items():
self.system_logger.info(f" {key}: {value}")
return upload_data, self.colored_im
if __name__ == "__main__":
token = login_to_platform("13504022184","password")
# 创建MAT类的实例并运行
mat = MAT(
videoSourceIndex = 1,
bounce_time=2,
end_bounce_time=0.01,
k = 32
)
mat.state.mode = State.Mode.FAST
final_data, finish_picture = mat.run(
slow_speed = 0.05,
quick_speed = 1.0,
about_time=3,
# cap_dir=None
)
with open("log.json","w") as f:
import json
json.dump(final_data, f, indent=4)
send_data_to_platform(token, final_data, finish_picture)