Files
ai-titration/main.py
flt6 e0e1c649eb clean and format codebase
Former-commit-id: 5d0497ac67199a7ea475849a6ec3f28df46371cb
2025-07-07 18:52:50 +08:00

451 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import atexit
import logging
import time
from datetime import datetime
from typing import Optional
import cv2
import numpy as np
from scipy.signal import find_peaks
import ch340
import utils
from utils import History, State, login_to_platform, send_data_to_platform
class MAT:
def __init__(self, videoSourceIndex=0, bounce_time=2, sensity=30):
# 初始化logging
utils.setup_logging()
self.system_logger = logging.getLogger("System")
self.control_logger = logging.getLogger("Control")
self.endpoint_logger = logging.getLogger("Endpoint")
self.volume_logger = logging.getLogger("Volume")
self.system_logger.info("正在初始化MAT系统...")
self.videoSourceIndex = videoSourceIndex
self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW)
self.ch340 = ch340.CH340()
self.total_volume = 0
self.sensity = sensity
self.state = State(bounce_time)
atexit.register(self.ch340.stop)
self.history = History(bounce_time)
self.colored_volume = None
self.colored_time = None
self.colored_im = None
def ch340_pull(self):
"抽取12ml液体"
self.control_logger.info("开始抽取12ml")
self.ch340.max_speed()
self.ch340.pull(vol=12)
self.control_logger.info("完成抽取")
def ch340_init(self):
"初始化电机位置,防止过头导致抽取溶液不准。"
# self.ch340.push(speed=1,t=1)
self.ch340.pull(speed=1.2, vol=1.8)
self.control_logger.info("电极位置已初始化")
def ch340_push(self, speed: int | float = 0.1):
"常规推送保证推送时间为1s推送体积=speed"
self.ch340.push_async(speed=speed, t=1)
def process_left(
self, now: float, value_only: Optional[bool] = False
) -> Optional[float]:
"""
计算当前时刻剩余体积并更新到self.total_volume。
@param now: 当前时间戳
@param value_only: 如果为True则只返回剩余体积不更新状态
@return: 当前剩余体积仅当value_only为True时返回
"""
if self.state.mode == State.Mode.ABOUT:
return self.total_volume
st = self.ch340.start
if not value_only:
self.ch340.stop()
if not abs(self.ch340._get_time() - 1) < 1e-3:
self.control_logger.warning(
f"CH340 timing issue detected: {self.ch340._get_time()}"
)
return self.total_volume
r = now - st
ret = self.total_volume - (1 - r) * self.speeds[self.state.mode.value]
if value_only:
return ret
else:
self.total_volume = ret
def _pred(self):
"""预测当前图像状态,返回'transport''middle''about''colored'"""
suc, im = self.cap.read()
if not suc:
self.control_logger.error("无法从摄像头捕获帧")
return None
# 录制当前帧到视频文件
if hasattr(self, "capFile") and self.capFile.isOpened():
self.capFile.write(im)
ret, rate = self.predictor(im)
if ret is None:
return None
now = time.time()
val = self.process_left(now, value_only=True)
# 确保val不为None
if val is None:
self.control_logger.warning("体积计算返回None跳过本次记录")
return ret
# 更新滑动窗口历史记录 - 维护最近end_bounce_time内的状态
self.history.add_record(now, ret, rate, val, im)
if self.history.is_empty():
self.control_logger.error("未预期的没有可用的历史记录")
return ret
# === 状态进入逻辑 ===
if now - self._start_time < 10:
self._display_status(im, ret, rate, val)
return ret
if (
not self.edited
and self.history.base is not None
and rate > self.history.base * 2
):
self.edited = True
self.speeds[0] /= 2
# 1. middle: predictor返回middle立即进入slow状态
if ret == "middle":
if self.state.is_fast_mode():
self.process_left(now)
self.state.enter_middle_state(now)
self.control_logger.info(
f"检测到middle立即进入slow模式当前体积: {val:.2f} ml"
)
# 2. about: 返回about且处于middle则进入about状态
elif ret == "about":
if self.state.is_slow_mode():
self.process_left(now)
self.state.enter_about_state(now)
self.control_logger.info(
f"检测到about进入about模式当前体积: {val:.2f} ml"
)
# middle检查: 进入middle的bounce_time后在最近bounce_time内middle比例<70%返回fast状态
if self.state.should_check_middle_exit(now) and not self.history.last_end:
# 计算最近bounce_time内的middle比例
recent_records = self.history.get_recent_records(
self.state.bounce_time, now
)
if recent_records:
trans_ratio = self.history.get_state_ratio("transport", recent_records)
if trans_ratio > 0.4:
self.process_left(now)
self.state.exit_middle_check()
self.control_logger.info(
f"middle比例{trans_ratio:.2%}<60%退出middle检查返回fast模式"
)
if self.state.mode == State.Mode.ABOUT and self.state.about_check:
h = self.history.get_recent_records(self.about_time / 3, now)
ratio = self.history.get_state_ratio("transport", h)
self.history.about_history.append(ratio == 1)
while len(self.history.about_history) > 5:
self.history.about_history.pop(0)
if len(self.history.about_history) == 5 and self.history.last_end < 2:
rate = sum(self.history.about_history) / len(self.history.about_history)
if rate > 0.8:
self.state.exit_about()
self.control_logger.info(
f"about比例{ratio:.2%}<80%退出about检查返回middle模式"
)
ret = self.end_check()
if ret == "colored":
return ret
if not self.history.last_end:
self.history.last_end = max(self.history.last_end, ret)
self.state.about_check = False
# 显示状态信息
self._display_status(im, ret, rate, val)
return ret
def end_check(self, dep=0):
"""终点检测,通过对图像直方图峰值分析,确认是否全局变色。"""
if not self.running:
return "colored"
if dep == 1:
suc, im = self.cap.read()
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
# 降低饱和度通道 (S通道是索引1)
hsv[:, :, 1] = hsv[:, :, 1] * 0.8
# 转换回BGR颜色空间
result = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
name = f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
cv2.imwrite(name, result)
self.colored_im = name
# 维护一个递归检测重复4次后认定为终点。
if dep > 3:
self.colored_volume = self.total_volume
self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml")
self.running = False
self.ch340.stop()
return "colored"
suc, im = self.cap.read()
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
s = hsv[:, :, 0]
s = s[s > 0]
hist = cv2.calcHist([s], [0], None, [256], [0, 256])
hist = hist.flatten() # 转换为一维数组
# 峰值检测 - 找到直方图中的峰值
peaks, properties = find_peaks(
hist,
height=np.max(hist) * 0.2, # 峰值高度至少是最大值的10%
distance=10, # 峰值之间的最小距离
prominence=np.max(hist) * 0.01,
) # 峰值的突出度
if np.any(peaks > 130):
if dep == 0:
self.process_left(time.time())
self.history.end_history.append(True)
self.control_logger.info(f"检测到colored状态end_check at {dep}")
bgn = time.time()
while time.time() - bgn < 2:
suc, im = self.cap.read()
if hasattr(self, "capFile") and self.capFile.isOpened():
self.capFile.write(im)
cv2.putText(
im,
"ENDCHK",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 255, 0),
2,
)
cv2.imshow("Frame", im)
return self.end_check(dep + 1)
# time.sleep(2)
else:
self.colored_im = None
return dep
def _display_status(self, im, detection_result, rate, volume):
"""显示状态信息到图像上"""
mode_color = {
State.Mode.FAST: (255, 255, 255),
State.Mode.SLOW: (10, 215, 255),
State.Mode.ABOUT: (228, 116, 167),
}
status_text = (
f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml"
)
status_text += self.state.get_status_text()
cv2.putText(
im,
status_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
mode_color[self.state.mode],
2,
)
cv2.imshow("Frame", im)
cv2.waitKey(1)
def predictor(self, im):
"""主预测函数分析图像并返回当前状态。当前colored不使用这个函数。"""
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
s = hsv[:, :, 1]
mask = s > self.sensity
cv2.imshow("mask", im * mask[:, :, np.newaxis])
tot = mask.shape[0] * mask.shape[1]
val = np.sum(mask)
rate = val / tot
if self.history.base is not None:
base = self.history.base
thr = (min(0.05, base * 5), min(0.2, base * 13), 0.5)
# thr = (base*5, base*13, 0.5)
if rate < thr[0]:
return "transport", rate
elif rate < thr[1]:
return "middle", rate
elif rate < thr[2]:
return "about", rate
else:
return "colored", rate
else:
return "transport", rate
def __del__(self):
self.cap.release()
if hasattr(self, "capFile") and self.capFile.isOpened():
self.capFile.release()
cv2.destroyAllWindows()
def format_date_time(self, timestamp):
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
def run(
self,
quick_speed=0.2,
slow_speed=0.05,
end_speed=0.02,
mid_time=0.5,
about_time=1,
cap_dir="Videos",
):
self.running = True
self.start_time = time.time()
self.speeds: list[float] = [quick_speed, slow_speed, end_speed]
self.need_check = False
self.edited = False
self.volume_list = []
self.color_list = []
if cap_dir is not None:
vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mkv"
else:
vid_name = "Disabled"
# 记录实验开始
experiment_params = {
"视频源索引 ": self.videoSourceIndex,
"防抖时间 ": self.state.bounce_time,
"快速模式速度": f"{quick_speed} ml/次",
"慢速模式速度": f"{slow_speed} ml/次",
"录制视频 ": vid_name,
}
self.system_logger.info("实验参数:")
for key, value in experiment_params.items():
self.system_logger.info(f" {key}: {value}")
fps = int(self.cap.get(cv2.CAP_PROP_FPS)) or 30
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 初始化视频录制器
if vid_name != "Disabled":
fourcc = cv2.VideoWriter.fourcc(*"x264") # 使用更兼容的编码器
self.capFile = cv2.VideoWriter(vid_name, fourcc, fps, (width, height))
if not self.capFile.isOpened():
self.system_logger.error(f"无法打开视频录制器: {vid_name}")
del self.capFile
else:
self.system_logger.info(f"视频录制已初始化")
self.ch340_init()
cnt = 0
self._start_time = time.time()
self.about_time = about_time
while self.running:
if not self.ch340.running:
if 12 * cnt - self.total_volume < 0.5:
self.ch340_pull()
cnt += 1
should_push = False
if self.state.is_fast_mode():
should_push = True
elif (
self.state.is_slow_mode()
and time.time() - self.ch340.start > mid_time
):
should_push = True
elif (
self.state.is_about_mode()
and time.time() - self.ch340.start > about_time
):
if not self.state.about_first_flag:
self.state.about_check = True
else:
self.state.about_first_flag = False
should_push = True
if should_push:
speed = self.speeds[self.state.mode.value]
self.volume_logger.info(
f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次"
)
self.ch340_push(speed)
self.total_volume += speed
self.volume_list.append(round(self.total_volume, 2))
try:
self.color_list.append(self.state.mode.name)
except Exception as e:
self.control_logger.error(f"报表失败: {e}")
if self._pred() is None:
self.control_logger.error("预测失败,跳过当前帧")
continue
# 释放视频录制器
if hasattr(self, "capFile") and self.capFile.isOpened():
self.capFile.release()
self.system_logger.info(f"视频录制完成: {vid_name}")
# 实验结束,记录结果
experiment_results = {
"总体积 ": f"{self.total_volume:.2f} ml",
"终点体积": f"{self.colored_volume:.2f} ml",
"理论体积": f"{self.colored_volume:.2f} ml",
"实验时长": f"{time.time() - self.start_time:.2f}",
"滴定速率": f"{self.total_volume / (time.time() - self.start_time):.2f} mL/s",
}
upload_data = {
"start_time": self.format_date_time(self.start_time),
"end_time": self.format_date_time(time.time()),
"volume_record": f"{self.volume_list}",
"voltage_record": f"{[]}",
"color_record": f"{self.color_list}",
"final_volume": f"{self.colored_volume}",
}
self.system_logger.info("实验结果:")
for key, value in experiment_results.items():
self.system_logger.info(f" {key}: {value}")
return upload_data, self.colored_im
if __name__ == "__main__":
token = login_to_platform("13504022184", "password")
# 创建MAT类的实例并运行
mat = MAT(videoSourceIndex=1, bounce_time=2, sensity=32)
mat.state.mode = State.Mode.FAST
final_data, finish_picture = mat.run(
slow_speed=0.05,
quick_speed=1.0,
about_time=3,
# cap_dir=None
)
with open("log.json", "w") as f:
import json
json.dump(final_data, f, indent=4)
send_data_to_platform(token, final_data, finish_picture)