Former-commit-id: 98742561a1e9f3a19111762d29c5d1fbec4920c6
This commit is contained in:
2025-06-04 16:54:04 +08:00
parent 4e387287eb
commit 6e1682e59a

183
utils.py
View File

@ -3,9 +3,12 @@ import os
from datetime import datetime
from enum import Enum
from dataclasses import dataclass
from typing import List, Tuple, Optional
from typing import List, Tuple, Optional, Any
import numpy as np
import time
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import threading
@dataclass
class HistoryRecord:
@ -20,12 +23,14 @@ class HistoryRecord:
class History:
"""滑动窗口历史记录管理类"""
def __init__(self, max_window_size: float = 5.0, base_time = 5.0):
def __init__(self, max_window_size: float = 5.0, base_time = 5.0, display: bool = True):
"""
初始化历史记录管理器
Args:
max_window_size: 最大窗口时间长度(秒)
base_time: 基准时间长度(秒)
display: 是否开启实时可视化显示
"""
if base_time > max_window_size:
max_window_size = base_time+0.2
@ -34,11 +39,21 @@ class History:
self.max_window_size = max_window_size
self.fulled = False
self.base = None
self._base_time = base_time
self._base_time = base_time # 可视化相关属性
self.display = display
self.fig: Any = None
self.ax: Any = None
self.line: Any = None
self.base_line: Any = None # 基准线的引用
self.animation: Any = None
self._plot_thread: Optional[threading.Thread] = None
self._plot_running = False
if self.display:
self._setup_plot()
def add_record(self, timestamp: float, state: str, rate: float, volume: float, image: np.ndarray):
"""添加新的历史记录"""
record = HistoryRecord(timestamp, state,rate, volume, image)
record = HistoryRecord(timestamp, state, rate, volume, image)
self.records.append(record)
self._cleanup_old_records(timestamp)
if self.base is None and (timestamp-self._base_time)>= self.records[0].timestamp:
@ -101,7 +116,163 @@ class History:
def clear(self):
"""清空所有历史记录"""
self.records.clear()
if self.display:
self._stop_plot()
def _setup_plot(self):
"""设置matplotlib图表"""
plt.ion() # 交互式模式
self.fig, self.ax = plt.subplots(figsize=(10, 6))
self.ax.set_title('实时Rate值变化')
self.ax.set_xlabel('时间 (s)')
self.ax.set_ylabel('Rate值')
self.ax.grid(True, alpha=0.3)
# 初始化空线条
self.line, = self.ax.plot([], [], 'b-', linewidth=2, label='Rate')
self.base_line = None # 基准线的引用
self.ax.legend()
# 启动更新线程
self._plot_running = True
self._plot_thread = threading.Thread(target=self._update_plot_loop, daemon=True)
self._plot_thread.start()
def _update_plot_loop(self):
"""绘图更新循环"""
while self._plot_running:
try:
self._update_plot()
time.sleep(0.1) # 100ms更新间隔
except Exception as e:
get_vision_logger().error(f"Plot update error: {e}")
break
def _update_plot(self):
"""更新图表数据 - 按照matplotlib推荐的方式重构"""
if not self.records:
return
start_time = time.time()
try:
# 获取时间和rate数据
timestamps = [record.timestamp for record in self.records]
rates = [record.rate for record in self.records]
if not timestamps:
return
# 将时间戳转换为相对时间(从第一个记录开始)
start_timestamp = timestamps[0]
relative_times = [(t - start_timestamp) for t in timestamps]
# 更新主线条数据
self.line.set_data(relative_times, rates)
# 处理base线
self._update_base_line()
# 自动调整坐标轴范围
self._update_axis_limits(relative_times, rates)
# 使用推荐的绘制方式
self.ax.draw_artist(self.line)
self.fig.canvas.draw() # 使用draw_idle而不是draw
except Exception as e:
get_vision_logger().error(f"Plot update error: {e}")
finally:
# 性能监控
elapsed = time.time() - start_time
if elapsed > 0.1:
get_vision_logger().warning(f"Plot update took too long: {elapsed:.3f}s")
def _update_base_line(self):
"""更新或创建基准线"""
if self.base is not None:
if self.base_line is None:
# 首次创建基准线
self.base_line = self.ax.axhline(
y=self.base,
color='r',
linestyle='--',
alpha=0.7,
label=f'Base Rate: {self.base:.2f}'
)
# 更新图例
self.ax.legend()
else:
# 更新现有基准线的位置
self.base_line.set_ydata([self.base, self.base])
# 更新标签
self.base_line.set_label(f'Base Rate: {self.base:.2f}')
self.ax.legend()
elif self.base_line is not None:
# 如果base被重置移除基准线
self.base_line.remove()
self.base_line = None
self.ax.legend()
def _update_axis_limits(self, relative_times, rates):
"""智能更新坐标轴范围"""
if not relative_times or not rates:
return
# X轴范围
max_time = max(relative_times)
self.ax.set_xlim(0, max_time + max(1, max_time * 0.05)) # 添加5%的边距
# Y轴范围计算
min_rate, max_rate = min(rates), max(rates)
if self.base is not None:
# 如果有基准线确保Y轴范围合理
y_min = 0
y_max = min(max(self.base * 35, max_rate * 1.2), 100) # 限制最大值为100
else:
# 没有基准线时的默认范围
margin = (max_rate - min_rate) * 0.1 if max_rate > min_rate else 1
y_min = max(0, min_rate - margin)
y_max = max_rate + margin
self.ax.set_ylim(y_min, y_max)
def _stop_plot(self):
"""停止绘图"""
self._plot_running = False
if self._plot_thread and self._plot_thread.is_alive():
self._plot_thread.join(timeout=1.0)
if self.fig:
plt.close(self.fig)
self.fig = None
self.ax = None
self.line = None
self.base_line = None
def save_plot(self, filename: Optional[str] = None):
"""保存当前图表"""
if not self.display or not self.fig:
get_vision_logger().warning("Plot not initialized, cannot save")
return
if filename is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"rate_plot_{timestamp}.png"
try:
self.fig.savefig(filename, dpi=300, bbox_inches='tight')
get_vision_logger().info(f"Plot saved to {filename}")
except Exception as e:
get_vision_logger().error(f"Failed to save plot: {e}")
def toggle_display(self):
"""切换显示状态"""
if self.display:
self._stop_plot()
self.display = False
else:
self.display = True
self._setup_plot()
class State:
"""滴定状态管理类"""