Files
ai-titration/tmp.py
flt6 8184179a17 alter
Former-commit-id: 0d9b33e7625efe7bee422d1514d8453ff689553d
2025-06-05 22:23:28 +08:00

81 lines
2.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
class DistributionChangeDetector:
def __init__(self, baseline_windows: list[np.ndarray]):
"""
参数 baseline_windows: List of arrays代表初始稳定期的多个窗口
"""
self.baseline = self._compute_baseline(baseline_windows)
def _compute_stats(self, window: np.ndarray) -> tuple[float, float, float]:
"""返回 (P_under30, std, mode)"""
p_under30 = np.mean(window < 30)
std = np.std(window, ddof=1)
# 快速估计众数:最大 bin 的中心
hist, bin_edges = np.histogram(window, bins=50)
max_bin_index = np.argmax(hist)
mode_est = (bin_edges[max_bin_index] + bin_edges[max_bin_index + 1]) / 2
return p_under30, std, mode_est
def _compute_baseline(self, windows: list[np.ndarray]) -> tuple[np.ndarray, np.ndarray]:
"""
返回 baseline 向量 (P0, σ0, mode0) 和对应标准差(用于归一化)
"""
stats = np.array([self._compute_stats(w) for w in windows])
mean = stats.mean(axis=0)
std = stats.std(axis=0) + 1e-6 # 防止除0
return mean, std
def update(self, window: np.ndarray) -> float:
"""
输入:当前窗口数据(长度 = 窗口大小)
输出:变化分数(越大表示分布越偏离基准)
"""
x = np.array(self._compute_stats(window))
mean, std = self.baseline
norm_diff = (x - mean) / std
change_score = np.linalg.norm(norm_diff)
return float(change_score)
import cv2
def gen_data():
cap = cv2.VideoCapture()
cap.open(1)
while True:
ret, frame = cap.read()
cv2.imshow("Camera Feed", frame)
if not ret:
break
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
s = hsv[:, :, 1] # 直接提取饱和度通道
s = s[s > 0] # 只保留非零饱和度值,减少噪声
yield s
if cv2.waitKey(1) & 0xFF == ord('a'):
break
gen = gen_data()
baseline_data = [gen.__next__() for _ in range(5)] # 获取10个窗口作为基线
det = DistributionChangeDetector(baseline_data)
results = []
for x in gen:
out = det.update(x)
if out is not None:
results.append(out)
# 作图查看
import matplotlib.pyplot as plt
plt.plot(results, label="ChangeScore")
plt.xlabel("Window index")
plt.ylabel("Score")
plt.title("Streaming Change Detection")
plt.legend()
plt.show()