diff --git a/test.py b/test.py new file mode 100644 index 0000000..f216d06 --- /dev/null +++ b/test.py @@ -0,0 +1,67 @@ +import cv2 +import numpy as np +from matplotlib import pyplot as plt +from scipy.signal import find_peaks + +cap = cv2.VideoCapture(1) # 使用摄像头0,通常更稳定 +cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) # 降低分辨率提高处理速度 +cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) + +# 预先创建图形窗口,避免重复创建 +fig, ax = plt.subplots(figsize=(10, 4)) +plt.ion() +ax.set_title('Saturation Channel Histogram') +ax.set_xlabel('Saturation Value') +ax.set_ylabel('Pixel Count') +ax.set_xlim(0, 255) + +while True: + ret, frame = cap.read() + if not ret: + print("Failed to grab frame") + break + + cv2.imshow("Camera Feed", frame) + + # 直接提取饱和度通道,避免完整HSV转换 + hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) + s = hsv[:, :, 1] + s = s[s > 0] # 只保留非零饱和度值,减少噪声 + # 使用更高效的直方图计算 + hist = cv2.calcHist([s], [0], None, [256], [0, 256]) + hist = hist.flatten() # 转换为一维数组 + + # 峰值检测 - 找到直方图中的峰值 + peaks, properties = find_peaks(hist, + # height=np.max(hist) * 0.1, # 峰值高度至少是最大值的10% + distance=5, # 峰值之间的最小距离 + prominence=np.max(hist) * 0.05) # 峰值的突出度 + + # 清除旧数据并绘制新直方图 + ax.clear() + ax.plot(hist, 'b-', linewidth=1) + + # 标注峰值 + if len(peaks) > 0: + ax.text(0.5, 1.05, f'Found {len(peaks)} peaks') + ax.plot(peaks, hist[peaks], 'ro', markersize=8, label=f'Peaks ({len(peaks)})') + # 在峰值处添加文字标注 + for i, peak in enumerate(peaks): + ax.annotate(f'Peak {i+1}\n({peak}, {int(hist[peak])})', + xy=(peak, hist[peak]), + xytext=(peak, hist[peak] + np.max(hist) * 0.1), + ha='center', va='bottom', + bbox=dict(boxstyle='round,pad=0.3', facecolor='yellow', alpha=0.7), + arrowprops=dict(arrowstyle='->', color='red')) + + + plt.draw() + plt.pause(0.1) # 确保图形更新 + + key = cv2.waitKey(1) & 0xFF + if key == ord('q'): + break + +cap.release() +cv2.destroyAllWindows() +plt.close('all') \ No newline at end of file diff --git a/tmp.py b/tmp.py new file mode 100644 index 0000000..086e846 --- /dev/null +++ b/tmp.py @@ -0,0 +1,80 @@ +import numpy as np + +class DistributionChangeDetector: + def __init__(self, baseline_windows: list[np.ndarray]): + """ + 参数 baseline_windows: List of arrays,代表初始稳定期的多个窗口 + """ + self.baseline = self._compute_baseline(baseline_windows) + + def _compute_stats(self, window: np.ndarray) -> tuple[float, float, float]: + """返回 (P_under30, std, mode)""" + p_under30 = np.mean(window < 30) + std = np.std(window, ddof=1) + + # 快速估计众数:最大 bin 的中心 + hist, bin_edges = np.histogram(window, bins=50) + max_bin_index = np.argmax(hist) + mode_est = (bin_edges[max_bin_index] + bin_edges[max_bin_index + 1]) / 2 + return p_under30, std, mode_est + + def _compute_baseline(self, windows: list[np.ndarray]) -> tuple[np.ndarray, np.ndarray]: + """ + 返回 baseline 向量 (P0, σ0, mode0) 和对应标准差(用于归一化) + """ + stats = np.array([self._compute_stats(w) for w in windows]) + mean = stats.mean(axis=0) + std = stats.std(axis=0) + 1e-6 # 防止除0 + return mean, std + + def update(self, window: np.ndarray) -> float: + """ + 输入:当前窗口数据(长度 = 窗口大小) + 输出:变化分数(越大表示分布越偏离基准) + """ + x = np.array(self._compute_stats(window)) + mean, std = self.baseline + norm_diff = (x - mean) / std + change_score = np.linalg.norm(norm_diff) + return float(change_score) + + +import cv2 +def gen_data(): + cap = cv2.VideoCapture() + cap.open(1) + while True: + ret, frame = cap.read() + cv2.imshow("Camera Feed", frame) + if not ret: + break + hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) + s = hsv[:, :, 1] # 直接提取饱和度通道 + s = s[s > 0] # 只保留非零饱和度值,减少噪声 + yield s + if cv2.waitKey(1) & 0xFF == ord('a'): + break + + +gen = gen_data() +baseline_data = [gen.__next__() for _ in range(5)] # 获取10个窗口作为基线 + +det = DistributionChangeDetector(baseline_data) + + + +results = [] +for x in gen: + out = det.update(x) + if out is not None: + results.append(out) + + +# 作图查看 +import matplotlib.pyplot as plt +plt.plot(results, label="ChangeScore") +plt.xlabel("Window index") +plt.ylabel("Score") +plt.title("Streaming Change Detection") +plt.legend() +plt.show() diff --git a/tmp2.py b/tmp2.py new file mode 100644 index 0000000..a231bb9 --- /dev/null +++ b/tmp2.py @@ -0,0 +1,144 @@ +import numpy as np +import cv2 +import matplotlib.pyplot as plt +from matplotlib.animation import FuncAnimation +from collections import deque +import threading +import time + +class DistributionChangeDetector: + def __init__(self, baseline_windows: list[np.ndarray]): + """ + 参数 baseline_windows: List of arrays,代表初始稳定期的多个窗口 + """ + self.baseline = self._compute_baseline(baseline_windows) + + def _compute_stats(self, window: np.ndarray) -> tuple[float, float, float]: + """返回 (P_under30, std, mode)""" + p_under30 = np.mean(window < 30) + std = np.std(window, ddof=1) + + # 快速估计众数:最大 bin 的中心 + hist, bin_edges = np.histogram(window, bins=50) + max_bin_index = np.argmax(hist) + mode_est = (bin_edges[max_bin_index] + bin_edges[max_bin_index + 1]) / 2 + return p_under30, std, mode_est + + def _compute_baseline(self, windows: list[np.ndarray]) -> tuple[np.ndarray, np.ndarray]: + """ + 返回 baseline 向量 (P0, σ0, mode0) 和对应标准差(用于归一化) + """ + stats = np.array([self._compute_stats(w) for w in windows]) + mean = stats.mean(axis=0) + std = stats.std(axis=0) + 1e-6 # 防止除0 + return mean, std + + def update(self, window: np.ndarray) -> float: + """ + 输入:当前窗口数据(长度 = 窗口大小) + 输出:变化分数(越大表示分布越偏离基准) + """ + x = np.array(self._compute_stats(window)) + mean, std = self.baseline + norm_diff = (x - mean) / std + change_score = np.linalg.norm(norm_diff) + return float(change_score) + +def hsv_score(s:np.ndarray): + mask = s>30 + tot = len(mask) + val = np.sum(mask) + rate = val/tot + return rate + +class RealTimePlotter: + def __init__(self, max_points=200): + self.max_points = max_points + self.scores = deque(maxlen=max_points) + self.scores2 = deque(maxlen=max_points) + self.times = deque(maxlen=max_points) + self.start_time = time.time() + + # 设置图形 + plt.ion() # 打开交互模式 + self.fig, (self.ax,self.ax2) = plt.subplots(1,2,figsize=(10, 6)) + self.line, = self.ax.plot([], [], 'b-', linewidth=2) + self.line2, = self.ax2.plot([], [], 'b-', linewidth=2) + self.ax.set_xlabel('Time (s)') + self.ax.set_ylabel('Change Score') + self.ax.set_title('Real-time Distribution Change Detection') + self.ax.grid(True) + self.ax2.grid(True) + + def update_plot(self, score,s_score): + current_time = time.time() - self.start_time + self.scores.append(score) + self.scores2.append(s_score) + self.times.append(current_time) + + # 更新数据 + self.line.set_data(list(self.times), list(self.scores)) + self.line2.set_data(list(self.times), list(self.scores2)) + + # 自动调整坐标轴 + if len(self.times) > 1: + self.ax.set_xlim(min(self.times), max(self.times)) + self.ax2.set_xlim(min(self.times), max(self.times)) + self.ax.set_ylim(0,100) + # self.ax.set_ylim(min(self.scores) * 0.95, max(self.scores) * 1.05) + self.ax2.set_ylim(0,1) + + # 刷新图形 + self.fig.canvas.draw() + self.fig.canvas.flush_events() + + +def gen_data(): + cap = cv2.VideoCapture(1) + while True: + ret, frame = cap.read() + if not ret: + break + cv2.imshow("Camera Feed", frame) + hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) + s = hsv[:, :, 1] # 直接提取饱和度通道 + s = s[s > 0] # 只保留非零饱和度值,减少噪声 + yield s + if cv2.waitKey(1) & 0xFF == ord('a'): + break + cap.release() + cv2.destroyAllWindows() + + +def main(): + # 初始化数据生成器 + gen = gen_data() + + # 获取基线数据 + print("收集基线数据...") + baseline_data = [next(gen) for _ in range(30*5)] + + # 初始化检测器和绘图器 + det = DistributionChangeDetector(baseline_data) + plotter = RealTimePlotter() + + print("开始实时检测和绘图...") + + try: + for x in gen: + score = det.update(x) + score2 = hsv_score(x) + plotter.update_plot(score,score2) + + # 小延时以控制更新频率 + time.sleep(0.01) + + except KeyboardInterrupt: + print("停止检测") + finally: + plt.ioff() # 关闭交互模式 + plt.show() # 保持最终图形显示 + + +if __name__ == "__main__": + main() \ No newline at end of file