import logging import os from datetime import datetime from enum import Enum from dataclasses import dataclass from typing import List, Tuple, Optional import numpy as np import time @dataclass class HistoryRecord: """历史记录的单个条目""" timestamp: float state: str # 'transport', 'middle', 'about', 'colored' rate: float volume: float image: np.ndarray class History: """滑动窗口历史记录管理类""" def __init__(self, max_window_size: float = 5.0, base_time = 5.0): """ 初始化历史记录管理器 Args: max_window_size: 最大窗口时间长度(秒) """ if base_time > max_window_size: max_window_size = base_time+0.2 # raise ValueError("Base time must be less than or equal to max window size.") self.records: List[HistoryRecord] = [] self.max_window_size = max_window_size self.fulled = False self.base = None self._base_time = base_time def add_record(self, timestamp: float, state: str, rate: float, volume: float, image: np.ndarray): """添加新的历史记录""" record = HistoryRecord(timestamp, state,rate, volume, image) self.records.append(record) self._cleanup_old_records(timestamp) if self.base is None and (timestamp-self._base_time)>= self.records[0].timestamp: base_records = self.get_recent_records(self._base_time, timestamp) self.base = sum([rec.rate for rec in base_records]) / len(base_records) get_endpoint_logger().info("Base rate calculated: %.2f", self.base) def _cleanup_old_records(self, current_time: float): """清理过期的历史记录""" cutoff_time = current_time - self.max_window_size if not self.records and self.records[0].timestamp < cutoff_time: return False while self.records and self.records[0].timestamp < cutoff_time: self.records.pop(0) return True def get_records_in_timespan(self, start_time: float, end_time: Optional[float] = None) -> List[HistoryRecord]: """获取指定时间段内的记录""" if end_time is None: return [record for record in self.records if record.timestamp >= start_time] else: return [record for record in self.records if start_time <= record.timestamp <= end_time] def get_recent_records(self, duration: float, current_time: float) -> List[HistoryRecord]: """获取最近指定时间长度内的记录""" start_time = current_time - duration return self.get_records_in_timespan(start_time, current_time) def get_state_ratio(self, target_state: str, records: Optional[List[HistoryRecord]] = None) -> float: """计算指定状态在记录中的比例""" if records is None: records = self.records if not records: return 0.0 target_count = sum(1 for record in records if record.state == target_state) return target_count / len(records) def get_states_by_type(self, target_state: str) -> List[float]: """获取所有指定状态的时间戳""" return [record.timestamp for record in self.records if record.state == target_state] def find_record_by_timestamp(self, target_timestamp: float) -> Optional[HistoryRecord]: """根据时间戳查找记录""" for record in self.records: if record.timestamp == target_timestamp: return record return None def is_empty(self) -> bool: """检查历史记录是否为空""" return len(self.records) == 0 def __len__(self) -> int: """返回历史记录的数量""" return len(self.records) def clear(self): """清空所有历史记录""" self.records.clear() class State: """滴定状态管理类""" class Mode(Enum): FAST = 0 # 快速模式 SLOW = 1 # 慢速模式 (middle) ABOUT = 2 # 接近终点模式 # END = 3 # 终点模式 def __init__(self, bounce_time=1, end_bounce_time=5): self.mode = self.Mode.FAST self.bounce_time = bounce_time self.end_bounce_time = end_bounce_time # 状态检查标志 self.in_middle_check = False self.in_end_check = False # 时间记录 self.middle_detected_time = None self.end_detected_time = None def is_fast_mode(self): return self.mode == self.Mode.FAST def is_slow_mode(self): return self.mode == self.Mode.SLOW def is_about_mode(self): return self.mode == self.Mode.ABOUT # def is_end_mode(self): # return self.mode == self.Mode.END def enter_middle_state(self, current_time): """进入middle状态 - 立即切换到slow模式并开始检查""" self.mode = self.Mode.SLOW self.in_middle_check = True self.middle_detected_time = current_time def enter_about_state(self, current_time): """从middle状态进入about状态""" if self.mode == self.Mode.SLOW: self.mode = self.Mode.ABOUT def enter_end_check(self, current_time): """进入end检查状态""" self.in_end_check = True self.end_detected_time = current_time self.mode = self.Mode.ABOUT def exit_middle_check(self): """退出middle检查状态,返回fast模式""" self.in_middle_check = False self.middle_detected_time = None self.mode = self.Mode.FAST def exit_about_with_middle(self): """about状态随middle一起退出""" if self.mode == self.Mode.ABOUT: self.mode = self.Mode.FAST def should_check_middle_exit(self, current_time): """检查是否应该进行middle退出检查""" return (self.in_middle_check and self.middle_detected_time is not None and current_time - self.middle_detected_time > self.bounce_time) def should_check_end_result(self, current_time): """检查是否应该进行end结果检查""" return (self.in_end_check and self.end_detected_time is not None and current_time - self.end_detected_time > self.end_bounce_time) def reset_end_check(self): """重置end检查状态""" self.in_end_check = False self.end_detected_time = None def get_status_text(self): """获取状态显示文本""" status = [] if self.in_middle_check: status.append("MIDCHK") if self.in_end_check: status.append("ENDCHK") return ", " + ", ".join(status) if status else "" def setup_logging(log_level=logging.INFO, log_dir="logs"): """ 设置logging配置,创建不同模块的logger Args: log_level: 日志级别,默认INFO log_dir: 日志文件存储目录,默认"logs" """ # 创建日志目录 if not os.path.exists(log_dir): os.makedirs(log_dir) # 获取当前时间作为日志文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') log_file = os.path.join(log_dir, f"titration_{timestamp}.log") # 配置根logger logging.basicConfig( level=log_level, format='%(asctime)s - %(name)8s - %(levelname)7s - %(message)s', handlers=[ logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler() # 同时输出到控制台 ] ) return log_file def get_system_logger(): """系统初始化和控制相关的logger""" return logging.getLogger("System") def get_hardware_logger(): """硬件控制相关的logger(CH340等)""" return logging.getLogger("Hardware") def get_vision_logger(): """图像处理和预测相关的logger""" return logging.getLogger("Vision") def get_control_logger(): """控制逻辑相关的logger""" return logging.getLogger("Control") def get_endpoint_logger(): """终点检测相关的logger""" return logging.getLogger("Endpoint") def get_volume_logger(): """体积计算相关的logger""" return logging.getLogger("Volume")