Files
ai-titration/main.py
flt6 c38984e7e7 0529_1
Former-commit-id: 74b47e62e52fe7724cbf096b1f67022110dd86e9
2025-05-29 17:14:55 +08:00

308 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import time
from datetime import datetime
import numpy as np
import ch340
import atexit
import utils
from enum import Enum
class Flags(Enum):
ENDCHK = 0b01
MIDCHK = 0b10
@staticmethod
def un(flag):
return 0b11 ^ flag.value
class MAT:
def __init__(self, videoSourceIndex=0, bounce_time=1,end_bounce_time=5):
# 初始化logging
utils.setup_logging()
self.system_logger = utils.get_system_logger()
self.control_logger = utils.get_control_logger()
self.endpoint_logger = utils.get_endpoint_logger()
self.volume_logger = utils.get_volume_logger()
self.system_logger.info('正在初始化MAT系统...')
self.videoSourceIndex = videoSourceIndex
self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW)
self.ch340 = ch340.CH340()
self.bounce_time = bounce_time
self.end_bounce_time = end_bounce_time
self.total_volume = 0
self.middle_time = 0
self.flags = 0
atexit.register(self.ch340.stop)
self.history = [] # 滑动窗口历史记录
self.colored_volume = None # 首次colored体积
self.colored_time = None # 首次colored时间
self.colored_im = None # 首次colored图像
def ch340_pull(self):
self.control_logger.info("开始抽取12ml")
self.ch340.max_speed()
self.ch340.pull(vol=12)
self.control_logger.info('完成抽取')
def ch340_init(self):
self.ch340.push(speed=1,t=1)
self.ch340.pull(speed=1.2,vol=1)
self.control_logger.info('电极位置已初始化')
def ch340_push(self, speed=0.1):
self.ch340.push_async(speed=speed, t=1)
def process_left(self,now:float, velue_only=False):
st = self.ch340.start
if not velue_only: self.ch340.stop()
assert abs(self.ch340._get_time()-1) < 1e3 # run time should be 1s
r = now-st
ret = self.total_volume - (1-r)*self.speeds[self.typ] # 减去未完成的体积
if velue_only:return ret
else:self.total_volume = ret
def _pred(self):
"""预测当前图像状态,返回'transport''middle''colored'"""
suc, im = self.cap.read()
if not suc:
self.control_logger.error("无法从摄像头捕获帧")
return None
# 录制当前帧到视频文件
if hasattr(self, 'capFile') and self.capFile.isOpened():
self.capFile.write(im)
ret,rate = self.predictor(im)
if ret is None:
return None
now = time.time()
val = self.process_left(now,velue_only=True)
# 记录历史状态保留bounce_time时间内的记录
self.history.append((now, ret, val, im))
while self.history and self.history[0][0] < now - self.end_bounce_time:
self.history.pop(0)
if not self.history:
self.control_logger.error("未预期的没有可用的历史记录")
return ret
# 处理检测结果
if ret == "middle":
# 检测到middle立即切换到慢速模式
if self.typ == 0:
self.process_left(now)
self.typ = 1
self.middle_time = now
self.flags |= Flags.MIDCHK.value
self.control_logger.info(f"检测到middle切换到慢速模式当前体积: {val:.2f} ml")
elif ret == "colored":
# 检测到colored时记录当前体积
if self.colored_volume is None:
self.colored_volume = val
self.colored_time = now
self.colored_im = self.history[-1][3] # 保存当前图像
self.flags |= Flags.ENDCHK.value
self.endpoint_logger.info(f"检测到colored记录体积: {self.colored_volume:.2f} ml")
# 检查是否要停止
if self.colored_time is not None and now - self.colored_time > self.end_bounce_time:
colored_count = sum(1 for _, state, _,_ in self.history if state == "colored")
if colored_count / len(self.history) > 0.9:
self.endpoint_logger.info(f"确认终点,最终体积: {self.colored_volume:.2f} ml")
self.running = False
self.ch340.stop()
cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im)
return "colored"
else:
self.flags &= Flags.un(Flags.ENDCHK)
self.endpoint_logger.warning(f"colored比例小于90%,当前体积: {val:.2f} ml, 比例: {colored_count / len(self.history):.2%}")
self.colored_volume = val
self.colored_time = now
else: # ret == "transport"
pass
# 在慢速模式下检查是否要切回快速模式
if self.typ == 1 and now - self.middle_time > self.bounce_time and not self.flags & Flags.ENDCHK.value:
non_middle_count = sum(1 for _, state, _,_ in self.history if state == "transport")
if non_middle_count / len(self.history) > 0.8:
self.typ = 0
self.process_left(now)
# TODO: 滑动到第一个middle状态
self.middle_time = 0
self.control_logger.info(f"非middle比例超过80%,切回快速模式,当前体积: {val:.2f} ml")
self.flags &= Flags.un(Flags.MIDCHK)
# 如果已记录colored但在bounce_time内colored比例小于90%,重置
if self.colored_volume is not None and self.colored_time is not None and now - self.colored_time > self.end_bounce_time:
colored_count = sum(1 for _, state, _,_ in self.history if state == "colored")
if colored_count / len(self.history) < 0.9:
self.endpoint_logger.warning(f"colored比例小于90%重置colored记录")
self.colored_volume = None
flag = False
for t, state,vol,_ in self.history:
if state == "colored":
if not flag:
flag = True
continue
self.endpoint_logger.debug(f"滑动窗口到: {vol:.2f} ml at {t}")
self.colored_volume = vol
self.colored_time = t
break
if self.colored_volume is None:
self.flags &= Flags.un(Flags.ENDCHK)
self.typ = 0
else:
self.endpoint_logger.warning("异常情况colored超过90%但最后状态是transport")
self.endpoint_logger.info(f"疑似滴定终点: {self.colored_volume:.2f} ml")
t = f"Stat: {ret}, rate: {round(rate,2)}, Vol: {val:.2f} ml"
if self.middle_time > 0 and (now - self.middle_time)<self.bounce_time:
self.flags |= Flags.MIDCHK.value
t += f", MIDCHK"
if self.colored_time is not None and (now - self.colored_time) < self.end_bounce_time:
self.flags |= Flags.ENDCHK.value
t += f", ENDCHK"
cv2.putText(im,
t,
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (10,215, 255) if self.typ else (255,255,255), 2)
cv2.imshow("Frame", im)
cv2.waitKey(1)
return ret
def predictor(self,im):
hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV)
s = hsv[:,:,1]
mask = s>30
cv2.imshow('mask',im*mask[:,:,np.newaxis])
tot = mask.shape[0]*mask.shape[1]
val = np.sum(mask)
rate = val/tot
if rate < 0.02:
return "transport",rate
elif rate <0.2:
return "middle",rate
elif rate < 0.4:
return "about",rate
else:
return "colored",rate
def __del__(self):
self.cap.release()
if hasattr(self, 'capFile') and self.capFile.isOpened():
self.capFile.release()
cv2.destroyAllWindows()
def run(self,quick_speed = 0.2,slow_speed = 0.05,end_speed = 0.02,mid_time = 0.5,end_time=1, cap_dir="Videos"):
self.running = True
self.typ = 0
self.start_time = time.time()
self.speeds = (quick_speed, slow_speed,end_speed)
if cap_dir is not None:
vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
else:
vid_name = "Disabled"
# 记录实验开始
experiment_params = {
"视频源索引 ": self.videoSourceIndex,
"防抖时间 ": self.bounce_time,
"快速模式速度": f"{quick_speed} ml/次",
"慢速模式速度": f"{slow_speed} ml/次",
"录制视频 ": vid_name
}
self.system_logger.info("实验参数:")
for key, value in experiment_params.items():
self.system_logger.info(f" {key}: {value}")
speed = self.speeds[self.typ]
# 获取摄像头的帧率和分辨率
fps = int(self.cap.get(cv2.CAP_PROP_FPS)) or 30
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 初始化视频录制器
if vid_name != "Disabled":
fourcc = cv2.VideoWriter_fourcc(*'avc1')
self.capFile = cv2.VideoWriter(vid_name, fourcc, fps, (width, height))
if not self.capFile.isOpened():
self.system_logger.error(f"无法打开视频录制器: {vid_name}")
del self.capFile
else:
self.system_logger.info(f"视频录制已初始化")
self.ch340_init()
cnt=0
while self.running:
match (self.flags):
case 0:
self.typ = 0
case Flags.MIDCHK.value:
self.typ = 1
case Flags.ENDCHK.value:
self.typ = 2
if self.flags&Flags.ENDCHK.value and self.typ != 2:
# self.system_logger.warning("Colored状态但未进入慢速")
self.typ = 2
# if self.flags == Flags.MIDCHK.value and self.typ != 1:
# self.system_logger.warning("Middle状态但未进入慢速")
# self.typ = 1
if not self.ch340.running:
if 12*cnt - self.total_volume<0.5:
self.ch340_pull() # 抽取12ml
cnt+=1
time.sleep(0.01)
flag = False
flag |= self.typ == 0
flag |= self.typ == 1 and time.time()-self.ch340.start > mid_time
flag |= self.typ == 2 and time.time()-self.ch340.start > end_time
if flag:
self.volume_logger.info(f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次")
speed = self.speeds[self.typ]
self.ch340_push(speed)
self.total_volume += speed
if self._pred() is None:
self.control_logger.error("预测失败,跳过当前帧")
continue
# 释放视频录制器
if hasattr(self, 'capFile') and self.capFile.isOpened():
self.capFile.release()
self.system_logger.info(f"视频录制完成: {vid_name}")
# 实验结束,记录结果
experiment_results = {
"总体积 ": f"{self.total_volume:.2f} ml",
"终点体积": f"{self.colored_volume:.2f} ml",
"理论体积": f"{self.colored_volume:.2f} ml",
"实验时长": f"{time.time() - self.start_time:.2f}"
}
self.system_logger.info("实验结果:")
for key, value in experiment_results.items():
self.system_logger.info(f" {key}: {value}")
if __name__ == "__main__":
# 创建MAT类的实例并运行
mat = MAT(
videoSourceIndex = 1,
bounce_time=3,
end_bounce_time=20
)
mat.run(
slow_speed = 0.05,
quick_speed = 0.15,
end_time= 2
# cap_dir=None
)