clean code

Former-commit-id: 0c4a66b08f05c881992c7303fdebfd15d65d82df
This commit is contained in:
2025-07-07 18:51:02 +08:00
parent 27c1b08365
commit e0df489aee
2 changed files with 69 additions and 444 deletions

105
main.py
View File

@ -7,51 +7,61 @@ import atexit
import utils import utils
from utils import State, History,login_to_platform,send_data_to_platform from utils import State, History,login_to_platform,send_data_to_platform
from scipy.signal import find_peaks from scipy.signal import find_peaks
import tkinter as tk from typing import Optional
import logging
class MAT: class MAT:
def __init__(self, videoSourceIndex=0, bounce_time=2, end_bounce_time=5,k=30): def __init__(self, videoSourceIndex=0, bounce_time=2,sensity=30):
# 初始化logging # 初始化logging
utils.setup_logging() utils.setup_logging()
self.system_logger = utils.get_system_logger() self.system_logger = logging.getLogger("System")
self.control_logger = utils.get_control_logger() self.control_logger = logging.getLogger("Control")
self.endpoint_logger = utils.get_endpoint_logger() self.endpoint_logger = logging.getLogger("Endpoint")
self.volume_logger = utils.get_volume_logger() self.volume_logger = logging.getLogger("Volume")
self.system_logger.info('正在初始化MAT系统...') self.system_logger.info('正在初始化MAT系统...')
self.videoSourceIndex = videoSourceIndex self.videoSourceIndex = videoSourceIndex
self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW) self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW)
self.ch340 = ch340.CH340() self.ch340 = ch340.CH340()
self.total_volume = 0 self.total_volume = 0
self.k = k self.sensity = sensity
self.state = State(bounce_time, end_bounce_time) self.state = State(bounce_time)
atexit.register(self.ch340.stop) atexit.register(self.ch340.stop)
self.history = History(max(bounce_time, end_bounce_time)) self.history = History(bounce_time)
self.first_about = 0
self.colored_volume = None self.colored_volume = None
self.colored_time = None self.colored_time = None
self.colored_im = None self.colored_im = None
def ch340_pull(self): def ch340_pull(self):
"抽取12ml液体"
self.control_logger.info("开始抽取12ml") self.control_logger.info("开始抽取12ml")
self.ch340.max_speed() self.ch340.max_speed()
self.ch340.pull(vol=12) self.ch340.pull(vol=12)
self.control_logger.info('完成抽取') self.control_logger.info('完成抽取')
def ch340_init(self): def ch340_init(self):
"初始化电机位置,防止过头导致抽取溶液不准。"
# self.ch340.push(speed=1,t=1) # self.ch340.push(speed=1,t=1)
self.ch340.pull(speed=1.2,vol=1.8) self.ch340.pull(speed=1.2,vol=1.8)
self.control_logger.info('电极位置已初始化') self.control_logger.info('电极位置已初始化')
def ch340_push(self, speed=0.1): def ch340_push(self, speed:int|float=0.1):
"常规推送保证推送时间为1s推送体积=speed"
self.ch340.push_async(speed=speed, t=1) self.ch340.push_async(speed=speed, t=1)
def process_left(self, now: float, value_only=False): def process_left(self, now: float, value_only:Optional[bool]=False) -> Optional[float]:
if self.state.mode == State.Mode.CRAZY: '''
value_only = True 计算当前时刻剩余体积并更新到self.total_volume。
@param now: 当前时间戳
@param value_only: 如果为True则只返回剩余体积不更新状态
@return: 当前剩余体积仅当value_only为True时返回
'''
if self.state.mode == State.Mode.ABOUT: if self.state.mode == State.Mode.ABOUT:
return self.total_volume return self.total_volume
st = self.ch340.start st = self.ch340.start
@ -95,9 +105,6 @@ class MAT:
if self.history.is_empty(): if self.history.is_empty():
self.control_logger.error("未预期的没有可用的历史记录") self.control_logger.error("未预期的没有可用的历史记录")
return ret return ret
if self.state.mode == State.Mode.CRAZY:
self._display_status(im, ret, rate, val)
return ret
# === 状态进入逻辑 === # === 状态进入逻辑 ===
@ -123,16 +130,6 @@ class MAT:
self.state.enter_about_state(now) self.state.enter_about_state(now)
self.control_logger.info(f"检测到about进入about模式当前体积: {val:.2f} ml") self.control_logger.info(f"检测到about进入about模式当前体积: {val:.2f} ml")
# 3. end返回colored置end_check标记开始检查
# elif ret == "colored":
# if not self.state.in_end_check:
# self.state.enter_end_check(now)
# if self.colored_volume is None:
# self.colored_volume = val
# self.colored_time = now
# self.colored_im = im.copy()
# self.endpoint_logger.info(f"检测到colored开始end检查记录体积: {val:.2f} ml")
# === 状态检查逻辑 ===
# middle检查: 进入middle的bounce_time后在最近bounce_time内middle比例<70%返回fast状态 # middle检查: 进入middle的bounce_time后在最近bounce_time内middle比例<70%返回fast状态
if self.state.should_check_middle_exit(now) and not self.history.last_end: if self.state.should_check_middle_exit(now) and not self.history.last_end:
@ -145,8 +142,6 @@ class MAT:
if trans_ratio > 0.4: if trans_ratio > 0.4:
self.process_left(now) self.process_left(now)
self.state.exit_middle_check() self.state.exit_middle_check()
# about状态随middle一起退出
# self.state.exit_about_with_middle()
self.control_logger.info(f"middle比例{trans_ratio:.2%}<60%退出middle检查返回fast模式") self.control_logger.info(f"middle比例{trans_ratio:.2%}<60%退出middle检查返回fast模式")
if self.state.mode == State.Mode.ABOUT and self.state.about_check: if self.state.mode == State.Mode.ABOUT and self.state.about_check:
@ -175,6 +170,7 @@ class MAT:
return ret return ret
def end_check(self,dep=0): def end_check(self,dep=0):
'''终点检测,通过对图像直方图峰值分析,确认是否全局变色。'''
if not self.running: if not self.running:
return "colored" return "colored"
if dep == 1: if dep == 1:
@ -189,14 +185,13 @@ class MAT:
name = f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg" name = f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
cv2.imwrite(name,result) cv2.imwrite(name,result)
self.colored_im = name self.colored_im = name
# 维护一个递归检测重复4次后认定为终点。
if dep > 3: if dep > 3:
# self.endpoint_logger.info(f"colored比例{.2%}>=70%,确认滴定终点")
self.colored_volume = self.total_volume self.colored_volume = self.total_volume
self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml") self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml")
self.running = False self.running = False
self.ch340.stop() self.ch340.stop()
# if self.colored_im is not None:
# cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im)
return "colored" return "colored"
suc,im = self.cap.read() suc,im = self.cap.read()
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV) hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
@ -235,8 +230,6 @@ class MAT:
State.Mode.FAST: (255, 255, 255), State.Mode.FAST: (255, 255, 255),
State.Mode.SLOW: (10, 215, 255), State.Mode.SLOW: (10, 215, 255),
State.Mode.ABOUT: (228,116,167), State.Mode.ABOUT: (228,116,167),
State.Mode.CRAZY: (0, 255, 0),
# State.Mode.END: (0, 0, 255)
} }
status_text = f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml" status_text = f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml"
@ -248,9 +241,10 @@ class MAT:
cv2.waitKey(1) cv2.waitKey(1)
def predictor(self,im): def predictor(self,im):
'''主预测函数分析图像并返回当前状态。当前colored不使用这个函数。'''
hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV) hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV)
s = hsv[:,:,1] s = hsv[:,:,1]
mask = s>self.k mask = s>self.sensity
cv2.imshow('mask',im*mask[:,:,np.newaxis]) cv2.imshow('mask',im*mask[:,:,np.newaxis])
tot = mask.shape[0]*mask.shape[1] tot = mask.shape[0]*mask.shape[1]
val = np.sum(mask) val = np.sum(mask)
@ -283,7 +277,7 @@ class MAT:
def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, about_time=1, cap_dir="Videos"): def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, about_time=1, cap_dir="Videos"):
self.running = True self.running = True
self.start_time = time.time() self.start_time = time.time()
self.speeds = [quick_speed, slow_speed, end_speed, 1.0] self.speeds: list[float] = [quick_speed, slow_speed, end_speed]
self.need_check = False self.need_check = False
self.edited = False self.edited = False
self.volume_list = [] self.volume_list = []
@ -324,39 +318,22 @@ class MAT:
self._start_time = time.time() self._start_time = time.time()
self.about_time=about_time self.about_time=about_time
self.crazy = 0
while self.running: while self.running:
if not self.ch340.running: if not self.ch340.running:
if self.state.mode == State.Mode.CRAZY:
match self.crazy:
case 0:
self.control_logger.info("crazy 0")
self.ch340_pull()
self.ch340.speed = 1.00
self.ch340.push_async(vol=12)
self.crazy = 1
case 1:
self.control_logger.info("crazy 1")
self.ch340_pull()
self.ch340.push_async(vol=8)
self.total_volume = 20
cnt = 2
self.crazy = 2
self.state.mode = State.Mode.FAST
else:
if 12 * cnt - self.total_volume < 0.5: if 12 * cnt - self.total_volume < 0.5:
self.ch340_pull() self.ch340_pull()
cnt += 1 cnt += 1
time.sleep(0.01)
# 简化的推送逻辑
should_push = False should_push = False
if self.state.is_fast_mode(): if self.state.is_fast_mode():
should_push = True should_push = True
elif self.state.is_slow_mode() and time.time() - self.ch340.start > mid_time: elif self.state.is_slow_mode() and \
time.time() - self.ch340.start > mid_time:
should_push = True should_push = True
elif self.state.is_about_mode() and time.time() - self.ch340.start > about_time: elif self.state.is_about_mode() and \
time.time() - self.ch340.start > about_time:
if not self.state.about_first_flag: if not self.state.about_first_flag:
self.state.about_check = True self.state.about_check = True
else: else:
@ -374,17 +351,10 @@ class MAT:
except Exception as e: except Exception as e:
self.control_logger.error(f"报表失败: {e}") self.control_logger.error(f"报表失败: {e}")
# if (self.state.is_about_mode() and time.time() - self.ch340.start > end_time) or not self.state.is_about_mode():
if self._pred() is None: if self._pred() is None:
self.control_logger.error("预测失败,跳过当前帧") self.control_logger.error("预测失败,跳过当前帧")
continue continue
# try:
# self.control_logger.info("推送安全体积")
# self.ch340_push(0.15)
# except Exception as e:
# self.control_logger.error(f"推送失败: {e}")
# 释放视频录制器 # 释放视频录制器
if hasattr(self, 'capFile') and self.capFile.isOpened(): if hasattr(self, 'capFile') and self.capFile.isOpened():
self.capFile.release() self.capFile.release()
@ -419,8 +389,7 @@ if __name__ == "__main__":
mat = MAT( mat = MAT(
videoSourceIndex = 1, videoSourceIndex = 1,
bounce_time=2, bounce_time=2,
end_bounce_time=0.01, sensity = 32
k = 32
) )
mat.state.mode = State.Mode.FAST mat.state.mode = State.Mode.FAST

