import os import cv2 import numpy as np import matplotlib.pyplot as plt import re from datetime import datetime import main as _main import argparse class RateVolumeAnalyzer: def __init__(self): self.mat = _main.MAT() # 创建MAT实例以使用predictor方法 def parse_log_file(self, log_path): """解析日志文件,提取体积和时间信息""" volumes = [] timestamps = [] with open(log_path, 'r', encoding='utf-8') as f: for line in f: # 匹配体积信息的行 if '当前体积:' in line: # 提取时间戳 time_match = re.search(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d+', line) # 提取体积 volume_match = re.search(r'当前体积: ([\d.]+) ml', line) if time_match and volume_match: timestamp = datetime.strptime(time_match.group(1), '%Y-%m-%d %H:%M:%S') volume = float(volume_match.group(1)) timestamps.append(timestamp) volumes.append(volume) return timestamps, volumes def extract_frames_from_video(self, video_path, timestamps, start_time): """从视频中提取对应时间点的帧""" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"无法打开视频文件: {video_path}") return [] fps = cap.get(cv2.CAP_PROP_FPS) frames = [] for timestamp in timestamps: # 计算相对于实验开始的秒数 seconds_from_start = (timestamp - start_time).total_seconds() frame_number = int(seconds_from_start * fps) # 设置视频位置到指定帧 cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = cap.read() if ret: frames.append(frame) else: frames.append(None) # 如果无法读取帧,添加None cap.release() return frames def calculate_rates(self, frames): """使用predictor方法计算每帧的rate""" rates = [] states = [] for frame in frames: if frame is not None: try: state, rate = self.mat.predictor(frame) rates.append(rate) states.append(state) except Exception as e: print(f"计算rate时发生错误: {e}") rates.append(None) states.append(None) else: rates.append(None) states.append(None) return rates, states def plot_rate_volume_curve(self, volumes, rates, states, log_filename): """绘制rate-体积曲线""" # 过滤掉None值 valid_data = [(v, r, s) for v, r, s in zip(volumes, rates, states) if r is not None and s is not None] if not valid_data: print("没有有效的数据点可以绘制") return volumes_valid, rates_valid, states_valid = zip(*valid_data) plt.figure(figsize=(12, 8)) # 根据状态用不同颜色绘制点 colors = {'transport': 'blue', 'middle': 'orange', 'about': 'purple', 'colored': 'red'} for state in colors: state_volumes = [v for v, s in zip(volumes_valid, states_valid) if s == state] state_rates = [r for r, s in zip(rates_valid, states_valid) if s == state] if state_volumes: plt.scatter(state_volumes, state_rates, c=colors[state], label=state, alpha=0.7, s=50) # 绘制连接线 plt.plot(volumes_valid, rates_valid, 'k-', alpha=0.3, linewidth=1) plt.xlabel('体积 (ml)', fontsize=12) plt.ylabel('Rate', fontsize=12) plt.title(f'Rate-体积曲线 ({log_filename})', fontsize=14) plt.legend() plt.grid(True, alpha=0.3) # 保存图片 output_filename = f"rate_volume_curve_{log_filename.replace('.log', '.png')}" plt.savefig(output_filename, dpi=300, bbox_inches='tight') plt.show() print(f"图片已保存为: {output_filename}") # 打印统计信息 print(f"\n统计信息:") print(f"总数据点: {len(valid_data)}") for state in colors: count = sum(1 for s in states_valid if s == state) if count > 0: print(f"{state}: {count} 个点") def analyze_experiment(self, timestamp_str=None): """分析指定时间戳的实验,如果不指定则分析最新的实验""" logs_dir = "logs" videos_dir = "Videos" if timestamp_str: log_file = f"titration_{timestamp_str}.log" video_file = f"{timestamp_str}.mp4" else: # 找到最新的日志文件 log_files = [f for f in os.listdir(logs_dir) if f.endswith('.log')] if not log_files: print("没有找到日志文件") return log_files.sort() log_file = log_files[-1] # 提取时间戳来找对应的视频文件 timestamp_match = re.search(r'titration_(\d{8}_\d{6})\.log', log_file) if timestamp_match: video_file = f"{timestamp_match.group(1)}.mp4" else: print("无法从日志文件名提取时间戳") return log_path = os.path.join(logs_dir, "titration_20250529_191634.log") video_path = os.path.join(videos_dir, "tmp.mp4") if not os.path.exists(log_path): print(f"日志文件不存在: {log_path}") return if not os.path.exists(video_path): print(f"视频文件不存在: {video_path}") return print(f"分析实验: {log_file}") print(f"对应视频: {video_file}") # 解析日志文件 timestamps, volumes = self.parse_log_file(log_path) if not timestamps: print("日志文件中没有找到体积数据") return print(f"找到 {len(timestamps)} 个数据点") # 获取实验开始时间 start_time = timestamps[0] # 从视频中提取帧 print("正在从视频中提取帧...") frames = self.extract_frames_from_video(video_path, timestamps, start_time) # 计算rate print("正在计算rate值...") rates, states = self.calculate_rates(frames) # 绘制曲线 print("正在绘制rate-体积曲线...") self.plot_rate_volume_curve(volumes, rates, states, log_file) def main(): parser = argparse.ArgumentParser(description='分析滴定实验的rate-体积曲线') parser.add_argument('--timestamp', type=str, help='指定实验时间戳 (格式: YYYYMMDD_HHMMSS)') parser.add_argument('--list', action='store_true', help='列出所有可用的实验') args = parser.parse_args() analyzer = RateVolumeAnalyzer() if args.list: # 列出所有可用的实验 logs_dir = "logs" if os.path.exists(logs_dir): log_files = [f for f in os.listdir(logs_dir) if f.endswith('.log')] log_files.sort() print("可用的实验:") for log_file in log_files: timestamp_match = re.search(r'titration_(\d{8}_\d{6})\.log', log_file) if timestamp_match: timestamp = timestamp_match.group(1) print(f" {timestamp}") return analyzer.analyze_experiment(args.timestamp) if __name__ == "__main__": main()