206 lines
6.2 KiB
Python
206 lines
6.2 KiB
Python
|
|
import serial.tools.list_ports
|
|
import time
|
|
|
|
OFFLINE_DEBUG = False
|
|
|
|
def _list_ch340_ports():
|
|
ports = serial.tools.list_ports.comports()
|
|
ch340_ports_list = []
|
|
for port in ports:
|
|
if 'CH340' in port.description or 'CH340' in port.device:
|
|
ch340_ports_list.append(port.device)
|
|
print("Found CH340 ports:", port.device)
|
|
if ch340_ports_list:
|
|
return ch340_ports_list
|
|
else:
|
|
return []
|
|
|
|
if OFFLINE_DEBUG:
|
|
print("WARNING: Running in offline debug mode, no actual hardware interaction.\n"*3)
|
|
time.sleep(2)
|
|
class CH340:
|
|
def __init__(self):
|
|
self._speed = 0.0
|
|
self._time = (0,0,0)
|
|
self.start = 0
|
|
|
|
@property
|
|
def speed(self):
|
|
return self._speed
|
|
|
|
@speed.setter
|
|
def speed(self,x):
|
|
self._speed = x
|
|
|
|
@property
|
|
def time(self):
|
|
return self._time
|
|
|
|
@time.setter
|
|
def time(self,x):
|
|
self._time = x
|
|
|
|
@property
|
|
def running(self):
|
|
return time.time() - self.start < self._get_time()
|
|
|
|
def _get_time(self):
|
|
return (self.time[0]*3600 + self.time[1]*60 + self.time[2])
|
|
|
|
def _prepare(self,speed,t,vol):
|
|
if speed is not None:
|
|
self.speed = speed
|
|
if t is not None:
|
|
if isinstance(t,int):
|
|
self.time = (t//3600,t//60%60,t%60)
|
|
else:
|
|
self.time = t
|
|
if self.speed == 0:
|
|
raise ValueError("Speed must be set to non-zero.")
|
|
if vol is not None:
|
|
if t is not None:
|
|
raise ValueError("Cannot set both volume and time.")
|
|
# t =
|
|
t = vol/self.speed
|
|
h = int(t//3600)
|
|
m = int(t//60%60)
|
|
s = int(t%60)
|
|
self.time = (h,m,s)
|
|
|
|
def max_speed(self):
|
|
self.speed = 1.26
|
|
|
|
def push(self,speed=None,t=None,vol=None):
|
|
self._prepare(speed,t,vol)
|
|
self.start = time.time()
|
|
time.sleep(self._get_time())
|
|
|
|
def pull(self,speed=None,t=None,vol=None):
|
|
self._prepare(speed,t,vol)
|
|
self.start = time.time()
|
|
time.sleep(self._get_time()/4)
|
|
|
|
def push_async(self,speed=None,t=None,vol=None):
|
|
self._prepare(speed,t,vol)
|
|
self.start = time.time()
|
|
time.sleep(0.01)
|
|
|
|
def pull_async(self,speed=None,t=None,vol=None):
|
|
self._prepare(speed,t,vol)
|
|
self.start = time.time()
|
|
time.sleep(0.01)
|
|
|
|
def stop(self):
|
|
self.start = 0
|
|
print("Stop")
|
|
else:
|
|
class CH340:
|
|
def __init__(self,idx=0):
|
|
self.port = _list_ch340_ports()[idx] # 串口名
|
|
self.pump_ser = serial.Serial(self.port, 9600) # 初始化串口
|
|
self._speed = 0.0
|
|
self.speed = 0.0
|
|
self._time = (0,0,0)
|
|
self.time = (0,0,0)
|
|
self.start = 0
|
|
|
|
@property
|
|
def speed(self):
|
|
return self._speed
|
|
|
|
@speed.setter
|
|
def speed(self,x):
|
|
self._speed = x
|
|
x*=34
|
|
if x >= 43:
|
|
self._speed = 43
|
|
self._speed = 43/34
|
|
# x = x*30
|
|
# print(int((x-int(x))*100),int(x),x)
|
|
self.pump_ser.write(f"q1h{int(x)}d".encode('ascii'))
|
|
time.sleep(0.01)
|
|
self.pump_ser.write(f"q2h{int((x-int(x))*100)}d".encode('ascii'))
|
|
|
|
def max_speed(self):
|
|
self.speed = 1.26
|
|
|
|
@property
|
|
def time(self):
|
|
return self._time
|
|
|
|
@time.setter
|
|
def time(self,x):
|
|
self._time = x
|
|
self.pump_ser.write(f"q3h{x[0]}d".encode('ascii'))
|
|
time.sleep(0.01)
|
|
self.pump_ser.write(f"q4h{x[1]}d".encode('ascii'))
|
|
time.sleep(0.01)
|
|
self.pump_ser.write(f"q5h{x[2]}d".encode('ascii'))
|
|
time.sleep(0.01)
|
|
|
|
@property
|
|
def running(self):
|
|
return time.time() - self.start < self._get_time()
|
|
|
|
def _get_time(self):
|
|
return self.time[0]*3600 + self.time[1]*60 + self.time[2]
|
|
|
|
def _prepare(self,speed,t,vol):
|
|
if speed is not None:
|
|
self.speed = speed
|
|
if t is not None:
|
|
if isinstance(t,int):
|
|
self.time = (t//3600,t//60%60,t%60)
|
|
else:
|
|
self.time = t
|
|
if self.speed == 0:
|
|
raise ValueError("Speed must be set to non-zero.")
|
|
if vol is not None:
|
|
if t is not None:
|
|
raise ValueError("Cannot set both volume and time.")
|
|
# t =
|
|
t = vol/self.speed
|
|
h = int(t//3600)
|
|
m = int(t//60%60)
|
|
s = int(t%60)
|
|
self.time = (h,m,s)
|
|
|
|
def push(self,speed=None,t=None,vol=None):
|
|
self.push_async(speed,t,vol)
|
|
time.sleep(self._get_time())
|
|
|
|
def push_async(self,speed=None,t=None,vol=None):
|
|
self._prepare(speed,t,vol)
|
|
self.pump_ser.write(b"q6h2d")
|
|
self.start = time.time()
|
|
time.sleep(0.01)
|
|
|
|
def pull(self,speed=None,t=None,vol=None):
|
|
self.pull_async(speed,t,vol)
|
|
time.sleep(self._get_time())
|
|
|
|
def pull_async(self,speed=None,t=None,vol=None):
|
|
self._prepare(speed,t,vol)
|
|
self.pump_ser.write(b"q6h3d")
|
|
self.start = time.time()
|
|
time.sleep(0.01)
|
|
|
|
def stop(self):
|
|
self.pump_ser.write(b"q6h6d")
|
|
self.start = 0
|
|
time.sleep(0.01)
|
|
|
|
def __del__(self):
|
|
if self.pump_ser.is_open:
|
|
self.pump_ser.close()
|
|
|
|
if __name__ == "__main__":
|
|
ch340 = CH340()
|
|
# ch340.speed = 3.75
|
|
# ch340.time = (0,0,10)
|
|
# ch340.push(speed=1,t=1)
|
|
# ch340.pull(speed=0.8,t=1)
|
|
# ch340.pull(speed=1.2,vol=3)
|
|
|
|
ch340.push(speed=0.01,vol=0.02) |