Files
ai-titration/analyze_rate_volume.py
flt6 3c32c03c45 0603-1
Former-commit-id: 79f9eff524ee3778b6e010e5fc56d2060118c0fd
2025-06-03 12:26:07 +08:00

224 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import re
from datetime import datetime
import main as _main
import argparse
class RateVolumeAnalyzer:
def __init__(self):
self.mat = _main.MAT() # 创建MAT实例以使用predictor方法
def parse_log_file(self, log_path):
"""解析日志文件,提取体积和时间信息"""
volumes = []
timestamps = []
with open(log_path, 'r', encoding='utf-8') as f:
for line in f:
# 匹配体积信息的行
if '当前体积:' in line:
# 提取时间戳
time_match = re.search(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d+', line)
# 提取体积
volume_match = re.search(r'当前体积: ([\d.]+) ml', line)
if time_match and volume_match:
timestamp = datetime.strptime(time_match.group(1), '%Y-%m-%d %H:%M:%S')
volume = float(volume_match.group(1))
timestamps.append(timestamp)
volumes.append(volume)
return timestamps, volumes
def extract_frames_from_video(self, video_path, timestamps, start_time):
"""从视频中提取对应时间点的帧"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return []
fps = cap.get(cv2.CAP_PROP_FPS)
frames = []
for timestamp in timestamps:
# 计算相对于实验开始的秒数
seconds_from_start = (timestamp - start_time).total_seconds()
frame_number = int(seconds_from_start * fps)
# 设置视频位置到指定帧
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = cap.read()
if ret:
frames.append(frame)
else:
frames.append(None) # 如果无法读取帧添加None
cap.release()
return frames
def calculate_rates(self, frames):
"""使用predictor方法计算每帧的rate"""
rates = []
states = []
for frame in frames:
if frame is not None:
try:
state, rate = self.mat.predictor(frame)
rates.append(rate)
states.append(state)
except Exception as e:
print(f"计算rate时发生错误: {e}")
rates.append(None)
states.append(None)
else:
rates.append(None)
states.append(None)
return rates, states
def plot_rate_volume_curve(self, volumes, rates, states, log_filename):
"""绘制rate-体积曲线"""
# 过滤掉None值
valid_data = [(v, r, s) for v, r, s in zip(volumes, rates, states)
if r is not None and s is not None]
if not valid_data:
print("没有有效的数据点可以绘制")
return
volumes_valid, rates_valid, states_valid = zip(*valid_data)
plt.figure(figsize=(12, 8))
# 根据状态用不同颜色绘制点
colors = {'transport': 'blue', 'middle': 'orange', 'about': 'purple', 'colored': 'red'}
for state in colors:
state_volumes = [v for v, s in zip(volumes_valid, states_valid) if s == state]
state_rates = [r for r, s in zip(rates_valid, states_valid) if s == state]
if state_volumes:
plt.scatter(state_volumes, state_rates,
c=colors[state], label=state, alpha=0.7, s=50)
# 绘制连接线
plt.plot(volumes_valid, rates_valid, 'k-', alpha=0.3, linewidth=1)
plt.xlabel('体积 (ml)', fontsize=12)
plt.ylabel('Rate', fontsize=12)
plt.title(f'Rate-体积曲线 ({log_filename})', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
# 保存图片
output_filename = f"rate_volume_curve_{log_filename.replace('.log', '.png')}"
plt.savefig(output_filename, dpi=300, bbox_inches='tight')
plt.show()
print(f"图片已保存为: {output_filename}")
# 打印统计信息
print(f"\n统计信息:")
print(f"总数据点: {len(valid_data)}")
for state in colors:
count = sum(1 for s in states_valid if s == state)
if count > 0:
print(f"{state}: {count} 个点")
def analyze_experiment(self, timestamp_str=None):
"""分析指定时间戳的实验,如果不指定则分析最新的实验"""
logs_dir = "logs"
videos_dir = "Videos"
if timestamp_str:
log_file = f"titration_{timestamp_str}.log"
video_file = f"{timestamp_str}.mp4"
else:
# 找到最新的日志文件
log_files = [f for f in os.listdir(logs_dir) if f.endswith('.log')]
if not log_files:
print("没有找到日志文件")
return
log_files.sort()
log_file = log_files[-1]
# 提取时间戳来找对应的视频文件
timestamp_match = re.search(r'titration_(\d{8}_\d{6})\.log', log_file)
if timestamp_match:
video_file = f"{timestamp_match.group(1)}.mp4"
else:
print("无法从日志文件名提取时间戳")
return
log_path = os.path.join(logs_dir, "titration_20250529_191634.log")
video_path = os.path.join(videos_dir, "tmp.mp4")
if not os.path.exists(log_path):
print(f"日志文件不存在: {log_path}")
return
if not os.path.exists(video_path):
print(f"视频文件不存在: {video_path}")
return
print(f"分析实验: {log_file}")
print(f"对应视频: {video_file}")
# 解析日志文件
timestamps, volumes = self.parse_log_file(log_path)
if not timestamps:
print("日志文件中没有找到体积数据")
return
print(f"找到 {len(timestamps)} 个数据点")
# 获取实验开始时间
start_time = timestamps[0]
# 从视频中提取帧
print("正在从视频中提取帧...")
frames = self.extract_frames_from_video(video_path, timestamps, start_time)
# 计算rate
print("正在计算rate值...")
rates, states = self.calculate_rates(frames)
# 绘制曲线
print("正在绘制rate-体积曲线...")
self.plot_rate_volume_curve(volumes, rates, states, log_file)
def main():
parser = argparse.ArgumentParser(description='分析滴定实验的rate-体积曲线')
parser.add_argument('--timestamp', type=str, help='指定实验时间戳 (格式: YYYYMMDD_HHMMSS)')
parser.add_argument('--list', action='store_true', help='列出所有可用的实验')
args = parser.parse_args()
analyzer = RateVolumeAnalyzer()
if args.list:
# 列出所有可用的实验
logs_dir = "logs"
if os.path.exists(logs_dir):
log_files = [f for f in os.listdir(logs_dir) if f.endswith('.log')]
log_files.sort()
print("可用的实验:")
for log_file in log_files:
timestamp_match = re.search(r'titration_(\d{8}_\d{6})\.log', log_file)
if timestamp_match:
timestamp = timestamp_match.group(1)
print(f" {timestamp}")
return
analyzer.analyze_experiment(args.timestamp)
if __name__ == "__main__":
main()