From 90a54aae45d2251fe6584c5e121ce87b4432807f Mon Sep 17 00:00:00 2001 From: flt6 <1404262047@qq.com> Date: Mon, 26 May 2025 19:43:02 +0800 Subject: [PATCH] clean code and use ch340 Former-commit-id: 3e401751a24183eb1fa03df0a835624ed6ca6f9d --- Find_COM.py | 65 -------------- ch340.py | 176 ++++++++++++++++++++++++++++++++++++++ predictor_Syringe_Pump.py | 91 ++------------------ 3 files changed, 184 insertions(+), 148 deletions(-) delete mode 100644 Find_COM.py create mode 100644 ch340.py diff --git a/Find_COM.py b/Find_COM.py deleted file mode 100644 index 7a6b330..0000000 --- a/Find_COM.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -File: CH340.py -Author: Zinc Zou -Email: zinczou@163.com -Date: 2024/10/11 -Copyright: 慕乐网络科技(大连)有限公司 - www.mools.net - moolsnet@126.com -Description: -""" -import serial.tools.list_ports - - -def list_ch340_ports(): - - ports = serial.tools.list_ports.comports() - ch340_ports_list = [] - # print(ports) - for port in ports: - if 'CH340' in port.description or 'CH340' in port.device: - ch340_ports_list.append(port.device) - print("Found CH340 ports:", port.device) - if ch340_ports_list: - return ch340_ports_list - else: - return [] - - -def list_USB_ports(): - - ports = serial.tools.list_ports.comports() - USB_ports_list = [] - # print(ports) - for port in ports: - if '串行' in port.description or '串行' in port.device: - USB_ports_list.append(port.device) - print("Found USB ports:", port.device) - if USB_ports_list: - return USB_ports_list - else: - return [] - -if __name__ == "__main__": - - - ports = list(serial.tools.list_ports.comports()) - if len(ports) == 0: - print('No port available') - else: - for port in ports: - print(port) - port = list_ch340_ports()[0] # 串口名,根据实际情况修改 - baudrate = 9600 # 波特率,根据实际情况修改 - pump_ser = serial.Serial(port, baudrate) - port_USB = list_USB_ports()[0] # 串口名,根据实际情况修改 - baudrate = 115200 # 波特率,根据实际情况修改 - if port_USB: - USB_ser = serial.Serial(port, baudrate) - # ch340_ports = list_ch340_ports() - # if ch340_ports: - # print("Found CH340 ports:", ch340_ports) - # else: - # print("No CH340 ports found.") \ No newline at end of file diff --git a/ch340.py b/ch340.py new file mode 100644 index 0000000..4b0aa8a --- /dev/null +++ b/ch340.py @@ -0,0 +1,176 @@ + +import serial.tools.list_ports +import time + +OFFLINE_DEBUG = True + +def _list_ch340_ports(): + ports = serial.tools.list_ports.comports() + ch340_ports_list = [] + for port in ports: + if 'CH340' in port.description or 'CH340' in port.device: + ch340_ports_list.append(port.device) + print("Found CH340 ports:", port.device) + if ch340_ports_list: + return ch340_ports_list + else: + return [] + +if OFFLINE_DEBUG: + class CH340: + def __init__(self): + self._speed = 0.0 + self._time = (0,0,0) + + @property + def speed(self): + return self._speed + + @speed.setter + def speed(self,x): + self._speed = x + + @property + def time(self): + return self._time + + @time.setter + def time(self,x): + self._time = x + + def _prepare(self,speed,t,vol): + if speed is not None: + self.speed = speed + if t is not None: + if isinstance(t,int): + self.time = (t//3600,t//60%60,t%60) + else: + self.time = t + if self.speed == 0: + raise ValueError("Speed must be set to non-zero.") + if vol is not None: + if t is not None: + raise ValueError("Cannot set both volume and time.") + # t = + t = vol/self.speed + h = int(t//3600) + m = int(t//60%60) + s = int(t%60) + self.time = (h,m,s) + + def max_speed(self): + self.speed = 1.26 + + def push(self,speed=None,t=None,vol=None): + self._prepare(speed,t,vol) + time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2]) + + def pull(self,speed=None,t=None,vol=None): + self._prepare(speed,t,vol) + time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2]) + + def stop(self): + print("Stop") +else: + class CH340: + def __init__(self,idx=0): + + self.port = _list_ch340_ports()[idx] # 串口名 + self.pump_ser = serial.Serial(self.port, 9600) # 初始化串口 + self._speed = 0.0 + self.speed = 0.0 + self._time = (0,0,0) + self.time = (0,0,0) + + @property + def speed(self): + return self._speed + + @speed.setter + def speed(self,x): + self._speed = x + x*=34 + if x >= 43: + self._speed = 43 + self._speed = 43/34 + # x = x*30 + print(int((x-int(x))*100),int(x),x) + self.pump_ser.write(f"q1h{int(x)}d".encode('ascii')) + time.sleep(0.01) + self.pump_ser.write(f"q2h{int((x-int(x))*100)}d".encode('ascii')) + + def max_speed(self): + self.speed = 1.26 + + @property + def time(self): + return self._time + + @time.setter + def time(self,x): + self._time = x + self.pump_ser.write(f"q3h{x[0]}d".encode('ascii')) + time.sleep(0.01) + self.pump_ser.write(f"q4h{x[1]}d".encode('ascii')) + time.sleep(0.01) + self.pump_ser.write(f"q5h{x[2]}d".encode('ascii')) + time.sleep(0.01) + + def _prepare(self,speed,t,vol): + if speed is not None: + self.speed = speed + if t is not None: + if isinstance(t,int): + self.time = (t//3600,t//60%60,t%60) + else: + self.time = t + if self.speed == 0: + raise ValueError("Speed must be set to non-zero.") + if vol is not None: + if t is not None: + raise ValueError("Cannot set both volume and time.") + # t = + t = vol/self.speed + h = int(t//3600) + m = int(t//60%60) + s = int(t%60) + self.time = (h,m,s) + + def push(self,speed=None,t=None,vol=None): + self.push_async(speed,t,vol) + time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2]) + + def push_async(self,speed=None,t=None,vol=None): + self._prepare(speed,t,vol) + self.pump_ser.write(b"q6h2d") + time.sleep(0.01) + + def pull(self,speed=None,t=None,vol=None): + self.pull_async(speed,t,vol) + time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2]) + + def pull_async(self,speed=None,t=None,vol=None): + print(1) + print(t,speed,vol) + self._prepare(speed,t,vol) + self.pump_ser.write(b"q6h3d") + print(self.time,self.speed) + time.sleep(0.01) + + def stop(self): + self.pump_ser.write(b"q6h6d") + time.sleep(0.01) + + def __del__(self): + if self.pump_ser.is_open: + self.pump_ser.close() + +if __name__ == "__main__": + ch340 = CH340() + # ch340.speed = 3.75 + # ch340.time = (0,0,10) + # ch340.push(speed=1,t=1) + # ch340.pull(speed=0.8,t=1) + # ch340.pull(speed=1.2,vol=3) + + ch340.push(speed=0.01,vol=0.02) \ No newline at end of file diff --git a/predictor_Syringe_Pump.py b/predictor_Syringe_Pump.py index 2f9325e..021713b 100644 --- a/predictor_Syringe_Pump.py +++ b/predictor_Syringe_Pump.py @@ -6,7 +6,7 @@ import serial from datetime import datetime import numpy as np import joblib -import Find_COM +import ch340 from threading import Thread import atexit @@ -23,9 +23,7 @@ class MAT: self.videoSourceIndex = videoSourceIndex # 摄像机编号 self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) # 打开摄像头 self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") - if not LOCAL_DEBUG: - self.port = Find_COM.list_ch340_ports()[0] # 串口名 - self.pump_ser = serial.Serial(self.port, 9600) # 初始化串口 + self.ch340 = ch340.CH340() self.classes = classes self.bounce_time = bounce_time # 防抖时间 self.total_volume = 0 # 记录总体积 @@ -38,90 +36,24 @@ class MAT: # 将开始时间转化为年月日时分秒的格式,后续文件命名都已此命名 self.formatted_time = datetime.fromtimestamp(self.start_time).strftime('%Y%m%d_%H%M%S') - self.model = joblib.load("model.pkl") atexit.register(self.start_move_3) print("实验开始于", self.formatted_time) def start_move_1(self): # 抽料程序 - if LOCAL_DEBUG:return - self.start_move_init() - data = b"q1h40d" # *2 - self.pump_ser.write(data) - time.sleep(0.01) - data = b"q2h0d" - self.pump_ser.write(data) - time.sleep(0.01) - data = b"q4h0d" - self.pump_ser.write(data) - time.sleep(0.01) - data = b"q5h9d" - self.pump_ser.write(data) - time.sleep(0.01) - data = b"q6h3d" - self.pump_ser.write(data) - time.sleep(9) + self.ch340.max_speed() + self.ch340.pull(vol=12) print('完成抽取') def start_move_init(self): # init - if LOCAL_DEBUG:return - data = b"q1h15d" # *2 - self.pump_ser.write(data) - time.sleep(0.01) - data = b"q2h0d" - self.pump_ser.write(data) - time.sleep(0.01) - data = b"q4h0d" - self.pump_ser.write(data) - time.sleep(0.01) - data = b"q5h2d" - self.pump_ser.write(data) - time.sleep(0.01) - data = b"q6h2d" - self.pump_ser.write(data) - print("send1") - time.sleep(2) - data = b"q1h20d" # *2 - self.pump_ser.write(data) - time.sleep(0.01) - data = b"q5h1d" - self.pump_ser.write(data) - time.sleep(0.1) - data = b"q6h3d" - self.pump_ser.write(data) - print("send2") - time.sleep(1) + self.ch340.push(speed=1,t=1) + self.ch340.pull(speed=1.2,vol=3) print('INITED') def start_move_2(self, speed=0.1): # 进料程序 - if LOCAL_DEBUG: - time.sleep(1) - return - # 计算单次滴定体积并传输至控制器 - speed_min = speed * 30 - speed_min_int = int(speed_min) - speed_min_float = int((speed_min - speed_min_int) * 100) - # print(speed_min_int, speed_min_float) - data = f"q1h{speed_min_int}d" - self.pump_ser.write(data.encode('ascii')) - time.sleep(0.01) - data = f"q2h{speed_min_float}d" - self.pump_ser.write(data.encode('ascii')) - time.sleep(0.01) - data = b"q4h0d" - self.pump_ser.write(data) - time.sleep(0.01) - data = b"q5h1d" - self.pump_ser.write(data) - time.sleep(0.01) - # 进料 - data = b"q6h2d" - self.pump_ser.write(data) - time.sleep(1) + self.ch340.push(speed=speed, t=1) def start_move_3(self): # 进料急停 - if LOCAL_DEBUG:return - data = b"q6h6d" - self.pump_ser.write(data) + self.ch340.stop() def _pred(self): suc,im = self.cap.read() @@ -195,7 +127,6 @@ class MAT: return "colored" def __del__(self): - self.pump_ser.close() self.cap.release() cv2.destroyAllWindows() print("Experiment finished.") @@ -218,7 +149,6 @@ class MAT: self.debounce = [[],[]] self.thr = Thread(target=self._pred) self.thr.start() - switching_point = expect * 0.9 while self.running: if self.now_volume <= 0: self.start_move_1() # 抽取12ml @@ -247,16 +177,11 @@ class MAT: self.save_img() print('----->>Visual Endpoint<<-----') print(f"Total Volume: {self.total_volume} ml") - # print(f"Image File: {im_file}") print("Volume List:", self.volume_list) print("Color List:", self.color_list) if __name__ == "__main__": - import warnings - # 忽略所有警告 - warnings.filterwarnings('ignore') - # 创建MAT类的实例并运行 mat = MAT( videoSourceIndex = 1,