import atexit import logging import time from datetime import datetime from typing import Optional import cv2 import numpy as np from scipy.signal import find_peaks import ch340 import utils from utils import History, State, login_to_platform, send_data_to_platform class MAT: def __init__(self, videoSourceIndex=0, bounce_time=2, sensity=30): # 初始化logging utils.setup_logging() self.system_logger = logging.getLogger("System") self.control_logger = logging.getLogger("Control") self.endpoint_logger = logging.getLogger("Endpoint") self.volume_logger = logging.getLogger("Volume") self.system_logger.info("正在初始化MAT系统...") self.videoSourceIndex = videoSourceIndex self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) self.ch340 = ch340.CH340() self.total_volume = 0 self.sensity = sensity self.state = State(bounce_time) atexit.register(self.ch340.stop) self.history = History(bounce_time) self.colored_volume = None self.colored_time = None self.colored_im = None def ch340_pull(self): "抽取12ml液体" self.control_logger.info("开始抽取12ml") self.ch340.max_speed() self.ch340.pull(vol=12) self.control_logger.info("完成抽取") def ch340_init(self): "初始化电机位置,防止过头导致抽取溶液不准。" # self.ch340.push(speed=1,t=1) self.ch340.pull(speed=1.2, vol=1.8) self.control_logger.info("电极位置已初始化") def ch340_push(self, speed: int | float = 0.1): "常规推送,保证推送时间为1s,推送体积=speed" self.ch340.push_async(speed=speed, t=1) def process_left( self, now: float, value_only: Optional[bool] = False ) -> Optional[float]: """ 计算当前时刻剩余体积,并更新到self.total_volume。 @param now: 当前时间戳 @param value_only: 如果为True,则只返回剩余体积,不更新状态 @return: 当前剩余体积,仅当value_only为True时返回 """ if self.state.mode == State.Mode.ABOUT: return self.total_volume st = self.ch340.start if not value_only: self.ch340.stop() if not abs(self.ch340._get_time() - 1) < 1e-3: self.control_logger.warning( f"CH340 timing issue detected: {self.ch340._get_time()}" ) return self.total_volume r = now - st ret = self.total_volume - (1 - r) * self.speeds[self.state.mode.value] if value_only: return ret else: self.total_volume = ret def _pred(self): """预测当前图像状态,返回'transport'、'middle'、'about'或'colored'""" suc, im = self.cap.read() if not suc: self.control_logger.error("无法从摄像头捕获帧") return None # 录制当前帧到视频文件 if hasattr(self, "capFile") and self.capFile.isOpened(): self.capFile.write(im) ret, rate = self.predictor(im) if ret is None: return None now = time.time() val = self.process_left(now, value_only=True) # 确保val不为None if val is None: self.control_logger.warning("体积计算返回None,跳过本次记录") return ret # 更新滑动窗口历史记录 - 维护最近end_bounce_time内的状态 self.history.add_record(now, ret, rate, val, im) if self.history.is_empty(): self.control_logger.error("未预期的没有可用的历史记录") return ret # === 状态进入逻辑 === if now - self._start_time < 10: self._display_status(im, ret, rate, val) return ret if ( not self.edited and self.history.base is not None and rate > self.history.base * 2 ): self.edited = True self.speeds[0] /= 2 # 1. middle: predictor返回middle立即进入slow状态 if ret == "middle": if self.state.is_fast_mode(): self.process_left(now) self.state.enter_middle_state(now) self.control_logger.info( f"检测到middle,立即进入slow模式,当前体积: {val:.2f} ml" ) # 2. about: 返回about,且处于middle则进入about状态 elif ret == "about": if self.state.is_slow_mode(): self.process_left(now) self.state.enter_about_state(now) self.control_logger.info( f"检测到about,进入about模式,当前体积: {val:.2f} ml" ) # middle检查: 进入middle的bounce_time后,在最近bounce_time内middle比例<70%返回fast状态 if self.state.should_check_middle_exit(now) and not self.history.last_end: # 计算最近bounce_time内的middle比例 recent_records = self.history.get_recent_records( self.state.bounce_time, now ) if recent_records: trans_ratio = self.history.get_state_ratio("transport", recent_records) if trans_ratio > 0.4: self.process_left(now) self.state.exit_middle_check() self.control_logger.info( f"middle比例{trans_ratio:.2%}<60%,退出middle检查,返回fast模式" ) if self.state.mode == State.Mode.ABOUT and self.state.about_check: h = self.history.get_recent_records(self.about_time / 3, now) ratio = self.history.get_state_ratio("transport", h) self.history.about_history.append(ratio == 1) while len(self.history.about_history) > 5: self.history.about_history.pop(0) if len(self.history.about_history) == 5 and self.history.last_end < 2: rate = sum(self.history.about_history) / len(self.history.about_history) if rate > 0.8: self.state.exit_about() self.control_logger.info( f"about比例{ratio:.2%}<80%,退出about检查,返回middle模式" ) ret = self.end_check() if ret == "colored": return ret if not self.history.last_end: self.history.last_end = max(self.history.last_end, ret) self.state.about_check = False # 显示状态信息 self._display_status(im, ret, rate, val) return ret def end_check(self, dep=0): """终点检测,通过对图像直方图峰值分析,确认是否全局变色。""" if not self.running: return "colored" if dep == 1: suc, im = self.cap.read() hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV) # 降低饱和度通道 (S通道是索引1) hsv[:, :, 1] = hsv[:, :, 1] * 0.8 # 转换回BGR颜色空间 result = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) name = f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" cv2.imwrite(name, result) self.colored_im = name # 维护一个递归检测,重复4次后认定为终点。 if dep > 3: self.colored_volume = self.total_volume self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml") self.running = False self.ch340.stop() return "colored" suc, im = self.cap.read() hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV) s = hsv[:, :, 0] s = s[s > 0] hist = cv2.calcHist([s], [0], None, [256], [0, 256]) hist = hist.flatten() # 转换为一维数组 # 峰值检测 - 找到直方图中的峰值 peaks, properties = find_peaks( hist, height=np.max(hist) * 0.2, # 峰值高度至少是最大值的10% distance=10, # 峰值之间的最小距离 prominence=np.max(hist) * 0.01, ) # 峰值的突出度 if np.any(peaks > 130): if dep == 0: self.process_left(time.time()) self.history.end_history.append(True) self.control_logger.info(f"检测到colored状态,end_check at {dep}") bgn = time.time() while time.time() - bgn < 2: suc, im = self.cap.read() if hasattr(self, "capFile") and self.capFile.isOpened(): self.capFile.write(im) cv2.putText( im, "ENDCHK", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, ) cv2.imshow("Frame", im) return self.end_check(dep + 1) # time.sleep(2) else: self.colored_im = None return dep def _display_status(self, im, detection_result, rate, volume): """显示状态信息到图像上""" mode_color = { State.Mode.FAST: (255, 255, 255), State.Mode.SLOW: (10, 215, 255), State.Mode.ABOUT: (228, 116, 167), } status_text = ( f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml" ) status_text += self.state.get_status_text() cv2.putText( im, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, mode_color[self.state.mode], 2, ) cv2.imshow("Frame", im) cv2.waitKey(1) def predictor(self, im): """主预测函数,分析图像并返回当前状态。当前,colored不使用这个函数。""" hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV) s = hsv[:, :, 1] mask = s > self.sensity cv2.imshow("mask", im * mask[:, :, np.newaxis]) tot = mask.shape[0] * mask.shape[1] val = np.sum(mask) rate = val / tot if self.history.base is not None: base = self.history.base thr = (min(0.05, base * 5), min(0.2, base * 13), 0.5) # thr = (base*5, base*13, 0.5) if rate < thr[0]: return "transport", rate elif rate < thr[1]: return "middle", rate elif rate < thr[2]: return "about", rate else: return "colored", rate else: return "transport", rate def __del__(self): self.cap.release() if hasattr(self, "capFile") and self.capFile.isOpened(): self.capFile.release() cv2.destroyAllWindows() def format_date_time(self, timestamp): return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") def run( self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, about_time=1, cap_dir="Videos", ): self.running = True self.start_time = time.time() self.speeds: list[float] = [quick_speed, slow_speed, end_speed] self.need_check = False self.edited = False self.volume_list = [] self.color_list = [] if cap_dir is not None: vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mkv" else: vid_name = "Disabled" # 记录实验开始 experiment_params = { "视频源索引 ": self.videoSourceIndex, "防抖时间 ": self.state.bounce_time, "快速模式速度": f"{quick_speed} ml/次", "慢速模式速度": f"{slow_speed} ml/次", "录制视频 ": vid_name, } self.system_logger.info("实验参数:") for key, value in experiment_params.items(): self.system_logger.info(f" {key}: {value}") fps = int(self.cap.get(cv2.CAP_PROP_FPS)) or 30 width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 初始化视频录制器 if vid_name != "Disabled": fourcc = cv2.VideoWriter.fourcc(*"x264") # 使用更兼容的编码器 self.capFile = cv2.VideoWriter(vid_name, fourcc, fps, (width, height)) if not self.capFile.isOpened(): self.system_logger.error(f"无法打开视频录制器: {vid_name}") del self.capFile else: self.system_logger.info(f"视频录制已初始化") self.ch340_init() cnt = 0 self._start_time = time.time() self.about_time = about_time while self.running: if not self.ch340.running: if 12 * cnt - self.total_volume < 0.5: self.ch340_pull() cnt += 1 should_push = False if self.state.is_fast_mode(): should_push = True elif ( self.state.is_slow_mode() and time.time() - self.ch340.start > mid_time ): should_push = True elif ( self.state.is_about_mode() and time.time() - self.ch340.start > about_time ): if not self.state.about_first_flag: self.state.about_check = True else: self.state.about_first_flag = False should_push = True if should_push: speed = self.speeds[self.state.mode.value] self.volume_logger.info( f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次" ) self.ch340_push(speed) self.total_volume += speed self.volume_list.append(round(self.total_volume, 2)) try: self.color_list.append(self.state.mode.name) except Exception as e: self.control_logger.error(f"报表失败: {e}") if self._pred() is None: self.control_logger.error("预测失败,跳过当前帧") continue # 释放视频录制器 if hasattr(self, "capFile") and self.capFile.isOpened(): self.capFile.release() self.system_logger.info(f"视频录制完成: {vid_name}") # 实验结束,记录结果 experiment_results = { "总体积 ": f"{self.total_volume:.2f} ml", "终点体积": f"{self.colored_volume:.2f} ml", "理论体积": f"{self.colored_volume:.2f} ml", "实验时长": f"{time.time() - self.start_time:.2f} 秒", "滴定速率": f"{self.total_volume / (time.time() - self.start_time):.2f} mL/s", } upload_data = { "start_time": self.format_date_time(self.start_time), "end_time": self.format_date_time(time.time()), "volume_record": f"{self.volume_list}", "voltage_record": f"{[]}", "color_record": f"{self.color_list}", "final_volume": f"{self.colored_volume}", } self.system_logger.info("实验结果:") for key, value in experiment_results.items(): self.system_logger.info(f" {key}: {value}") return upload_data, self.colored_im if __name__ == "__main__": token = login_to_platform("13504022184", "password") # 创建MAT类的实例并运行 mat = MAT(videoSourceIndex=1, bounce_time=2, sensity=32) mat.state.mode = State.Mode.FAST final_data, finish_picture = mat.run( slow_speed=0.05, quick_speed=1.0, about_time=3, # cap_dir=None ) with open("log.json", "w") as f: import json json.dump(final_data, f, indent=4) send_data_to_platform(token, final_data, finish_picture)