From e0df489aee0c183175db062b66d2122d0de0930e Mon Sep 17 00:00:00 2001 From: flt6 <1404262047@qq.com> Date: Mon, 7 Jul 2025 18:51:02 +0800 Subject: [PATCH] clean code Former-commit-id: 0c4a66b08f05c881992c7303fdebfd15d65d82df --- main.py | 157 ++++++++++-------------- utils.py | 356 +------------------------------------------------------ 2 files changed, 69 insertions(+), 444 deletions(-) diff --git a/main.py b/main.py index 94b6293..8faa985 100644 --- a/main.py +++ b/main.py @@ -7,51 +7,61 @@ import atexit import utils from utils import State, History,login_to_platform,send_data_to_platform from scipy.signal import find_peaks -import tkinter as tk +from typing import Optional +import logging + class MAT: - def __init__(self, videoSourceIndex=0, bounce_time=2, end_bounce_time=5,k=30): + def __init__(self, videoSourceIndex=0, bounce_time=2,sensity=30): # 初始化logging utils.setup_logging() - self.system_logger = utils.get_system_logger() - self.control_logger = utils.get_control_logger() - self.endpoint_logger = utils.get_endpoint_logger() - self.volume_logger = utils.get_volume_logger() - - self.system_logger.info('正在初始化MAT系统...') + self.system_logger = logging.getLogger("System") + self.control_logger = logging.getLogger("Control") + self.endpoint_logger = logging.getLogger("Endpoint") + self.volume_logger = logging.getLogger("Volume") + + self.system_logger.info('正在初始化MAT系统...') self.videoSourceIndex = videoSourceIndex self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) self.ch340 = ch340.CH340() self.total_volume = 0 - self.k = k + self.sensity = sensity - self.state = State(bounce_time, end_bounce_time) + self.state = State(bounce_time) atexit.register(self.ch340.stop) - self.history = History(max(bounce_time, end_bounce_time)) - self.first_about = 0 + self.history = History(bounce_time) self.colored_volume = None self.colored_time = None self.colored_im = None def ch340_pull(self): + "抽取12ml液体" self.control_logger.info("开始抽取12ml") self.ch340.max_speed() self.ch340.pull(vol=12) self.control_logger.info('完成抽取') def ch340_init(self): + "初始化电机位置,防止过头导致抽取溶液不准。" # self.ch340.push(speed=1,t=1) self.ch340.pull(speed=1.2,vol=1.8) self.control_logger.info('电极位置已初始化') - def ch340_push(self, speed=0.1): + def ch340_push(self, speed:int|float=0.1): + "常规推送,保证推送时间为1s,推送体积=speed" self.ch340.push_async(speed=speed, t=1) - def process_left(self, now: float, value_only=False): - if self.state.mode == State.Mode.CRAZY: - value_only = True + def process_left(self, now: float, value_only:Optional[bool]=False) -> Optional[float]: + ''' + 计算当前时刻剩余体积,并更新到self.total_volume。 + + @param now: 当前时间戳 + @param value_only: 如果为True,则只返回剩余体积,不更新状态 + + @return: 当前剩余体积,仅当value_only为True时返回 + ''' if self.state.mode == State.Mode.ABOUT: return self.total_volume st = self.ch340.start @@ -95,9 +105,6 @@ class MAT: if self.history.is_empty(): self.control_logger.error("未预期的没有可用的历史记录") return ret - if self.state.mode == State.Mode.CRAZY: - self._display_status(im, ret, rate, val) - return ret # === 状态进入逻辑 === @@ -123,16 +130,6 @@ class MAT: self.state.enter_about_state(now) self.control_logger.info(f"检测到about,进入about模式,当前体积: {val:.2f} ml") - # 3. end:返回colored,置end_check标记开始检查 - # elif ret == "colored": - # if not self.state.in_end_check: - # self.state.enter_end_check(now) - # if self.colored_volume is None: - # self.colored_volume = val - # self.colored_time = now - # self.colored_im = im.copy() - # self.endpoint_logger.info(f"检测到colored,开始end检查,记录体积: {val:.2f} ml") - # === 状态检查逻辑 === # middle检查: 进入middle的bounce_time后,在最近bounce_time内middle比例<70%返回fast状态 if self.state.should_check_middle_exit(now) and not self.history.last_end: @@ -145,8 +142,6 @@ class MAT: if trans_ratio > 0.4: self.process_left(now) self.state.exit_middle_check() - # about状态随middle一起退出 - # self.state.exit_about_with_middle() self.control_logger.info(f"middle比例{trans_ratio:.2%}<60%,退出middle检查,返回fast模式") if self.state.mode == State.Mode.ABOUT and self.state.about_check: @@ -175,6 +170,7 @@ class MAT: return ret def end_check(self,dep=0): + '''终点检测,通过对图像直方图峰值分析,确认是否全局变色。''' if not self.running: return "colored" if dep == 1: @@ -189,14 +185,13 @@ class MAT: name = f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" cv2.imwrite(name,result) self.colored_im = name + + # 维护一个递归检测,重复4次后认定为终点。 if dep > 3: - # self.endpoint_logger.info(f"colored比例{.2%}>=70%,确认滴定终点") self.colored_volume = self.total_volume self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml") self.running = False self.ch340.stop() - # if self.colored_im is not None: - # cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im) return "colored" suc,im = self.cap.read() hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV) @@ -235,8 +230,6 @@ class MAT: State.Mode.FAST: (255, 255, 255), State.Mode.SLOW: (10, 215, 255), State.Mode.ABOUT: (228,116,167), - State.Mode.CRAZY: (0, 255, 0), - # State.Mode.END: (0, 0, 255) } status_text = f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml" @@ -248,9 +241,10 @@ class MAT: cv2.waitKey(1) def predictor(self,im): + '''主预测函数,分析图像并返回当前状态。当前,colored不使用这个函数。''' hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV) s = hsv[:,:,1] - mask = s>self.k + mask = s>self.sensity cv2.imshow('mask',im*mask[:,:,np.newaxis]) tot = mask.shape[0]*mask.shape[1] val = np.sum(mask) @@ -283,7 +277,7 @@ class MAT: def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, about_time=1, cap_dir="Videos"): self.running = True self.start_time = time.time() - self.speeds = [quick_speed, slow_speed, end_speed, 1.0] + self.speeds: list[float] = [quick_speed, slow_speed, end_speed] self.need_check = False self.edited = False self.volume_list = [] @@ -324,67 +318,43 @@ class MAT: self._start_time = time.time() self.about_time=about_time - self.crazy = 0 - while self.running: if not self.ch340.running: - if self.state.mode == State.Mode.CRAZY: - match self.crazy: - case 0: - self.control_logger.info("crazy 0") - self.ch340_pull() - self.ch340.speed = 1.00 - self.ch340.push_async(vol=12) - self.crazy = 1 - case 1: - self.control_logger.info("crazy 1") - self.ch340_pull() - self.ch340.push_async(vol=8) - self.total_volume = 20 - cnt = 2 - self.crazy = 2 - self.state.mode = State.Mode.FAST - else: - if 12 * cnt - self.total_volume < 0.5: - self.ch340_pull() - cnt += 1 - time.sleep(0.01) - - # 简化的推送逻辑 - should_push = False - if self.state.is_fast_mode(): - should_push = True - elif self.state.is_slow_mode() and time.time() - self.ch340.start > mid_time: - should_push = True - elif self.state.is_about_mode() and time.time() - self.ch340.start > about_time: - if not self.state.about_first_flag: - self.state.about_check = True - else: - self.state.about_first_flag = False - should_push = True - - if should_push: - speed = self.speeds[self.state.mode.value] - self.volume_logger.info(f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次") - self.ch340_push(speed) - self.total_volume += speed - self.volume_list.append(round(self.total_volume,2)) - try: - self.color_list.append(self.state.mode.name) - except Exception as e: - self.control_logger.error(f"报表失败: {e}") + if 12 * cnt - self.total_volume < 0.5: + self.ch340_pull() + cnt += 1 + + should_push = False + if self.state.is_fast_mode(): + should_push = True + elif self.state.is_slow_mode() and \ + time.time() - self.ch340.start > mid_time: + + should_push = True + elif self.state.is_about_mode() and \ + time.time() - self.ch340.start > about_time: + + if not self.state.about_first_flag: + self.state.about_check = True + else: + self.state.about_first_flag = False + should_push = True + + if should_push: + speed = self.speeds[self.state.mode.value] + self.volume_logger.info(f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次") + self.ch340_push(speed) + self.total_volume += speed + self.volume_list.append(round(self.total_volume,2)) + try: + self.color_list.append(self.state.mode.name) + except Exception as e: + self.control_logger.error(f"报表失败: {e}") - # if (self.state.is_about_mode() and time.time() - self.ch340.start > end_time) or not self.state.is_about_mode(): if self._pred() is None: self.control_logger.error("预测失败,跳过当前帧") continue - # try: - # self.control_logger.info("推送安全体积") - # self.ch340_push(0.15) - # except Exception as e: - # self.control_logger.error(f"推送失败: {e}") - # 释放视频录制器 if hasattr(self, 'capFile') and self.capFile.isOpened(): self.capFile.release() @@ -419,8 +389,7 @@ if __name__ == "__main__": mat = MAT( videoSourceIndex = 1, bounce_time=2, - end_bounce_time=0.01, - k = 32 + sensity = 32 ) mat.state.mode = State.Mode.FAST diff --git a/utils.py b/utils.py index f2af3be..06dd224 100644 --- a/utils.py +++ b/utils.py @@ -3,24 +3,18 @@ import os from datetime import datetime from enum import Enum from dataclasses import dataclass -from typing import List, Tuple, Optional, Any +from typing import List, Optional,Literal import numpy as np import time -import matplotlib.pyplot as plt -from matplotlib.animation import FuncAnimation -import threading -from collections import deque import requests import json -import tkinter as tk -from tkinter import messagebox import base64 @dataclass class HistoryRecord: """历史记录的单个条目""" timestamp: float - state: str # 'transport', 'middle', 'about', 'colored' + state: Literal["transport","middle","about","colored"] rate: float volume: float image: np.ndarray @@ -29,7 +23,7 @@ class HistoryRecord: class History: """滑动窗口历史记录管理类""" - def __init__(self, max_window_size: float = 5.0, base_time = 5.0, display: bool = False): + def __init__(self, max_window_size: float = 5.0, base_time = 5.0): """ 初始化历史记录管理器 @@ -50,17 +44,7 @@ class History: self._base_cnt = 0 self.end_history:list[bool] = [] self.last_end = 0 - self.display = display - self.fig: Any = None - self.ax: Any = None - self.line: Any = None - self.base_line: Any = None # 基准线的引用 - self.animation: Any = None - self._plot_thread: Optional[threading.Thread] = None - self._plot_running = False - if self.display: - self._setup_plot() def add_record(self, timestamp: float, state: str, rate: float, volume: float, image: np.ndarray): """添加新的历史记录""" record = HistoryRecord(timestamp, state, rate, volume, image) @@ -121,185 +105,18 @@ class History: def is_empty(self) -> bool: """检查历史记录是否为空""" return len(self.records) == 0 - - def __len__(self) -> int: - """返回历史记录的数量""" - return len(self.records) - - def clear(self): - """清空所有历史记录""" - self.records.clear() - if self.display: - self._stop_plot() - - def _setup_plot(self): - """设置matplotlib图表""" - plt.ion() # 交互式模式 - self.fig, self.ax = plt.subplots(figsize=(10, 6)) - self.ax.set_title('实时Rate值变化') - self.ax.set_xlabel('时间 (s)') - self.ax.set_ylabel('Rate值') - self.ax.grid(True, alpha=0.3) - - # 初始化空线条 - self.line, = self.ax.plot([], [], 'b-', linewidth=2, label='Rate') - self.base_line = None # 基准线的引用 - self.ax.legend() - - # 启动更新线程 - self._plot_running = True - self._plot_thread = threading.Thread(target=self._update_plot_loop, daemon=True) - self._plot_thread.start() - - def _update_plot_loop(self): - """绘图更新循环""" - while self._plot_running: - try: - self._update_plot() - time.sleep(0.1) # 100ms更新间隔 - except Exception as e: - get_vision_logger().error(f"Plot update error: {e}") - break - def _update_plot(self): - """更新图表数据 - 按照matplotlib推荐的方式重构""" - if not self.records: - return - - start_time = time.time() - - try: - # 获取时间和rate数据 - timestamps = [record.timestamp for record in self.records] - rates = [record.rate for record in self.records] - - if not timestamps: - return - - # 将时间戳转换为相对时间(从第一个记录开始) - start_timestamp = timestamps[0] - relative_times = [(t - start_timestamp) for t in timestamps] - - # 更新主线条数据 - self.line.set_data(relative_times, rates) - - # 处理base线 - self._update_base_line() - - # 自动调整坐标轴范围 - self._update_axis_limits(relative_times, rates) - - # 使用推荐的绘制方式 - self.ax.draw_artist(self.line) - self.fig.canvas.draw() # 使用draw_idle而不是draw - - except Exception as e: - get_vision_logger().error(f"Plot update error: {e}") - finally: - # 性能监控 - elapsed = time.time() - start_time - if elapsed > 0.1: - get_vision_logger().warning(f"Plot update took too long: {elapsed:.3f}s") - - def _update_base_line(self): - """更新或创建基准线""" - if self.base is not None: - if self.base_line is None: - # 首次创建基准线 - self.base_line = self.ax.axhline( - y=self.base, - color='r', - linestyle='--', - alpha=0.7, - label=f'Base Rate: {self.base:.2f}' - ) - # 更新图例 - self.ax.legend() - else: - # 更新现有基准线的位置 - self.base_line.set_ydata([self.base, self.base]) - # 更新标签 - self.base_line.set_label(f'Base Rate: {self.base:.2f}') - self.ax.legend() - elif self.base_line is not None: - # 如果base被重置,移除基准线 - self.base_line.remove() - self.base_line = None - self.ax.legend() - - def _update_axis_limits(self, relative_times, rates): - """智能更新坐标轴范围""" - if not relative_times or not rates: - return - - # X轴范围 - max_time = max(relative_times) - self.ax.set_xlim(0, max_time + max(1, max_time * 0.05)) # 添加5%的边距 - - # Y轴范围计算 - min_rate, max_rate = min(rates), max(rates) - - if self.base is not None: - # 如果有基准线,确保Y轴范围合理 - y_min = 0 - y_max = min(max(self.base * 35, max_rate * 1.2), 100) # 限制最大值为100 - else: - # 没有基准线时的默认范围 - margin = (max_rate - min_rate) * 0.1 if max_rate > min_rate else 1 - y_min = max(0, min_rate - margin) - y_max = max_rate + margin - - self.ax.set_ylim(y_min, y_max) - def _stop_plot(self): - """停止绘图""" - self._plot_running = False - if self._plot_thread and self._plot_thread.is_alive(): - self._plot_thread.join(timeout=1.0) - if self.fig: - plt.close(self.fig) - self.fig = None - self.ax = None - self.line = None - self.base_line = None - - def save_plot(self, filename: Optional[str] = None): - """保存当前图表""" - if not self.display or not self.fig: - get_vision_logger().warning("Plot not initialized, cannot save") - return - - if filename is None: - timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - filename = f"rate_plot_{timestamp}.png" - - try: - self.fig.savefig(filename, dpi=300, bbox_inches='tight') - get_vision_logger().info(f"Plot saved to {filename}") - except Exception as e: - get_vision_logger().error(f"Failed to save plot: {e}") - - def toggle_display(self): - """切换显示状态""" - if self.display: - self._stop_plot() - self.display = False - else: - self.display = True - self._setup_plot() - + class State: """滴定状态管理类""" class Mode(Enum): FAST = 0 # 快速模式 SLOW = 1 # 慢速模式 (middle) ABOUT = 2 # 接近终点模式 - CRAZY = 3 # CRAZY模式 - # END = 3 # 终点模式 - def __init__(self, bounce_time=1, end_bounce_time=5): + def __init__(self, bounce_time=1): self.mode = self.Mode.FAST self.bounce_time = bounce_time - self.end_bounce_time = end_bounce_time # 状态检查标志 self.in_middle_check = False @@ -308,8 +125,7 @@ class State: self.about_first_flag = False # 时间记录 - self.middle_detected_time = None - self.end_detected_time = None + self.middle_detected_time:Optional[float] = None def is_fast_mode(self): return self.mode == self.Mode.FAST @@ -320,9 +136,6 @@ class State: def is_about_mode(self): return self.mode == self.Mode.ABOUT - # def is_end_mode(self): - # return self.mode == self.Mode.END - def enter_middle_state(self, current_time): """进入middle状态 - 立即切换到slow模式并开始检查""" self.mode = self.Mode.SLOW @@ -334,12 +147,6 @@ class State: if self.mode == self.Mode.SLOW: self.mode = self.Mode.ABOUT - def enter_end_check(self, current_time): - """进入end检查状态""" - self.in_end_check = True - self.end_detected_time = current_time - self.mode = self.Mode.ABOUT - def exit_middle_check(self): """退出middle检查状态,返回fast模式""" self.in_middle_check = False @@ -360,25 +167,12 @@ class State: current_time - self.middle_detected_time > self.bounce_time and (self.mode == self.Mode.SLOW)) - def should_check_end_result(self, current_time): - """检查是否应该进行end结果检查""" - return (self.in_end_check and - self.end_detected_time is not None and - current_time - self.end_detected_time > self.end_bounce_time) - - def reset_end_check(self): - """重置end检查状态""" - self.in_end_check = False - self.end_detected_time = None - def get_status_text(self): """获取状态显示文本""" status = [] current_time = time.time() if self.in_middle_check and current_time - self.middle_detected_time > self.bounce_time: status.append("MIDCHK") - if self.in_end_check and current_time - self.end_detected_time > self.end_bounce_time: - status.append("ENDCHK") return ", " + ", ".join(status) if status else "" @@ -433,8 +227,6 @@ def send_data_to_platform(token, data, picture): } url = "https://jingsai.mools.net/api/upload-record" - # 合并 headers 和 data 为一个字典 - datas = {**headers, **data} # 准备 JSON 数据 json_data = json.dumps(data) # 发送 POST 请求 @@ -453,117 +245,6 @@ def send_data_to_platform(token, data, picture): print("错误", f"发送数据时出错:{e}") -class LoginApp: - def __init__(self, root): - self.root = root - self.root.title("Mlabs AI Titration 1.0") - # 设置窗口图标 - self.set_window_icon() - - # 定义变量 - self.username = tk.StringVar() - self.password = tk.StringVar() - self.token = '' - - self.un = '' - self.pw = '' - - # 创建登录界面 - self.create_login_interface() - - # 设置窗口居中 - self.center_window() - - def center_window(self, width=300, height=120): - """将窗口居中""" - # 获取屏幕宽度和高度 - screen_width = self.root.winfo_screenwidth() - screen_height = self.root.winfo_screenheight() - - # 计算窗口位置 - x = (screen_width - width) // 2 - y = (screen_height - height) // 2 - - # 设置窗口大小和位置 - self.root.geometry(f"{width}x{height}+{x}+{y}") - - def set_window_icon(self): - """设置窗口图标""" - try: - # 替换为你的图标文件路径 - icon_path = "logo.ico" # Windows系统使用.ico文件 - self.root.iconbitmap(icon_path) - except Exception as e: - print(f"设置窗口图标时出错:{e}") - - def create_login_interface(self): - # 用户名 - tk.Label(self.root, text="用户名:").grid(row=0, column=0, padx=10, pady=5) - self.username_entry = tk.Entry(self.root, textvariable=self.username, width=30) - self.username_entry.grid(row=0, column=1, padx=10, pady=5) - - # 密码 - tk.Label(self.root, text="密码:").grid(row=1, column=0, padx=10, pady=5) - self.password_entry = tk.Entry(self.root, textvariable=self.password, show="*", width=30) - self.password_entry.grid(row=1, column=1, padx=10, pady=5) - - # 登录按钮 - login_button = tk.Button(self.root, text="登录", command=self.login) - login_button.grid(row=3, column=0, columnspan=2, pady=10) - - # 检查info.json文件是否存在 - self.check_info_file() - self.username_entry.focus_set() - - def check_info_file(self): - login_folder = "login" - info_file = os.path.join(login_folder, "info.json") - - if os.path.exists(info_file): - with open(info_file, "r", encoding="utf-8") as file: - info = json.load(file) - self.username.set(info.get("username", "")) - self.password.set(info.get("password", "")) - else: - print("提示", "未找到本地登录信息,请手动输入登录信息。") - - def login(self): - self.un = self.username.get() - self.pw = self.password.get() - - if not self.un or not self.pw: - print("错误", "用户名、密码不能为空!") - return - - # 这里可以添加登录逻辑,例如发送请求到服务器验证登录信息 - self.token = login_to_platform(self.un, self.pw) - # print(self.token) - # 这里可以添加登录逻辑,例如发送请求到服务器验证登录信息 - - self.save_info_file() - self.root.destroy() - # return username, password - - - def save_info_file(self): - login_folder = "login" - #如果没有login文件夹,则创建一个 - if not os.path.exists(login_folder): - os.makedirs(login_folder) - info_file = os.path.join(login_folder, "info.json") - - if not os.path.exists(login_folder): - os.makedirs(login_folder) - - info = { - "username": self.username.get(), - "password": self.password.get(), - } - - with open(info_file, "w", encoding="utf-8") as file: - json.dump(info, file, ensure_ascii=False, indent=4) - - def setup_logging(log_level=logging.INFO, log_dir="logs"): """ 设置logging配置,创建不同模块的logger @@ -591,28 +272,3 @@ def setup_logging(log_level=logging.INFO, log_dir="logs"): ) return log_file - - -def get_system_logger(): - """系统初始化和控制相关的logger""" - return logging.getLogger("System") - -def get_hardware_logger(): - """硬件控制相关的logger(CH340等)""" - return logging.getLogger("Hardware") - -def get_vision_logger(): - """图像处理和预测相关的logger""" - return logging.getLogger("Vision") - -def get_control_logger(): - """控制逻辑相关的logger""" - return logging.getLogger("Control") - -def get_endpoint_logger(): - """终点检测相关的logger""" - return logging.getLogger("Endpoint") - -def get_volume_logger(): - """体积计算相关的logger""" - return logging.getLogger("Volume")