visual test 1

Former-commit-id: fd87c16bba13b15e8ebc82d816cbf289ec9118f8
This commit is contained in:
2025-06-08 22:03:50 +08:00
parent c1f33df66d
commit 6ad826c17f
3 changed files with 614 additions and 40 deletions

3
.gitignore vendored
View File

@ -11,4 +11,5 @@ Videos
*.build
*.dist
HCHO
upx.exe
upx.exe
media

611
demo_rate_visualization.py Normal file
View File

@ -0,0 +1,611 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
滴定分析Demo程序
从Videos目录获取最新视频解析logs或直接处理视频生成rate随时间变化的可视化图像
"""
import os
import re
import cv2
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Tuple, Dict, Optional
import logging
from dataclasses import dataclass
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']
plt.rcParams['axes.unicode_minus'] = False
@dataclass
class DataPoint:
"""数据点结构"""
timestamp: float
relative_time: float # 相对于开始时间的秒数
state: str
rate: float
volume: float
mode: str = "FAST" # FAST, SLOW, ABOUT, CRAZY
class TitrationDemo:
def __init__(self, base_dir: str = "c:/expiriment/ai-titration-main"):
"""初始化Demo类"""
self.base_dir = Path(base_dir)
self.videos_dir = self.base_dir / "Videos"
self.logs_dir = self.base_dir / "logs"
# 模式对应的速度映射 (ml/次)
self.mode_speeds = {
'FAST': 0.45, # 快速模式速度
'SLOW': 0.05, # 慢速模式速度
'ABOUT': 0.02, # 精密模式速度
'CRAZY': 1.0, # 疯狂模式速度
'END': 0.0 # 结束模式无速度
}
# 模式颜色映射
self.mode_colors = {
'FAST': '#FF6B6B', # 红色
'SLOW': '#4ECDC4', # 青色
'ABOUT': '#45B7D1', # 蓝色
'CRAZY': '#96CEB4', # 绿色
'END': '#FFEAA7' # 黄色
}
# 状态颜色映射
self.state_colors = {
'transport': '#95A5A6', # 灰色
'middle': '#F39C12', # 橙色
'about': '#E74C3C', # 深红色
'colored': '#8E44AD' # 紫色
}
# 设置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def get_latest_video(self) -> Optional[Path]:
"""获取最新的视频文件"""
video_files = []
for ext in ['*.mp4', '*.mkv', '*.avi']:
video_files.extend(self.videos_dir.glob(ext))
if not video_files:
self.logger.error("未找到任何视频文件")
return None
# 按文件名中的时间戳排序,获取最新的
def extract_timestamp(filename: str) -> datetime:
# 提取形如 20250606_200940 的时间戳
match = re.search(r'(\d{8}_\d{6})', filename)
if match:
return datetime.strptime(match.group(1), '%Y%m%d_%H%M%S')
return datetime.min
latest_video = max(video_files, key=lambda x: extract_timestamp(x.name))
self.logger.info(f"选择最新视频: {latest_video.name}")
return latest_video
def get_corresponding_log(self, video_path: Path) -> Optional[Path]:
"""获取对应的日志文件"""
# 从视频文件名提取时间戳
timestamp_match = re.search(r'(\d{8}_\d{6})', video_path.name)
if not timestamp_match:
return None
timestamp = timestamp_match.group(1)
log_file = self.logs_dir / f"titration_{timestamp}.log"
if log_file.exists():
self.logger.info(f"找到对应日志文件: {log_file.name}")
return log_file
else:
self.logger.warning(f"未找到对应日志文件: {log_file.name}")
return None
def parse_log_data(self, log_path: Path) -> List[DataPoint]:
"""解析日志文件,提取数据点"""
data_points = []
start_time = None
current_mode = "FAST"
with open(log_path, 'r', encoding='utf-8') as f:
for line in f:
# 解析时间戳
timestamp_match = re.match(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})', line)
if not timestamp_match:
continue
timestamp_str = timestamp_match.group(1)
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S,%f')
timestamp_float = timestamp.timestamp()
if start_time is None:
start_time = timestamp_float
relative_time = timestamp_float - start_time
# 解析模式变化
if "进入slow模式" in line or "检测到middle" in line:
current_mode = "SLOW"
elif "进入about模式" in line or "检测到about" in line:
current_mode = "ABOUT"
elif "返回fast模式" in line or "退出middle检查" in line:
current_mode = "FAST"
elif "检测到colored" in line:
current_mode = "END"
# 解析体积信息
volume_match = re.search(r'当前体积:\s*([\d.]+)\s*ml', line)
if volume_match:
volume = float(volume_match.group(1))
# 这里rate需要通过视频处理获得暂时设置为0
data_points.append(DataPoint(
timestamp=timestamp_float,
relative_time=relative_time,
state="transport", # 默认状态
rate=0.0, # 需要通过视频处理获得
volume=volume,
mode=current_mode
))
return data_points
def interpolate_volume_data(self, data_points: List[DataPoint],
interpolation_interval: float = 0.1) -> List[DataPoint]:
"""
通过mode求speed反推log两个体积数据点之间的数据
Args:
data_points: 原始数据点列表
interpolation_interval: 插值间隔(秒)
Returns:
插值后的数据点列表
"""
if len(data_points) < 2:
return data_points
interpolated_points = []
for i in range(len(data_points)):
# 添加原始数据点
interpolated_points.append(data_points[i])
# 如果不是最后一个点,进行插值
if i < len(data_points) - 1:
current_point = data_points[i]
next_point = data_points[i + 1]
# 计算时间差和体积差
time_diff = next_point.relative_time - current_point.relative_time
volume_diff = next_point.volume - current_point.volume
# 如果时间差大于插值间隔,进行插值
if time_diff > interpolation_interval:
# 根据当前模式获取速度
current_speed = self.mode_speeds.get(current_point.mode, 0.0)
# 计算需要插值的点数
num_interpolations = int(time_diff / interpolation_interval)
for j in range(1, num_interpolations + 1):
# 计算插值时间
interp_time = current_point.relative_time + j * interpolation_interval
# 如果插值时间超过下一个点的时间,跳出
if interp_time >= next_point.relative_time:
break
# 根据模式和时间计算体积
# 假设每次推进的时间间隔内,体积按照模式速度增长
elapsed_time = interp_time - current_point.relative_time
# 估算体积增长(基于推进频率和速度)
# 假设推进频率FAST=1次/秒SLOW=0.5次/秒ABOUT=0.2次/秒
push_frequency = {
'FAST': 1.0,
'SLOW': 0.5,
'ABOUT': 0.2,
'CRAZY': 2.0,
'END': 0.0
}
freq = push_frequency.get(current_point.mode, 1.0)
estimated_volume = current_point.volume + elapsed_time * freq * current_speed
# 确保体积不超过下一个点的体积
if estimated_volume > next_point.volume:
estimated_volume = current_point.volume + (elapsed_time / time_diff) * volume_diff
# 创建插值数据点
interp_point = DataPoint(
timestamp=current_point.timestamp + elapsed_time,
relative_time=interp_time,
state=current_point.state, # 继承当前状态
rate=0.0, # 插值点的rate需要通过视频获得
volume=estimated_volume,
mode=current_point.mode # 继承当前模式
)
interpolated_points.append(interp_point)
self.logger.info(f"原始数据点: {len(data_points)}, 插值后数据点: {len(interpolated_points)}")
return interpolated_points
def process_video_with_predictor(self, video_path: Path, sample_interval: int = 5) -> List[DataPoint]:
"""使用predictor处理视频提取rate数据"""
# 导入main模块中的predictor方法
import sys
sys.path.append(str(self.base_dir))
try:
from main import MAT
from utils import History
except ImportError:
self.logger.error("无法导入MAT类或History类")
return []
cap = cv2.VideoCapture(str(video_path))
if not cap.isOpened():
self.logger.error(f"无法打开视频文件: {video_path}")
return []
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.logger.info(f"视频信息: FPS={fps}, 总帧数={total_frames}")
# 创建MAT实例用于predictor简化初始化
try:
mat = MAT(videoSourceIndex=0, bounce_time=4, end_bounce_time=1)
# 创建简化的历史记录管理器
mat.history = History(max_window_size=100.0, base_time=5.0)
except Exception as e:
self.logger.error(f"MAT初始化失败: {e}")
return []
data_points = []
frame_count = 0
base_calculated = False
rates_for_base = []
# 用于模式推断的变量
current_mode = "FAST"
mode_change_frame = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 更密集的采样
if frame_count % sample_interval == 0:
try:
# 调用predictor获取状态和rate
state, rate = mat.predictor(frame)
relative_time = frame_count / fps
# 收集前50帧的rate值来计算base
if not base_calculated and len(rates_for_base) < 50:
if state == "transport": # 只用transport状态的rate计算base
rates_for_base.append(rate)
if len(rates_for_base) >= 50:
base_rate = float(np.mean(rates_for_base))
if mat.history:
mat.history.base = base_rate
base_calculated = True
self.logger.info(f"计算得到基准rate: {base_rate:.4f}")
# 基于rate和状态推断模式
if base_calculated and mat.history and mat.history.base:
base = mat.history.base
thresholds = (base * 5, base * 13, base * 20)
# 基于predictor返回的状态推断模式
if state == "transport":
if current_mode != "FAST" and frame_count - mode_change_frame > fps * 2: # 2秒稳定期
current_mode = "FAST"
mode_change_frame = frame_count
elif state == "middle":
if current_mode == "FAST":
current_mode = "SLOW"
mode_change_frame = frame_count
elif state == "about":
if current_mode in ["FAST", "SLOW"]:
current_mode = "ABOUT"
mode_change_frame = frame_count
elif state == "colored":
current_mode = "END"
mode_change_frame = frame_count
data_points.append(DataPoint(
timestamp=time.time(),
relative_time=relative_time,
state=state,
rate=rate,
volume=0.0, # 视频中无法直接获得体积信息
mode=current_mode
))
except Exception as e:
self.logger.warning(f"处理帧 {frame_count} 时出错: {e}")
continue
frame_count += 1
# 更频繁的进度显示
if frame_count % (sample_interval * 50) == 0:
progress = (frame_count / total_frames) * 100
self.logger.info(f"处理进度: {progress:.1f}% - 已处理 {len(data_points)} 个数据点")
cap.release()
cv2.destroyAllWindows()
self.logger.info(f"视频处理完成,共获得 {len(data_points)} 个数据点")
return data_points
def _smooth_mode_changes(self, data_points: List[DataPoint]) -> List[DataPoint]:
"""平滑模式变化,避免频繁跳转"""
if len(data_points) < 3:
return data_points
smoothed_points = data_points.copy()
window_size = 5 # 平滑窗口大小
for i in range(window_size, len(data_points) - window_size):
# 获取窗口内的模式
window_modes = [p.mode for p in data_points[i-window_size:i+window_size+1]]
# 使用众数作为当前点的模式
mode_counts = {}
for mode in window_modes:
mode_counts[mode] = mode_counts.get(mode, 0) + 1
most_common_mode = max(mode_counts.items(), key=lambda x: x[1])[0]
smoothed_points[i].mode = most_common_mode
return smoothed_points
def merge_log_and_video_data(self, log_data: List[DataPoint],
video_data: List[DataPoint]) -> List[DataPoint]:
"""合并日志和视频数据"""
if not log_data:
return video_data
if not video_data:
return log_data
# 以日志数据为基础补充视频中的rate信息
merged_data = []
for log_point in log_data:
# 找到时间最接近的视频数据点
closest_video_point = min(video_data,
key=lambda x: abs(x.relative_time - log_point.relative_time))
# 如果时间差在合理范围内使用视频的rate数据
if abs(closest_video_point.relative_time - log_point.relative_time) < 5.0:
log_point.rate = closest_video_point.rate
log_point.state = closest_video_point.state
merged_data.append(log_point)
return merged_data
def create_visualization(self, data_points: List[DataPoint],
save_path: Optional[str] = None) -> None:
"""创建可视化图表"""
if not data_points:
self.logger.error("没有数据点可供可视化")
return
# 提取数据
times = [point.relative_time for point in data_points]
rates = [point.rate for point in data_points]
volumes = [point.volume for point in data_points]
modes = [point.mode for point in data_points]
states = [point.state for point in data_points]
# 创建图表
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# 上图Rate随时间变化按Mode着色
self._plot_rate_by_mode(ax1, times, rates, modes)
# 下图Volume随时间变化按State着色
self._plot_volume_by_state(ax2, times, volumes, states)
# 调整布局
plt.tight_layout()
# 保存或显示
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
self.logger.info(f"图表已保存至: {save_path}")
else:
plt.show()
def _plot_rate_by_mode(self, ax, times: List[float], rates: List[float],
modes: List[str]) -> None:
"""绘制Rate随时间变化图按Mode着色"""
# 按模式分组绘制
current_mode = None
current_times = []
current_rates = []
for i, (time, rate, mode) in enumerate(zip(times, rates, modes)):
if mode != current_mode:
# 绘制前一段
if current_times and current_mode:
ax.plot(current_times, current_rates,
color=self.mode_colors.get(current_mode, '#000000'),
linewidth=2, label=f'{current_mode} Mode' if current_mode not in ax.get_legend_handles_labels()[1] else "")
# 开始新的一段
current_mode = mode
current_times = [time]
current_rates = [rate]
else:
current_times.append(time)
current_rates.append(rate)
# 绘制最后一段
if current_times and current_mode:
ax.plot(current_times, current_rates,
color=self.mode_colors.get(current_mode, '#000000'),
linewidth=2, label=f'{current_mode} Mode' if current_mode not in ax.get_legend_handles_labels()[1] else "")
ax.set_xlabel('时间 (秒)')
ax.set_ylabel('Rate')
ax.set_title('Rate随时间变化 (按操作模式着色)')
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)
def _plot_volume_by_state(self, ax, times: List[float], volumes: List[float],
states: List[str]) -> None:
"""绘制Volume随时间变化图按State着色"""
# 过滤掉volume为0的点来自视频数据
filtered_data = [(t, v, s) for t, v, s in zip(times, volumes, states) if v > 0]
if not filtered_data:
ax.text(0.5, 0.5, '无体积数据', ha='center', va='center', transform=ax.transAxes)
ax.set_xlabel('时间 (秒)')
ax.set_ylabel('体积 (ml)')
ax.set_title('体积随时间变化 (按检测状态着色)')
return
f_times, f_volumes, f_states = zip(*filtered_data)
# 按状态分组绘制
current_state = None
current_times = []
current_volumes = []
for time, volume, state in zip(f_times, f_volumes, f_states):
if state != current_state:
# 绘制前一段
if current_times and current_state:
ax.plot(current_times, current_volumes,
color=self.state_colors.get(current_state, '#000000'),
linewidth=2, marker='o', markersize=3,
label=f'{current_state.title()} State' if current_state not in ax.get_legend_handles_labels()[1] else "")
# 开始新的一段
current_state = state
current_times = [time]
current_volumes = [volume]
else:
current_times.append(time)
current_volumes.append(volume)
# 绘制最后一段
if current_times and current_state:
ax.plot(current_times, current_volumes,
color=self.state_colors.get(current_state, '#000000'),
linewidth=2, marker='o', markersize=3,
label=f'{current_state.title()} State' if current_state not in ax.get_legend_handles_labels()[1] else "")
ax.set_xlabel('时间 (秒)')
ax.set_ylabel('体积 (ml)')
ax.set_title('体积随时间变化 (按检测状态着色)')
ax.legend(loc='lower right')
ax.grid(True, alpha=0.3)
def run_demo(self, use_video: bool = True, sample_interval: int = 5) -> None:
"""运行demo程序"""
self.logger.info("开始运行滴定分析Demo...")
# 1. 获取最新视频
latest_video = self.get_latest_video()
if not latest_video:
return
# 2. 获取对应日志
log_file = self.get_corresponding_log(latest_video)
# 3. 解析数据
log_data = []
if log_file:
self.logger.info("解析日志数据...")
log_data = self.parse_log_data(log_file)
video_data = []
if use_video:
self.logger.info("处理视频数据...")
video_data = self.process_video_with_predictor(latest_video, sample_interval)
# 4. 合并数据
if log_data and video_data:
self.logger.info("合并日志和视频数据...")
final_data = self.merge_log_and_video_data(log_data, video_data)
elif log_data:
final_data = log_data
elif video_data:
final_data = video_data
else:
self.logger.error("没有可用的数据")
return
# 5. 创建可视化
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
save_path = self.base_dir / f"rate_analysis_{timestamp}.png"
self.logger.info("创建可视化图表...")
self.create_visualization(final_data, str(save_path))
self.logger.info("Demo运行完成!")
# 打印统计信息
self._print_statistics(final_data)
def _print_statistics(self, data_points: List[DataPoint]) -> None:
"""打印数据统计信息"""
if not data_points:
return
print("\n=== 数据统计信息 ===")
print(f"总数据点数: {len(data_points)}")
print(f"时间范围: {data_points[0].relative_time:.1f}s - {data_points[-1].relative_time:.1f}s")
# 模式统计
mode_counts = {}
for point in data_points:
mode_counts[point.mode] = mode_counts.get(point.mode, 0) + 1
print("\n模式分布:")
for mode, count in mode_counts.items():
percentage = (count / len(data_points)) * 100
print(f" {mode}: {count} 次 ({percentage:.1f}%)")
# Rate统计
rates = [point.rate for point in data_points if point.rate > 0]
if rates:
print(f"\nRate统计:")
print(f" 最小值: {min(rates):.4f}")
print(f" 最大值: {max(rates):.4f}")
print(f" 平均值: {np.mean(rates):.4f}")
# 体积统计
volumes = [point.volume for point in data_points if point.volume > 0]
if volumes:
print(f"\n体积统计:")
print(f" 最小值: {min(volumes):.2f} ml")
print(f" 最大值: {max(volumes):.2f} ml")
def main():
"""主函数"""
demo = TitrationDemo()
# 运行demo
# use_video=True: 同时处理视频和日志数据
# use_video=False: 仅使用日志数据
# sample_interval: 视频采样间隔每N帧处理一次- 设置为更小的值获得更密集的数据点
demo.run_demo(use_video=True, sample_interval=1) # 每3帧采样一次大幅增加数据点
if __name__ == "__main__":
main()

40
main.py
View File

@ -174,45 +174,7 @@ class MAT:
self.state.about_check = False
# end检查: 进入end之后的end_bounce_time如果end比例<80%,则重置;否则终止实验
# if self.state.should_check_end_result(now):
# colored_ratio = self.history.get_state_ratio("colored", self.history.get_recent_records(self.state.end_bounce_time, now))
# if colored_ratio < 0.8:
# # end比例<80%从history中找到第二个end并继续check逻辑
# self.endpoint_logger.warning(f"colored比例{colored_ratio:.2%}<80%寻找下一个colored点")
# # 寻找历史中倒数第二个colored状态
# colored_times = self.history.get_states_by_type("colored")
# if len(colored_times) >= 2:
# # 使用倒数第二个colored时间重新开始检查
# second_last_colored_time = colored_times[1]
# self.state.end_detected_time = second_last_colored_time
# # 更新colored记录为对应的体积
# record = self.history.find_record_by_timestamp(second_last_colored_time)
# if record:
# self.colored_volume = record.volume
# self.colored_time = record.timestamp
# self.colored_im = record.image.copy()
# self.endpoint_logger.info(f"重置到第二个colored点: {self.colored_volume:.2f} ml")
# else:
# # 没有足够的colored点重置end检查
# self.state.reset_end_check()
# self.colored_volume = None
# self.colored_time = None
# self.endpoint_logger.info("没有足够的colored点重置end检查")
# else: # end比例>=80%,确认终点,终止实验
# self.endpoint_logger.info(f"colored比例{colored_ratio:.2%}>=80%,确认滴定终点")
# self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml")
# self.running = False
# self.ch340.stop()
# if self.colored_im is not None:
# cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im)
# return "colored"
# 显示状态信息
self._display_status(im, ret, rate, val)
return ret