diff --git a/.gitignore b/.gitignore index 096bba5..c901de6 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ logs Videos *.build *.dist +HCHO +upx.exe \ No newline at end of file diff --git a/README.md b/README.md index e69de29..f3111bf 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,14 @@ +状态管理逻辑: +- 滑动窗口 +维护一个滑动窗口history,记录最近end_bounce_time内的状态。 +状态包括:时间,浓度,状态,当前截图 + +- 进入 +1. middle: predictor返回middle立即进入slow状态。 +2. about: 返回about,且处于middle则进入about状态 +3. end:返回end,置end_check标记开始检查 + +- check +1. middle: 进入middle的bounce_time后,在最近bounce_time内middle比例<70%返回fast状态。 +2. about:随着middle退出一起退出,不单独处理 +3. end:进入end之后的end_bounce_time,如果end_bounce_time内end比例<80%,则实验终止逻辑。否则从history中找到第二个end并继续check逻辑。 \ No newline at end of file diff --git a/analyze_rate_volume.py b/analyze_rate_volume.py new file mode 100644 index 0000000..ba550e4 --- /dev/null +++ b/analyze_rate_volume.py @@ -0,0 +1,223 @@ +import os +import cv2 +import numpy as np +import matplotlib.pyplot as plt +import re +from datetime import datetime +import main as _main +import argparse + +class RateVolumeAnalyzer: + def __init__(self): + self.mat = _main.MAT() # 创建MAT实例以使用predictor方法 + + def parse_log_file(self, log_path): + """解析日志文件,提取体积和时间信息""" + volumes = [] + timestamps = [] + + with open(log_path, 'r', encoding='utf-8') as f: + for line in f: + # 匹配体积信息的行 + if '当前体积:' in line: + # 提取时间戳 + time_match = re.search(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d+', line) + # 提取体积 + volume_match = re.search(r'当前体积: ([\d.]+) ml', line) + + if time_match and volume_match: + timestamp = datetime.strptime(time_match.group(1), '%Y-%m-%d %H:%M:%S') + volume = float(volume_match.group(1)) + timestamps.append(timestamp) + volumes.append(volume) + + return timestamps, volumes + + def extract_frames_from_video(self, video_path, timestamps, start_time): + """从视频中提取对应时间点的帧""" + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + print(f"无法打开视频文件: {video_path}") + return [] + + fps = cap.get(cv2.CAP_PROP_FPS) + frames = [] + + for timestamp in timestamps: + # 计算相对于实验开始的秒数 + seconds_from_start = (timestamp - start_time).total_seconds() + frame_number = int(seconds_from_start * fps) + + # 设置视频位置到指定帧 + cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) + ret, frame = cap.read() + + if ret: + frames.append(frame) + else: + frames.append(None) # 如果无法读取帧,添加None + + cap.release() + return frames + + def calculate_rates(self, frames): + """使用predictor方法计算每帧的rate""" + rates = [] + states = [] + + for frame in frames: + if frame is not None: + try: + state, rate = self.mat.predictor(frame) + rates.append(rate) + states.append(state) + except Exception as e: + print(f"计算rate时发生错误: {e}") + rates.append(None) + states.append(None) + else: + rates.append(None) + states.append(None) + + return rates, states + + def plot_rate_volume_curve(self, volumes, rates, states, log_filename): + """绘制rate-体积曲线""" + # 过滤掉None值 + valid_data = [(v, r, s) for v, r, s in zip(volumes, rates, states) + if r is not None and s is not None] + + if not valid_data: + print("没有有效的数据点可以绘制") + return + + volumes_valid, rates_valid, states_valid = zip(*valid_data) + + plt.figure(figsize=(12, 8)) + + # 根据状态用不同颜色绘制点 + colors = {'transport': 'blue', 'middle': 'orange', 'about': 'purple', 'colored': 'red'} + + for state in colors: + state_volumes = [v for v, s in zip(volumes_valid, states_valid) if s == state] + state_rates = [r for r, s in zip(rates_valid, states_valid) if s == state] + + if state_volumes: + plt.scatter(state_volumes, state_rates, + c=colors[state], label=state, alpha=0.7, s=50) + + # 绘制连接线 + plt.plot(volumes_valid, rates_valid, 'k-', alpha=0.3, linewidth=1) + + plt.xlabel('体积 (ml)', fontsize=12) + plt.ylabel('Rate', fontsize=12) + plt.title(f'Rate-体积曲线 ({log_filename})', fontsize=14) + plt.legend() + plt.grid(True, alpha=0.3) + + # 保存图片 + output_filename = f"rate_volume_curve_{log_filename.replace('.log', '.png')}" + plt.savefig(output_filename, dpi=300, bbox_inches='tight') + plt.show() + + print(f"图片已保存为: {output_filename}") + + # 打印统计信息 + print(f"\n统计信息:") + print(f"总数据点: {len(valid_data)}") + for state in colors: + count = sum(1 for s in states_valid if s == state) + if count > 0: + print(f"{state}: {count} 个点") + + def analyze_experiment(self, timestamp_str=None): + """分析指定时间戳的实验,如果不指定则分析最新的实验""" + logs_dir = "logs" + videos_dir = "Videos" + + if timestamp_str: + log_file = f"titration_{timestamp_str}.log" + video_file = f"{timestamp_str}.mp4" + else: + # 找到最新的日志文件 + log_files = [f for f in os.listdir(logs_dir) if f.endswith('.log')] + if not log_files: + print("没有找到日志文件") + return + + log_files.sort() + log_file = log_files[-1] + + # 提取时间戳来找对应的视频文件 + timestamp_match = re.search(r'titration_(\d{8}_\d{6})\.log', log_file) + if timestamp_match: + video_file = f"{timestamp_match.group(1)}.mp4" + else: + print("无法从日志文件名提取时间戳") + return + + log_path = os.path.join(logs_dir, "titration_20250529_191634.log") + video_path = os.path.join(videos_dir, "tmp.mp4") + + if not os.path.exists(log_path): + print(f"日志文件不存在: {log_path}") + return + + if not os.path.exists(video_path): + print(f"视频文件不存在: {video_path}") + return + + print(f"分析实验: {log_file}") + print(f"对应视频: {video_file}") + + # 解析日志文件 + timestamps, volumes = self.parse_log_file(log_path) + + if not timestamps: + print("日志文件中没有找到体积数据") + return + + print(f"找到 {len(timestamps)} 个数据点") + + # 获取实验开始时间 + start_time = timestamps[0] + + # 从视频中提取帧 + print("正在从视频中提取帧...") + frames = self.extract_frames_from_video(video_path, timestamps, start_time) + + # 计算rate + print("正在计算rate值...") + rates, states = self.calculate_rates(frames) + + # 绘制曲线 + print("正在绘制rate-体积曲线...") + self.plot_rate_volume_curve(volumes, rates, states, log_file) + +def main(): + parser = argparse.ArgumentParser(description='分析滴定实验的rate-体积曲线') + parser.add_argument('--timestamp', type=str, help='指定实验时间戳 (格式: YYYYMMDD_HHMMSS)') + parser.add_argument('--list', action='store_true', help='列出所有可用的实验') + + args = parser.parse_args() + + analyzer = RateVolumeAnalyzer() + + if args.list: + # 列出所有可用的实验 + logs_dir = "logs" + if os.path.exists(logs_dir): + log_files = [f for f in os.listdir(logs_dir) if f.endswith('.log')] + log_files.sort() + print("可用的实验:") + for log_file in log_files: + timestamp_match = re.search(r'titration_(\d{8}_\d{6})\.log', log_file) + if timestamp_match: + timestamp = timestamp_match.group(1) + print(f" {timestamp}") + return + + analyzer.analyze_experiment(args.timestamp) + +if __name__ == "__main__": + main() diff --git a/ch340_gui.exe b/ch340_gui.exe new file mode 100644 index 0000000..67cdd98 --- /dev/null +++ b/ch340_gui.exe @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:fc7d1088d4f1edb2c939f12a2d5cd136142744c6ddd23835f91710a0383d295f +size 21382656 diff --git a/main.py b/main.py index 4069a99..d6f4361 100644 --- a/main.py +++ b/main.py @@ -121,15 +121,15 @@ class MAT: recent_history = [(t, state, v, i) for t, state, v, i in self.history if t >= bounce_start_time] if recent_history: - middle_count = sum(1 for _, state, _, _ in recent_history if state == "transport") - middle_ratio = middle_count / len(recent_history) + trans_count = sum(1 for _, state, _, _ in recent_history if state == "transport") + trans_ratio = trans_count / len(recent_history) - if middle_ratio > 0.3: + if trans_ratio > 0.3: self.process_left(now) self.state.exit_middle_check() # about状态随middle一起退出 self.state.exit_about_with_middle() - self.control_logger.info(f"middle比例{middle_ratio:.2%}<70%,退出middle检查,返回fast模式") + self.control_logger.info(f"middle比例{trans_ratio:.2%}<70%,退出middle检查,返回fast模式") # end检查: 进入end之后的end_bounce_time,如果end比例<80%,则重置;否则终止实验 if self.state.should_check_end_result(now): @@ -174,10 +174,6 @@ class MAT: self._display_status(im, ret, rate, val) return ret - def _reset_colored_detection(self, current_time, colored_ratio, current_volume): - """重置colored检测状态 - 已被新逻辑替代,保留以避免错误""" - pass - def _display_status(self, im, detection_result, rate, volume): """显示状态信息到图像上""" mode_color = { diff --git a/requirements.txt b/requirements.txt index 59a25f0..7d0c445 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ numpy opencv-python pyserial matplotlib +PySide6 diff --git a/test_logging.py b/test_logging.py deleted file mode 100644 index 209a826..0000000 --- a/test_logging.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -""" -测试logging系统的功能 -""" - -import utils - -def test_logging(): - """测试logging系统""" - # 初始化logging - log_file = utils.setup_logging() - print(f"日志文件已创建: {log_file}") - - # 获取各个模块的logger - system_logger = utils.get_system_logger() - hardware_logger = utils.get_hardware_logger() - vision_logger = utils.get_vision_logger() - control_logger = utils.get_control_logger() - endpoint_logger = utils.get_endpoint_logger() - volume_logger = utils.get_volume_logger() - - # 测试各个模块的日志输出 - system_logger.info("系统初始化完成") - hardware_logger.info("硬件连接正常") - vision_logger.debug("图像处理开始") - control_logger.warning("控制逻辑警告") - endpoint_logger.error("终点检测错误") - volume_logger.info("体积计算: 10.5 ml") - - print("日志测试完成!请检查logs目录下的日志文件。") - -if __name__ == "__main__": - test_logging()