import cv2 import time from datetime import datetime import numpy as np import ch340 from threading import Thread import atexit class MAT: def __init__(self, videoSourceIndex=0, bounce_time=1): print('Initializing MAT...') self.videoSourceIndex = videoSourceIndex self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) self.ch340 = ch340.CH340() self.bounce_time = bounce_time self.total_volume = 0 self.now_volume = 0 self.start_time = time.time() # 将开始时间转化为年月日时分秒的格式,后续文件命名都已此命名 self.formatted_time = datetime.fromtimestamp(self.start_time).strftime('%Y%m%d_%H%M%S') atexit.register(self.ch340.stop) print("实验开始于", self.formatted_time) def ch340_pull(self): self.ch340.max_speed() self.ch340.pull(vol=12) print('完成抽取') def ch340_init(self): self.ch340.push(speed=1,t=1) self.ch340.pull(speed=1.2,vol=3) print('CH340 INITED') def ch340_push(self, speed=0.1): self.ch340.push(speed=speed, t=1) def _pred(self): suc,im = self.cap.read() if not suc: print("Failed to capture frame from camera.") return None ret = self.my_predictor(im) # print(ret) if ret is None: print("Fallback") self.thr = Thread(target=self._pred).start() else: # fps now = time.time() if now - self.last[0] > 1: print("FPS: ",self.last[1]) print(ret) self.last[0] = now self.last[1] = 0 else: self.last[1] += 1 # fps end now = time.time() if ret == "middle": if self.debounce[0] and self.typ == 0: if self.debounce[1] and self.debounce[0]: # print(self.debounce) if now-self.debounce[0][-1] > self.bounce_time: print("Bounce check succeeded, val:",self.debounce[1][0]) else: print("Got middle flag, bounce check start, val:",self.total_volume) self.typ = 1 self.ch340.stop() self.debounce[1].append((time.time(),self.total_volume)) elif ret == "colored": if self.debounce[1]: print("Got stop flag, val:",self.total_volume) self.running = False self.ch340.stop() return else: if self.debounce[0]: if self.debounce[1]: while self.debounce[1] and self.debounce[1][0][0] > now - self.bounce_time: self.debounce[1].pop(0) while self.debounce[0] and self.debounce[0][0] < now - self.bounce_time: self.debounce[0].pop(0) self.debounce[0].append(now) self.thr = Thread(target=self._pred).start() return ret def my_predictor(self,im): hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV) s = hsv[:,:,1] mask = s>60 tot = mask.shape[0]*mask.shape[1] val = np.sum(mask) rate = val/tot if rate < 0.01: return "transport" elif rate <0.2: return "middle" else: return "colored" def __del__(self): self.cap.release() cv2.destroyAllWindows() print("Experiment finished.") def save_img(self): suc,im = self.cap.read() if not suc: print("Failed to capture frame from camera.") return cv2.imshow("new",im) name = f"Imgs/{self.formatted_time}_{self.total_volume}.jpg" if not cv2.imwrite(name,im): print("Failed to save image",name) def run(self,quick_speed = 0.2, mid_speed=0.1,slow_speed = 0.05): self.running = True self.typ = 0 self.last = [time.time(),0] self.debounce = [[],[]] self.thr = Thread(target=self._pred) self.thr.start() while self.running: if self.now_volume <= 0: self.ch340_pull() # 抽取12ml self.now_volume += 12 if self.typ == 0: # 每次加0.2ml speed = quick_speed self.ch340_push(speed) self.total_volume += speed self.now_volume -= speed else: speed = slow_speed self.ch340_push(speed) # 每次加0.05ml self.total_volume += speed self.now_volume -= speed time.sleep(1) self.total_volume = round(self.total_volume, 3) self.save_img() cv2.waitKey(1) print(f"Current Total Volume: {self.total_volume} ml") self.save_img() print('----->>Visual Endpoint<<-----') print(f"Total Volume: {self.total_volume} ml") if __name__ == "__main__": # 创建MAT类的实例并运行 mat = MAT( videoSourceIndex = 1, bounce_time=0.2 ) # exit() mat.run( quick_speed = 0.15, slow_speed = 0.05, )