From 4e387287ebbbfa0ac0180531cb2fe1446bd54e8a Mon Sep 17 00:00:00 2001 From: flt6 <1404262047@qq.com> Date: Tue, 3 Jun 2025 23:26:51 +0800 Subject: [PATCH] history Former-commit-id: c078eab42975bcbc5fd98aae28228937b5694d19 --- ch340.py | 2 +- main.py | 84 ++++++++++++++++++++++++---------------------- utils.py | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 41 deletions(-) diff --git a/ch340.py b/ch340.py index efda3e1..ddbc949 100644 --- a/ch340.py +++ b/ch340.py @@ -79,7 +79,7 @@ if OFFLINE_DEBUG: def pull(self,speed=None,t=None,vol=None): self._prepare(speed,t,vol) self.start = time.time() - time.sleep(self._get_time()) + time.sleep(self._get_time()/4) def push_async(self,speed=None,t=None,vol=None): self._prepare(speed,t,vol) diff --git a/main.py b/main.py index 0fa6b52..5547c7a 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ import numpy as np import ch340 import atexit import utils -from utils import State +from utils import State, History class MAT: @@ -17,7 +17,7 @@ class MAT: self.endpoint_logger = utils.get_endpoint_logger() self.volume_logger = utils.get_volume_logger() - self.system_logger.info('正在初始化MAT系统...') + self.system_logger.info('正在初始化MAT系统...') self.videoSourceIndex = videoSourceIndex self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) self.ch340 = ch340.CH340() @@ -26,7 +26,7 @@ class MAT: self.state = State(bounce_time, end_bounce_time) atexit.register(self.ch340.stop) - self.history = [] + self.history = History(max(bounce_time, end_bounce_time)) self.colored_volume = None self.colored_time = None self.colored_im = None @@ -53,7 +53,7 @@ class MAT: r = now - st ret = self.total_volume - (1-r) * self.speeds[self.state.mode.value] if value_only: - return ret + return ret else: self.total_volume = ret @@ -71,16 +71,18 @@ class MAT: ret, rate = self.predictor(im) if ret is None: return None - now = time.time() val = self.process_left(now, value_only=True) - # 更新滑动窗口历史记录 - 维护最近end_bounce_time内的状态 - self.history.append((now, ret, val, im)) - while self.history and self.history[0][0] < now - self.state.end_bounce_time: - self.history.pop(0) + # 确保val不为None + if val is None: + self.control_logger.warning("体积计算返回None,跳过本次记录") + return ret - if not self.history: + # 更新滑动窗口历史记录 - 维护最近end_bounce_time内的状态 + self.history.add_record(now, ret,rate, val, im) + + if self.history.is_empty(): self.control_logger.error("未预期的没有可用的历史记录") return ret @@ -111,49 +113,44 @@ class MAT: self.colored_time = now self.colored_im = im.copy() self.endpoint_logger.info(f"检测到colored,开始end检查,记录体积: {val:.2f} ml") - - # === 状态检查逻辑 === + # === 状态检查逻辑 === # middle检查: 进入middle的bounce_time后,在最近bounce_time内middle比例<70%返回fast状态 if self.state.should_check_middle_exit(now): # 计算最近bounce_time内的middle比例 - bounce_start_time = now - self.state.bounce_time - recent_history = [(t, state, v, i) for t, state, v, i in self.history if t >= bounce_start_time] + recent_records = self.history.get_recent_records(self.state.bounce_time, now) - if recent_history: - trans_count = sum(1 for _, state, _, _ in recent_history if state == "transport") - trans_ratio = trans_count / len(recent_history) + if recent_records: + trans_ratio = self.history.get_state_ratio("transport", recent_records) if trans_ratio > 0.3: self.process_left(now) self.state.exit_middle_check() # about状态随middle一起退出 - self.state.exit_about_with_middle() + self.state.exit_about_with_middle() self.control_logger.info(f"middle比例{trans_ratio:.2%}<70%,退出middle检查,返回fast模式") # end检查: 进入end之后的end_bounce_time,如果end比例<80%,则重置;否则终止实验 if self.state.should_check_end_result(now): - colored_count = sum(1 for _, state, _, _ in self.history if state == "colored") - colored_ratio = colored_count / len(self.history) + colored_ratio = self.history.get_state_ratio("colored", self.history.get_recent_records(self.state.end_bounce_time, now)) if colored_ratio < 0.8: # end比例<80%,从history中找到第二个end并继续check逻辑 self.endpoint_logger.warning(f"colored比例{colored_ratio:.2%}<80%,寻找下一个colored点") # 寻找历史中倒数第二个colored状态 - colored_times = [t for t, state, _, _ in self.history if state == "colored"] + colored_times = self.history.get_states_by_type("colored") if len(colored_times) >= 2: # 使用倒数第二个colored时间重新开始检查 second_last_colored_time = colored_times[1] self.state.end_detected_time = second_last_colored_time # 更新colored记录为对应的体积 - for t, state, vol, img in self.history: - if t == second_last_colored_time and state == "colored": - self.colored_volume = vol - self.colored_time = t - self.colored_im = img.copy() - break + record = self.history.find_record_by_timestamp(second_last_colored_time) + if record: + self.colored_volume = record.volume + self.colored_time = record.timestamp + self.colored_im = record.image.copy() self.endpoint_logger.info(f"重置到第二个colored点: {self.colored_volume:.2f} ml") else: @@ -170,7 +167,8 @@ class MAT: if self.colored_im is not None: cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im) return "colored" - # 显示状态信息 + + # 显示状态信息 self._display_status(im, ret, rate, val) return ret @@ -199,14 +197,20 @@ class MAT: tot = mask.shape[0]*mask.shape[1] val = np.sum(mask) rate = val/tot - if rate < 0.02: - return "transport",rate - elif rate <0.2: - return "middle",rate - elif rate < 0.35: - return "about",rate + if self.history.base is not None: + base = self.history.base + thr = (base*5, base*10, base*30) + if rate < thr[0]: + return "transport",rate + elif rate max_window_size: + max_window_size = base_time+0.2 + # raise ValueError("Base time must be less than or equal to max window size.") + self.records: List[HistoryRecord] = [] + self.max_window_size = max_window_size + self.fulled = False + self.base = None + self._base_time = base_time + + def add_record(self, timestamp: float, state: str, rate: float, volume: float, image: np.ndarray): + """添加新的历史记录""" + record = HistoryRecord(timestamp, state,rate, volume, image) + self.records.append(record) + self._cleanup_old_records(timestamp) + if self.base is None and (timestamp-self._base_time)>= self.records[0].timestamp: + base_records = self.get_recent_records(self._base_time, timestamp) + self.base = sum([rec.rate for rec in base_records]) / len(base_records) + get_endpoint_logger().info("Base rate calculated: %.2f", self.base) + + def _cleanup_old_records(self, current_time: float): + """清理过期的历史记录""" + cutoff_time = current_time - self.max_window_size + if not self.records and self.records[0].timestamp < cutoff_time: + return False + while self.records and self.records[0].timestamp < cutoff_time: + self.records.pop(0) + return True + + def get_records_in_timespan(self, start_time: float, end_time: Optional[float] = None) -> List[HistoryRecord]: + """获取指定时间段内的记录""" + if end_time is None: + return [record for record in self.records if record.timestamp >= start_time] + else: + return [record for record in self.records + if start_time <= record.timestamp <= end_time] + + def get_recent_records(self, duration: float, current_time: float) -> List[HistoryRecord]: + """获取最近指定时间长度内的记录""" + start_time = current_time - duration + return self.get_records_in_timespan(start_time, current_time) + + def get_state_ratio(self, target_state: str, records: Optional[List[HistoryRecord]] = None) -> float: + """计算指定状态在记录中的比例""" + if records is None: + records = self.records + + if not records: + return 0.0 + + target_count = sum(1 for record in records if record.state == target_state) + return target_count / len(records) + + def get_states_by_type(self, target_state: str) -> List[float]: + """获取所有指定状态的时间戳""" + return [record.timestamp for record in self.records if record.state == target_state] + + def find_record_by_timestamp(self, target_timestamp: float) -> Optional[HistoryRecord]: + """根据时间戳查找记录""" + for record in self.records: + if record.timestamp == target_timestamp: + return record + return None + + def is_empty(self) -> bool: + """检查历史记录是否为空""" + return len(self.records) == 0 + + def __len__(self) -> int: + """返回历史记录的数量""" + return len(self.records) + + def clear(self): + """清空所有历史记录""" + self.records.clear() + class State: """滴定状态管理类"""