Former-commit-id: 263f9973de5beda63d9df86abae5f08292466d42
This commit is contained in:
2025-06-03 12:24:32 +08:00
parent 5795b7c2da
commit 6cab5e12f4
2 changed files with 577 additions and 1 deletions

4
.gitignore vendored
View File

@ -7,4 +7,6 @@ Input
*.jpg
*.png
logs
Videos
Videos
*.build
*.dist

574
ch340_gui.py Normal file
View File

@ -0,0 +1,574 @@
import sys
import time
from PySide6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout,
QHBoxLayout, QGridLayout, QLabel, QDoubleSpinBox,
QSpinBox, QPushButton, QGroupBox, QRadioButton,
QButtonGroup, QProgressBar, QTextEdit, QComboBox,
QMessageBox)
from PySide6.QtCore import QTimer, Signal, QThread
from ch340 import CH340, _list_ch340_ports,OFFLINE_DEBUG
class PumpWorker(QThread):
"""用于执行泵操作的工作线程"""
finished = Signal()
error = Signal(str)
progress = Signal(int)
def __init__(self, pump, operation, speed=None, time_val=None, volume=None):
super().__init__()
self.pump = pump
self.operation = operation
self.speed = speed
self.time_val = time_val
self.volume = volume
def run(self):
try:
if self.operation == "push":
self.pump.push(speed=self.speed, t=self.time_val, vol=self.volume)
elif self.operation == "pull":
self.pump.pull(speed=self.speed, t=self.time_val, vol=self.volume)
elif self.operation == "push_async":
self.pump.push_async(speed=self.speed, t=self.time_val, vol=self.volume)
elif self.operation == "pull_async":
self.pump.pull_async(speed=self.speed, t=self.time_val, vol=self.volume)
self.finished.emit()
except Exception as e:
self.error.emit(str(e))
class CH340GUI(QMainWindow):
def __init__(self):
super().__init__()
self.pump = None
self.worker = None
self.status_timer = QTimer()
self.status_timer.timeout.connect(self.update_status)
self.init_ui()
self.init_pump()
if OFFLINE_DEBUG:
QMessageBox.warning(self, "警告", "处于调试模式,任何操作将无实际效果!")
def init_ui(self):
self.setWindowTitle("CH340 可视化控制 by 樊乐天")
self.setGeometry(100, 100, 400, 600)
# 设置样式
self.setStyleSheet("""
QMainWindow {
background-color: #f0f0f0;
}
QGroupBox {
font-weight: bold;
border: 2px solid #cccccc;
border-radius: 5px;
margin-top: 1ex;
padding-top: 10px;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 10px;
padding: 0 5px 0 5px;
}
QPushButton {
background-color: #4CAF50;
border: none;
color: white;
padding: 8px 16px;
text-align: center;
font-size: 14px;
border-radius: 4px;
min-height: 30px;
}
QPushButton:hover {
background-color: #45a049;
}
QPushButton:pressed {
background-color: #3d8b40;
}
QPushButton:disabled {
background-color: #cccccc;
color: #666666;
}
.stop-button {
background-color: #f44336;
}
.stop-button:hover {
background-color: #da190b;
}
.emergency-button {
background-color: #ff9800;
}
.emergency-button:hover {
background-color: #e68900;
}
""")
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# 日志区域
self.create_log_group(main_layout)
# 连接状态区域
self.create_connection_group(main_layout)
# 参数设置区域
self.create_parameter_group(main_layout)
# 控制按钮区域
self.create_control_group(main_layout)
# 状态显示区域
self.create_status_group(main_layout)
def create_connection_group(self, parent_layout):
"""创建连接状态组"""
group = QGroupBox("连接状态")
layout = QHBoxLayout(group)
# COM端口选择
self.port_combo = QComboBox()
self.refresh_ports()
layout.addWidget(QLabel("COM端口:"))
layout.addWidget(self.port_combo)
# 刷新端口按钮
self.refresh_btn = QPushButton("刷新端口")
self.refresh_btn.clicked.connect(self.refresh_ports)
layout.addWidget(self.refresh_btn)
# 连接按钮
self.connect_btn = QPushButton("连接")
self.connect_btn.clicked.connect(self.toggle_connection)
layout.addWidget(self.connect_btn)
# 连接状态指示
self.connection_status = QLabel("未连接")
self.connection_status.setStyleSheet("color: red; font-weight: bold;")
layout.addWidget(self.connection_status)
layout.addStretch()
parent_layout.addWidget(group)
def create_parameter_group(self, parent_layout):
"""创建参数设置组"""
group = QGroupBox("参数设置")
layout = QGridLayout(group)
# 速度设置
layout.addWidget(QLabel("速度 (ml/min):"), 0, 0)
self.speed_spinbox = QDoubleSpinBox()
self.speed_spinbox.setRange(0.01, 1.26)
self.speed_spinbox.setValue(1.0)
self.speed_spinbox.setDecimals(2)
self.speed_spinbox.setSingleStep(0.01)
layout.addWidget(self.speed_spinbox, 0, 1)
# 最大速度按钮
max_speed_btn = QPushButton("设为最大速度")
max_speed_btn.clicked.connect(self.set_max_speed)
layout.addWidget(max_speed_btn, 0, 2)
# 控制模式选择
layout.addWidget(QLabel("控制模式:"), 1, 0)
self.mode_group = QButtonGroup()
self.time_mode_radio = QRadioButton("按时间")
self.volume_mode_radio = QRadioButton("按体积")
self.time_mode_radio.setChecked(True)
self.mode_group.addButton(self.time_mode_radio)
self.mode_group.addButton(self.volume_mode_radio)
mode_layout = QHBoxLayout()
mode_layout.addWidget(self.time_mode_radio)
mode_layout.addWidget(self.volume_mode_radio)
layout.addLayout(mode_layout, 1, 1, 1, 2)
# 时间设置
layout.addWidget(QLabel("时间设置:"), 2, 0)
time_layout = QHBoxLayout()
self.hours_spinbox = QSpinBox()
self.hours_spinbox.setRange(0, 23)
self.hours_spinbox.setSuffix("")
time_layout.addWidget(self.hours_spinbox)
self.minutes_spinbox = QSpinBox()
self.minutes_spinbox.setRange(0, 59)
self.minutes_spinbox.setValue(0)
self.minutes_spinbox.setSuffix("")
time_layout.addWidget(self.minutes_spinbox)
self.seconds_spinbox = QSpinBox()
self.seconds_spinbox.setRange(0, 59)
self.seconds_spinbox.setValue(10)
self.seconds_spinbox.setSuffix("")
time_layout.addWidget(self.seconds_spinbox)
layout.addLayout(time_layout, 2, 1, 1, 2)
# 体积设置
layout.addWidget(QLabel("体积 (ml):"), 3, 0)
self.volume_spinbox = QDoubleSpinBox()
self.volume_spinbox.setRange(0.01, 1000.0)
self.volume_spinbox.setValue(5.0)
self.volume_spinbox.setDecimals(2)
self.volume_spinbox.setSingleStep(0.1)
self.volume_spinbox.setEnabled(False)
layout.addWidget(self.volume_spinbox, 3, 1)
# 连接模式切换信号
self.time_mode_radio.toggled.connect(self.on_mode_changed)
parent_layout.addWidget(group)
def create_control_group(self, parent_layout):
"""创建控制按钮组"""
group = QGroupBox("泵控制")
layout = QGridLayout(group)
# 推送按钮
self.push_btn = QPushButton("滴液 (Push)")
self.push_btn.clicked.connect(lambda: self.start_operation("push"))
layout.addWidget(self.push_btn, 0, 0)
# 拉取按钮
self.pull_btn = QPushButton("取液 (Pull)")
self.pull_btn.clicked.connect(lambda: self.start_operation("pull"))
layout.addWidget(self.pull_btn, 0, 1)
# 异步推送按钮
# self.push_async_btn = QPushButton("异步滴液")
# self.push_async_btn.clicked.connect(lambda: self.start_operation("push_async"))
# layout.addWidget(self.push_async_btn, 1, 0)
# 异步拉取按钮
# self.pull_async_btn = QPushButton("异步取液")
# self.pull_async_btn.clicked.connect(lambda: self.start_operation("pull_async"))
# layout.addWidget(self.pull_async_btn, 1, 1)
# 停止按钮
self.stop_btn = QPushButton("停止")
self.stop_btn.setProperty("class", "stop-button")
self.stop_btn.clicked.connect(self.stop_operation)
self.stop_btn.setStyleSheet("background-color: #f44336;")
layout.addWidget(self.stop_btn, 2, 0, 1, 2)
parent_layout.addWidget(group)
def create_status_group(self, parent_layout):
"""创建状态显示组"""
group = QGroupBox("运行状态")
layout = QGridLayout(group)
# 当前状态
layout.addWidget(QLabel("当前状态:"), 0, 0)
self.status_label = QLabel("未连接")
self.status_label.setStyleSheet("color: red; font-weight: bold;")
layout.addWidget(self.status_label, 0, 1)
# 当前速度
layout.addWidget(QLabel("当前速度:"), 1, 0)
self.current_speed_label = QLabel("0.00 ml/min")
layout.addWidget(self.current_speed_label, 1, 1)
# 运行时间
# layout.addWidget(QLabel("运行时间:"), 2, 0)
# self.running_time_label = QLabel("00:00:00")
# layout.addWidget(self.running_time_label, 2, 1)
# 剩余时间
layout.addWidget(QLabel("剩余时间:"), 3, 0)
self.remaining_time_label = QLabel("00:00:00")
layout.addWidget(self.remaining_time_label, 3, 1)
# 进度条
layout.addWidget(QLabel("进度:"), 4, 0)
self.progress_bar = QProgressBar()
self.progress_bar.setRange(0, 100)
layout.addWidget(self.progress_bar, 4, 1)
parent_layout.addWidget(group)
def create_log_group(self, parent_layout):
"""创建日志显示组"""
group = QGroupBox("操作日志")
layout = QVBoxLayout(group)
self.log_text = QTextEdit()
self.log_text.setMaximumHeight(150)
self.log_text.setReadOnly(True)
layout.addWidget(self.log_text)
# 清除日志按钮
clear_log_btn = QPushButton("清除日志")
clear_log_btn.clicked.connect(self.clear_log)
layout.addWidget(clear_log_btn)
parent_layout.addWidget(group)
def refresh_ports(self):
"""刷新COM端口列表"""
self.port_combo.clear()
ports = _list_ch340_ports()
if ports:
self.port_combo.addItems(ports)
self.log_message(f"发现 {len(ports)} 个CH340端口: {', '.join(ports)}")
else:
self.port_combo.addItem("未找到CH340设备")
self.log_message("未找到CH340设备")
def init_pump(self):
"""初始化泵对象"""
try:
self.pump = CH340()
if OFFLINE_DEBUG:
self.connection_status.setText("已连接 (调试模式)")
self.connection_status.setStyleSheet("color: #F78300; font-weight: bold;")
self.status_label.setText("待机")
self.status_label.setStyleSheet("color: blue; font-weight: bold;")
else:
self.connection_status.setText("已连接")
self.connection_status.setStyleSheet("color: green; font-weight: bold;")
self.status_label.setText("待机")
self.status_label.setStyleSheet("color: blue; font-weight: bold;")
self.connect_btn.setText("断开连接")
self.enable_controls(True)
self.log_message("泵初始化成功"+ " (调试模式)" if OFFLINE_DEBUG else "")
self.status_timer.start(1000) # 每秒更新状态
except Exception as e:
self.log_message(f"泵初始化失败: {str(e)}")
def toggle_connection(self):
"""切换连接状态"""
if self.pump is None:
self.init_pump()
else:
if hasattr(self.pump, 'pump_ser') and self.pump.pump_ser.is_open:
self.pump.pump_ser.close()
self.pump = None
self.connection_status.setText("未连接")
self.connection_status.setStyleSheet("color: red; font-weight: bold;")
self.status_label.setText("未连接")
self.status_label.setStyleSheet("color: red; font-weight: bold;")
self.connect_btn.setText("连接")
self.enable_controls(False)
self.status_timer.stop()
self.log_message("已断开连接")
def enable_controls(self, enabled):
"""启用或禁用控制按钮"""
self.push_btn.setEnabled(enabled)
self.pull_btn.setEnabled(enabled)
# self.push_async_btn.setEnabled(enabled)
# self.pull_async_btn.setEnabled(enabled)
self.stop_btn.setEnabled(enabled)
def on_mode_changed(self):
"""模式切换处理"""
if self.time_mode_radio.isChecked():
self.hours_spinbox.setEnabled(True)
self.minutes_spinbox.setEnabled(True)
self.seconds_spinbox.setEnabled(True)
self.volume_spinbox.setEnabled(False)
else:
self.hours_spinbox.setEnabled(False)
self.minutes_spinbox.setEnabled(False)
self.seconds_spinbox.setEnabled(False)
self.volume_spinbox.setEnabled(True)
def set_max_speed(self):
"""设置最大速度"""
self.speed_spinbox.setValue(1.26)
self.log_message("速度设置为最大值: 1.26 ml/min")
def get_parameters(self):
"""获取当前参数设置"""
speed = self.speed_spinbox.value()
if self.time_mode_radio.isChecked():
time_val = (self.hours_spinbox.value(),
self.minutes_spinbox.value(),
self.seconds_spinbox.value())
volume = None
else:
time_val = None
volume = self.volume_spinbox.value()
return speed, time_val, volume
def start_operation(self, operation):
"""开始泵操作"""
if self.pump is None:
QMessageBox.warning(self, "警告", "请先连接设备")
return
try:
speed, time_val, volume = self.get_parameters()
# 验证参数
if speed <= 0:
QMessageBox.warning(self, "参数错误", "速度必须大于0")
return
if self.time_mode_radio.isChecked():
if all(t == 0 for t in time_val):
QMessageBox.warning(self, "参数错误", "时间不能全为0")
return
else:
if volume <= 0:
QMessageBox.warning(self, "参数错误", "体积必须大于0")
return
# 停止之前的操作
if self.worker and self.worker.isRunning():
self.worker.terminate()
self.worker.wait()
# 创建工作线程
self.worker = PumpWorker(self.pump, operation, speed, time_val, volume)
self.worker.finished.connect(self.on_operation_finished)
self.worker.error.connect(self.on_operation_error)
# 更新界面状态
self.enable_controls(False)
self.stop_btn.setEnabled(True)
operation_names = {
"push": "滴液",
"pull": "取液",
"push_async": "异步滴定",
"pull_async": "异步取液"
}
self.status_label.setText(f"执行中: {operation_names.get(operation, operation)}")
self.status_label.setStyleSheet("color: green; font-weight: bold;")
# 记录日志
if self.time_mode_radio.isChecked():
time_str = f"{time_val[0]:02d}:{time_val[1]:02d}:{time_val[2]:02d}"
self.log_message(f"开始{operation_names.get(operation)}: 速度={speed:.2f}ml/min, 时间={time_str}")
else:
self.log_message(f"开始{operation_names.get(operation)}: 速度={speed:.2f}ml/min, 体积={volume:.2f}ml")
# 启动工作线程
self.worker.start()
except Exception as e:
QMessageBox.critical(self, "错误", f"操作失败: {str(e)}")
self.log_message(f"操作失败: {str(e)}")
def stop_operation(self):
"""停止泵操作"""
if self.pump:
try:
self.pump.stop()
self.log_message("手动停止操作")
if self.worker and self.worker.isRunning():
self.worker.terminate()
self.worker.wait()
self.on_operation_finished()
except Exception as e:
QMessageBox.critical(self, "错误", f"停止操作失败: {str(e)}")
self.log_message(f"停止操作失败: {str(e)}")
def on_operation_finished(self):
"""操作完成处理"""
self.status_label.setText("待机")
self.status_label.setStyleSheet("color: blue; font-weight: bold;")
self.enable_controls(True)
self.progress_bar.setValue(0)
self.log_message("操作完成")
def on_operation_error(self, error_msg):
"""操作错误处理"""
QMessageBox.critical(self, "操作错误", error_msg)
self.log_message(f"操作错误: {error_msg}")
self.on_operation_finished()
def update_status(self):
"""更新状态显示"""
if self.pump:
# 更新当前速度
current_speed = getattr(self.pump, 'speed', 0)
self.current_speed_label.setText(f"{current_speed:.2f} ml/min")
# 更新运行状态
if hasattr(self.pump, 'running') and self.pump.running:
# 计算运行时间和剩余时间
if hasattr(self.pump, 'start') and self.pump.start > 0:
running_time = time.time() - self.pump.start
total_time = self.pump._get_time()
remaining_time = max(0, total_time - running_time)
# 更新时间显示
# self.running_time_label.setText(self.format_time(running_time))
self.remaining_time_label.setText(self.format_time(remaining_time))
# 更新进度条
if total_time > 0:
progress = min(100, int((running_time / total_time) * 100))
self.progress_bar.setValue(progress)
else:
self.progress_bar.setValue(0)
else:
# self.running_time_label.setText("00:00:00")
self.remaining_time_label.setText("00:00:00")
self.progress_bar.setValue(0)
else:
# self.running_time_label.setText("00:00:00")
self.remaining_time_label.setText("00:00:00")
self.progress_bar.setValue(0)
def format_time(self, seconds):
"""格式化时间显示"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
def log_message(self, message):
"""添加日志消息"""
timestamp = time.strftime("%H:%M:%S")
self.log_text.append(f"[{timestamp}] {message}")
def clear_log(self):
"""清除日志"""
self.log_text.clear()
def closeEvent(self, event):
"""关闭事件处理"""
if self.worker and self.worker.isRunning():
self.worker.terminate()
self.worker.wait()
if self.pump and hasattr(self.pump, 'pump_ser'):
try:
self.pump.stop()
if self.pump.pump_ser.is_open:
self.pump.pump_ser.close()
except:
pass
event.accept()
def main():
app = QApplication(sys.argv)
# 设置应用程序图标和名称
app.setApplicationName("CH340 泵控制系统")
app.setApplicationVersion("1.0")
window = CH340GUI()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()