Compare commits
10 Commits
6f56255cbf
...
29eea0817e
Author | SHA1 | Date | |
---|---|---|---|
29eea0817e | |||
b696cb71be | |||
e0e1c649eb | |||
e0df489aee | |||
27c1b08365 | |||
42460ad72f | |||
492b37c912 | |||
6ad826c17f | |||
c1f33df66d | |||
8184179a17 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -12,3 +12,4 @@ Videos
|
||||
*.dist
|
||||
HCHO
|
||||
upx.exe
|
||||
media
|
50
README.md
50
README.md
@ -1,14 +1,40 @@
|
||||
状态管理逻辑:
|
||||
- 滑动窗口
|
||||
维护一个滑动窗口history,记录最近end_bounce_time内的状态。
|
||||
状态包括:时间,浓度,状态,当前截图
|
||||
这是一个基于传统图像处理方法的AI视觉滴定控制程序。
|
||||
|
||||
- 进入
|
||||
1. middle: predictor返回middle立即进入slow状态。
|
||||
2. about: 返回about,且处于middle则进入about状态
|
||||
3. end:返回end,置end_check标记开始检查
|
||||
## 操作步骤
|
||||
|
||||
- check
|
||||
1. middle: 进入middle的bounce_time后,在最近bounce_time内middle比例<70%返回fast状态。
|
||||
2. about:随着middle退出一起退出,不单独处理
|
||||
3. end:进入end之后的end_bounce_time,如果end_bounce_time内end比例<80%,则实验终止逻辑。否则从history中找到第二个end并继续check逻辑。
|
||||
1. 安装依赖
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
2. 校准光线
|
||||
1. 运行`vid_chk.py`,按任意键直到显示为目标摄像头,按下`q`
|
||||
2. 按`w`和`s`直到左上角参数在$0.01\sim 0.03$
|
||||
3. 按`q`后将控制台输出参数输入`main.py`
|
||||
3. 运行主程序
|
||||
|
||||
## 技术说明
|
||||
|
||||
### 变色预测
|
||||
|
||||
1. 前期分析
|
||||
透明的溶液,饱和度较低。粉色饱和度较高。故取HSV中的S分析,筛选$S_{\rm{fil}}S>\rm{sensity}$,再根据$\frac{S_{\rm{fil}}}{S}$判断变色区域大小,并避免背景干扰
|
||||
|
||||
2. 终点检测
|
||||
透明的溶液色相和变色后不同,即HSV中的H不同。但是任何单点包括平均值不足以反应溶液是否完全变色,故对图像H通道作直方图分析。刚到突变点时,直方图的最高峰位置右移,且位置和透明显著不同。随着继续滴加,峰值继续右移,并且从尖峰变为宽峰。
|
||||
|
||||
### 滴定控制
|
||||
|
||||
1. 前期
|
||||
滴入后溶液完全没有变色,可以采用高速滴加。当发现溶液有小区域变色时,稍微降低速度,避免气泡、搅匀等因素干扰。
|
||||
2. 中期
|
||||
滴入后,溶液发生变色,且在数秒内褪色。这时候距离终点约$1\sim 2\,\rm{mL}$,采用较慢的滴定速度($0.05\,\rm{mL/s}$),且每次滴入后等待$1\sim 2\,\rm{s}$。
|
||||
3. 末期
|
||||
滴入后,溶液大范围变色,且在数秒至十秒左右褪色。这时继续减慢滴入速率($0.02\,\rm{mL/s}$),并每次滴入后终点检测,如认为变色则等待数秒后重新判断。持续4次后,认为到达滴定终点,汇报数据。
|
||||
|
||||
**意外处理**
|
||||
|
||||
中期检测可能存在误判。检测符合中期条件后,立即进入中期模式。如果在`bounce_time`内判定为中期的样本过少,则退出。`bounce_time`使用滑动窗口处理。
|
||||
|
||||
特别地,为了避免在末期意外退出,连续2次判定终点成功后,禁用该退出。
|
||||
|
||||
类似地,末期也有退出处理,连续判定终点3次以上,禁用退出。
|
@ -1,223 +0,0 @@
|
||||
import os
|
||||
import cv2
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
import re
|
||||
from datetime import datetime
|
||||
import main as _main
|
||||
import argparse
|
||||
|
||||
class RateVolumeAnalyzer:
|
||||
def __init__(self):
|
||||
self.mat = _main.MAT() # 创建MAT实例以使用predictor方法
|
||||
|
||||
def parse_log_file(self, log_path):
|
||||
"""解析日志文件,提取体积和时间信息"""
|
||||
volumes = []
|
||||
timestamps = []
|
||||
|
||||
with open(log_path, 'r', encoding='utf-8') as f:
|
||||
for line in f:
|
||||
# 匹配体积信息的行
|
||||
if '当前体积:' in line:
|
||||
# 提取时间戳
|
||||
time_match = re.search(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d+', line)
|
||||
# 提取体积
|
||||
volume_match = re.search(r'当前体积: ([\d.]+) ml', line)
|
||||
|
||||
if time_match and volume_match:
|
||||
timestamp = datetime.strptime(time_match.group(1), '%Y-%m-%d %H:%M:%S')
|
||||
volume = float(volume_match.group(1))
|
||||
timestamps.append(timestamp)
|
||||
volumes.append(volume)
|
||||
|
||||
return timestamps, volumes
|
||||
|
||||
def extract_frames_from_video(self, video_path, timestamps, start_time):
|
||||
"""从视频中提取对应时间点的帧"""
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
if not cap.isOpened():
|
||||
print(f"无法打开视频文件: {video_path}")
|
||||
return []
|
||||
|
||||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||
frames = []
|
||||
|
||||
for timestamp in timestamps:
|
||||
# 计算相对于实验开始的秒数
|
||||
seconds_from_start = (timestamp - start_time).total_seconds()
|
||||
frame_number = int(seconds_from_start * fps)
|
||||
|
||||
# 设置视频位置到指定帧
|
||||
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
|
||||
ret, frame = cap.read()
|
||||
|
||||
if ret:
|
||||
frames.append(frame)
|
||||
else:
|
||||
frames.append(None) # 如果无法读取帧,添加None
|
||||
|
||||
cap.release()
|
||||
return frames
|
||||
|
||||
def calculate_rates(self, frames):
|
||||
"""使用predictor方法计算每帧的rate"""
|
||||
rates = []
|
||||
states = []
|
||||
|
||||
for frame in frames:
|
||||
if frame is not None:
|
||||
try:
|
||||
state, rate = self.mat.predictor(frame)
|
||||
rates.append(rate)
|
||||
states.append(state)
|
||||
except Exception as e:
|
||||
print(f"计算rate时发生错误: {e}")
|
||||
rates.append(None)
|
||||
states.append(None)
|
||||
else:
|
||||
rates.append(None)
|
||||
states.append(None)
|
||||
|
||||
return rates, states
|
||||
|
||||
def plot_rate_volume_curve(self, volumes, rates, states, log_filename):
|
||||
"""绘制rate-体积曲线"""
|
||||
# 过滤掉None值
|
||||
valid_data = [(v, r, s) for v, r, s in zip(volumes, rates, states)
|
||||
if r is not None and s is not None]
|
||||
|
||||
if not valid_data:
|
||||
print("没有有效的数据点可以绘制")
|
||||
return
|
||||
|
||||
volumes_valid, rates_valid, states_valid = zip(*valid_data)
|
||||
|
||||
plt.figure(figsize=(12, 8))
|
||||
|
||||
# 根据状态用不同颜色绘制点
|
||||
colors = {'transport': 'blue', 'middle': 'orange', 'about': 'purple', 'colored': 'red'}
|
||||
|
||||
for state in colors:
|
||||
state_volumes = [v for v, s in zip(volumes_valid, states_valid) if s == state]
|
||||
state_rates = [r for r, s in zip(rates_valid, states_valid) if s == state]
|
||||
|
||||
if state_volumes:
|
||||
plt.scatter(state_volumes, state_rates,
|
||||
c=colors[state], label=state, alpha=0.7, s=50)
|
||||
|
||||
# 绘制连接线
|
||||
plt.plot(volumes_valid, rates_valid, 'k-', alpha=0.3, linewidth=1)
|
||||
|
||||
plt.xlabel('体积 (ml)', fontsize=12)
|
||||
plt.ylabel('Rate', fontsize=12)
|
||||
plt.title(f'Rate-体积曲线 ({log_filename})', fontsize=14)
|
||||
plt.legend()
|
||||
plt.grid(True, alpha=0.3)
|
||||
|
||||
# 保存图片
|
||||
output_filename = f"rate_volume_curve_{log_filename.replace('.log', '.png')}"
|
||||
plt.savefig(output_filename, dpi=300, bbox_inches='tight')
|
||||
plt.show()
|
||||
|
||||
print(f"图片已保存为: {output_filename}")
|
||||
|
||||
# 打印统计信息
|
||||
print(f"\n统计信息:")
|
||||
print(f"总数据点: {len(valid_data)}")
|
||||
for state in colors:
|
||||
count = sum(1 for s in states_valid if s == state)
|
||||
if count > 0:
|
||||
print(f"{state}: {count} 个点")
|
||||
|
||||
def analyze_experiment(self, timestamp_str=None):
|
||||
"""分析指定时间戳的实验,如果不指定则分析最新的实验"""
|
||||
logs_dir = "logs"
|
||||
videos_dir = "Videos"
|
||||
|
||||
if timestamp_str:
|
||||
log_file = f"titration_{timestamp_str}.log"
|
||||
video_file = f"{timestamp_str}.mp4"
|
||||
else:
|
||||
# 找到最新的日志文件
|
||||
log_files = [f for f in os.listdir(logs_dir) if f.endswith('.log')]
|
||||
if not log_files:
|
||||
print("没有找到日志文件")
|
||||
return
|
||||
|
||||
log_files.sort()
|
||||
log_file = log_files[-1]
|
||||
|
||||
# 提取时间戳来找对应的视频文件
|
||||
timestamp_match = re.search(r'titration_(\d{8}_\d{6})\.log', log_file)
|
||||
if timestamp_match:
|
||||
video_file = f"{timestamp_match.group(1)}.mp4"
|
||||
else:
|
||||
print("无法从日志文件名提取时间戳")
|
||||
return
|
||||
|
||||
log_path = os.path.join(logs_dir, "titration_20250529_191634.log")
|
||||
video_path = os.path.join(videos_dir, "tmp.mp4")
|
||||
|
||||
if not os.path.exists(log_path):
|
||||
print(f"日志文件不存在: {log_path}")
|
||||
return
|
||||
|
||||
if not os.path.exists(video_path):
|
||||
print(f"视频文件不存在: {video_path}")
|
||||
return
|
||||
|
||||
print(f"分析实验: {log_file}")
|
||||
print(f"对应视频: {video_file}")
|
||||
|
||||
# 解析日志文件
|
||||
timestamps, volumes = self.parse_log_file(log_path)
|
||||
|
||||
if not timestamps:
|
||||
print("日志文件中没有找到体积数据")
|
||||
return
|
||||
|
||||
print(f"找到 {len(timestamps)} 个数据点")
|
||||
|
||||
# 获取实验开始时间
|
||||
start_time = timestamps[0]
|
||||
|
||||
# 从视频中提取帧
|
||||
print("正在从视频中提取帧...")
|
||||
frames = self.extract_frames_from_video(video_path, timestamps, start_time)
|
||||
|
||||
# 计算rate
|
||||
print("正在计算rate值...")
|
||||
rates, states = self.calculate_rates(frames)
|
||||
|
||||
# 绘制曲线
|
||||
print("正在绘制rate-体积曲线...")
|
||||
self.plot_rate_volume_curve(volumes, rates, states, log_file)
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='分析滴定实验的rate-体积曲线')
|
||||
parser.add_argument('--timestamp', type=str, help='指定实验时间戳 (格式: YYYYMMDD_HHMMSS)')
|
||||
parser.add_argument('--list', action='store_true', help='列出所有可用的实验')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
analyzer = RateVolumeAnalyzer()
|
||||
|
||||
if args.list:
|
||||
# 列出所有可用的实验
|
||||
logs_dir = "logs"
|
||||
if os.path.exists(logs_dir):
|
||||
log_files = [f for f in os.listdir(logs_dir) if f.endswith('.log')]
|
||||
log_files.sort()
|
||||
print("可用的实验:")
|
||||
for log_file in log_files:
|
||||
timestamp_match = re.search(r'titration_(\d{8}_\d{6})\.log', log_file)
|
||||
if timestamp_match:
|
||||
timestamp = timestamp_match.group(1)
|
||||
print(f" {timestamp}")
|
||||
return
|
||||
|
||||
analyzer.analyze_experiment(args.timestamp)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
2
ch340.py
2
ch340.py
@ -117,7 +117,7 @@ else:
|
||||
self._speed = 43
|
||||
self._speed = 43/34
|
||||
# x = x*30
|
||||
print(int((x-int(x))*100),int(x),x)
|
||||
# print(int((x-int(x))*100),int(x),x)
|
||||
self.pump_ser.write(f"q1h{int(x)}d".encode('ascii'))
|
||||
time.sleep(0.01)
|
||||
self.pump_ser.write(f"q2h{int((x-int(x))*100)}d".encode('ascii'))
|
||||
|
BIN
ch340_gui.exe
BIN
ch340_gui.exe
Binary file not shown.
BIN
colored.zip
Normal file
BIN
colored.zip
Normal file
Binary file not shown.
4
login/info.json
Normal file
4
login/info.json
Normal file
@ -0,0 +1,4 @@
|
||||
{
|
||||
"username": "13504022184",
|
||||
"password": "password"
|
||||
}
|
385
main.py
385
main.py
@ -1,62 +1,82 @@
|
||||
import cv2
|
||||
import atexit
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from scipy.signal import find_peaks
|
||||
|
||||
import ch340
|
||||
import atexit
|
||||
import utils
|
||||
from utils import State, History
|
||||
from utils import History, State, login_to_platform, send_data_to_platform
|
||||
|
||||
|
||||
class MAT:
|
||||
def __init__(self, videoSourceIndex=0, bounce_time=2, end_bounce_time=5):
|
||||
def __init__(self, videoSourceIndex=0, bounce_time=2, sensity=30):
|
||||
# 初始化logging
|
||||
utils.setup_logging()
|
||||
self.system_logger = utils.get_system_logger()
|
||||
self.control_logger = utils.get_control_logger()
|
||||
self.endpoint_logger = utils.get_endpoint_logger()
|
||||
self.volume_logger = utils.get_volume_logger()
|
||||
self.system_logger = logging.getLogger("System")
|
||||
self.control_logger = logging.getLogger("Control")
|
||||
self.endpoint_logger = logging.getLogger("Endpoint")
|
||||
self.volume_logger = logging.getLogger("Volume")
|
||||
|
||||
self.system_logger.info('正在初始化MAT系统...')
|
||||
self.system_logger.info("正在初始化MAT系统...")
|
||||
self.videoSourceIndex = videoSourceIndex
|
||||
self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW)
|
||||
self.ch340 = ch340.CH340()
|
||||
self.total_volume = 0
|
||||
self.sensity = sensity
|
||||
|
||||
self.state = State(bounce_time, end_bounce_time)
|
||||
self.state = State(bounce_time)
|
||||
|
||||
atexit.register(self.ch340.stop)
|
||||
self.history = History(max(bounce_time, end_bounce_time))
|
||||
self.first_about = 0
|
||||
self.history = History(bounce_time)
|
||||
self.colored_volume = None
|
||||
self.colored_time = None
|
||||
self.colored_im = None
|
||||
|
||||
def ch340_pull(self):
|
||||
"抽取12ml液体"
|
||||
self.control_logger.info("开始抽取12ml")
|
||||
self.ch340.max_speed()
|
||||
self.ch340.pull(vol=12)
|
||||
self.control_logger.info('完成抽取')
|
||||
self.control_logger.info("完成抽取")
|
||||
|
||||
def ch340_init(self):
|
||||
self.ch340.push(speed=1,t=1)
|
||||
self.ch340.pull(speed=1.2,vol=1)
|
||||
self.control_logger.info('电极位置已初始化')
|
||||
"初始化电机位置,防止过头导致抽取溶液不准。"
|
||||
# self.ch340.push(speed=1,t=1)
|
||||
self.ch340.pull(speed=1.2, vol=1.8)
|
||||
self.control_logger.info("电极位置已初始化")
|
||||
|
||||
def ch340_push(self, speed=0.1):
|
||||
def ch340_push(self, speed: int | float = 0.1):
|
||||
"常规推送,保证推送时间为1s,推送体积=speed"
|
||||
self.ch340.push_async(speed=speed, t=1)
|
||||
|
||||
def process_left(self, now: float, value_only=False):
|
||||
if self.state.mode == State.Mode.CRAZY:
|
||||
value_only = True
|
||||
def process_left(
|
||||
self, now: float, value_only: Optional[bool] = False
|
||||
) -> Optional[float]:
|
||||
"""
|
||||
计算当前时刻剩余体积,并更新到self.total_volume。
|
||||
|
||||
@param now: 当前时间戳
|
||||
@param value_only: 如果为True,则只返回剩余体积,不更新状态
|
||||
|
||||
@return: 当前剩余体积,仅当value_only为True时返回
|
||||
"""
|
||||
if self.state.mode == State.Mode.ABOUT:
|
||||
return self.total_volume
|
||||
st = self.ch340.start
|
||||
if not value_only:
|
||||
self.ch340.stop()
|
||||
assert abs(self.ch340._get_time()-1) < 1e-3
|
||||
if not abs(self.ch340._get_time() - 1) < 1e-3:
|
||||
self.control_logger.warning(
|
||||
f"CH340 timing issue detected: {self.ch340._get_time()}"
|
||||
)
|
||||
return self.total_volume
|
||||
r = now - st
|
||||
ret = self.total_volume - (1-r) * self.speeds[self.state.mode.value]
|
||||
ret = self.total_volume - (1 - r) * self.speeds[self.state.mode.value]
|
||||
if value_only:
|
||||
return ret
|
||||
else:
|
||||
@ -70,7 +90,7 @@ class MAT:
|
||||
return None
|
||||
|
||||
# 录制当前帧到视频文件
|
||||
if hasattr(self, 'capFile') and self.capFile.isOpened():
|
||||
if hasattr(self, "capFile") and self.capFile.isOpened():
|
||||
self.capFile.write(im)
|
||||
|
||||
ret, rate = self.predictor(im)
|
||||
@ -90,44 +110,44 @@ class MAT:
|
||||
if self.history.is_empty():
|
||||
self.control_logger.error("未预期的没有可用的历史记录")
|
||||
return ret
|
||||
if self.state.mode == State.Mode.CRAZY:
|
||||
self._display_status(im, ret, rate, val)
|
||||
return ret
|
||||
|
||||
|
||||
# === 状态进入逻辑 ===
|
||||
if now - self._start_time<10:
|
||||
if now - self._start_time < 10:
|
||||
self._display_status(im, ret, rate, val)
|
||||
return ret
|
||||
|
||||
if (
|
||||
not self.edited
|
||||
and self.history.base is not None
|
||||
and rate > self.history.base * 2
|
||||
):
|
||||
self.edited = True
|
||||
self.speeds[0] /= 2
|
||||
|
||||
# 1. middle: predictor返回middle立即进入slow状态
|
||||
if ret == "middle":
|
||||
if self.state.is_fast_mode():
|
||||
self.process_left(now)
|
||||
self.state.enter_middle_state(now)
|
||||
self.control_logger.info(f"检测到middle,立即进入slow模式,当前体积: {val:.2f} ml")
|
||||
self.control_logger.info(
|
||||
f"检测到middle,立即进入slow模式,当前体积: {val:.2f} ml"
|
||||
)
|
||||
|
||||
# 2. about: 返回about,且处于middle则进入about状态
|
||||
elif ret == "about":
|
||||
if self.state.is_slow_mode():
|
||||
self.process_left(now)
|
||||
self.state.enter_about_state(now)
|
||||
self.control_logger.info(f"检测到about,进入about模式,当前体积: {val:.2f} ml")
|
||||
|
||||
# 3. end:返回colored,置end_check标记开始检查
|
||||
elif ret == "colored":
|
||||
if not self.state.in_end_check:
|
||||
self.state.enter_end_check(now)
|
||||
if self.colored_volume is None:
|
||||
self.colored_volume = val
|
||||
self.colored_time = now
|
||||
self.colored_im = im.copy()
|
||||
self.endpoint_logger.info(f"检测到colored,开始end检查,记录体积: {val:.2f} ml")
|
||||
# === 状态检查逻辑 ===
|
||||
self.control_logger.info(
|
||||
f"检测到about,进入about模式,当前体积: {val:.2f} ml"
|
||||
)
|
||||
|
||||
# middle检查: 进入middle的bounce_time后,在最近bounce_time内middle比例<70%返回fast状态
|
||||
if self.state.should_check_middle_exit(now):
|
||||
if self.state.should_check_middle_exit(now) and not self.history.last_end:
|
||||
# 计算最近bounce_time内的middle比例
|
||||
recent_records = self.history.get_recent_records(self.state.bounce_time, now)
|
||||
recent_records = self.history.get_recent_records(
|
||||
self.state.bounce_time, now
|
||||
)
|
||||
|
||||
if recent_records:
|
||||
trans_ratio = self.history.get_state_ratio("transport", recent_records)
|
||||
@ -135,119 +155,173 @@ class MAT:
|
||||
if trans_ratio > 0.4:
|
||||
self.process_left(now)
|
||||
self.state.exit_middle_check()
|
||||
# about状态随middle一起退出
|
||||
# self.state.exit_about_with_middle()
|
||||
self.control_logger.info(f"middle比例{trans_ratio:.2%}<60%,退出middle检查,返回fast模式")
|
||||
self.control_logger.info(
|
||||
f"middle比例{trans_ratio:.2%}<60%,退出middle检查,返回fast模式"
|
||||
)
|
||||
|
||||
if self.state.mode == State.Mode.ABOUT and self.state.about_check:
|
||||
h = self.history.get_recent_records(self.about_time/3,now)
|
||||
ratio = self.history.get_state_ratio("about", h)
|
||||
self.history.about_history.append(ratio<0.3)
|
||||
h = self.history.get_recent_records(self.about_time / 3, now)
|
||||
ratio = self.history.get_state_ratio("transport", h)
|
||||
self.history.about_history.append(ratio == 1)
|
||||
while len(self.history.about_history) > 5:
|
||||
self.history.about_history.pop(0)
|
||||
if len(self.history.about_history) == 5:
|
||||
if len(self.history.about_history) == 5 and self.history.last_end < 2:
|
||||
rate = sum(self.history.about_history) / len(self.history.about_history)
|
||||
if rate > 0.8:
|
||||
self.state.exit_about()
|
||||
self.control_logger.info(f"about比例{ratio:.2%}<80%,退出about检查,返回middle模式")
|
||||
self.control_logger.info(
|
||||
f"about比例{ratio:.2%}<80%,退出about检查,返回middle模式"
|
||||
)
|
||||
|
||||
ret = self.end_check()
|
||||
if ret == "colored":
|
||||
return ret
|
||||
if not self.history.last_end:
|
||||
self.history.last_end = max(self.history.last_end, ret)
|
||||
self.state.about_check = False
|
||||
|
||||
|
||||
# end检查: 进入end之后的end_bounce_time,如果end比例<80%,则重置;否则终止实验
|
||||
if self.state.should_check_end_result(now):
|
||||
colored_ratio = self.history.get_state_ratio("colored", self.history.get_recent_records(self.state.end_bounce_time, now))
|
||||
|
||||
if colored_ratio < 0.8:
|
||||
# end比例<80%,从history中找到第二个end并继续check逻辑
|
||||
self.endpoint_logger.warning(f"colored比例{colored_ratio:.2%}<80%,寻找下一个colored点")
|
||||
|
||||
# 寻找历史中倒数第二个colored状态
|
||||
colored_times = self.history.get_states_by_type("colored")
|
||||
if len(colored_times) >= 2:
|
||||
# 使用倒数第二个colored时间重新开始检查
|
||||
second_last_colored_time = colored_times[1]
|
||||
self.state.end_detected_time = second_last_colored_time
|
||||
|
||||
# 更新colored记录为对应的体积
|
||||
record = self.history.find_record_by_timestamp(second_last_colored_time)
|
||||
if record:
|
||||
self.colored_volume = record.volume
|
||||
self.colored_time = record.timestamp
|
||||
self.colored_im = record.image.copy()
|
||||
|
||||
self.endpoint_logger.info(f"重置到第二个colored点: {self.colored_volume:.2f} ml")
|
||||
else:
|
||||
# 没有足够的colored点,重置end检查
|
||||
self.state.reset_end_check()
|
||||
self.colored_volume = None
|
||||
self.colored_time = None
|
||||
self.endpoint_logger.info("没有足够的colored点,重置end检查")
|
||||
else: # end比例>=80%,确认终点,终止实验
|
||||
self.endpoint_logger.info(f"colored比例{colored_ratio:.2%}>=80%,确认滴定终点")
|
||||
self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml")
|
||||
self.running = False
|
||||
self.ch340.stop()
|
||||
if self.colored_im is not None:
|
||||
cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im)
|
||||
return "colored"
|
||||
|
||||
# 显示状态信息
|
||||
self._display_status(im, ret, rate, val)
|
||||
return ret
|
||||
|
||||
def end_check(self, dep=0):
|
||||
"""终点检测,通过对图像直方图峰值分析,确认是否全局变色。"""
|
||||
if not self.running:
|
||||
return "colored"
|
||||
if dep == 1:
|
||||
suc, im = self.cap.read()
|
||||
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
|
||||
|
||||
# 降低饱和度通道 (S通道是索引1)
|
||||
hsv[:, :, 1] = hsv[:, :, 1] * 0.8
|
||||
|
||||
# 转换回BGR颜色空间
|
||||
result = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
|
||||
name = f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
|
||||
cv2.imwrite(name, result)
|
||||
self.colored_im = name
|
||||
|
||||
# 维护一个递归检测,重复4次后认定为终点。
|
||||
if dep > 3:
|
||||
self.colored_volume = self.total_volume
|
||||
self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml")
|
||||
self.running = False
|
||||
self.ch340.stop()
|
||||
return "colored"
|
||||
suc, im = self.cap.read()
|
||||
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
|
||||
s = hsv[:, :, 0]
|
||||
s = s[s > 0]
|
||||
hist = cv2.calcHist([s], [0], None, [256], [0, 256])
|
||||
hist = hist.flatten() # 转换为一维数组
|
||||
|
||||
# 峰值检测 - 找到直方图中的峰值
|
||||
peaks, properties = find_peaks(
|
||||
hist,
|
||||
height=np.max(hist) * 0.2, # 峰值高度至少是最大值的10%
|
||||
distance=10, # 峰值之间的最小距离
|
||||
prominence=np.max(hist) * 0.01,
|
||||
) # 峰值的突出度
|
||||
if np.any(peaks > 130):
|
||||
if dep == 0:
|
||||
self.process_left(time.time())
|
||||
self.history.end_history.append(True)
|
||||
self.control_logger.info(f"检测到colored状态,end_check at {dep}")
|
||||
bgn = time.time()
|
||||
while time.time() - bgn < 2:
|
||||
suc, im = self.cap.read()
|
||||
if hasattr(self, "capFile") and self.capFile.isOpened():
|
||||
self.capFile.write(im)
|
||||
cv2.putText(
|
||||
im,
|
||||
"ENDCHK",
|
||||
(10, 30),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
0.7,
|
||||
(0, 255, 0),
|
||||
2,
|
||||
)
|
||||
cv2.imshow("Frame", im)
|
||||
return self.end_check(dep + 1)
|
||||
# time.sleep(2)
|
||||
else:
|
||||
self.colored_im = None
|
||||
return dep
|
||||
|
||||
def _display_status(self, im, detection_result, rate, volume):
|
||||
"""显示状态信息到图像上"""
|
||||
mode_color = {
|
||||
State.Mode.FAST: (255, 255, 255),
|
||||
State.Mode.SLOW: (10, 215, 255),
|
||||
State.Mode.ABOUT: (228,116,167),
|
||||
State.Mode.CRAZY: (0, 255, 0),
|
||||
# State.Mode.END: (0, 0, 255)
|
||||
State.Mode.ABOUT: (228, 116, 167),
|
||||
}
|
||||
|
||||
status_text = f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml"
|
||||
status_text = (
|
||||
f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml"
|
||||
)
|
||||
status_text += self.state.get_status_text()
|
||||
|
||||
cv2.putText(im, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7,
|
||||
mode_color[self.state.mode], 2)
|
||||
cv2.putText(
|
||||
im,
|
||||
status_text,
|
||||
(10, 30),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
0.7,
|
||||
mode_color[self.state.mode],
|
||||
2,
|
||||
)
|
||||
cv2.imshow("Frame", im)
|
||||
cv2.waitKey(1)
|
||||
|
||||
def predictor(self,im):
|
||||
hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV)
|
||||
s = hsv[:,:,1]
|
||||
mask = s>30
|
||||
cv2.imshow('mask',im*mask[:,:,np.newaxis])
|
||||
tot = mask.shape[0]*mask.shape[1]
|
||||
def predictor(self, im):
|
||||
"""主预测函数,分析图像并返回当前状态。当前,colored不使用这个函数。"""
|
||||
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
|
||||
s = hsv[:, :, 1]
|
||||
mask = s > self.sensity
|
||||
cv2.imshow("mask", im * mask[:, :, np.newaxis])
|
||||
tot = mask.shape[0] * mask.shape[1]
|
||||
val = np.sum(mask)
|
||||
rate = val/tot
|
||||
rate = val / tot
|
||||
if self.history.base is not None:
|
||||
base = self.history.base
|
||||
thr = (min(0.05,base*5), min(0.4,base*9), 0.75)
|
||||
thr = (min(0.05, base * 5), min(0.2, base * 13), 0.5)
|
||||
# thr = (base*5, base*13, 0.5)
|
||||
if rate < thr[0]:
|
||||
return "transport",rate
|
||||
elif rate <thr[1]:
|
||||
return "middle",rate
|
||||
return "transport", rate
|
||||
elif rate < thr[1]:
|
||||
return "middle", rate
|
||||
elif rate < thr[2]:
|
||||
return "about",rate
|
||||
return "about", rate
|
||||
else:
|
||||
return "colored",rate
|
||||
return "colored", rate
|
||||
else:
|
||||
return "transport",rate
|
||||
|
||||
return "transport", rate
|
||||
|
||||
def __del__(self):
|
||||
self.cap.release()
|
||||
if hasattr(self, 'capFile') and self.capFile.isOpened():
|
||||
if hasattr(self, "capFile") and self.capFile.isOpened():
|
||||
self.capFile.release()
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
def format_date_time(self, timestamp):
|
||||
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, about_time=1, cap_dir="Videos"):
|
||||
def run(
|
||||
self,
|
||||
quick_speed=0.2,
|
||||
slow_speed=0.05,
|
||||
end_speed=0.02,
|
||||
mid_time=0.5,
|
||||
about_time=1,
|
||||
cap_dir="Videos",
|
||||
):
|
||||
self.running = True
|
||||
self.start_time = time.time()
|
||||
self.speeds = (quick_speed, slow_speed, end_speed, 1.0)
|
||||
self.speeds: list[float] = [quick_speed, slow_speed, end_speed]
|
||||
self.need_check = False
|
||||
self.edited = False
|
||||
self.volume_list = []
|
||||
self.color_list = []
|
||||
|
||||
if cap_dir is not None:
|
||||
vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mkv"
|
||||
@ -260,7 +334,7 @@ class MAT:
|
||||
"防抖时间 ": self.state.bounce_time,
|
||||
"快速模式速度": f"{quick_speed} ml/次",
|
||||
"慢速模式速度": f"{slow_speed} ml/次",
|
||||
"录制视频 ": vid_name
|
||||
"录制视频 ": vid_name,
|
||||
}
|
||||
self.system_logger.info("实验参数:")
|
||||
for key, value in experiment_params.items():
|
||||
@ -271,7 +345,7 @@ class MAT:
|
||||
|
||||
# 初始化视频录制器
|
||||
if vid_name != "Disabled":
|
||||
fourcc = cv2.VideoWriter.fourcc(*'x264') # 使用更兼容的编码器
|
||||
fourcc = cv2.VideoWriter.fourcc(*"x264") # 使用更兼容的编码器
|
||||
self.capFile = cv2.VideoWriter(vid_name, fourcc, fps, (width, height))
|
||||
if not self.capFile.isOpened():
|
||||
self.system_logger.error(f"无法打开视频录制器: {vid_name}")
|
||||
@ -282,41 +356,28 @@ class MAT:
|
||||
self.ch340_init()
|
||||
cnt = 0
|
||||
self._start_time = time.time()
|
||||
self.about_time=about_time
|
||||
|
||||
self.crazy = 0
|
||||
self.about_time = about_time
|
||||
|
||||
while self.running:
|
||||
if not self.ch340.running:
|
||||
if self.state.mode == State.Mode.CRAZY:
|
||||
match self.crazy:
|
||||
case 0:
|
||||
self.control_logger.info("crazy 0")
|
||||
self.ch340_pull()
|
||||
self.ch340.speed = 1.00
|
||||
self.ch340.push_async(vol=12)
|
||||
self.crazy = 1
|
||||
case 1:
|
||||
self.control_logger.info("crazy 1")
|
||||
self.ch340_pull()
|
||||
self.ch340.push_async(vol=8)
|
||||
self.total_volume = 20
|
||||
cnt = 2
|
||||
self.crazy = 2
|
||||
self.state.mode = State.Mode.FAST
|
||||
else:
|
||||
if 12 * cnt - self.total_volume < 0.5:
|
||||
self.ch340_pull()
|
||||
cnt += 1
|
||||
time.sleep(0.01)
|
||||
|
||||
# 简化的推送逻辑
|
||||
should_push = False
|
||||
if self.state.is_fast_mode():
|
||||
should_push = True
|
||||
elif self.state.is_slow_mode() and time.time() - self.ch340.start > mid_time:
|
||||
elif (
|
||||
self.state.is_slow_mode()
|
||||
and time.time() - self.ch340.start > mid_time
|
||||
):
|
||||
|
||||
should_push = True
|
||||
elif self.state.is_about_mode() and time.time() - self.ch340.start > about_time:
|
||||
elif (
|
||||
self.state.is_about_mode()
|
||||
and time.time() - self.ch340.start > about_time
|
||||
):
|
||||
|
||||
if not self.state.about_first_flag:
|
||||
self.state.about_check = True
|
||||
else:
|
||||
@ -325,17 +386,23 @@ class MAT:
|
||||
|
||||
if should_push:
|
||||
speed = self.speeds[self.state.mode.value]
|
||||
self.volume_logger.info(f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次")
|
||||
self.volume_logger.info(
|
||||
f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次"
|
||||
)
|
||||
self.ch340_push(speed)
|
||||
self.total_volume += speed
|
||||
self.volume_list.append(round(self.total_volume, 2))
|
||||
try:
|
||||
self.color_list.append(self.state.mode.name)
|
||||
except Exception as e:
|
||||
self.control_logger.error(f"报表失败: {e}")
|
||||
|
||||
# if (self.state.is_about_mode() and time.time() - self.ch340.start > end_time) or not self.state.is_about_mode():
|
||||
if self._pred() is None:
|
||||
self.control_logger.error("预测失败,跳过当前帧")
|
||||
continue
|
||||
|
||||
# 释放视频录制器
|
||||
if hasattr(self, 'capFile') and self.capFile.isOpened():
|
||||
if hasattr(self, "capFile") and self.capFile.isOpened():
|
||||
self.capFile.release()
|
||||
self.system_logger.info(f"视频录制完成: {vid_name}")
|
||||
|
||||
@ -344,28 +411,40 @@ class MAT:
|
||||
"总体积 ": f"{self.total_volume:.2f} ml",
|
||||
"终点体积": f"{self.colored_volume:.2f} ml",
|
||||
"理论体积": f"{self.colored_volume:.2f} ml",
|
||||
"实验时长": f"{time.time() - self.start_time:.2f} 秒"
|
||||
"实验时长": f"{time.time() - self.start_time:.2f} 秒",
|
||||
"滴定速率": f"{self.total_volume / (time.time() - self.start_time):.2f} mL/s",
|
||||
}
|
||||
upload_data = {
|
||||
"start_time": self.format_date_time(self.start_time),
|
||||
"end_time": self.format_date_time(time.time()),
|
||||
"volume_record": f"{self.volume_list}",
|
||||
"voltage_record": f"{[]}",
|
||||
"color_record": f"{self.color_list}",
|
||||
"final_volume": f"{self.colored_volume}",
|
||||
}
|
||||
self.system_logger.info("实验结果:")
|
||||
for key, value in experiment_results.items():
|
||||
self.system_logger.info(f" {key}: {value}")
|
||||
|
||||
return upload_data, self.colored_im
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
token = login_to_platform("13504022184", "password")
|
||||
|
||||
# 创建MAT类的实例并运行
|
||||
mat = MAT(
|
||||
videoSourceIndex = 1,
|
||||
bounce_time=4,
|
||||
end_bounce_time=0.01
|
||||
)
|
||||
mat = MAT(videoSourceIndex=1, bounce_time=2, sensity=32)
|
||||
|
||||
mat.state.mode = State.Mode.FAST
|
||||
|
||||
mat.run(
|
||||
slow_speed = 0.05,
|
||||
quick_speed = 0.15,
|
||||
final_data, finish_picture = mat.run(
|
||||
slow_speed=0.05,
|
||||
quick_speed=1.0,
|
||||
about_time=3,
|
||||
# cap_dir=None
|
||||
)
|
||||
with open("log.json", "w") as f:
|
||||
import json
|
||||
|
||||
|
||||
json.dump(final_data, f, indent=4)
|
||||
send_data_to_platform(token, final_data, finish_picture)
|
||||
|
@ -1,5 +1,5 @@
|
||||
numpy
|
||||
opencv-python
|
||||
pyserial
|
||||
matplotlib
|
||||
PySide6
|
||||
scipy
|
||||
requests
|
399
utils.py
399
utils.py
@ -1,21 +1,25 @@
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Tuple, Optional, Any
|
||||
from typing import List, Literal, Optional
|
||||
|
||||
import numpy as np
|
||||
import time
|
||||
import matplotlib.pyplot as plt
|
||||
from matplotlib.animation import FuncAnimation
|
||||
import threading
|
||||
from collections import deque
|
||||
import requests
|
||||
|
||||
stateType = Literal["transport", "middle", "about", "colored"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class HistoryRecord:
|
||||
"""历史记录的单个条目"""
|
||||
|
||||
timestamp: float
|
||||
state: str # 'transport', 'middle', 'about', 'colored'
|
||||
state: stateType
|
||||
rate: float
|
||||
volume: float
|
||||
image: np.ndarray
|
||||
@ -24,7 +28,7 @@ class HistoryRecord:
|
||||
class History:
|
||||
"""滑动窗口历史记录管理类"""
|
||||
|
||||
def __init__(self, max_window_size: float = 5.0, base_time = 5.0, display: bool = False):
|
||||
def __init__(self, max_window_size: float = 5.0, base_time=5.0):
|
||||
"""
|
||||
初始化历史记录管理器
|
||||
|
||||
@ -34,7 +38,7 @@ class History:
|
||||
display: 是否开启实时可视化显示
|
||||
"""
|
||||
if base_time > max_window_size:
|
||||
max_window_size = base_time+0.2
|
||||
max_window_size = base_time + 0.2
|
||||
# raise ValueError("Base time must be less than or equal to max window size.")
|
||||
self.records: List[HistoryRecord] = []
|
||||
self.max_window_size = max_window_size
|
||||
@ -43,29 +47,31 @@ class History:
|
||||
self.base = None
|
||||
self._base_time = base_time
|
||||
self._base_cnt = 0
|
||||
self.display = display
|
||||
self.fig: Any = None
|
||||
self.ax: Any = None
|
||||
self.line: Any = None
|
||||
self.base_line: Any = None # 基准线的引用
|
||||
self.animation: Any = None
|
||||
self._plot_thread: Optional[threading.Thread] = None
|
||||
self._plot_running = False
|
||||
self.end_history: list[bool] = []
|
||||
self.last_end = 0
|
||||
|
||||
if self.display:
|
||||
self._setup_plot()
|
||||
def add_record(self, timestamp: float, state: str, rate: float, volume: float, image: np.ndarray):
|
||||
def add_record(
|
||||
self,
|
||||
timestamp: float,
|
||||
state: stateType,
|
||||
rate: float,
|
||||
volume: float,
|
||||
image: np.ndarray,
|
||||
):
|
||||
"""添加新的历史记录"""
|
||||
record = HistoryRecord(timestamp, state, rate, volume, image)
|
||||
self.records.append(record)
|
||||
self._cleanup_old_records(timestamp)
|
||||
if self.base is None and (timestamp-self._base_time)>= self.records[0].timestamp:
|
||||
if (
|
||||
self.base is None
|
||||
and (timestamp - self._base_time) >= self.records[0].timestamp
|
||||
):
|
||||
if self._base_cnt < 30:
|
||||
self._base_cnt+=1
|
||||
self._base_cnt += 1
|
||||
return
|
||||
base_records = self.get_recent_records(self._base_time, timestamp)
|
||||
self.base = sum([rec.rate for rec in base_records]) / len(base_records)
|
||||
get_endpoint_logger().info("Base rate calculated: %.2f", self.base)
|
||||
logging.getLogger("Endpoint").info("Base rate calculated: %.2f", self.base)
|
||||
|
||||
def _cleanup_old_records(self, current_time: float):
|
||||
"""清理过期的历史记录"""
|
||||
@ -76,20 +82,29 @@ class History:
|
||||
self.records.pop(0)
|
||||
return True
|
||||
|
||||
def get_records_in_timespan(self, start_time: float, end_time: Optional[float] = None) -> List[HistoryRecord]:
|
||||
def get_records_in_timespan(
|
||||
self, start_time: float, end_time: Optional[float] = None
|
||||
) -> List[HistoryRecord]:
|
||||
"""获取指定时间段内的记录"""
|
||||
if end_time is None:
|
||||
return [record for record in self.records if record.timestamp >= start_time]
|
||||
else:
|
||||
return [record for record in self.records
|
||||
if start_time <= record.timestamp <= end_time]
|
||||
return [
|
||||
record
|
||||
for record in self.records
|
||||
if start_time <= record.timestamp <= end_time
|
||||
]
|
||||
|
||||
def get_recent_records(self, duration: float, current_time: float) -> List[HistoryRecord]:
|
||||
def get_recent_records(
|
||||
self, duration: float, current_time: float
|
||||
) -> List[HistoryRecord]:
|
||||
"""获取最近指定时间长度内的记录"""
|
||||
start_time = current_time - duration
|
||||
return self.get_records_in_timespan(start_time, current_time)
|
||||
|
||||
def get_state_ratio(self, target_state: str, records: Optional[List[HistoryRecord]] = None) -> float:
|
||||
def get_state_ratio(
|
||||
self, target_state: str, records: Optional[List[HistoryRecord]] = None
|
||||
) -> float:
|
||||
"""计算指定状态在记录中的比例"""
|
||||
if records is None:
|
||||
records = self.records
|
||||
@ -102,9 +117,13 @@ class History:
|
||||
|
||||
def get_states_by_type(self, target_state: str) -> List[float]:
|
||||
"""获取所有指定状态的时间戳"""
|
||||
return [record.timestamp for record in self.records if record.state == target_state]
|
||||
return [
|
||||
record.timestamp for record in self.records if record.state == target_state
|
||||
]
|
||||
|
||||
def find_record_by_timestamp(self, target_timestamp: float) -> Optional[HistoryRecord]:
|
||||
def find_record_by_timestamp(
|
||||
self, target_timestamp: float
|
||||
) -> Optional[HistoryRecord]:
|
||||
"""根据时间戳查找记录"""
|
||||
for record in self.records:
|
||||
if record.timestamp == target_timestamp:
|
||||
@ -115,184 +134,18 @@ class History:
|
||||
"""检查历史记录是否为空"""
|
||||
return len(self.records) == 0
|
||||
|
||||
def __len__(self) -> int:
|
||||
"""返回历史记录的数量"""
|
||||
return len(self.records)
|
||||
|
||||
def clear(self):
|
||||
"""清空所有历史记录"""
|
||||
self.records.clear()
|
||||
if self.display:
|
||||
self._stop_plot()
|
||||
|
||||
def _setup_plot(self):
|
||||
"""设置matplotlib图表"""
|
||||
plt.ion() # 交互式模式
|
||||
self.fig, self.ax = plt.subplots(figsize=(10, 6))
|
||||
self.ax.set_title('实时Rate值变化')
|
||||
self.ax.set_xlabel('时间 (s)')
|
||||
self.ax.set_ylabel('Rate值')
|
||||
self.ax.grid(True, alpha=0.3)
|
||||
|
||||
# 初始化空线条
|
||||
self.line, = self.ax.plot([], [], 'b-', linewidth=2, label='Rate')
|
||||
self.base_line = None # 基准线的引用
|
||||
self.ax.legend()
|
||||
|
||||
# 启动更新线程
|
||||
self._plot_running = True
|
||||
self._plot_thread = threading.Thread(target=self._update_plot_loop, daemon=True)
|
||||
self._plot_thread.start()
|
||||
|
||||
def _update_plot_loop(self):
|
||||
"""绘图更新循环"""
|
||||
while self._plot_running:
|
||||
try:
|
||||
self._update_plot()
|
||||
time.sleep(0.1) # 100ms更新间隔
|
||||
except Exception as e:
|
||||
get_vision_logger().error(f"Plot update error: {e}")
|
||||
break
|
||||
def _update_plot(self):
|
||||
"""更新图表数据 - 按照matplotlib推荐的方式重构"""
|
||||
if not self.records:
|
||||
return
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# 获取时间和rate数据
|
||||
timestamps = [record.timestamp for record in self.records]
|
||||
rates = [record.rate for record in self.records]
|
||||
|
||||
if not timestamps:
|
||||
return
|
||||
|
||||
# 将时间戳转换为相对时间(从第一个记录开始)
|
||||
start_timestamp = timestamps[0]
|
||||
relative_times = [(t - start_timestamp) for t in timestamps]
|
||||
|
||||
# 更新主线条数据
|
||||
self.line.set_data(relative_times, rates)
|
||||
|
||||
# 处理base线
|
||||
self._update_base_line()
|
||||
|
||||
# 自动调整坐标轴范围
|
||||
self._update_axis_limits(relative_times, rates)
|
||||
|
||||
# 使用推荐的绘制方式
|
||||
self.ax.draw_artist(self.line)
|
||||
self.fig.canvas.draw() # 使用draw_idle而不是draw
|
||||
|
||||
except Exception as e:
|
||||
get_vision_logger().error(f"Plot update error: {e}")
|
||||
finally:
|
||||
# 性能监控
|
||||
elapsed = time.time() - start_time
|
||||
if elapsed > 0.1:
|
||||
get_vision_logger().warning(f"Plot update took too long: {elapsed:.3f}s")
|
||||
|
||||
def _update_base_line(self):
|
||||
"""更新或创建基准线"""
|
||||
if self.base is not None:
|
||||
if self.base_line is None:
|
||||
# 首次创建基准线
|
||||
self.base_line = self.ax.axhline(
|
||||
y=self.base,
|
||||
color='r',
|
||||
linestyle='--',
|
||||
alpha=0.7,
|
||||
label=f'Base Rate: {self.base:.2f}'
|
||||
)
|
||||
# 更新图例
|
||||
self.ax.legend()
|
||||
else:
|
||||
# 更新现有基准线的位置
|
||||
self.base_line.set_ydata([self.base, self.base])
|
||||
# 更新标签
|
||||
self.base_line.set_label(f'Base Rate: {self.base:.2f}')
|
||||
self.ax.legend()
|
||||
elif self.base_line is not None:
|
||||
# 如果base被重置,移除基准线
|
||||
self.base_line.remove()
|
||||
self.base_line = None
|
||||
self.ax.legend()
|
||||
|
||||
def _update_axis_limits(self, relative_times, rates):
|
||||
"""智能更新坐标轴范围"""
|
||||
if not relative_times or not rates:
|
||||
return
|
||||
|
||||
# X轴范围
|
||||
max_time = max(relative_times)
|
||||
self.ax.set_xlim(0, max_time + max(1, max_time * 0.05)) # 添加5%的边距
|
||||
|
||||
# Y轴范围计算
|
||||
min_rate, max_rate = min(rates), max(rates)
|
||||
|
||||
if self.base is not None:
|
||||
# 如果有基准线,确保Y轴范围合理
|
||||
y_min = 0
|
||||
y_max = min(max(self.base * 35, max_rate * 1.2), 100) # 限制最大值为100
|
||||
else:
|
||||
# 没有基准线时的默认范围
|
||||
margin = (max_rate - min_rate) * 0.1 if max_rate > min_rate else 1
|
||||
y_min = max(0, min_rate - margin)
|
||||
y_max = max_rate + margin
|
||||
|
||||
self.ax.set_ylim(y_min, y_max)
|
||||
def _stop_plot(self):
|
||||
"""停止绘图"""
|
||||
self._plot_running = False
|
||||
if self._plot_thread and self._plot_thread.is_alive():
|
||||
self._plot_thread.join(timeout=1.0)
|
||||
if self.fig:
|
||||
plt.close(self.fig)
|
||||
self.fig = None
|
||||
self.ax = None
|
||||
self.line = None
|
||||
self.base_line = None
|
||||
|
||||
def save_plot(self, filename: Optional[str] = None):
|
||||
"""保存当前图表"""
|
||||
if not self.display or not self.fig:
|
||||
get_vision_logger().warning("Plot not initialized, cannot save")
|
||||
return
|
||||
|
||||
if filename is None:
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
filename = f"rate_plot_{timestamp}.png"
|
||||
|
||||
try:
|
||||
self.fig.savefig(filename, dpi=300, bbox_inches='tight')
|
||||
get_vision_logger().info(f"Plot saved to {filename}")
|
||||
except Exception as e:
|
||||
get_vision_logger().error(f"Failed to save plot: {e}")
|
||||
|
||||
def toggle_display(self):
|
||||
"""切换显示状态"""
|
||||
if self.display:
|
||||
self._stop_plot()
|
||||
self.display = False
|
||||
else:
|
||||
self.display = True
|
||||
self._setup_plot()
|
||||
|
||||
|
||||
class State:
|
||||
"""滴定状态管理类"""
|
||||
|
||||
class Mode(Enum):
|
||||
FAST = 0 # 快速模式
|
||||
SLOW = 1 # 慢速模式 (middle)
|
||||
ABOUT = 2 # 接近终点模式
|
||||
CRAZY = 3 # CRAZY模式
|
||||
# END = 3 # 终点模式
|
||||
|
||||
def __init__(self, bounce_time=1, end_bounce_time=5):
|
||||
def __init__(self, bounce_time=1):
|
||||
self.mode = self.Mode.FAST
|
||||
self.bounce_time = bounce_time
|
||||
self.end_bounce_time = end_bounce_time
|
||||
|
||||
# 状态检查标志
|
||||
self.in_middle_check = False
|
||||
@ -301,8 +154,7 @@ class State:
|
||||
self.about_first_flag = False
|
||||
|
||||
# 时间记录
|
||||
self.middle_detected_time = None
|
||||
self.end_detected_time = None
|
||||
self.middle_detected_time: Optional[float] = None
|
||||
|
||||
def is_fast_mode(self):
|
||||
return self.mode == self.Mode.FAST
|
||||
@ -313,9 +165,6 @@ class State:
|
||||
def is_about_mode(self):
|
||||
return self.mode == self.Mode.ABOUT
|
||||
|
||||
# def is_end_mode(self):
|
||||
# return self.mode == self.Mode.END
|
||||
|
||||
def enter_middle_state(self, current_time):
|
||||
"""进入middle状态 - 立即切换到slow模式并开始检查"""
|
||||
self.mode = self.Mode.SLOW
|
||||
@ -327,12 +176,6 @@ class State:
|
||||
if self.mode == self.Mode.SLOW:
|
||||
self.mode = self.Mode.ABOUT
|
||||
|
||||
def enter_end_check(self, current_time):
|
||||
"""进入end检查状态"""
|
||||
self.in_end_check = True
|
||||
self.end_detected_time = current_time
|
||||
self.mode = self.Mode.ABOUT
|
||||
|
||||
def exit_middle_check(self):
|
||||
"""退出middle检查状态,返回fast模式"""
|
||||
self.in_middle_check = False
|
||||
@ -348,33 +191,98 @@ class State:
|
||||
|
||||
def should_check_middle_exit(self, current_time):
|
||||
"""检查是否应该进行middle退出检查"""
|
||||
return (self.in_middle_check and
|
||||
self.middle_detected_time is not None and
|
||||
current_time - self.middle_detected_time > self.bounce_time and
|
||||
(self.mode == self.Mode.SLOW))
|
||||
|
||||
def should_check_end_result(self, current_time):
|
||||
"""检查是否应该进行end结果检查"""
|
||||
return (self.in_end_check and
|
||||
self.end_detected_time is not None and
|
||||
current_time - self.end_detected_time > self.end_bounce_time)
|
||||
|
||||
def reset_end_check(self):
|
||||
"""重置end检查状态"""
|
||||
self.in_end_check = False
|
||||
self.end_detected_time = None
|
||||
return (
|
||||
self.in_middle_check
|
||||
and self.middle_detected_time is not None
|
||||
and current_time - self.middle_detected_time > self.bounce_time
|
||||
and (self.mode == self.Mode.SLOW)
|
||||
)
|
||||
|
||||
def get_status_text(self):
|
||||
"""获取状态显示文本"""
|
||||
status = []
|
||||
current_time = time.time()
|
||||
if self.in_middle_check and current_time - self.middle_detected_time > self.bounce_time:
|
||||
if (
|
||||
self.in_middle_check
|
||||
and current_time - self.middle_detected_time > self.bounce_time
|
||||
):
|
||||
status.append("MIDCHK")
|
||||
if self.in_end_check and current_time - self.end_detected_time > self.end_bounce_time:
|
||||
status.append("ENDCHK")
|
||||
return ", " + ", ".join(status) if status else ""
|
||||
|
||||
|
||||
def login_to_platform(username, password):
|
||||
"""登录到平台获取token"""
|
||||
try:
|
||||
wwwF = {"userName": username, "password": password}
|
||||
url = "https://jingsai.mools.net/api/login"
|
||||
|
||||
response = requests.post(url, wwwF, timeout=2)
|
||||
if response is None:
|
||||
print("错误", "网络连接失败 ")
|
||||
return None
|
||||
request = json.loads(response.text)
|
||||
if request["code"] == 1:
|
||||
# 登陆成功
|
||||
# print("登陆成功",'登陆成功')
|
||||
# 从服务器获取到的数据中找到token
|
||||
token = request["token"]
|
||||
print("成功", "登录成功!")
|
||||
return token
|
||||
elif request["code"] == 2:
|
||||
print("错误", "用户名或密码错误!")
|
||||
return None
|
||||
else:
|
||||
print("错误", "登陆失败 ")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("错误", f"发送数据时出错:{e}")
|
||||
return None
|
||||
|
||||
|
||||
def send_data_to_platform(token, data, picture):
|
||||
"""将数据发送到平台"""
|
||||
try:
|
||||
# if 1:
|
||||
# 打开图片文件并转换为 Base64 编码
|
||||
with open(picture, "rb") as picture_file:
|
||||
picture_data = picture_file.read()
|
||||
base64_encoded_picture = base64.b64encode(picture_data).decode("utf-8")
|
||||
|
||||
# print(base64_encoded_picture)
|
||||
|
||||
# 更新数据字典,添加 Base64 编码的图片
|
||||
data["final_image"] = base64_encoded_picture
|
||||
# print(data)
|
||||
|
||||
# 设置请求头
|
||||
headers = {
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
url = "https://jingsai.mools.net/api/upload-record"
|
||||
# 准备 JSON 数据
|
||||
json_data = json.dumps(data)
|
||||
# 发送 POST 请求
|
||||
response = requests.post(url, headers=headers, data=json_data)
|
||||
request = json.loads(response.text)
|
||||
# print(request['code'])
|
||||
# 检查响应
|
||||
if request["code"] == 1:
|
||||
print("提交成功", "提交成功")
|
||||
|
||||
else:
|
||||
print(
|
||||
"错误",
|
||||
f"网络请求失败,状态码:{response.status_code}\n错误信息:{response.text}",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
raise e
|
||||
print("错误", f"发送数据时出错:{e}")
|
||||
|
||||
|
||||
def setup_logging(log_level=logging.INFO, log_dir="logs"):
|
||||
"""
|
||||
设置logging配置,创建不同模块的logger
|
||||
@ -388,42 +296,17 @@ def setup_logging(log_level=logging.INFO, log_dir="logs"):
|
||||
os.makedirs(log_dir)
|
||||
|
||||
# 获取当前时间作为日志文件名
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
log_file = os.path.join(log_dir, f"titration_{timestamp}.log")
|
||||
|
||||
# 配置根logger
|
||||
logging.basicConfig(
|
||||
level=log_level,
|
||||
format='%(asctime)s - %(name)8s - %(levelname)7s - %(message)s',
|
||||
format="%(asctime)s - %(name)8s - %(levelname)7s - %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler(log_file, encoding='utf-8'),
|
||||
logging.StreamHandler() # 同时输出到控制台
|
||||
]
|
||||
logging.FileHandler(log_file, encoding="utf-8"),
|
||||
logging.StreamHandler(), # 同时输出到控制台
|
||||
],
|
||||
)
|
||||
|
||||
return log_file
|
||||
|
||||
|
||||
def get_system_logger():
|
||||
"""系统初始化和控制相关的logger"""
|
||||
return logging.getLogger("System")
|
||||
|
||||
def get_hardware_logger():
|
||||
"""硬件控制相关的logger(CH340等)"""
|
||||
return logging.getLogger("Hardware")
|
||||
|
||||
def get_vision_logger():
|
||||
"""图像处理和预测相关的logger"""
|
||||
return logging.getLogger("Vision")
|
||||
|
||||
def get_control_logger():
|
||||
"""控制逻辑相关的logger"""
|
||||
return logging.getLogger("Control")
|
||||
|
||||
def get_endpoint_logger():
|
||||
"""终点检测相关的logger"""
|
||||
return logging.getLogger("Endpoint")
|
||||
|
||||
def get_volume_logger():
|
||||
"""体积计算相关的logger"""
|
||||
return logging.getLogger("Volume")
|
||||
|
63
vid_chk.py
63
vid_chk.py
@ -1,18 +1,67 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
import ch340
|
||||
|
||||
flag = False
|
||||
vidId = 1
|
||||
cap = cv2.VideoCapture(vidId, cv2.CAP_DSHOW)
|
||||
while not flag:
|
||||
suc,im = cap.read()
|
||||
cv2.imshow("image",im)
|
||||
suc, im = cap.read()
|
||||
cv2.imshow("image", im)
|
||||
k = cv2.waitKey(0)
|
||||
if k & 0xff == ord('q'):
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
flag=True
|
||||
if k & 0xFF == ord("q"):
|
||||
# cap.release()
|
||||
# cv2.destroyAllWindows()
|
||||
flag = True
|
||||
else:
|
||||
vidId+=1
|
||||
vidId += 1
|
||||
cap.release()
|
||||
cap.open(vidId, cv2.CAP_DSHOW)
|
||||
print(f"使用摄像头索引: {vidId}")
|
||||
|
||||
|
||||
base = 30
|
||||
pump = ch340.CH340()
|
||||
while True:
|
||||
suc, im = cap.read()
|
||||
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
|
||||
s = hsv[:, :, 1]
|
||||
mask = s > base
|
||||
cv2.imshow("mask", im * mask[:, :, np.newaxis])
|
||||
tot = mask.shape[0] * mask.shape[1]
|
||||
val = np.sum(mask)
|
||||
rate = val / tot
|
||||
thr = (0.05, 0.2, 0.5)
|
||||
ret = ""
|
||||
if rate < thr[0]:
|
||||
ret = "transport"
|
||||
elif rate < thr[1]:
|
||||
ret = "middle"
|
||||
elif rate < thr[2]:
|
||||
ret = "about"
|
||||
else:
|
||||
ret = "colored"
|
||||
im = cv2.putText(
|
||||
im,
|
||||
f"Rate: {rate:.2},base={base},{ret}",
|
||||
(10, 30),
|
||||
cv2.FONT_HERSHEY_SIMPLEX,
|
||||
1,
|
||||
(0, 255, 0),
|
||||
2,
|
||||
)
|
||||
cv2.imshow("image", im)
|
||||
k = cv2.waitKey(1)
|
||||
if k & 0xFF == ord("w"):
|
||||
base += 1
|
||||
elif k & 0xFF == ord("s"):
|
||||
base -= 1
|
||||
elif k & 0xFF == ord("a"):
|
||||
pump.push_async(speed=0.1, vol=0.1)
|
||||
elif k & 0xFF == ord("q"):
|
||||
print("Camera index = ", vidId)
|
||||
print("k =", base)
|
||||
cap.release()
|
||||
cv2.destroyAllWindows()
|
||||
break
|
||||
|
Reference in New Issue
Block a user