144 lines
4.6 KiB
Python
144 lines
4.6 KiB
Python
import numpy as np
|
||
import cv2
|
||
import matplotlib.pyplot as plt
|
||
from matplotlib.animation import FuncAnimation
|
||
from collections import deque
|
||
import threading
|
||
import time
|
||
|
||
class DistributionChangeDetector:
|
||
def __init__(self, baseline_windows: list[np.ndarray]):
|
||
"""
|
||
参数 baseline_windows: List of arrays,代表初始稳定期的多个窗口
|
||
"""
|
||
self.baseline = self._compute_baseline(baseline_windows)
|
||
|
||
def _compute_stats(self, window: np.ndarray) -> tuple[float, float, float]:
|
||
"""返回 (P_under30, std, mode)"""
|
||
p_under30 = np.mean(window < 30)
|
||
std = np.std(window, ddof=1)
|
||
|
||
# 快速估计众数:最大 bin 的中心
|
||
hist, bin_edges = np.histogram(window, bins=50)
|
||
max_bin_index = np.argmax(hist)
|
||
mode_est = (bin_edges[max_bin_index] + bin_edges[max_bin_index + 1]) / 2
|
||
return p_under30, std, mode_est
|
||
|
||
def _compute_baseline(self, windows: list[np.ndarray]) -> tuple[np.ndarray, np.ndarray]:
|
||
"""
|
||
返回 baseline 向量 (P0, σ0, mode0) 和对应标准差(用于归一化)
|
||
"""
|
||
stats = np.array([self._compute_stats(w) for w in windows])
|
||
mean = stats.mean(axis=0)
|
||
std = stats.std(axis=0) + 1e-6 # 防止除0
|
||
return mean, std
|
||
|
||
def update(self, window: np.ndarray) -> float:
|
||
"""
|
||
输入:当前窗口数据(长度 = 窗口大小)
|
||
输出:变化分数(越大表示分布越偏离基准)
|
||
"""
|
||
x = np.array(self._compute_stats(window))
|
||
mean, std = self.baseline
|
||
norm_diff = (x - mean) / std
|
||
change_score = np.linalg.norm(norm_diff)
|
||
return float(change_score)
|
||
|
||
def hsv_score(s:np.ndarray):
|
||
mask = s>30
|
||
tot = len(mask)
|
||
val = np.sum(mask)
|
||
rate = val/tot
|
||
return rate
|
||
|
||
class RealTimePlotter:
|
||
def __init__(self, max_points=200):
|
||
self.max_points = max_points
|
||
self.scores = deque(maxlen=max_points)
|
||
self.scores2 = deque(maxlen=max_points)
|
||
self.times = deque(maxlen=max_points)
|
||
self.start_time = time.time()
|
||
|
||
# 设置图形
|
||
plt.ion() # 打开交互模式
|
||
self.fig, (self.ax,self.ax2) = plt.subplots(1,2,figsize=(10, 6))
|
||
self.line, = self.ax.plot([], [], 'b-', linewidth=2)
|
||
self.line2, = self.ax2.plot([], [], 'b-', linewidth=2)
|
||
self.ax.set_xlabel('Time (s)')
|
||
self.ax.set_ylabel('Change Score')
|
||
self.ax.set_title('Real-time Distribution Change Detection')
|
||
self.ax.grid(True)
|
||
self.ax2.grid(True)
|
||
|
||
def update_plot(self, score,s_score):
|
||
current_time = time.time() - self.start_time
|
||
self.scores.append(score)
|
||
self.scores2.append(s_score)
|
||
self.times.append(current_time)
|
||
|
||
# 更新数据
|
||
self.line.set_data(list(self.times), list(self.scores))
|
||
self.line2.set_data(list(self.times), list(self.scores2))
|
||
|
||
# 自动调整坐标轴
|
||
if len(self.times) > 1:
|
||
self.ax.set_xlim(min(self.times), max(self.times))
|
||
self.ax2.set_xlim(min(self.times), max(self.times))
|
||
self.ax.set_ylim(0,100)
|
||
# self.ax.set_ylim(min(self.scores) * 0.95, max(self.scores) * 1.05)
|
||
self.ax2.set_ylim(0,1)
|
||
|
||
# 刷新图形
|
||
self.fig.canvas.draw()
|
||
self.fig.canvas.flush_events()
|
||
|
||
|
||
def gen_data():
|
||
cap = cv2.VideoCapture(1)
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
cv2.imshow("Camera Feed", frame)
|
||
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
|
||
s = hsv[:, :, 1] # 直接提取饱和度通道
|
||
s = s[s > 0] # 只保留非零饱和度值,减少噪声
|
||
yield s
|
||
if cv2.waitKey(1) & 0xFF == ord('a'):
|
||
break
|
||
cap.release()
|
||
cv2.destroyAllWindows()
|
||
|
||
|
||
def main():
|
||
# 初始化数据生成器
|
||
gen = gen_data()
|
||
|
||
# 获取基线数据
|
||
print("收集基线数据...")
|
||
baseline_data = [next(gen) for _ in range(30*5)]
|
||
|
||
# 初始化检测器和绘图器
|
||
det = DistributionChangeDetector(baseline_data)
|
||
plotter = RealTimePlotter()
|
||
|
||
print("开始实时检测和绘图...")
|
||
|
||
try:
|
||
for x in gen:
|
||
score = det.update(x)
|
||
score2 = hsv_score(x)
|
||
plotter.update_plot(score,score2)
|
||
|
||
# 小延时以控制更新频率
|
||
time.sleep(0.01)
|
||
|
||
except KeyboardInterrupt:
|
||
print("停止检测")
|
||
finally:
|
||
plt.ioff() # 关闭交互模式
|
||
plt.show() # 保持最终图形显示
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |