Files
ai-titration/ch340.py
flt6 c1f33df66d 1
Former-commit-id: 1b34dc488f5306fcb5d20746c217255bb9ead3c5
2025-06-06 23:02:15 +08:00

206 lines
6.2 KiB
Python

import serial.tools.list_ports
import time
OFFLINE_DEBUG = False
def _list_ch340_ports():
ports = serial.tools.list_ports.comports()
ch340_ports_list = []
for port in ports:
if 'CH340' in port.description or 'CH340' in port.device:
ch340_ports_list.append(port.device)
print("Found CH340 ports:", port.device)
if ch340_ports_list:
return ch340_ports_list
else:
return []
if OFFLINE_DEBUG:
print("WARNING: Running in offline debug mode, no actual hardware interaction.\n"*3)
time.sleep(2)
class CH340:
def __init__(self):
self._speed = 0.0
self._time = (0,0,0)
self.start = 0
@property
def speed(self):
return self._speed
@speed.setter
def speed(self,x):
self._speed = x
@property
def time(self):
return self._time
@time.setter
def time(self,x):
self._time = x
@property
def running(self):
return time.time() - self.start < self._get_time()
def _get_time(self):
return (self.time[0]*3600 + self.time[1]*60 + self.time[2])
def _prepare(self,speed,t,vol):
if speed is not None:
self.speed = speed
if t is not None:
if isinstance(t,int):
self.time = (t//3600,t//60%60,t%60)
else:
self.time = t
if self.speed == 0:
raise ValueError("Speed must be set to non-zero.")
if vol is not None:
if t is not None:
raise ValueError("Cannot set both volume and time.")
# t =
t = vol/self.speed
h = int(t//3600)
m = int(t//60%60)
s = int(t%60)
self.time = (h,m,s)
def max_speed(self):
self.speed = 1.26
def push(self,speed=None,t=None,vol=None):
self._prepare(speed,t,vol)
self.start = time.time()
time.sleep(self._get_time())
def pull(self,speed=None,t=None,vol=None):
self._prepare(speed,t,vol)
self.start = time.time()
time.sleep(self._get_time()/4)
def push_async(self,speed=None,t=None,vol=None):
self._prepare(speed,t,vol)
self.start = time.time()
time.sleep(0.01)
def pull_async(self,speed=None,t=None,vol=None):
self._prepare(speed,t,vol)
self.start = time.time()
time.sleep(0.01)
def stop(self):
self.start = 0
print("Stop")
else:
class CH340:
def __init__(self,idx=0):
self.port = _list_ch340_ports()[idx] # 串口名
self.pump_ser = serial.Serial(self.port, 9600) # 初始化串口
self._speed = 0.0
self.speed = 0.0
self._time = (0,0,0)
self.time = (0,0,0)
self.start = 0
@property
def speed(self):
return self._speed
@speed.setter
def speed(self,x):
self._speed = x
x*=34
if x >= 43:
self._speed = 43
self._speed = 43/34
# x = x*30
# print(int((x-int(x))*100),int(x),x)
self.pump_ser.write(f"q1h{int(x)}d".encode('ascii'))
time.sleep(0.01)
self.pump_ser.write(f"q2h{int((x-int(x))*100)}d".encode('ascii'))
def max_speed(self):
self.speed = 1.26
@property
def time(self):
return self._time
@time.setter
def time(self,x):
self._time = x
self.pump_ser.write(f"q3h{x[0]}d".encode('ascii'))
time.sleep(0.01)
self.pump_ser.write(f"q4h{x[1]}d".encode('ascii'))
time.sleep(0.01)
self.pump_ser.write(f"q5h{x[2]}d".encode('ascii'))
time.sleep(0.01)
@property
def running(self):
return time.time() - self.start < self._get_time()
def _get_time(self):
return self.time[0]*3600 + self.time[1]*60 + self.time[2]
def _prepare(self,speed,t,vol):
if speed is not None:
self.speed = speed
if t is not None:
if isinstance(t,int):
self.time = (t//3600,t//60%60,t%60)
else:
self.time = t
if self.speed == 0:
raise ValueError("Speed must be set to non-zero.")
if vol is not None:
if t is not None:
raise ValueError("Cannot set both volume and time.")
# t =
t = vol/self.speed
h = int(t//3600)
m = int(t//60%60)
s = int(t%60)
self.time = (h,m,s)
def push(self,speed=None,t=None,vol=None):
self.push_async(speed,t,vol)
time.sleep(self._get_time())
def push_async(self,speed=None,t=None,vol=None):
self._prepare(speed,t,vol)
self.pump_ser.write(b"q6h2d")
self.start = time.time()
time.sleep(0.01)
def pull(self,speed=None,t=None,vol=None):
self.pull_async(speed,t,vol)
time.sleep(self._get_time())
def pull_async(self,speed=None,t=None,vol=None):
self._prepare(speed,t,vol)
self.pump_ser.write(b"q6h3d")
self.start = time.time()
time.sleep(0.01)
def stop(self):
self.pump_ser.write(b"q6h6d")
self.start = 0
time.sleep(0.01)
def __del__(self):
if self.pump_ser.is_open:
self.pump_ser.close()
if __name__ == "__main__":
ch340 = CH340()
# ch340.speed = 3.75
# ch340.time = (0,0,10)
# ch340.push(speed=1,t=1)
# ch340.pull(speed=0.8,t=1)
# ch340.pull(speed=1.2,vol=3)
ch340.push(speed=0.01,vol=0.02)