From 5795b7c2dacfc4a74ebd67c4472f812c946c3ff8 Mon Sep 17 00:00:00 2001 From: flt6 <1404262047@qq.com> Date: Thu, 29 May 2025 19:24:57 +0800 Subject: [PATCH] 0529-2 Former-commit-id: 09404bbedb1ff5b9ff9ad0753867012826cf84dc --- main.py | 286 +++++++++++++++++++++++++++++-------------------------- utils.py | 88 +++++++++++++++++ 2 files changed, 238 insertions(+), 136 deletions(-) diff --git a/main.py b/main.py index 47ddc88..4069a99 100644 --- a/main.py +++ b/main.py @@ -5,17 +5,11 @@ import numpy as np import ch340 import atexit import utils -from enum import Enum +from utils import State -class Flags(Enum): - ENDCHK = 0b01 - MIDCHK = 0b10 - @staticmethod - def un(flag): - return 0b11 ^ flag.value class MAT: - def __init__(self, videoSourceIndex=0, bounce_time=1,end_bounce_time=5): + def __init__(self, videoSourceIndex=0, bounce_time=1, end_bounce_time=5): # 初始化logging utils.setup_logging() self.system_logger = utils.get_system_logger() @@ -27,17 +21,15 @@ class MAT: self.videoSourceIndex = videoSourceIndex self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) self.ch340 = ch340.CH340() - self.bounce_time = bounce_time - self.end_bounce_time = end_bounce_time self.total_volume = 0 - self.middle_time = 0 - self.flags = 0 + + self.state = State(bounce_time, end_bounce_time) atexit.register(self.ch340.stop) - self.history = [] # 滑动窗口历史记录 - self.colored_volume = None # 首次colored体积 - self.colored_time = None # 首次colored时间 - self.colored_im = None # 首次colored图像 + self.history = [] + self.colored_volume = None + self.colored_time = None + self.colored_im = None def ch340_pull(self): self.control_logger.info("开始抽取12ml") @@ -53,124 +45,155 @@ class MAT: def ch340_push(self, speed=0.1): self.ch340.push_async(speed=speed, t=1) - def process_left(self,now:float, velue_only=False): + def process_left(self, now: float, value_only=False): st = self.ch340.start - if not velue_only: self.ch340.stop() - assert abs(self.ch340._get_time()-1) < 1e3 # run time should be 1s - r = now-st - ret = self.total_volume - (1-r)*self.speeds[self.typ] # 减去未完成的体积 - if velue_only:return ret - else:self.total_volume = ret + if not value_only: + self.ch340.stop() + assert abs(self.ch340._get_time()-1) < 1e3 + r = now - st + ret = self.total_volume - (1-r) * self.speeds[self.state.mode.value] + if value_only: + return ret + else: + self.total_volume = ret def _pred(self): - """预测当前图像状态,返回'transport'、'middle'或'colored'""" + """预测当前图像状态,返回'transport'、'middle'、'about'或'colored'""" suc, im = self.cap.read() if not suc: self.control_logger.error("无法从摄像头捕获帧") return None + # 录制当前帧到视频文件 if hasattr(self, 'capFile') and self.capFile.isOpened(): self.capFile.write(im) - ret,rate = self.predictor(im) + ret, rate = self.predictor(im) if ret is None: return None now = time.time() + val = self.process_left(now, value_only=True) - val = self.process_left(now,velue_only=True) - - # 记录历史状态,保留bounce_time时间内的记录 + # 更新滑动窗口历史记录 - 维护最近end_bounce_time内的状态 self.history.append((now, ret, val, im)) - while self.history and self.history[0][0] < now - self.end_bounce_time: + while self.history and self.history[0][0] < now - self.state.end_bounce_time: self.history.pop(0) if not self.history: self.control_logger.error("未预期的没有可用的历史记录") return ret - # 处理检测结果 + # === 状态进入逻辑 === + if now - self._start_time<17: + self._display_status(im, ret, rate, val) + return ret + # 1. middle: predictor返回middle立即进入slow状态 if ret == "middle": - # 检测到middle立即切换到慢速模式 - if self.typ == 0: + if self.state.is_fast_mode(): self.process_left(now) - self.typ = 1 - self.middle_time = now - self.flags |= Flags.MIDCHK.value - self.control_logger.info(f"检测到middle,切换到慢速模式,当前体积: {val:.2f} ml") + self.state.enter_middle_state(now) + self.control_logger.info(f"检测到middle,立即进入slow模式,当前体积: {val:.2f} ml") + # 2. about: 返回about,且处于middle则进入about状态 + elif ret == "about": + if self.state.is_slow_mode(): + self.process_left(now) + self.state.enter_about_state(now) + self.control_logger.info(f"检测到about,进入about模式,当前体积: {val:.2f} ml") + + # 3. end:返回colored,置end_check标记开始检查 elif ret == "colored": - # 检测到colored时记录当前体积 - if self.colored_volume is None: - self.colored_volume = val - self.colored_time = now - self.colored_im = self.history[-1][3] # 保存当前图像 - self.flags |= Flags.ENDCHK.value - self.endpoint_logger.info(f"检测到colored,记录体积: {self.colored_volume:.2f} ml") - # 检查是否要停止 - if self.colored_time is not None and now - self.colored_time > self.end_bounce_time: - colored_count = sum(1 for _, state, _,_ in self.history if state == "colored") - if colored_count / len(self.history) > 0.9: - self.endpoint_logger.info(f"确认终点,最终体积: {self.colored_volume:.2f} ml") - self.running = False - self.ch340.stop() - cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im) - return "colored" - else: - self.flags &= Flags.un(Flags.ENDCHK) - self.endpoint_logger.warning(f"colored比例小于90%,当前体积: {val:.2f} ml, 比例: {colored_count / len(self.history):.2%}") + if not self.state.in_end_check: + self.state.enter_end_check(now) + if self.colored_volume is None: self.colored_volume = val self.colored_time = now - else: # ret == "transport" - pass + self.colored_im = im.copy() + self.endpoint_logger.info(f"检测到colored,开始end检查,记录体积: {val:.2f} ml") - # 在慢速模式下检查是否要切回快速模式 - if self.typ == 1 and now - self.middle_time > self.bounce_time and not self.flags & Flags.ENDCHK.value: - non_middle_count = sum(1 for _, state, _,_ in self.history if state == "transport") - if non_middle_count / len(self.history) > 0.8: - self.typ = 0 - self.process_left(now) - # TODO: 滑动到第一个middle状态 - self.middle_time = 0 - self.control_logger.info(f"非middle比例超过80%,切回快速模式,当前体积: {val:.2f} ml") - self.flags &= Flags.un(Flags.MIDCHK) + # === 状态检查逻辑 === - # 如果已记录colored但在bounce_time内colored比例小于90%,重置 - if self.colored_volume is not None and self.colored_time is not None and now - self.colored_time > self.end_bounce_time: - colored_count = sum(1 for _, state, _,_ in self.history if state == "colored") - if colored_count / len(self.history) < 0.9: - self.endpoint_logger.warning(f"colored比例小于90%,重置colored记录") - self.colored_volume = None - flag = False - for t, state,vol,_ in self.history: - if state == "colored": - if not flag: - flag = True - continue - self.endpoint_logger.debug(f"滑动窗口到: {vol:.2f} ml at {t}") - self.colored_volume = vol - self.colored_time = t - break - if self.colored_volume is None: - self.flags &= Flags.un(Flags.ENDCHK) - self.typ = 0 - else: - self.endpoint_logger.warning("异常情况,colored超过90%但最后状态是transport") - self.endpoint_logger.info(f"疑似滴定终点: {self.colored_volume:.2f} ml") + # middle检查: 进入middle的bounce_time后,在最近bounce_time内middle比例<70%返回fast状态 + if self.state.should_check_middle_exit(now): + # 计算最近bounce_time内的middle比例 + bounce_start_time = now - self.state.bounce_time + recent_history = [(t, state, v, i) for t, state, v, i in self.history if t >= bounce_start_time] + + if recent_history: + middle_count = sum(1 for _, state, _, _ in recent_history if state == "transport") + middle_ratio = middle_count / len(recent_history) + + if middle_ratio > 0.3: + self.process_left(now) + self.state.exit_middle_check() + # about状态随middle一起退出 + self.state.exit_about_with_middle() + self.control_logger.info(f"middle比例{middle_ratio:.2%}<70%,退出middle检查,返回fast模式") - t = f"Stat: {ret}, rate: {round(rate,2)}, Vol: {val:.2f} ml" - if self.middle_time > 0 and (now - self.middle_time)= 2: + # 使用倒数第二个colored时间重新开始检查 + second_last_colored_time = colored_times[1] + self.state.end_detected_time = second_last_colored_time + + # 更新colored记录为对应的体积 + for t, state, vol, img in self.history: + if t == second_last_colored_time and state == "colored": + self.colored_volume = vol + self.colored_time = t + self.colored_im = img.copy() + break + + self.endpoint_logger.info(f"重置到第二个colored点: {self.colored_volume:.2f} ml") + else: + # 没有足够的colored点,重置end检查 + self.state.reset_end_check() + self.colored_volume = None + self.colored_time = None + self.endpoint_logger.info("没有足够的colored点,重置end检查") + else: # end比例>=80%,确认终点,终止实验 + self.endpoint_logger.info(f"colored比例{colored_ratio:.2%}>=80%,确认滴定终点") + self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml") + self.running = False + self.ch340.stop() + if self.colored_im is not None: + cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im) + return "colored" + # 显示状态信息 + self._display_status(im, ret, rate, val) + return ret + + def _reset_colored_detection(self, current_time, colored_ratio, current_volume): + """重置colored检测状态 - 已被新逻辑替代,保留以避免错误""" + pass + + def _display_status(self, im, detection_result, rate, volume): + """显示状态信息到图像上""" + mode_color = { + State.Mode.FAST: (255, 255, 255), + State.Mode.SLOW: (10, 215, 255), + State.Mode.ABOUT: (228,116,167), + # State.Mode.END: (0, 0, 255) + } + + status_text = f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml" + status_text += self.state.get_status_text() + + cv2.putText(im, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, + mode_color[self.state.mode], 2) cv2.imshow("Frame", im) cv2.waitKey(1) - return ret def predictor(self,im): hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV) @@ -184,7 +207,7 @@ class MAT: return "transport",rate elif rate <0.2: return "middle",rate - elif rate < 0.4: + elif rate < 0.35: return "about",rate else: return "colored",rate @@ -196,11 +219,11 @@ class MAT: cv2.destroyAllWindows() - def run(self,quick_speed = 0.2,slow_speed = 0.05,end_speed = 0.02,mid_time = 0.5,end_time=1, cap_dir="Videos"): + def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, end_time=1, cap_dir="Videos"): self.running = True - self.typ = 0 self.start_time = time.time() - self.speeds = (quick_speed, slow_speed,end_speed) + self.speeds = (quick_speed, slow_speed, end_speed,end_speed) + if cap_dir is not None: vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" else: @@ -209,7 +232,7 @@ class MAT: # 记录实验开始 experiment_params = { "视频源索引 ": self.videoSourceIndex, - "防抖时间 ": self.bounce_time, + "防抖时间 ": self.state.bounce_time, "快速模式速度": f"{quick_speed} ml/次", "慢速模式速度": f"{slow_speed} ml/次", "录制视频 ": vid_name @@ -218,16 +241,12 @@ class MAT: for key, value in experiment_params.items(): self.system_logger.info(f" {key}: {value}") - speed = self.speeds[self.typ] - - # 获取摄像头的帧率和分辨率 fps = int(self.cap.get(cv2.CAP_PROP_FPS)) or 30 width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - - # 初始化视频录制器 + # 初始化视频录制器 if vid_name != "Disabled": - fourcc = cv2.VideoWriter_fourcc(*'avc1') + fourcc = cv2.VideoWriter.fourcc(*'x264') # 使用更兼容的编码器 self.capFile = cv2.VideoWriter(vid_name, fourcc, fps, (width, height)) if not self.capFile.isOpened(): self.system_logger.error(f"无法打开视频录制器: {vid_name}") @@ -236,35 +255,30 @@ class MAT: self.system_logger.info(f"视频录制已初始化") self.ch340_init() - cnt=0 + cnt = 0 + self._start_time = time.time() while self.running: - match (self.flags): - case 0: - self.typ = 0 - case Flags.MIDCHK.value: - self.typ = 1 - case Flags.ENDCHK.value: - self.typ = 2 - if self.flags&Flags.ENDCHK.value and self.typ != 2: - # self.system_logger.warning("Colored状态但未进入慢速") - self.typ = 2 - # if self.flags == Flags.MIDCHK.value and self.typ != 1: - # self.system_logger.warning("Middle状态但未进入慢速") - # self.typ = 1 - if not self.ch340.running: - if 12*cnt - self.total_volume<0.5: - self.ch340_pull() # 抽取12ml - cnt+=1 + if 12 * cnt - self.total_volume < 0.5: + self.ch340_pull() + cnt += 1 time.sleep(0.01) - flag = False - flag |= self.typ == 0 - flag |= self.typ == 1 and time.time()-self.ch340.start > mid_time - flag |= self.typ == 2 and time.time()-self.ch340.start > end_time - if flag: + + # 简化的推送逻辑 + should_push = False + if self.state.is_fast_mode(): + should_push = True + elif self.state.is_slow_mode() and time.time() - self.ch340.start > mid_time: + should_push = True + elif self.state.is_about_mode() and time.time() - self.ch340.start > end_time: + should_push = True + elif self.state.is_about_mode() and time.time() - self.ch340.start > end_time: + should_push = True + + if should_push: + speed = self.speeds[self.state.mode.value] self.volume_logger.info(f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次") - speed = self.speeds[self.typ] self.ch340_push(speed) self.total_volume += speed @@ -294,7 +308,7 @@ if __name__ == "__main__": mat = MAT( videoSourceIndex = 1, bounce_time=3, - end_bounce_time=20 + end_bounce_time=15 ) mat.run( diff --git a/utils.py b/utils.py index dc04cb5..de99403 100644 --- a/utils.py +++ b/utils.py @@ -1,6 +1,94 @@ import logging import os from datetime import datetime +from enum import Enum + +class State: + """滴定状态管理类""" + class Mode(Enum): + FAST = 0 # 快速模式 + SLOW = 1 # 慢速模式 (middle) + ABOUT = 2 # 接近终点模式 + # END = 3 # 终点模式 + + def __init__(self, bounce_time=1, end_bounce_time=5): + self.mode = self.Mode.FAST + self.bounce_time = bounce_time + self.end_bounce_time = end_bounce_time + + # 状态检查标志 + self.in_middle_check = False + self.in_end_check = False + + # 时间记录 + self.middle_detected_time = None + self.end_detected_time = None + + def is_fast_mode(self): + return self.mode == self.Mode.FAST + + def is_slow_mode(self): + return self.mode == self.Mode.SLOW + + def is_about_mode(self): + return self.mode == self.Mode.ABOUT + + # def is_end_mode(self): + # return self.mode == self.Mode.END + + def enter_middle_state(self, current_time): + """进入middle状态 - 立即切换到slow模式并开始检查""" + self.mode = self.Mode.SLOW + self.in_middle_check = True + self.middle_detected_time = current_time + + def enter_about_state(self, current_time): + """从middle状态进入about状态""" + if self.mode == self.Mode.SLOW: + self.mode = self.Mode.ABOUT + + def enter_end_check(self, current_time): + """进入end检查状态""" + self.in_end_check = True + self.end_detected_time = current_time + self.mode = self.Mode.ABOUT + + def exit_middle_check(self): + """退出middle检查状态,返回fast模式""" + self.in_middle_check = False + self.middle_detected_time = None + self.mode = self.Mode.FAST + + def exit_about_with_middle(self): + """about状态随middle一起退出""" + if self.mode == self.Mode.ABOUT: + self.mode = self.Mode.FAST + + def should_check_middle_exit(self, current_time): + """检查是否应该进行middle退出检查""" + return (self.in_middle_check and + self.middle_detected_time is not None and + current_time - self.middle_detected_time > self.bounce_time) + + def should_check_end_result(self, current_time): + """检查是否应该进行end结果检查""" + return (self.in_end_check and + self.end_detected_time is not None and + current_time - self.end_detected_time > self.end_bounce_time) + + def reset_end_check(self): + """重置end检查状态""" + self.in_end_check = False + self.end_detected_time = None + + def get_status_text(self): + """获取状态显示文本""" + status = [] + if self.in_middle_check: + status.append("MIDCHK") + if self.in_end_check: + status.append("ENDCHK") + return ", " + ", ".join(status) if status else "" def setup_logging(log_level=logging.INFO, log_dir="logs"):