Files
ai-titration/main.py
flt6 a5149bc21e rename and minimal requirements
Former-commit-id: 1faf0f8af9d143a3031da4bd117bc420ff6c64a8
2025-05-26 22:37:38 +08:00

206 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import time
from datetime import datetime
import numpy as np
import ch340
import atexit
class MAT:
def __init__(self, videoSourceIndex=0, bounce_time=1):
print('Initializing MAT...')
self.videoSourceIndex = videoSourceIndex
self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW)
self.ch340 = ch340.CH340()
self.bounce_time = bounce_time
self.total_volume = 0
self.start_time = time.time()
self.middle_time = time.time()
# 将开始时间转化为年月日时分秒的格式,后续文件命名都已此命名
self.formatted_time = datetime.fromtimestamp(self.start_time).strftime('%Y%m%d_%H%M%S')
atexit.register(self.ch340.stop)
print("实验开始于", self.formatted_time)
self.history = [] # 滑动窗口历史记录
self.colored_volume = None # 首次colored体积
self.colored_time = None # 首次colored时间
def ch340_pull(self):
print("开始抽取12ml")
self.ch340.max_speed()
self.ch340.pull(vol=12)
print('完成抽取')
def ch340_init(self):
self.ch340.push(speed=1,t=1)
self.ch340.pull(speed=1.2,vol=3)
print('CH340 INITED')
def ch340_push(self, speed=0.1):
self.ch340.push_async(speed=speed, t=1)
def process_left(self,now:float, velue_only=False):
st = self.ch340.start
if not velue_only: self.ch340.stop()
assert abs(self.ch340._get_time()-1) < 1e3 # run time should be 1s
r = now-st
ret = self.total_volume - (1-r)*self.speeds[self.typ] # 减去未完成的体积
if velue_only:return ret
else:self.total_volume = ret
def _pred(self):
"""预测当前图像状态,返回'transport''middle''colored'"""
suc, im = self.cap.read()
if not suc:
print("Failed to capture frame from camera.")
return None
ret,rate = self.predictor(im)
if ret is None:
print("Fallback")
return None
now = time.time()
val = self.process_left(now,velue_only=True)
# 记录历史状态保留bounce_time时间内的记录
self.history.append((now, ret, val))
while self.history and self.history[0][0] < now - self.bounce_time:
self.history.pop(0)
if not self.history:
print("Warning: No history available.")
return ret
# 处理检测结果
if ret == "middle":
# 检测到middle立即切换到慢速模式
if self.typ == 0:
self.process_left(now)
self.typ = 1
self.middle_time = now
print(f"检测到middle切换到慢速模式当前体积: {val}")
elif ret == "colored":
# 检测到colored时记录当前体积
if self.colored_volume is None:
self.colored_volume = val
self.colored_time = now
print(f"检测到colored记录体积: {self.colored_volume}")
# 检查是否要停止
if now - self.colored_time > self.bounce_time:
colored_count = sum(1 for _, state, _ in self.history if state == "colored")
if colored_count / len(self.history) > 0.9:
print(f"确认终点,最终体积: {self.colored_volume:0.2f} ml")
self.running = False
self.ch340.stop()
return "colored"
else:
print(f"colored比例小于90%,当前体积: {val}, {colored_count / len(self.history)}")
self.colored_volume = val
self.colored_time = now
else: # ret == "transport"
pass
# 在慢速模式下检查是否要切回快速模式
if self.typ == 1 and now - self.middle_time > self.bounce_time:
non_middle_count = sum(1 for _, state, _ in self.history if state == "transport")
if non_middle_count / len(self.history) > 0.9:
self.typ = 0
self.process_left(now)
# TODO: 滑动到第一个middle状态
self.middle_time = 0
print(f"非middle比例超过90%,切回快速模式,当前体积: {val}")
# 如果已记录colored但在bounce_time内colored比例小于90%,重置
if self.colored_volume is not None and now - self.colored_time > self.bounce_time:
colored_count = sum(1 for _, state, _ in self.history if state == "colored")
if colored_count / len(self.history) < 0.9:
print(f"colored比例小于90%重置colored记录")
self.colored_volume = None
flag = False
for t, state,vol in self.history:
if state == "colored":
if not flag:
flag = True
continue
print(f"滑动窗口到: {vol} at {t}")
self.colored_volume = vol
self.colored_time = t
break
else:
print("Warning: Unusual condition, colored for over 90%% but last is transport")
print(f"疑似滴定终点: {self.colored_volume}")
cv2.putText(im,
f"State: {ret}, rate: {round(rate,2)}, Vol: {val:.2f} ml, typ: {'slow' if self.typ else 'fast'}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (10,215, 255) if self.typ else (255,255,255), 2)
cv2.imshow("Frame", im)
cv2.waitKey(1)
return ret
def predictor(self,im):
hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV)
s = hsv[:,:,1]
mask = s>60
tot = mask.shape[0]*mask.shape[1]
val = np.sum(mask)
rate = val/tot
if rate < 0.01:
return "transport",rate
elif rate <0.2:
return "middle",rate
else:
return "colored",rate
def __del__(self):
self.cap.release()
cv2.destroyAllWindows()
print("Experiment finished.")
def run(self,quick_speed = 0.2,slow_speed = 0.05):
self.running = True
self.typ = 0
self.last = 0
self.speeds = (quick_speed, slow_speed)
while self.running:
if self.total_volume % 12 == 0:
self.ch340_pull() # 抽取12ml
time.sleep(0.01)
if not self.ch340.running:
print(f"Current Total Volume: {self.total_volume:0.2f} ml")
if self.typ == 0: # 每次加0.2ml
speed = quick_speed
self.ch340_push(speed)
self.total_volume += speed
else:
speed = slow_speed
self.ch340_push(speed) # 每次加0.05ml
self.total_volume += speed
if self._pred() is None:
print("Prediction failed, skipping frame.")
continue
print('----->>Visual Endpoint<<-----')
print(f"Total Volume: {self.total_volume} ml")
print(f"End point volume: {self.colored_volume} ml")
print(f"Theorical fixed volume: {self.colored_volume} ml")
if __name__ == "__main__":
# 创建MAT类的实例并运行
mat = MAT(
videoSourceIndex = 1,
bounce_time=1
)
mat.run(
slow_speed = 0.05,
quick_speed = 0.15,
)