354
utils.py
View File

@ -3,24 +3,18 @@ import os
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from dataclasses import dataclass from dataclasses import dataclass
from typing import List, Tuple, Optional, Any from typing import List, Optional,Literal
import numpy as np import numpy as np
import time import time
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import threading
from collections import deque
import requests import requests
import json import json
import tkinter as tk
from tkinter import messagebox
import base64 import base64
@dataclass @dataclass
class HistoryRecord: class HistoryRecord:
"""历史记录的单个条目""" """历史记录的单个条目"""
timestamp: float timestamp: float
state: str # 'transport', 'middle', 'about', 'colored' state: Literal["transport","middle","about","colored"]
rate: float rate: float
volume: float volume: float
image: np.ndarray image: np.ndarray
@ -29,7 +23,7 @@ class HistoryRecord:
class History: class History:
"""滑动窗口历史记录管理类""" """滑动窗口历史记录管理类"""
def __init__(self, max_window_size: float = 5.0, base_time = 5.0, display: bool = False): def __init__(self, max_window_size: float = 5.0, base_time = 5.0):
""" """
初始化历史记录管理器 初始化历史记录管理器
@ -50,17 +44,7 @@ class History:
self._base_cnt = 0 self._base_cnt = 0
self.end_history:list[bool] = [] self.end_history:list[bool] = []
self.last_end = 0 self.last_end = 0
self.display = display
self.fig: Any = None
self.ax: Any = None
self.line: Any = None
self.base_line: Any = None # 基准线的引用
self.animation: Any = None
self._plot_thread: Optional[threading.Thread] = None
self._plot_running = False
if self.display:
self._setup_plot()
def add_record(self, timestamp: float, state: str, rate: float, volume: float, image: np.ndarray): def add_record(self, timestamp: float, state: str, rate: float, volume: float, image: np.ndarray):
"""添加新的历史记录""" """添加新的历史记录"""
record = HistoryRecord(timestamp, state, rate, volume, image) record = HistoryRecord(timestamp, state, rate, volume, image)
@ -122,170 +106,6 @@ class History:
"""检查历史记录是否为空""" """检查历史记录是否为空"""
return len(self.records) == 0 return len(self.records) == 0
def __len__(self) -> int:
"""返回历史记录的数量"""
return len(self.records)
def clear(self):
"""清空所有历史记录"""
self.records.clear()
if self.display:
self._stop_plot()
def _setup_plot(self):
"""设置matplotlib图表"""
plt.ion() # 交互式模式
self.fig, self.ax = plt.subplots(figsize=(10, 6))
self.ax.set_title('实时Rate值变化')
self.ax.set_xlabel('时间 (s)')
self.ax.set_ylabel('Rate值')
self.ax.grid(True, alpha=0.3)
# 初始化空线条
self.line, = self.ax.plot([], [], 'b-', linewidth=2, label='Rate')
self.base_line = None # 基准线的引用
self.ax.legend()
# 启动更新线程
self._plot_running = True
self._plot_thread = threading.Thread(target=self._update_plot_loop, daemon=True)
self._plot_thread.start()
def _update_plot_loop(self):
"""绘图更新循环"""
while self._plot_running:
try:
self._update_plot()
time.sleep(0.1) # 100ms更新间隔
except Exception as e:
get_vision_logger().error(f"Plot update error: {e}")
break
def _update_plot(self):
"""更新图表数据 - 按照matplotlib推荐的方式重构"""
if not self.records:
return
start_time = time.time()
try:
# 获取时间和rate数据
timestamps = [record.timestamp for record in self.records]
rates = [record.rate for record in self.records]
if not timestamps:
return
# 将时间戳转换为相对时间(从第一个记录开始)
start_timestamp = timestamps[0]
relative_times = [(t - start_timestamp) for t in timestamps]
# 更新主线条数据
self.line.set_data(relative_times, rates)
# 处理base线
self._update_base_line()
# 自动调整坐标轴范围
self._update_axis_limits(relative_times, rates)
# 使用推荐的绘制方式
self.ax.draw_artist(self.line)
self.fig.canvas.draw() # 使用draw_idle而不是draw
except Exception as e:
get_vision_logger().error(f"Plot update error: {e}")
finally:
# 性能监控
elapsed = time.time() - start_time
if elapsed > 0.1:
get_vision_logger().warning(f"Plot update took too long: {elapsed:.3f}s")
def _update_base_line(self):
"""更新或创建基准线"""
if self.base is not None:
if self.base_line is None:
# 首次创建基准线
self.base_line = self.ax.axhline(
y=self.base,
color='r',
linestyle='--',
alpha=0.7,
label=f'Base Rate: {self.base:.2f}'
)
# 更新图例
self.ax.legend()
else:
# 更新现有基准线的位置
self.base_line.set_ydata([self.base, self.base])
# 更新标签
self.base_line.set_label(f'Base Rate: {self.base:.2f}')
self.ax.legend()
elif self.base_line is not None:
# 如果base被重置移除基准线
self.base_line.remove()
self.base_line = None
self.ax.legend()
def _update_axis_limits(self, relative_times, rates):
"""智能更新坐标轴范围"""
if not relative_times or not rates:
return
# X轴范围
max_time = max(relative_times)
self.ax.set_xlim(0, max_time + max(1, max_time * 0.05)) # 添加5%的边距
# Y轴范围计算
min_rate, max_rate = min(rates), max(rates)
if self.base is not None:
# 如果有基准线确保Y轴范围合理
y_min = 0
y_max = min(max(self.base * 35, max_rate * 1.2), 100) # 限制最大值为100
else:
# 没有基准线时的默认范围
margin = (max_rate - min_rate) * 0.1 if max_rate > min_rate else 1
y_min = max(0, min_rate - margin)
y_max = max_rate + margin
self.ax.set_ylim(y_min, y_max)
def _stop_plot(self):
"""停止绘图"""
self._plot_running = False
if self._plot_thread and self._plot_thread.is_alive():
self._plot_thread.join(timeout=1.0)
if self.fig:
plt.close(self.fig)
self.fig = None
self.ax = None
self.line = None
self.base_line = None
def save_plot(self, filename: Optional[str] = None):
"""保存当前图表"""
if not self.display or not self.fig:
get_vision_logger().warning("Plot not initialized, cannot save")
return
if filename is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"rate_plot_{timestamp}.png"
try:
self.fig.savefig(filename, dpi=300, bbox_inches='tight')
get_vision_logger().info(f"Plot saved to {filename}")
except Exception as e:
get_vision_logger().error(f"Failed to save plot: {e}")
def toggle_display(self):
"""切换显示状态"""
if self.display:
self._stop_plot()
self.display = False
else:
self.display = True
self._setup_plot()
class State: class State:
"""滴定状态管理类""" """滴定状态管理类"""
@ -293,13 +113,10 @@ class State:
FAST = 0 # 快速模式 FAST = 0 # 快速模式
SLOW = 1 # 慢速模式 (middle) SLOW = 1 # 慢速模式 (middle)
ABOUT = 2 # 接近终点模式 ABOUT = 2 # 接近终点模式
CRAZY = 3 # CRAZY模式
# END = 3 # 终点模式
def __init__(self, bounce_time=1, end_bounce_time=5): def __init__(self, bounce_time=1):
self.mode = self.Mode.FAST self.mode = self.Mode.FAST
self.bounce_time = bounce_time self.bounce_time = bounce_time
self.end_bounce_time = end_bounce_time
# 状态检查标志 # 状态检查标志
self.in_middle_check = False self.in_middle_check = False
@ -308,8 +125,7 @@ class State:
self.about_first_flag = False self.about_first_flag = False
# 时间记录 # 时间记录
self.middle_detected_time = None self.middle_detected_time:Optional[float] = None
self.end_detected_time = None
def is_fast_mode(self): def is_fast_mode(self):
return self.mode == self.Mode.FAST return self.mode == self.Mode.FAST
@ -320,9 +136,6 @@ class State:
def is_about_mode(self): def is_about_mode(self):
return self.mode == self.Mode.ABOUT return self.mode == self.Mode.ABOUT
# def is_end_mode(self):
# return self.mode == self.Mode.END
def enter_middle_state(self, current_time): def enter_middle_state(self, current_time):
"""进入middle状态 - 立即切换到slow模式并开始检查""" """进入middle状态 - 立即切换到slow模式并开始检查"""
self.mode = self.Mode.SLOW self.mode = self.Mode.SLOW
@ -334,12 +147,6 @@ class State:
if self.mode == self.Mode.SLOW: if self.mode == self.Mode.SLOW:
self.mode = self.Mode.ABOUT self.mode = self.Mode.ABOUT
def enter_end_check(self, current_time):
"""进入end检查状态"""
self.in_end_check = True
self.end_detected_time = current_time
self.mode = self.Mode.ABOUT
def exit_middle_check(self): def exit_middle_check(self):
"""退出middle检查状态返回fast模式""" """退出middle检查状态返回fast模式"""
self.in_middle_check = False self.in_middle_check = False
@ -360,25 +167,12 @@ class State:
current_time - self.middle_detected_time > self.bounce_time and current_time - self.middle_detected_time > self.bounce_time and
(self.mode == self.Mode.SLOW)) (self.mode == self.Mode.SLOW))
def should_check_end_result(self, current_time):
"""检查是否应该进行end结果检查"""
return (self.in_end_check and
self.end_detected_time is not None and
current_time - self.end_detected_time > self.end_bounce_time)
def reset_end_check(self):
"""重置end检查状态"""
self.in_end_check = False
self.end_detected_time = None
def get_status_text(self): def get_status_text(self):
"""获取状态显示文本""" """获取状态显示文本"""
status = [] status = []
current_time = time.time() current_time = time.time()
if self.in_middle_check and current_time - self.middle_detected_time > self.bounce_time: if self.in_middle_check and current_time - self.middle_detected_time > self.bounce_time:
status.append("MIDCHK") status.append("MIDCHK")
if self.in_end_check and current_time - self.end_detected_time > self.end_bounce_time:
status.append("ENDCHK")
return ", " + ", ".join(status) if status else "" return ", " + ", ".join(status) if status else ""
@ -433,8 +227,6 @@ def send_data_to_platform(token, data, picture):
} }
url = "https://jingsai.mools.net/api/upload-record" url = "https://jingsai.mools.net/api/upload-record"
# 合并 headers 和 data 为一个字典
datas = {**headers, **data}
# 准备 JSON 数据 # 准备 JSON 数据
json_data = json.dumps(data) json_data = json.dumps(data)
# 发送 POST 请求 # 发送 POST 请求
@ -453,117 +245,6 @@ def send_data_to_platform(token, data, picture):
print("错误", f"发送数据时出错:{e}") print("错误", f"发送数据时出错:{e}")
class LoginApp:
def __init__(self, root):
self.root = root
self.root.title("Mlabs AI Titration 1.0")
# 设置窗口图标
self.set_window_icon()
# 定义变量
self.username = tk.StringVar()
self.password = tk.StringVar()
self.token = ''
self.un = ''
self.pw = ''
# 创建登录界面
self.create_login_interface()
# 设置窗口居中
self.center_window()
def center_window(self, width=300, height=120):
"""将窗口居中"""
# 获取屏幕宽度和高度
screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight()
# 计算窗口位置
x = (screen_width - width) // 2
y = (screen_height - height) // 2
# 设置窗口大小和位置
self.root.geometry(f"{width}x{height}+{x}+{y}")
def set_window_icon(self):
"""设置窗口图标"""
try:
# 替换为你的图标文件路径
icon_path = "logo.ico" # Windows系统使用.ico文件
self.root.iconbitmap(icon_path)
except Exception as e:
print(f"设置窗口图标时出错:{e}")
def create_login_interface(self):
# 用户名
tk.Label(self.root, text="用户名:").grid(row=0, column=0, padx=10, pady=5)
self.username_entry = tk.Entry(self.root, textvariable=self.username, width=30)
self.username_entry.grid(row=0, column=1, padx=10, pady=5)
# 密码
tk.Label(self.root, text="密码:").grid(row=1, column=0, padx=10, pady=5)
self.password_entry = tk.Entry(self.root, textvariable=self.password, show="*", width=30)
self.password_entry.grid(row=1, column=1, padx=10, pady=5)
# 登录按钮
login_button = tk.Button(self.root, text="登录", command=self.login)
login_button.grid(row=3, column=0, columnspan=2, pady=10)
# 检查info.json文件是否存在
self.check_info_file()
self.username_entry.focus_set()
def check_info_file(self):
login_folder = "login"
info_file = os.path.join(login_folder, "info.json")
if os.path.exists(info_file):
with open(info_file, "r", encoding="utf-8") as file:
info = json.load(file)
self.username.set(info.get("username", ""))
self.password.set(info.get("password", ""))
else:
print("提示", "未找到本地登录信息,请手动输入登录信息。")
def login(self):
self.un = self.username.get()
self.pw = self.password.get()
if not self.un or not self.pw:
print("错误", "用户名、密码不能为空!")
return
# 这里可以添加登录逻辑,例如发送请求到服务器验证登录信息
self.token = login_to_platform(self.un, self.pw)
# print(self.token)
# 这里可以添加登录逻辑,例如发送请求到服务器验证登录信息
self.save_info_file()
self.root.destroy()
# return username, password
def save_info_file(self):
login_folder = "login"
#如果没有login文件夹则创建一个
if not os.path.exists(login_folder):
os.makedirs(login_folder)
info_file = os.path.join(login_folder, "info.json")
if not os.path.exists(login_folder):
os.makedirs(login_folder)
info = {
"username": self.username.get(),
"password": self.password.get(),
}
with open(info_file, "w", encoding="utf-8") as file:
json.dump(info, file, ensure_ascii=False, indent=4)
def setup_logging(log_level=logging.INFO, log_dir="logs"): def setup_logging(log_level=logging.INFO, log_dir="logs"):
""" """
设置logging配置创建不同模块的logger 设置logging配置创建不同模块的logger
@ -591,28 +272,3 @@ def setup_logging(log_level=logging.INFO, log_dir="logs"):
) )
return log_file return log_file
def get_system_logger():
"""系统初始化和控制相关的logger"""
return logging.getLogger("System")
def get_hardware_logger():
"""硬件控制相关的loggerCH340等"""
return logging.getLogger("Hardware")
def get_vision_logger():
"""图像处理和预测相关的logger"""
return logging.getLogger("Vision")
def get_control_logger():
"""控制逻辑相关的logger"""
return logging.getLogger("Control")
def get_endpoint_logger():
"""终点检测相关的logger"""
return logging.getLogger("Endpoint")
def get_volume_logger():
"""体积计算相关的logger"""
return logging.getLogger("Volume")