import torch import cv2 import time import os import serial from datetime import datetime import numpy as np import joblib import Find_COM from threading import Thread import atexit LOCAL_DEBUG = False if LOCAL_DEBUG: print("WARNING: Local debug mode is enabled. Serial communication will be skipped.") time.sleep(2) class MAT: def __init__(self, videoSourceIndex=0, weights_path = "resnet34-1Net.pth", json_path = 'class_indices.json', classes = 2,bounce_time=1): print('实验初始化中') self.data_root = os.getcwd() self.videoSourceIndex = videoSourceIndex # 摄像机编号 self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) # 打开摄像头 self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") if not LOCAL_DEBUG: self.port = Find_COM.list_ch340_ports()[0] # 串口名 self.pump_ser = serial.Serial(self.port, 9600) # 初始化串口 self.classes = classes self.bounce_time = bounce_time # 防抖时间 self.total_volume = 0 # 记录总体积 self.now_volume = 0 # 记录当前注射泵内体积 self.volume_list = [] # 记录体积变化 self.color_list = [] # 记录颜色变化 self.start_time = time.time() # 记录实验开始时间 self.weights_path = os.path.join(self.data_root, weights_path) # 权重文件路径 self.json_path = os.path.join(self.data_root, json_path) # 类别文件路径 # 将开始时间转化为年月日时分秒的格式,后续文件命名都已此命名 self.formatted_time = datetime.fromtimestamp(self.start_time).strftime('%Y%m%d_%H%M%S') self.model = joblib.load("model.pkl") atexit.register(self.start_move_3) print("实验开始于", self.formatted_time) def start_move_1(self): # 抽料程序 if LOCAL_DEBUG:return self.start_move_init() data = b"q1h40d" # *2 self.pump_ser.write(data) time.sleep(0.01) data = b"q2h0d" self.pump_ser.write(data) time.sleep(0.01) data = b"q4h0d" self.pump_ser.write(data) time.sleep(0.01) data = b"q5h9d" self.pump_ser.write(data) time.sleep(0.01) data = b"q6h3d" self.pump_ser.write(data) time.sleep(9) print('完成抽取') def start_move_init(self): # init if LOCAL_DEBUG:return data = b"q1h15d" # *2 self.pump_ser.write(data) time.sleep(0.01) data = b"q2h0d" self.pump_ser.write(data) time.sleep(0.01) data = b"q4h0d" self.pump_ser.write(data) time.sleep(0.01) data = b"q5h2d" self.pump_ser.write(data) time.sleep(0.01) data = b"q6h2d" self.pump_ser.write(data) print("send1") time.sleep(2) data = b"q1h20d" # *2 self.pump_ser.write(data) time.sleep(0.01) data = b"q5h1d" self.pump_ser.write(data) time.sleep(0.1) data = b"q6h3d" self.pump_ser.write(data) print("send2") time.sleep(1) print('INITED') def start_move_2(self, speed=0.1): # 进料程序 if LOCAL_DEBUG: time.sleep(1) return # 计算单次滴定体积并传输至控制器 speed_min = speed * 30 speed_min_int = int(speed_min) speed_min_float = int((speed_min - speed_min_int) * 100) # print(speed_min_int, speed_min_float) data = f"q1h{speed_min_int}d" self.pump_ser.write(data.encode('ascii')) time.sleep(0.01) data = f"q2h{speed_min_float}d" self.pump_ser.write(data.encode('ascii')) time.sleep(0.01) data = b"q4h0d" self.pump_ser.write(data) time.sleep(0.01) data = b"q5h1d" self.pump_ser.write(data) time.sleep(0.01) # 进料 data = b"q6h2d" self.pump_ser.write(data) time.sleep(1) def start_move_3(self): # 进料急停 if LOCAL_DEBUG:return data = b"q6h6d" self.pump_ser.write(data) def _pred(self): suc,im = self.cap.read() if not suc: print("Failed to capture frame from camera.") return None ret = self.my_predictor(im) # print(ret) if ret is None: print("Fallback") self.thr = Thread(target=self._pred).start() else: # fps now = time.time() if now - self.last[0] > 1: print("FPS: ",self.last[1]) print(ret) self.last[0] = now self.last[1] = 0 else: self.last[1] += 1 # fps end now = time.time() if ret == "middle": if self.debounce[0] and self.typ == 0: if self.debounce[1] and self.debounce[0]: # print(self.debounce) if now-self.debounce[0][-1] > self.bounce_time: print("Bounce check succeeded, val:",self.debounce[1][0]) else: print("Got middle flag, bounce check start, val:",self.total_volume) self.typ = 1 self.start_move_3() self.debounce[1].append((time.time(),self.total_volume)) elif ret == self.end_kind: if self.debounce[1]: print("Got stop flag, val:",self.total_volume) self.running = False self.start_move_3() return else: if self.debounce[0]: # print(self.debounce) if self.debounce[1]: # print(self.debounce[1][0][0],now,self.bounce_time) # print(self.debounce[1][0][0] > now - self.bounce_time) while self.debounce[1] and self.debounce[1][0][0] > now - self.bounce_time: self.debounce[1].pop(0) while self.debounce[0] and self.debounce[0][0] < now - self.bounce_time: self.debounce[0].pop(0) self.debounce[0].append(now) self.thr = Thread(target=self._pred).start() return ret,0.9 def my_predictor(self,im): # im = cv2.imread(file) hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV) s = hsv[:,:,1] # print(mask) mask = s>60 tot = mask.shape[0]*mask.shape[1] val = np.sum(mask) rate = val/tot if rate < 0.01: return "transport" elif rate <0.2: return "middle" else: return "colored" def __del__(self): self.pump_ser.close() self.cap.release() cv2.destroyAllWindows() print("Experiment finished.") def save_img(self): suc,im = self.cap.read() if not suc: print("Failed to capture frame from camera.") return cv2.imshow("new",im) name = f"Imgs/{self.formatted_time}_{self.total_volume}.jpg" if not cv2.imwrite(name,im): print("Failed to save image",name) def run(self,quick_speed = 0.2, mid_speed=0.1,slow_speed = 0.05,expect:float|int = 5, end_kind = 'orange'): self.running = True self.typ = 0 self.end_kind = end_kind self.last = [time.time(),0] self.debounce = [[],[]] self.thr = Thread(target=self._pred) self.thr.start() switching_point = expect * 0.9 while self.running: if self.now_volume <= 0: self.start_move_1() # 抽取12ml self.now_volume += 12 if self.typ == 0: # 每次加0.2ml speed = quick_speed self.start_move_2(speed) self.total_volume += speed self.now_volume -= speed else: speed = slow_speed self.start_move_2(speed) # 每次加0.05ml self.total_volume += speed self.now_volume -= speed time.sleep(1) self.total_volume = round(self.total_volume, 3) self.volume_list.append(self.total_volume) self.save_img() cv2.waitKey(1) print(f"Current Total Volume: {self.total_volume} ml") self.save_img() print('----->>Visual Endpoint<<-----') print(f"Total Volume: {self.total_volume} ml") # print(f"Image File: {im_file}") print("Volume List:", self.volume_list) print("Color List:", self.color_list) if __name__ == "__main__": import warnings # 忽略所有警告 warnings.filterwarnings('ignore') # 创建MAT类的实例并运行 mat = MAT( videoSourceIndex = 1, weights_path = "resnet34-1Net.pth", json_path = 'class_indices.json', classes = 2, bounce_time=0.2 ) # exit() mat.run( quick_speed = 0.15, slow_speed = 0.05, expect = 10, end_kind = 'colored', )