import cv2 import time from datetime import datetime import numpy as np import ch340 from threading import Thread import atexit class MAT: def __init__(self, videoSourceIndex=0, bounce_time=1): print('Initializing MAT...') self.videoSourceIndex = videoSourceIndex self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) self.ch340 = ch340.CH340() self.bounce_time = bounce_time self.total_volume = 0 self.start_time = time.time() # 将开始时间转化为年月日时分秒的格式,后续文件命名都已此命名 self.formatted_time = datetime.fromtimestamp(self.start_time).strftime('%Y%m%d_%H%M%S') atexit.register(self.ch340.stop) print("实验开始于", self.formatted_time) self.history = [] # 滑动窗口历史记录 self.colored_volume = None # 首次colored体积 self.colored_time = None # 首次colored时间 def ch340_pull(self): print("开始抽取12ml") self.ch340.max_speed() self.ch340.pull(vol=12) print('完成抽取') def ch340_init(self): self.ch340.push(speed=1,t=1) self.ch340.pull(speed=1.2,vol=3) print('CH340 INITED') def ch340_push(self, speed=0.1): self.ch340.push_async(speed=speed, t=1) def _pred(self): """预测当前图像状态,返回'transport'、'middle'或'colored'""" suc, im = self.cap.read() if not suc: print("Failed to capture frame from camera.") return None ret,rate = self.predictor(im) if ret is None: print("Fallback") return None now = time.time() # 记录历史状态,保留bounce_time时间内的记录 self.history.append((now, ret, self.total_volume)) while self.history and self.history[0][0] < now - self.bounce_time: self.history.pop(0) if not self.history: print("Warning: No history available.") return ret # 处理检测结果 if ret == "middle": # 检测到middle立即切换到慢速模式 if self.typ == 0: print(f"检测到middle,切换到慢速模式,当前体积: {self.total_volume}") self.typ = 1 elif ret == "colored": # 检测到colored时记录当前体积 if self.colored_volume is None: self.colored_volume = self.total_volume self.colored_time = now print(f"检测到colored,记录体积: {self.colored_volume}") # 检查是否要停止 if now - self.colored_time > self.bounce_time: colored_count = sum(1 for _, state, _ in self.history if state == "colored") if colored_count / len(self.history) > 0.9: print(f"确认终点,最终体积: {self.colored_volume:0.2f} ml") self.running = False self.ch340.stop() return "colored" else: print(f"colored比例小于90%,当前体积: {self.total_volume}, {colored_count / len(self.history)}") self.colored_volume = self.total_volume self.colored_time = now else: # ret == "transport" pass # 在慢速模式下检查是否要切回快速模式 if self.typ == 1: non_middle_count = sum(1 for _, state, _ in self.history if state == "transport") if len(self.history) > 3 and non_middle_count / len(self.history) > 0.9: print(f"非middle比例超过90%,切回快速模式,当前体积: {self.total_volume}") self.typ = 0 # 如果已记录colored但在bounce_time内colored比例小于90%,重置 if self.colored_volume is not None and now - self.colored_time > self.bounce_time: colored_count = sum(1 for _, state, _ in self.history if state == "colored") if colored_count / len(self.history) < 0.9: print(f"colored比例小于90%,重置colored记录") self.colored_volume = None for t, state,vol in self.history: if state == "colored": print(f"滑动窗口到: {vol} at {t}") self.colored_volume = vol self.colored_time = t break else: print("Warning: Unusual condition, colored for over 90%% but last is transport") print(f"疑似滴定终点: {self.colored_volume}") cv2.putText(im, f"State: {ret}, rate: {round(rate,2)}, Vol: {self.total_volume:.2f} ml, typ: {'slow' if self.typ else 'fast'}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (10,215, 255) if self.typ else (255,255,255), 2) cv2.imshow("Frame", im) cv2.waitKey(1) return ret def predictor(self,im): hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV) s = hsv[:,:,1] mask = s>60 tot = mask.shape[0]*mask.shape[1] val = np.sum(mask) rate = val/tot if rate < 0.01: return "transport",rate elif rate <0.2: return "middle",rate else: return "colored",rate def __del__(self): self.cap.release() cv2.destroyAllWindows() print("Experiment finished.") def run(self,quick_speed = 0.2,slow_speed = 0.05): self.running = True self.typ = 0 self.last = 0 while self.running: if self.total_volume % 12 == 0: self.ch340_pull() # 抽取12ml time.sleep(0.01) if not self.ch340.running: print(f"Current Total Volume: {self.total_volume:0.2f} ml") if self.typ == 0: # 每次加0.2ml speed = quick_speed self.ch340_push(speed) self.total_volume += speed else: speed = slow_speed self.ch340_push(speed) # 每次加0.05ml self.total_volume += speed if self._pred() is None: print("Prediction failed, skipping frame.") continue print('----->>Visual Endpoint<<-----') print(f"Total Volume: {self.total_volume} ml") if __name__ == "__main__": # 创建MAT类的实例并运行 mat = MAT( videoSourceIndex = 1, bounce_time=0.2 ) mat.run( slow_speed = 0.05, quick_speed = 0.15, )