import cv2 import time from datetime import datetime import numpy as np import ch340 import atexit import utils from utils import State class MAT: def __init__(self, videoSourceIndex=0, bounce_time=1, end_bounce_time=5): # 初始化logging utils.setup_logging() self.system_logger = utils.get_system_logger() self.control_logger = utils.get_control_logger() self.endpoint_logger = utils.get_endpoint_logger() self.volume_logger = utils.get_volume_logger() self.system_logger.info('正在初始化MAT系统...') self.videoSourceIndex = videoSourceIndex self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) self.ch340 = ch340.CH340() self.total_volume = 0 self.state = State(bounce_time, end_bounce_time) atexit.register(self.ch340.stop) self.history = [] self.colored_volume = None self.colored_time = None self.colored_im = None def ch340_pull(self): self.control_logger.info("开始抽取12ml") self.ch340.max_speed() self.ch340.pull(vol=12) self.control_logger.info('完成抽取') def ch340_init(self): self.ch340.push(speed=1,t=1) self.ch340.pull(speed=1.2,vol=1) self.control_logger.info('电极位置已初始化') def ch340_push(self, speed=0.1): self.ch340.push_async(speed=speed, t=1) def process_left(self, now: float, value_only=False): st = self.ch340.start if not value_only: self.ch340.stop() assert abs(self.ch340._get_time()-1) < 1e3 r = now - st ret = self.total_volume - (1-r) * self.speeds[self.state.mode.value] if value_only: return ret else: self.total_volume = ret def _pred(self): """预测当前图像状态,返回'transport'、'middle'、'about'或'colored'""" suc, im = self.cap.read() if not suc: self.control_logger.error("无法从摄像头捕获帧") return None # 录制当前帧到视频文件 if hasattr(self, 'capFile') and self.capFile.isOpened(): self.capFile.write(im) ret, rate = self.predictor(im) if ret is None: return None now = time.time() val = self.process_left(now, value_only=True) # 更新滑动窗口历史记录 - 维护最近end_bounce_time内的状态 self.history.append((now, ret, val, im)) while self.history and self.history[0][0] < now - self.state.end_bounce_time: self.history.pop(0) if not self.history: self.control_logger.error("未预期的没有可用的历史记录") return ret # === 状态进入逻辑 === if now - self._start_time<17: self._display_status(im, ret, rate, val) return ret # 1. middle: predictor返回middle立即进入slow状态 if ret == "middle": if self.state.is_fast_mode(): self.process_left(now) self.state.enter_middle_state(now) self.control_logger.info(f"检测到middle,立即进入slow模式,当前体积: {val:.2f} ml") # 2. about: 返回about,且处于middle则进入about状态 elif ret == "about": if self.state.is_slow_mode(): self.process_left(now) self.state.enter_about_state(now) self.control_logger.info(f"检测到about,进入about模式,当前体积: {val:.2f} ml") # 3. end:返回colored,置end_check标记开始检查 elif ret == "colored": if not self.state.in_end_check: self.state.enter_end_check(now) if self.colored_volume is None: self.colored_volume = val self.colored_time = now self.colored_im = im.copy() self.endpoint_logger.info(f"检测到colored,开始end检查,记录体积: {val:.2f} ml") # === 状态检查逻辑 === # middle检查: 进入middle的bounce_time后,在最近bounce_time内middle比例<70%返回fast状态 if self.state.should_check_middle_exit(now): # 计算最近bounce_time内的middle比例 bounce_start_time = now - self.state.bounce_time recent_history = [(t, state, v, i) for t, state, v, i in self.history if t >= bounce_start_time] if recent_history: middle_count = sum(1 for _, state, _, _ in recent_history if state == "transport") middle_ratio = middle_count / len(recent_history) if middle_ratio > 0.3: self.process_left(now) self.state.exit_middle_check() # about状态随middle一起退出 self.state.exit_about_with_middle() self.control_logger.info(f"middle比例{middle_ratio:.2%}<70%,退出middle检查,返回fast模式") # end检查: 进入end之后的end_bounce_time,如果end比例<80%,则重置;否则终止实验 if self.state.should_check_end_result(now): colored_count = sum(1 for _, state, _, _ in self.history if state == "colored") colored_ratio = colored_count / len(self.history) if colored_ratio < 0.8: # end比例<80%,从history中找到第二个end并继续check逻辑 self.endpoint_logger.warning(f"colored比例{colored_ratio:.2%}<80%,寻找下一个colored点") # 寻找历史中倒数第二个colored状态 colored_times = [t for t, state, _, _ in self.history if state == "colored"] if len(colored_times) >= 2: # 使用倒数第二个colored时间重新开始检查 second_last_colored_time = colored_times[1] self.state.end_detected_time = second_last_colored_time # 更新colored记录为对应的体积 for t, state, vol, img in self.history: if t == second_last_colored_time and state == "colored": self.colored_volume = vol self.colored_time = t self.colored_im = img.copy() break self.endpoint_logger.info(f"重置到第二个colored点: {self.colored_volume:.2f} ml") else: # 没有足够的colored点,重置end检查 self.state.reset_end_check() self.colored_volume = None self.colored_time = None self.endpoint_logger.info("没有足够的colored点,重置end检查") else: # end比例>=80%,确认终点,终止实验 self.endpoint_logger.info(f"colored比例{colored_ratio:.2%}>=80%,确认滴定终点") self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml") self.running = False self.ch340.stop() if self.colored_im is not None: cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im) return "colored" # 显示状态信息 self._display_status(im, ret, rate, val) return ret def _reset_colored_detection(self, current_time, colored_ratio, current_volume): """重置colored检测状态 - 已被新逻辑替代,保留以避免错误""" pass def _display_status(self, im, detection_result, rate, volume): """显示状态信息到图像上""" mode_color = { State.Mode.FAST: (255, 255, 255), State.Mode.SLOW: (10, 215, 255), State.Mode.ABOUT: (228,116,167), # State.Mode.END: (0, 0, 255) } status_text = f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml" status_text += self.state.get_status_text() cv2.putText(im, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, mode_color[self.state.mode], 2) cv2.imshow("Frame", im) cv2.waitKey(1) def predictor(self,im): hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV) s = hsv[:,:,1] mask = s>30 cv2.imshow('mask',im*mask[:,:,np.newaxis]) tot = mask.shape[0]*mask.shape[1] val = np.sum(mask) rate = val/tot if rate < 0.02: return "transport",rate elif rate <0.2: return "middle",rate elif rate < 0.35: return "about",rate else: return "colored",rate def __del__(self): self.cap.release() if hasattr(self, 'capFile') and self.capFile.isOpened(): self.capFile.release() cv2.destroyAllWindows() def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, end_time=1, cap_dir="Videos"): self.running = True self.start_time = time.time() self.speeds = (quick_speed, slow_speed, end_speed,end_speed) if cap_dir is not None: vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" else: vid_name = "Disabled" # 记录实验开始 experiment_params = { "视频源索引 ": self.videoSourceIndex, "防抖时间 ": self.state.bounce_time, "快速模式速度": f"{quick_speed} ml/次", "慢速模式速度": f"{slow_speed} ml/次", "录制视频 ": vid_name } self.system_logger.info("实验参数:") for key, value in experiment_params.items(): self.system_logger.info(f" {key}: {value}") fps = int(self.cap.get(cv2.CAP_PROP_FPS)) or 30 width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 初始化视频录制器 if vid_name != "Disabled": fourcc = cv2.VideoWriter.fourcc(*'x264') # 使用更兼容的编码器 self.capFile = cv2.VideoWriter(vid_name, fourcc, fps, (width, height)) if not self.capFile.isOpened(): self.system_logger.error(f"无法打开视频录制器: {vid_name}") del self.capFile else: self.system_logger.info(f"视频录制已初始化") self.ch340_init() cnt = 0 self._start_time = time.time() while self.running: if not self.ch340.running: if 12 * cnt - self.total_volume < 0.5: self.ch340_pull() cnt += 1 time.sleep(0.01) # 简化的推送逻辑 should_push = False if self.state.is_fast_mode(): should_push = True elif self.state.is_slow_mode() and time.time() - self.ch340.start > mid_time: should_push = True elif self.state.is_about_mode() and time.time() - self.ch340.start > end_time: should_push = True elif self.state.is_about_mode() and time.time() - self.ch340.start > end_time: should_push = True if should_push: speed = self.speeds[self.state.mode.value] self.volume_logger.info(f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次") self.ch340_push(speed) self.total_volume += speed if self._pred() is None: self.control_logger.error("预测失败,跳过当前帧") continue # 释放视频录制器 if hasattr(self, 'capFile') and self.capFile.isOpened(): self.capFile.release() self.system_logger.info(f"视频录制完成: {vid_name}") # 实验结束,记录结果 experiment_results = { "总体积 ": f"{self.total_volume:.2f} ml", "终点体积": f"{self.colored_volume:.2f} ml", "理论体积": f"{self.colored_volume:.2f} ml", "实验时长": f"{time.time() - self.start_time:.2f} 秒" } self.system_logger.info("实验结果:") for key, value in experiment_results.items(): self.system_logger.info(f" {key}: {value}") if __name__ == "__main__": # 创建MAT类的实例并运行 mat = MAT( videoSourceIndex = 1, bounce_time=3, end_bounce_time=15 ) mat.run( slow_speed = 0.05, quick_speed = 0.15, end_time= 2 # cap_dir=None )