clean code and use ch340
Former-commit-id: 3e401751a24183eb1fa03df0a835624ed6ca6f9d
This commit is contained in:
176
ch340.py
Normal file
176
ch340.py
Normal file
@ -0,0 +1,176 @@
|
||||
|
||||
import serial.tools.list_ports
|
||||
import time
|
||||
|
||||
OFFLINE_DEBUG = True
|
||||
|
||||
def _list_ch340_ports():
|
||||
ports = serial.tools.list_ports.comports()
|
||||
ch340_ports_list = []
|
||||
for port in ports:
|
||||
if 'CH340' in port.description or 'CH340' in port.device:
|
||||
ch340_ports_list.append(port.device)
|
||||
print("Found CH340 ports:", port.device)
|
||||
if ch340_ports_list:
|
||||
return ch340_ports_list
|
||||
else:
|
||||
return []
|
||||
|
||||
if OFFLINE_DEBUG:
|
||||
class CH340:
|
||||
def __init__(self):
|
||||
self._speed = 0.0
|
||||
self._time = (0,0,0)
|
||||
|
||||
@property
|
||||
def speed(self):
|
||||
return self._speed
|
||||
|
||||
@speed.setter
|
||||
def speed(self,x):
|
||||
self._speed = x
|
||||
|
||||
@property
|
||||
def time(self):
|
||||
return self._time
|
||||
|
||||
@time.setter
|
||||
def time(self,x):
|
||||
self._time = x
|
||||
|
||||
def _prepare(self,speed,t,vol):
|
||||
if speed is not None:
|
||||
self.speed = speed
|
||||
if t is not None:
|
||||
if isinstance(t,int):
|
||||
self.time = (t//3600,t//60%60,t%60)
|
||||
else:
|
||||
self.time = t
|
||||
if self.speed == 0:
|
||||
raise ValueError("Speed must be set to non-zero.")
|
||||
if vol is not None:
|
||||
if t is not None:
|
||||
raise ValueError("Cannot set both volume and time.")
|
||||
# t =
|
||||
t = vol/self.speed
|
||||
h = int(t//3600)
|
||||
m = int(t//60%60)
|
||||
s = int(t%60)
|
||||
self.time = (h,m,s)
|
||||
|
||||
def max_speed(self):
|
||||
self.speed = 1.26
|
||||
|
||||
def push(self,speed=None,t=None,vol=None):
|
||||
self._prepare(speed,t,vol)
|
||||
time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2])
|
||||
|
||||
def pull(self,speed=None,t=None,vol=None):
|
||||
self._prepare(speed,t,vol)
|
||||
time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2])
|
||||
|
||||
def stop(self):
|
||||
print("Stop")
|
||||
else:
|
||||
class CH340:
|
||||
def __init__(self,idx=0):
|
||||
|
||||
self.port = _list_ch340_ports()[idx] # 串口名
|
||||
self.pump_ser = serial.Serial(self.port, 9600) # 初始化串口
|
||||
self._speed = 0.0
|
||||
self.speed = 0.0
|
||||
self._time = (0,0,0)
|
||||
self.time = (0,0,0)
|
||||
|
||||
@property
|
||||
def speed(self):
|
||||
return self._speed
|
||||
|
||||
@speed.setter
|
||||
def speed(self,x):
|
||||
self._speed = x
|
||||
x*=34
|
||||
if x >= 43:
|
||||
self._speed = 43
|
||||
self._speed = 43/34
|
||||
# x = x*30
|
||||
print(int((x-int(x))*100),int(x),x)
|
||||
self.pump_ser.write(f"q1h{int(x)}d".encode('ascii'))
|
||||
time.sleep(0.01)
|
||||
self.pump_ser.write(f"q2h{int((x-int(x))*100)}d".encode('ascii'))
|
||||
|
||||
def max_speed(self):
|
||||
self.speed = 1.26
|
||||
|
||||
@property
|
||||
def time(self):
|
||||
return self._time
|
||||
|
||||
@time.setter
|
||||
def time(self,x):
|
||||
self._time = x
|
||||
self.pump_ser.write(f"q3h{x[0]}d".encode('ascii'))
|
||||
time.sleep(0.01)
|
||||
self.pump_ser.write(f"q4h{x[1]}d".encode('ascii'))
|
||||
time.sleep(0.01)
|
||||
self.pump_ser.write(f"q5h{x[2]}d".encode('ascii'))
|
||||
time.sleep(0.01)
|
||||
|
||||
def _prepare(self,speed,t,vol):
|
||||
if speed is not None:
|
||||
self.speed = speed
|
||||
if t is not None:
|
||||
if isinstance(t,int):
|
||||
self.time = (t//3600,t//60%60,t%60)
|
||||
else:
|
||||
self.time = t
|
||||
if self.speed == 0:
|
||||
raise ValueError("Speed must be set to non-zero.")
|
||||
if vol is not None:
|
||||
if t is not None:
|
||||
raise ValueError("Cannot set both volume and time.")
|
||||
# t =
|
||||
t = vol/self.speed
|
||||
h = int(t//3600)
|
||||
m = int(t//60%60)
|
||||
s = int(t%60)
|
||||
self.time = (h,m,s)
|
||||
|
||||
def push(self,speed=None,t=None,vol=None):
|
||||
self.push_async(speed,t,vol)
|
||||
time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2])
|
||||
|
||||
def push_async(self,speed=None,t=None,vol=None):
|
||||
self._prepare(speed,t,vol)
|
||||
self.pump_ser.write(b"q6h2d")
|
||||
time.sleep(0.01)
|
||||
|
||||
def pull(self,speed=None,t=None,vol=None):
|
||||
self.pull_async(speed,t,vol)
|
||||
time.sleep(self.time[0]*3600 + self.time[1]*60 + self.time[2])
|
||||
|
||||
def pull_async(self,speed=None,t=None,vol=None):
|
||||
print(1)
|
||||
print(t,speed,vol)
|
||||
self._prepare(speed,t,vol)
|
||||
self.pump_ser.write(b"q6h3d")
|
||||
print(self.time,self.speed)
|
||||
time.sleep(0.01)
|
||||
|
||||
def stop(self):
|
||||
self.pump_ser.write(b"q6h6d")
|
||||
time.sleep(0.01)
|
||||
|
||||
def __del__(self):
|
||||
if self.pump_ser.is_open:
|
||||
self.pump_ser.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
ch340 = CH340()
|
||||
# ch340.speed = 3.75
|
||||
# ch340.time = (0,0,10)
|
||||
# ch340.push(speed=1,t=1)
|
||||
# ch340.pull(speed=0.8,t=1)
|
||||
# ch340.pull(speed=1.2,vol=3)
|
||||
|
||||
ch340.push(speed=0.01,vol=0.02)
|
Reference in New Issue
Block a user