diff --git a/ch340.py b/ch340.py index 4b0aa8a..bdeb4ea 100644 --- a/ch340.py +++ b/ch340.py @@ -2,7 +2,7 @@ import serial.tools.list_ports import time -OFFLINE_DEBUG = True +OFFLINE_DEBUG = False def _list_ch340_ports(): ports = serial.tools.list_ports.comports() @@ -17,10 +17,13 @@ def _list_ch340_ports(): return [] if OFFLINE_DEBUG: + print("WARNING: Running in offline debug mode, no actual hardware interaction.\n"*3) + time.sleep(2) class CH340: def __init__(self): self._speed = 0.0 self._time = (0,0,0) + self.start = 0 @property def speed(self): @@ -38,6 +41,13 @@ if OFFLINE_DEBUG: def time(self,x): self._time = x + @property + def running(self): + return time.time() - self.start < self._get_time() + + def _get_time(self): + return (self.time[0]*3600 + self.time[1]*60 + self.time[2])/2 + def _prepare(self,speed,t,vol): if speed is not None: self.speed = speed @@ -63,24 +73,37 @@ if OFFLINE_DEBUG: def push(self,speed=None,t=None,vol=None): self._prepare(speed,t,vol) - time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2]) + self.start = time.time() + time.sleep(self._get_time()) def pull(self,speed=None,t=None,vol=None): self._prepare(speed,t,vol) - time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2]) + self.start = time.time() + time.sleep(self._get_time()) + + def push_async(self,speed=None,t=None,vol=None): + self._prepare(speed,t,vol) + self.start = time.time() + time.sleep(0.01) + + def pull_async(self,speed=None,t=None,vol=None): + self._prepare(speed,t,vol) + self.start = time.time() + time.sleep(0.01) def stop(self): + self.start = 0 print("Stop") else: class CH340: def __init__(self,idx=0): - self.port = _list_ch340_ports()[idx] # 串口名 self.pump_ser = serial.Serial(self.port, 9600) # 初始化串口 self._speed = 0.0 self.speed = 0.0 self._time = (0,0,0) self.time = (0,0,0) + self.start = 0 @property def speed(self): @@ -116,6 +139,13 @@ else: self.pump_ser.write(f"q5h{x[2]}d".encode('ascii')) time.sleep(0.01) + @property + def running(self): + return time.time() - self.start < self._get_time() + + def _get_time(self): + return self.time[0]*3600 + self.time[1]*60 + self.time[2] + def _prepare(self,speed,t,vol): if speed is not None: self.speed = speed @@ -138,27 +168,27 @@ else: def push(self,speed=None,t=None,vol=None): self.push_async(speed,t,vol) - time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2]) + time.sleep(self._get_time()) def push_async(self,speed=None,t=None,vol=None): self._prepare(speed,t,vol) self.pump_ser.write(b"q6h2d") + self.start = time.time() time.sleep(0.01) def pull(self,speed=None,t=None,vol=None): self.pull_async(speed,t,vol) - time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2]) + time.sleep(self._get_time()) def pull_async(self,speed=None,t=None,vol=None): - print(1) - print(t,speed,vol) self._prepare(speed,t,vol) self.pump_ser.write(b"q6h3d") - print(self.time,self.speed) + self.start = time.time() time.sleep(0.01) def stop(self): self.pump_ser.write(b"q6h6d") + self.start = 0 time.sleep(0.01) def __del__(self): diff --git a/predictor_Syringe_Pump.py b/predictor_Syringe_Pump.py index 96f8ea0..d72eb35 100644 --- a/predictor_Syringe_Pump.py +++ b/predictor_Syringe_Pump.py @@ -14,15 +14,18 @@ class MAT: self.ch340 = ch340.CH340() self.bounce_time = bounce_time self.total_volume = 0 - self.now_volume = 0 self.start_time = time.time() # 将开始时间转化为年月日时分秒的格式,后续文件命名都已此命名 self.formatted_time = datetime.fromtimestamp(self.start_time).strftime('%Y%m%d_%H%M%S') atexit.register(self.ch340.stop) print("实验开始于", self.formatted_time) + self.history = [] # 滑动窗口历史记录 + self.colored_volume = None # 首次colored体积 + self.colored_time = None # 首次colored时间 def ch340_pull(self): + print("开始抽取12ml") self.ch340.max_speed() self.ch340.pull(vol=12) print('完成抽取') @@ -33,61 +36,90 @@ class MAT: print('CH340 INITED') def ch340_push(self, speed=0.1): - self.ch340.push(speed=speed, t=1) + self.ch340.push_async(speed=speed, t=1) def _pred(self): - suc,im = self.cap.read() + """预测当前图像状态,返回'transport'、'middle'或'colored'""" + suc, im = self.cap.read() if not suc: print("Failed to capture frame from camera.") return None - ret = self.my_predictor(im) - # print(ret) + ret,rate = self.predictor(im) if ret is None: print("Fallback") - self.thr = Thread(target=self._pred).start() - else: - # fps - now = time.time() - if now - self.last[0] > 1: - print("FPS: ",self.last[1]) - print(ret) - self.last[0] = now - self.last[1] = 0 - else: - self.last[1] += 1 - # fps end - now = time.time() - - if ret == "middle": - if self.debounce[0] and self.typ == 0: - if self.debounce[1] and self.debounce[0]: - # print(self.debounce) - if now-self.debounce[0][-1] > self.bounce_time: - print("Bounce check succeeded, val:",self.debounce[1][0]) - else: - print("Got middle flag, bounce check start, val:",self.total_volume) - self.typ = 1 - self.ch340.stop() - self.debounce[1].append((time.time(),self.total_volume)) - elif ret == "colored": - if self.debounce[1]: - print("Got stop flag, val:",self.total_volume) + return None + + now = time.time() + + # 记录历史状态,保留bounce_time时间内的记录 + self.history.append((now, ret, self.total_volume)) + while self.history and self.history[0][0] < now - self.bounce_time: + self.history.pop(0) + + if not self.history: + print("Warning: No history available.") + return ret + + # 处理检测结果 + if ret == "middle": + # 检测到middle立即切换到慢速模式 + if self.typ == 0: + print(f"检测到middle,切换到慢速模式,当前体积: {self.total_volume}") + self.typ = 1 + + elif ret == "colored": + # 检测到colored时记录当前体积 + if self.colored_volume is None: + self.colored_volume = self.total_volume + self.colored_time = now + print(f"检测到colored,记录体积: {self.colored_volume}") + + # 检查是否要停止 + if now - self.colored_time > self.bounce_time: + colored_count = sum(1 for _, state, _ in self.history if state == "colored") + if colored_count / len(self.history) > 0.9: + print(f"确认终点,最终体积: {self.colored_volume:0.2f} ml") self.running = False self.ch340.stop() - return + return "colored" + else: + print(f"colored比例小于90%,当前体积: {self.total_volume}, {colored_count / len(self.history)}") + self.colored_volume = self.total_volume + self.colored_time = now + else: # ret == "transport" + pass + + # 在慢速模式下检查是否要切回快速模式 + if self.typ == 1: + non_middle_count = sum(1 for _, state, _ in self.history if state == "transport") + if len(self.history) > 3 and non_middle_count / len(self.history) > 0.9: + print(f"非middle比例超过90%,切回快速模式,当前体积: {self.total_volume}") + self.typ = 0 + + # 如果已记录colored但在bounce_time内colored比例小于90%,重置 + if self.colored_volume is not None and now - self.colored_time > self.bounce_time: + colored_count = sum(1 for _, state, _ in self.history if state == "colored") + if colored_count / len(self.history) < 0.9: + print(f"colored比例小于90%,重置colored记录") + self.colored_volume = None + for t, state,vol in self.history: + if state == "colored": + print(f"滑动窗口到: {vol} at {t}") + self.colored_volume = vol + self.colored_time = t + break else: - if self.debounce[0]: - if self.debounce[1]: - while self.debounce[1] and self.debounce[1][0][0] > now - self.bounce_time: - self.debounce[1].pop(0) - while self.debounce[0] and self.debounce[0][0] < now - self.bounce_time: - self.debounce[0].pop(0) - self.debounce[0].append(now) - self.thr = Thread(target=self._pred).start() - return ret + print("Warning: Unusual condition, colored for over 90%% but last is transport") + print(f"疑似滴定终点: {self.colored_volume}") + cv2.putText(im, + f"State: {ret}, rate: {round(rate,2)}, Vol: {self.total_volume:.2f} ml, typ: {'slow' if self.typ else 'fast'}", + (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (10,215, 255) if self.typ else (255,255,255), 2) + cv2.imshow("Frame", im) + cv2.waitKey(1) + return ret - def my_predictor(self,im): + def predictor(self,im): hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV) s = hsv[:,:,1] mask = s>60 @@ -95,59 +127,42 @@ class MAT: val = np.sum(mask) rate = val/tot if rate < 0.01: - return "transport" + return "transport",rate elif rate <0.2: - return "middle" + return "middle",rate else: - return "colored" + return "colored",rate def __del__(self): self.cap.release() cv2.destroyAllWindows() print("Experiment finished.") - def save_img(self): - suc,im = self.cap.read() - if not suc: - print("Failed to capture frame from camera.") - return - cv2.imshow("new",im) - name = f"Imgs/{self.formatted_time}_{self.total_volume}.jpg" - if not cv2.imwrite(name,im): - print("Failed to save image",name) - - def run(self,quick_speed = 0.2, mid_speed=0.1,slow_speed = 0.05): + def run(self,quick_speed = 0.2,slow_speed = 0.05): self.running = True self.typ = 0 - self.last = [time.time(),0] - self.debounce = [[],[]] - self.thr = Thread(target=self._pred) - self.thr.start() + self.last = 0 while self.running: - if self.now_volume <= 0: + if self.total_volume % 12 == 0: self.ch340_pull() # 抽取12ml - self.now_volume += 12 + time.sleep(0.01) - if self.typ == 0: # 每次加0.2ml - speed = quick_speed - self.ch340_push(speed) - self.total_volume += speed - self.now_volume -= speed - else: - speed = slow_speed - self.ch340_push(speed) # 每次加0.05ml - self.total_volume += speed - self.now_volume -= speed - time.sleep(1) - - self.total_volume = round(self.total_volume, 3) + if not self.ch340.running: + print(f"Current Total Volume: {self.total_volume:0.2f} ml") + if self.typ == 0: # 每次加0.2ml + speed = quick_speed + self.ch340_push(speed) + self.total_volume += speed + else: + speed = slow_speed + self.ch340_push(speed) # 每次加0.05ml + self.total_volume += speed + if self._pred() is None: + print("Prediction failed, skipping frame.") + continue - self.save_img() - cv2.waitKey(1) - print(f"Current Total Volume: {self.total_volume} ml") - self.save_img() print('----->>Visual Endpoint<<-----') print(f"Total Volume: {self.total_volume} ml") @@ -159,10 +174,9 @@ if __name__ == "__main__": bounce_time=0.2 ) - # exit() mat.run( - quick_speed = 0.15, - slow_speed = 0.05, + slow_speed = 0.05, + quick_speed = 0.15, )