import torch from PIL import Image import torchvision.transforms as transforms import cv2 import time import os from model import resnet34 import serial from datetime import datetime import numpy as np import joblib import json import Find_COM from threading import Thread import atexit LOCAL_DEBUG = False if LOCAL_DEBUG: print("WARNING: Local debug mode is enabled. Serial communication will be skipped.") time.sleep(2) class MAT: def __init__(self, videoSourceIndex=0, weights_path = "resnet34-1Net.pth", json_path = 'class_indices.json', classes = 2,bounce_time=1): print('实验初始化中') self.data_root = os.getcwd() self.videoSourceIndex = videoSourceIndex # 摄像机编号 self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) # 打开摄像头 self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") if not LOCAL_DEBUG: self.port = Find_COM.list_ch340_ports()[0] # 串口名 self.pump_ser = serial.Serial(self.port, 9600) # 初始化串口 self.classes = classes self.bounce_time = bounce_time # 防抖时间 self.total_volume = 0 # 记录总体积 self.now_volume = 0 # 记录当前注射泵内体积 self.volume_list = [] # 记录体积变化 self.color_list = [] # 记录颜色变化 self.start_time = time.time() # 记录实验开始时间 self.weights_path = os.path.join(self.data_root, weights_path) # 权重文件路径 self.json_path = os.path.join(self.data_root, json_path) # 类别文件路径 # 将开始时间转化为年月日时分秒的格式,后续文件命名都已此命名 self.formatted_time = datetime.fromtimestamp(self.start_time).strftime('%Y%m%d_%H%M%S') self.model = joblib.load("model.pkl") atexit.register(self.start_move_3) print("实验开始于", self.formatted_time) def start_move_1(self): # 抽料程序 if LOCAL_DEBUG:return self.start_move_init() data = b"q1h40d" # *2 self.pump_ser.write(data) time.sleep(0.01) data = b"q2h0d" self.pump_ser.write(data) time.sleep(0.01) data = b"q4h0d" self.pump_ser.write(data) time.sleep(0.01) data = b"q5h9d" self.pump_ser.write(data) time.sleep(0.01) data = b"q6h3d" self.pump_ser.write(data) time.sleep(9) print('完成抽取') def start_move_init(self): # init if LOCAL_DEBUG:return data = b"q1h15d" # *2 self.pump_ser.write(data) time.sleep(0.01) data = b"q2h0d" self.pump_ser.write(data) time.sleep(0.01) data = b"q4h0d" self.pump_ser.write(data) time.sleep(0.01) data = b"q5h2d" self.pump_ser.write(data) time.sleep(0.01) data = b"q6h2d" self.pump_ser.write(data) print("send1") time.sleep(2) data = b"q1h20d" # *2 self.pump_ser.write(data) time.sleep(0.01) data = b"q5h1d" self.pump_ser.write(data) time.sleep(0.1) data = b"q6h3d" self.pump_ser.write(data) print("send2") time.sleep(1) print('INITED') def start_move_2(self, speed=0.1): # 进料程序 if LOCAL_DEBUG: time.sleep(1) return # 计算单次滴定体积并传输至控制器 speed_min = speed * 30 speed_min_int = int(speed_min) speed_min_float = int((speed_min - speed_min_int) * 100) # print(speed_min_int, speed_min_float) data = f"q1h{speed_min_int}d" self.pump_ser.write(data.encode('ascii')) time.sleep(0.01) data = f"q2h{speed_min_float}d" self.pump_ser.write(data.encode('ascii')) time.sleep(0.01) data = b"q4h0d" self.pump_ser.write(data) time.sleep(0.01) data = b"q5h1d" self.pump_ser.write(data) time.sleep(0.01) # 进料 data = b"q6h2d" self.pump_ser.write(data) time.sleep(1) def start_move_3(self): # 进料急停 if LOCAL_DEBUG:return data = b"q6h6d" self.pump_ser.write(data) def preproc(self, im): try: hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV) mask = hsv[:,:,1] > 150 mask = mask[:,:,np.newaxis] cnt = np.count_nonzero(mask) if cnt == 0: return 0,0,0 hsv*=mask h = round(np.sum(hsv[:,:,0])/cnt) s = round(np.sum(hsv[:,:,1])/cnt) v = round(np.sum(hsv[:,:,2])/cnt) return h,s,v except Exception as e : import traceback traceback.print_exc() return None # name = f"{cl}_{h}_{s}_{v}.jpg" def _pred(self): suc,im = self.cap.read() if not suc: print("Failed to capture frame from camera.") return None ret = self.my_predictor(im) # print(ret) if ret is None: print("Fallback") self.thr = Thread(target=self._pred).start() else: # fps now = time.time() if now - self.last[0] > 1: print("FPS: ",self.last[1]) print(ret) self.last[0] = now self.last[1] = 0 else: self.last[1] += 1 # fps end now = time.time() if ret == self.end_kind: if self.debounce[0]: if self.debounce[1] and self.debounce[0]: # print(self.debounce) if now-self.debounce[0][-1] > self.bounce_time: print("Bounce check succeeded, val:",self.debounce[1][0]) self.running = False self.start_move_3() return else: print("Got stop flag, bounce check start, val:",self.total_volume) self.debounce[1].append((time.time(),self.total_volume)) else: if self.debounce[0]: # print(self.debounce) if self.debounce[1]: # print(self.debounce[1][0][0],now,self.bounce_time) # print(self.debounce[1][0][0] > now - self.bounce_time) while self.debounce[1] and self.debounce[1][0][0] > now - self.bounce_time: self.debounce[1].pop(0) while self.debounce[0] and self.debounce[0][0] < now - self.bounce_time: self.debounce[0].pop(0) self.debounce[0].append(now) self.thr = Thread(target=self._pred).start() return ret,0.9 def my_predictor(self,im): # im = cv2.imread(file) hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV) s = hsv[:,:,1] mask = s>100 # print(mask) tot = mask.shape[0]*mask.shape[1] val = np.sum(mask) # print(val/tot) if val>Visual Endpoint<<-----') print(f"Total Volume: {self.total_volume} ml") # print(f"Image File: {im_file}") print("Volume List:", self.volume_list) print("Color List:", self.color_list) if __name__ == "__main__": import warnings # 忽略所有警告 warnings.filterwarnings('ignore') # 创建MAT类的实例并运行 mat = MAT( videoSourceIndex = 1, weights_path = "resnet34-1Net.pth", json_path = 'class_indices.json', classes = 2, bounce_time=1 ) # exit() mat.run( quick_speed = 0.3, slow_speed = 0.2, expect = 10, end_kind = 'colored', )