#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 滴定分析Demo程序 从Videos目录获取最新视频,解析logs或直接处理视频,生成rate随时间变化的可视化图像 """ import os import re import cv2 import time import numpy as np import matplotlib.pyplot as plt import matplotlib.dates as mdates from datetime import datetime, timedelta from pathlib import Path from typing import List, Tuple, Dict, Optional import logging from dataclasses import dataclass # 设置中文字体 plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei'] plt.rcParams['axes.unicode_minus'] = False @dataclass class DataPoint: """数据点结构""" timestamp: float relative_time: float # 相对于开始时间的秒数 state: str rate: float volume: float mode: str = "FAST" # FAST, SLOW, ABOUT, CRAZY class TitrationDemo: def __init__(self, base_dir: str = "c:/expiriment/ai-titration-main"): """初始化Demo类""" self.base_dir = Path(base_dir) self.videos_dir = self.base_dir / "Videos" self.logs_dir = self.base_dir / "logs" # 模式对应的速度映射 (ml/次) self.mode_speeds = { 'FAST': 0.45, # 快速模式速度 'SLOW': 0.05, # 慢速模式速度 'ABOUT': 0.02, # 精密模式速度 'CRAZY': 1.0, # 疯狂模式速度 'END': 0.0 # 结束模式无速度 } # 模式颜色映射 self.mode_colors = { 'FAST': '#FF6B6B', # 红色 'SLOW': '#4ECDC4', # 青色 'ABOUT': '#45B7D1', # 蓝色 'CRAZY': '#96CEB4', # 绿色 'END': '#FFEAA7' # 黄色 } # 状态颜色映射 self.state_colors = { 'transport': '#95A5A6', # 灰色 'middle': '#F39C12', # 橙色 'about': '#E74C3C', # 深红色 'colored': '#8E44AD' # 紫色 } # 设置日志 logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) def get_latest_video(self) -> Optional[Path]: """获取最新的视频文件""" video_files = [] for ext in ['*.mp4', '*.mkv', '*.avi']: video_files.extend(self.videos_dir.glob(ext)) if not video_files: self.logger.error("未找到任何视频文件") return None # 按文件名中的时间戳排序,获取最新的 def extract_timestamp(filename: str) -> datetime: # 提取形如 20250606_200940 的时间戳 match = re.search(r'(\d{8}_\d{6})', filename) if match: return datetime.strptime(match.group(1), '%Y%m%d_%H%M%S') return datetime.min latest_video = max(video_files, key=lambda x: extract_timestamp(x.name)) self.logger.info(f"选择最新视频: {latest_video.name}") return latest_video def get_corresponding_log(self, video_path: Path) -> Optional[Path]: """获取对应的日志文件""" # 从视频文件名提取时间戳 timestamp_match = re.search(r'(\d{8}_\d{6})', video_path.name) if not timestamp_match: return None timestamp = timestamp_match.group(1) log_file = self.logs_dir / f"titration_{timestamp}.log" if log_file.exists(): self.logger.info(f"找到对应日志文件: {log_file.name}") return log_file else: self.logger.warning(f"未找到对应日志文件: {log_file.name}") return None def parse_log_data(self, log_path: Path) -> List[DataPoint]: """解析日志文件,提取数据点""" data_points = [] start_time = None current_mode = "FAST" with open(log_path, 'r', encoding='utf-8') as f: for line in f: # 解析时间戳 timestamp_match = re.match(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})', line) if not timestamp_match: continue timestamp_str = timestamp_match.group(1) timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S,%f') timestamp_float = timestamp.timestamp() if start_time is None: start_time = timestamp_float relative_time = timestamp_float - start_time # 解析模式变化 if "进入slow模式" in line or "检测到middle" in line: current_mode = "SLOW" elif "进入about模式" in line or "检测到about" in line: current_mode = "ABOUT" elif "返回fast模式" in line or "退出middle检查" in line: current_mode = "FAST" elif "检测到colored" in line: current_mode = "END" # 解析体积信息 volume_match = re.search(r'当前体积:\s*([\d.]+)\s*ml', line) if volume_match: volume = float(volume_match.group(1)) # 这里rate需要通过视频处理获得,暂时设置为0 data_points.append(DataPoint( timestamp=timestamp_float, relative_time=relative_time, state="transport", # 默认状态 rate=0.0, # 需要通过视频处理获得 volume=volume, mode=current_mode )) return data_points def interpolate_volume_data(self, data_points: List[DataPoint], interpolation_interval: float = 0.1) -> List[DataPoint]: """ 通过mode求speed反推log两个体积数据点之间的数据 Args: data_points: 原始数据点列表 interpolation_interval: 插值间隔(秒) Returns: 插值后的数据点列表 """ if len(data_points) < 2: return data_points interpolated_points = [] for i in range(len(data_points)): # 添加原始数据点 interpolated_points.append(data_points[i]) # 如果不是最后一个点,进行插值 if i < len(data_points) - 1: current_point = data_points[i] next_point = data_points[i + 1] # 计算时间差和体积差 time_diff = next_point.relative_time - current_point.relative_time volume_diff = next_point.volume - current_point.volume # 如果时间差大于插值间隔,进行插值 if time_diff > interpolation_interval: # 根据当前模式获取速度 current_speed = self.mode_speeds.get(current_point.mode, 0.0) # 计算需要插值的点数 num_interpolations = int(time_diff / interpolation_interval) for j in range(1, num_interpolations + 1): # 计算插值时间 interp_time = current_point.relative_time + j * interpolation_interval # 如果插值时间超过下一个点的时间,跳出 if interp_time >= next_point.relative_time: break # 根据模式和时间计算体积 # 假设每次推进的时间间隔内,体积按照模式速度增长 elapsed_time = interp_time - current_point.relative_time # 估算体积增长(基于推进频率和速度) # 假设推进频率:FAST=1次/秒,SLOW=0.5次/秒,ABOUT=0.2次/秒 push_frequency = { 'FAST': 1.0, 'SLOW': 0.5, 'ABOUT': 0.2, 'CRAZY': 2.0, 'END': 0.0 } freq = push_frequency.get(current_point.mode, 1.0) estimated_volume = current_point.volume + elapsed_time * freq * current_speed # 确保体积不超过下一个点的体积 if estimated_volume > next_point.volume: estimated_volume = current_point.volume + (elapsed_time / time_diff) * volume_diff # 创建插值数据点 interp_point = DataPoint( timestamp=current_point.timestamp + elapsed_time, relative_time=interp_time, state=current_point.state, # 继承当前状态 rate=0.0, # 插值点的rate需要通过视频获得 volume=estimated_volume, mode=current_point.mode # 继承当前模式 ) interpolated_points.append(interp_point) self.logger.info(f"原始数据点: {len(data_points)}, 插值后数据点: {len(interpolated_points)}") return interpolated_points def process_video_with_predictor(self, video_path: Path, sample_interval: int = 5) -> List[DataPoint]: """使用predictor处理视频,提取rate数据""" # 导入main模块中的predictor方法 import sys sys.path.append(str(self.base_dir)) try: from main import MAT from utils import History except ImportError: self.logger.error("无法导入MAT类或History类") return [] cap = cv2.VideoCapture(str(video_path)) if not cap.isOpened(): self.logger.error(f"无法打开视频文件: {video_path}") return [] fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.logger.info(f"视频信息: FPS={fps}, 总帧数={total_frames}") # 创建MAT实例用于predictor,简化初始化 try: mat = MAT(videoSourceIndex=0, bounce_time=4, end_bounce_time=1) # 创建简化的历史记录管理器 mat.history = History(max_window_size=100.0, base_time=5.0) except Exception as e: self.logger.error(f"MAT初始化失败: {e}") return [] data_points = [] frame_count = 0 base_calculated = False rates_for_base = [] # 用于模式推断的变量 current_mode = "FAST" mode_change_frame = 0 while True: ret, frame = cap.read() if not ret: break # 更密集的采样 if frame_count % sample_interval == 0: try: # 调用predictor获取状态和rate state, rate = mat.predictor(frame) relative_time = frame_count / fps # 收集前50帧的rate值来计算base if not base_calculated and len(rates_for_base) < 50: if state == "transport": # 只用transport状态的rate计算base rates_for_base.append(rate) if len(rates_for_base) >= 50: base_rate = float(np.mean(rates_for_base)) if mat.history: mat.history.base = base_rate base_calculated = True self.logger.info(f"计算得到基准rate: {base_rate:.4f}") # 基于rate和状态推断模式 if base_calculated and mat.history and mat.history.base: base = mat.history.base thresholds = (base * 5, base * 13, base * 20) # 基于predictor返回的状态推断模式 if state == "transport": if current_mode != "FAST" and frame_count - mode_change_frame > fps * 2: # 2秒稳定期 current_mode = "FAST" mode_change_frame = frame_count elif state == "middle": if current_mode == "FAST": current_mode = "SLOW" mode_change_frame = frame_count elif state == "about": if current_mode in ["FAST", "SLOW"]: current_mode = "ABOUT" mode_change_frame = frame_count elif state == "colored": current_mode = "END" mode_change_frame = frame_count data_points.append(DataPoint( timestamp=time.time(), relative_time=relative_time, state=state, rate=rate, volume=0.0, # 视频中无法直接获得体积信息 mode=current_mode )) except Exception as e: self.logger.warning(f"处理帧 {frame_count} 时出错: {e}") continue frame_count += 1 # 更频繁的进度显示 if frame_count % (sample_interval * 50) == 0: progress = (frame_count / total_frames) * 100 self.logger.info(f"处理进度: {progress:.1f}% - 已处理 {len(data_points)} 个数据点") cap.release() cv2.destroyAllWindows() self.logger.info(f"视频处理完成,共获得 {len(data_points)} 个数据点") return data_points def _smooth_mode_changes(self, data_points: List[DataPoint]) -> List[DataPoint]: """平滑模式变化,避免频繁跳转""" if len(data_points) < 3: return data_points smoothed_points = data_points.copy() window_size = 5 # 平滑窗口大小 for i in range(window_size, len(data_points) - window_size): # 获取窗口内的模式 window_modes = [p.mode for p in data_points[i-window_size:i+window_size+1]] # 使用众数作为当前点的模式 mode_counts = {} for mode in window_modes: mode_counts[mode] = mode_counts.get(mode, 0) + 1 most_common_mode = max(mode_counts.items(), key=lambda x: x[1])[0] smoothed_points[i].mode = most_common_mode return smoothed_points def merge_log_and_video_data(self, log_data: List[DataPoint], video_data: List[DataPoint]) -> List[DataPoint]: """合并日志和视频数据""" if not log_data: return video_data if not video_data: return log_data # 以日志数据为基础,补充视频中的rate信息 merged_data = [] for log_point in log_data: # 找到时间最接近的视频数据点 closest_video_point = min(video_data, key=lambda x: abs(x.relative_time - log_point.relative_time)) # 如果时间差在合理范围内,使用视频的rate数据 if abs(closest_video_point.relative_time - log_point.relative_time) < 5.0: log_point.rate = closest_video_point.rate log_point.state = closest_video_point.state merged_data.append(log_point) return merged_data def create_visualization(self, data_points: List[DataPoint], save_path: Optional[str] = None) -> None: """创建可视化图表""" if not data_points: self.logger.error("没有数据点可供可视化") return # 提取数据 times = [point.relative_time for point in data_points] rates = [point.rate for point in data_points] volumes = [point.volume for point in data_points] modes = [point.mode for point in data_points] states = [point.state for point in data_points] # 创建图表 fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10)) # 上图:Rate随时间变化,按Mode着色 self._plot_rate_by_mode(ax1, times, rates, modes) # 下图:Volume随时间变化,按State着色 self._plot_volume_by_state(ax2, times, volumes, states) # 调整布局 plt.tight_layout() # 保存或显示 if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') self.logger.info(f"图表已保存至: {save_path}") else: plt.show() def _plot_rate_by_mode(self, ax, times: List[float], rates: List[float], modes: List[str]) -> None: """绘制Rate随时间变化图,按Mode着色""" # 按模式分组绘制 current_mode = None current_times = [] current_rates = [] for i, (time, rate, mode) in enumerate(zip(times, rates, modes)): if mode != current_mode: # 绘制前一段 if current_times and current_mode: ax.plot(current_times, current_rates, color=self.mode_colors.get(current_mode, '#000000'), linewidth=2, label=f'{current_mode} Mode' if current_mode not in ax.get_legend_handles_labels()[1] else "") # 开始新的一段 current_mode = mode current_times = [time] current_rates = [rate] else: current_times.append(time) current_rates.append(rate) # 绘制最后一段 if current_times and current_mode: ax.plot(current_times, current_rates, color=self.mode_colors.get(current_mode, '#000000'), linewidth=2, label=f'{current_mode} Mode' if current_mode not in ax.get_legend_handles_labels()[1] else "") ax.set_xlabel('时间 (秒)') ax.set_ylabel('Rate') ax.set_title('Rate随时间变化 (按操作模式着色)') ax.legend(loc='upper right') ax.grid(True, alpha=0.3) def _plot_volume_by_state(self, ax, times: List[float], volumes: List[float], states: List[str]) -> None: """绘制Volume随时间变化图,按State着色""" # 过滤掉volume为0的点(来自视频数据) filtered_data = [(t, v, s) for t, v, s in zip(times, volumes, states) if v > 0] if not filtered_data: ax.text(0.5, 0.5, '无体积数据', ha='center', va='center', transform=ax.transAxes) ax.set_xlabel('时间 (秒)') ax.set_ylabel('体积 (ml)') ax.set_title('体积随时间变化 (按检测状态着色)') return f_times, f_volumes, f_states = zip(*filtered_data) # 按状态分组绘制 current_state = None current_times = [] current_volumes = [] for time, volume, state in zip(f_times, f_volumes, f_states): if state != current_state: # 绘制前一段 if current_times and current_state: ax.plot(current_times, current_volumes, color=self.state_colors.get(current_state, '#000000'), linewidth=2, marker='o', markersize=3, label=f'{current_state.title()} State' if current_state not in ax.get_legend_handles_labels()[1] else "") # 开始新的一段 current_state = state current_times = [time] current_volumes = [volume] else: current_times.append(time) current_volumes.append(volume) # 绘制最后一段 if current_times and current_state: ax.plot(current_times, current_volumes, color=self.state_colors.get(current_state, '#000000'), linewidth=2, marker='o', markersize=3, label=f'{current_state.title()} State' if current_state not in ax.get_legend_handles_labels()[1] else "") ax.set_xlabel('时间 (秒)') ax.set_ylabel('体积 (ml)') ax.set_title('体积随时间变化 (按检测状态着色)') ax.legend(loc='lower right') ax.grid(True, alpha=0.3) def run_demo(self, use_video: bool = True, sample_interval: int = 5) -> None: """运行demo程序""" self.logger.info("开始运行滴定分析Demo...") # 1. 获取最新视频 latest_video = self.get_latest_video() if not latest_video: return # 2. 获取对应日志 log_file = self.get_corresponding_log(latest_video) # 3. 解析数据 log_data = [] if log_file: self.logger.info("解析日志数据...") log_data = self.parse_log_data(log_file) video_data = [] if use_video: self.logger.info("处理视频数据...") video_data = self.process_video_with_predictor(latest_video, sample_interval) # 4. 合并数据 if log_data and video_data: self.logger.info("合并日志和视频数据...") final_data = self.merge_log_and_video_data(log_data, video_data) elif log_data: final_data = log_data elif video_data: final_data = video_data else: self.logger.error("没有可用的数据") return # 5. 创建可视化 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') save_path = self.base_dir / f"rate_analysis_{timestamp}.png" self.logger.info("创建可视化图表...") self.create_visualization(final_data, str(save_path)) self.logger.info("Demo运行完成!") # 打印统计信息 self._print_statistics(final_data) def _print_statistics(self, data_points: List[DataPoint]) -> None: """打印数据统计信息""" if not data_points: return print("\n=== 数据统计信息 ===") print(f"总数据点数: {len(data_points)}") print(f"时间范围: {data_points[0].relative_time:.1f}s - {data_points[-1].relative_time:.1f}s") # 模式统计 mode_counts = {} for point in data_points: mode_counts[point.mode] = mode_counts.get(point.mode, 0) + 1 print("\n模式分布:") for mode, count in mode_counts.items(): percentage = (count / len(data_points)) * 100 print(f" {mode}: {count} 次 ({percentage:.1f}%)") # Rate统计 rates = [point.rate for point in data_points if point.rate > 0] if rates: print(f"\nRate统计:") print(f" 最小值: {min(rates):.4f}") print(f" 最大值: {max(rates):.4f}") print(f" 平均值: {np.mean(rates):.4f}") # 体积统计 volumes = [point.volume for point in data_points if point.volume > 0] if volumes: print(f"\n体积统计:") print(f" 最小值: {min(volumes):.2f} ml") print(f" 最大值: {max(volumes):.2f} ml") def main(): """主函数""" demo = TitrationDemo() # 运行demo # use_video=True: 同时处理视频和日志数据 # use_video=False: 仅使用日志数据 # sample_interval: 视频采样间隔(每N帧处理一次)- 设置为更小的值获得更密集的数据点 demo.run_demo(use_video=True, sample_interval=1) # 每3帧采样一次,大幅增加数据点 if __name__ == "__main__": main()