Compare commits

...

10 Commits

Author SHA1 Message Date
29eea0817e write README
Former-commit-id: 634e6fd710478b787c344c3bbe85b6b2cbddf9d2
2025-07-07 19:14:54 +08:00
b696cb71be fix
Former-commit-id: e60e29fb767ab2b7d8152eced7a50255ce6b06ad
2025-07-07 18:56:33 +08:00
e0e1c649eb clean and format codebase
Former-commit-id: 5d0497ac67199a7ea475849a6ec3f28df46371cb
2025-07-07 18:52:50 +08:00
e0df489aee clean code
Former-commit-id: 0c4a66b08f05c881992c7303fdebfd15d65d82df
2025-07-07 18:51:02 +08:00
27c1b08365 final
Former-commit-id: 251b57f490efdc292b0c4c019431219b75ee890c
2025-07-03 22:29:31 +08:00
42460ad72f upload
Former-commit-id: 3af1a19b5b0e0bb4dbc367468b36c62aa8762a91
2025-06-20 22:38:46 +08:00
492b37c912 final 1
Former-commit-id: dc04af028eb2ecbda74eef38f377be0989e21756
2025-06-12 18:50:25 +08:00
6ad826c17f visual test 1
Former-commit-id: fd87c16bba13b15e8ebc82d816cbf289ec9118f8
2025-06-08 22:03:50 +08:00
c1f33df66d 1
Former-commit-id: 1b34dc488f5306fcb5d20746c217255bb9ead3c5
2025-06-06 23:02:15 +08:00
8184179a17 alter
Former-commit-id: 0d9b33e7625efe7bee422d1514d8453ff689553d
2025-06-05 22:23:28 +08:00
12 changed files with 558 additions and 739 deletions

1
.gitignore vendored
View File

@ -12,3 +12,4 @@ Videos
*.dist
HCHO
upx.exe
media

View File

@ -1,14 +1,40 @@
状态管理逻辑:
- 滑动窗口
维护一个滑动窗口history记录最近end_bounce_time内的状态。
状态包括:时间,浓度,状态,当前截图
这是一个基于传统图像处理方法的AI视觉滴定控制程序。
- 进入
1. middle: predictor返回middle立即进入slow状态。
2. about: 返回about且处于middle则进入about状态
3. end返回end置end_check标记开始检查
## 操作步骤
- check
1. middle: 进入middle的bounce_time后在最近bounce_time内middle比例<70%返回fast状态
2. about随着middle退出一起退出不单独处理
3. end进入end之后的end_bounce_time如果end_bounce_time内end比例<80%则实验终止逻辑否则从history中找到第二个end并继续check逻辑
1. 安装依赖
```bash
pip install -r requirements.txt
```
2. 校准光线
1. 运行`vid_chk.py`,按任意键直到显示为目标摄像头,按下`q`
2.`w``s`直到左上角参数在$0.01\sim 0.03$
3.`q`后将控制台输出参数输入`main.py`
3. 运行主程序
## 技术说明
### 变色预测
1. 前期分析
透明的溶液饱和度较低。粉色饱和度较高。故取HSV中的S分析筛选$S_{\rm{fil}}S>\rm{sensity}$,再根据$\frac{S_{\rm{fil}}}{S}$判断变色区域大小,并避免背景干扰
2. 终点检测
透明的溶液色相和变色后不同即HSV中的H不同。但是任何单点包括平均值不足以反应溶液是否完全变色故对图像H通道作直方图分析。刚到突变点时直方图的最高峰位置右移且位置和透明显著不同。随着继续滴加峰值继续右移并且从尖峰变为宽峰。
### 滴定控制
1. 前期
滴入后溶液完全没有变色,可以采用高速滴加。当发现溶液有小区域变色时,稍微降低速度,避免气泡、搅匀等因素干扰。
2. 中期
滴入后,溶液发生变色,且在数秒内褪色。这时候距离终点约$1\sim 2\,\rm{mL}$,采用较慢的滴定速度($0.05\,\rm{mL/s}$),且每次滴入后等待$1\sim 2\,\rm{s}$。
3. 末期
滴入后,溶液大范围变色,且在数秒至十秒左右褪色。这时继续减慢滴入速率($0.02\,\rm{mL/s}$并每次滴入后终点检测如认为变色则等待数秒后重新判断。持续4次后认为到达滴定终点汇报数据。
**意外处理**
中期检测可能存在误判。检测符合中期条件后,立即进入中期模式。如果在`bounce_time`内判定为中期的样本过少,则退出。`bounce_time`使用滑动窗口处理。
特别地为了避免在末期意外退出连续2次判定终点成功后禁用该退出。
类似地末期也有退出处理连续判定终点3次以上禁用退出。

View File

View File

@ -1,223 +0,0 @@
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import re
from datetime import datetime
import main as _main
import argparse
class RateVolumeAnalyzer:
def __init__(self):
self.mat = _main.MAT() # 创建MAT实例以使用predictor方法
def parse_log_file(self, log_path):
"""解析日志文件,提取体积和时间信息"""
volumes = []
timestamps = []
with open(log_path, 'r', encoding='utf-8') as f:
for line in f:
# 匹配体积信息的行
if '当前体积:' in line:
# 提取时间戳
time_match = re.search(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d+', line)
# 提取体积
volume_match = re.search(r'当前体积: ([\d.]+) ml', line)
if time_match and volume_match:
timestamp = datetime.strptime(time_match.group(1), '%Y-%m-%d %H:%M:%S')
volume = float(volume_match.group(1))
timestamps.append(timestamp)
volumes.append(volume)
return timestamps, volumes
def extract_frames_from_video(self, video_path, timestamps, start_time):
"""从视频中提取对应时间点的帧"""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return []
fps = cap.get(cv2.CAP_PROP_FPS)
frames = []
for timestamp in timestamps:
# 计算相对于实验开始的秒数
seconds_from_start = (timestamp - start_time).total_seconds()
frame_number = int(seconds_from_start * fps)
# 设置视频位置到指定帧
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = cap.read()
if ret:
frames.append(frame)
else:
frames.append(None) # 如果无法读取帧添加None
cap.release()
return frames
def calculate_rates(self, frames):
"""使用predictor方法计算每帧的rate"""
rates = []
states = []
for frame in frames:
if frame is not None:
try:
state, rate = self.mat.predictor(frame)
rates.append(rate)
states.append(state)
except Exception as e:
print(f"计算rate时发生错误: {e}")
rates.append(None)
states.append(None)
else:
rates.append(None)
states.append(None)
return rates, states
def plot_rate_volume_curve(self, volumes, rates, states, log_filename):
"""绘制rate-体积曲线"""
# 过滤掉None值
valid_data = [(v, r, s) for v, r, s in zip(volumes, rates, states)
if r is not None and s is not None]
if not valid_data:
print("没有有效的数据点可以绘制")
return
volumes_valid, rates_valid, states_valid = zip(*valid_data)
plt.figure(figsize=(12, 8))
# 根据状态用不同颜色绘制点
colors = {'transport': 'blue', 'middle': 'orange', 'about': 'purple', 'colored': 'red'}
for state in colors:
state_volumes = [v for v, s in zip(volumes_valid, states_valid) if s == state]
state_rates = [r for r, s in zip(rates_valid, states_valid) if s == state]
if state_volumes:
plt.scatter(state_volumes, state_rates,
c=colors[state], label=state, alpha=0.7, s=50)
# 绘制连接线
plt.plot(volumes_valid, rates_valid, 'k-', alpha=0.3, linewidth=1)
plt.xlabel('体积 (ml)', fontsize=12)
plt.ylabel('Rate', fontsize=12)
plt.title(f'Rate-体积曲线 ({log_filename})', fontsize=14)
plt.legend()
plt.grid(True, alpha=0.3)
# 保存图片
output_filename = f"rate_volume_curve_{log_filename.replace('.log', '.png')}"
plt.savefig(output_filename, dpi=300, bbox_inches='tight')
plt.show()
print(f"图片已保存为: {output_filename}")
# 打印统计信息
print(f"\n统计信息:")
print(f"总数据点: {len(valid_data)}")
for state in colors:
count = sum(1 for s in states_valid if s == state)
if count > 0:
print(f"{state}: {count} 个点")
def analyze_experiment(self, timestamp_str=None):
"""分析指定时间戳的实验,如果不指定则分析最新的实验"""
logs_dir = "logs"
videos_dir = "Videos"
if timestamp_str:
log_file = f"titration_{timestamp_str}.log"
video_file = f"{timestamp_str}.mp4"
else:
# 找到最新的日志文件
log_files = [f for f in os.listdir(logs_dir) if f.endswith('.log')]
if not log_files:
print("没有找到日志文件")
return
log_files.sort()
log_file = log_files[-1]
# 提取时间戳来找对应的视频文件
timestamp_match = re.search(r'titration_(\d{8}_\d{6})\.log', log_file)
if timestamp_match:
video_file = f"{timestamp_match.group(1)}.mp4"
else:
print("无法从日志文件名提取时间戳")
return
log_path = os.path.join(logs_dir, "titration_20250529_191634.log")
video_path = os.path.join(videos_dir, "tmp.mp4")
if not os.path.exists(log_path):
print(f"日志文件不存在: {log_path}")
return
if not os.path.exists(video_path):
print(f"视频文件不存在: {video_path}")
return
print(f"分析实验: {log_file}")
print(f"对应视频: {video_file}")
# 解析日志文件
timestamps, volumes = self.parse_log_file(log_path)
if not timestamps:
print("日志文件中没有找到体积数据")
return
print(f"找到 {len(timestamps)} 个数据点")
# 获取实验开始时间
start_time = timestamps[0]
# 从视频中提取帧
print("正在从视频中提取帧...")
frames = self.extract_frames_from_video(video_path, timestamps, start_time)
# 计算rate
print("正在计算rate值...")
rates, states = self.calculate_rates(frames)
# 绘制曲线
print("正在绘制rate-体积曲线...")
self.plot_rate_volume_curve(volumes, rates, states, log_file)
def main():
parser = argparse.ArgumentParser(description='分析滴定实验的rate-体积曲线')
parser.add_argument('--timestamp', type=str, help='指定实验时间戳 (格式: YYYYMMDD_HHMMSS)')
parser.add_argument('--list', action='store_true', help='列出所有可用的实验')
args = parser.parse_args()
analyzer = RateVolumeAnalyzer()
if args.list:
# 列出所有可用的实验
logs_dir = "logs"
if os.path.exists(logs_dir):
log_files = [f for f in os.listdir(logs_dir) if f.endswith('.log')]
log_files.sort()
print("可用的实验:")
for log_file in log_files:
timestamp_match = re.search(r'titration_(\d{8}_\d{6})\.log', log_file)
if timestamp_match:
timestamp = timestamp_match.group(1)
print(f" {timestamp}")
return
analyzer.analyze_experiment(args.timestamp)
if __name__ == "__main__":
main()

View File

@ -117,7 +117,7 @@ else:
self._speed = 43
self._speed = 43/34
# x = x*30
print(int((x-int(x))*100),int(x),x)
# print(int((x-int(x))*100),int(x),x)
self.pump_ser.write(f"q1h{int(x)}d".encode('ascii'))
time.sleep(0.01)
self.pump_ser.write(f"q2h{int((x-int(x))*100)}d".encode('ascii'))

Binary file not shown.

BIN
colored.zip Normal file

Binary file not shown.

4
login/info.json Normal file
View File

@ -0,0 +1,4 @@
{
"username": "13504022184",
"password": "password"
}

385
main.py
View File

@ -1,62 +1,82 @@
import cv2
import atexit
import logging
import time
from datetime import datetime
from typing import Optional
import cv2
import numpy as np
from scipy.signal import find_peaks
import ch340
import atexit
import utils
from utils import State, History
from utils import History, State, login_to_platform, send_data_to_platform
class MAT:
def __init__(self, videoSourceIndex=0, bounce_time=2, end_bounce_time=5):
def __init__(self, videoSourceIndex=0, bounce_time=2, sensity=30):
# 初始化logging
utils.setup_logging()
self.system_logger = utils.get_system_logger()
self.control_logger = utils.get_control_logger()
self.endpoint_logger = utils.get_endpoint_logger()
self.volume_logger = utils.get_volume_logger()
self.system_logger = logging.getLogger("System")
self.control_logger = logging.getLogger("Control")
self.endpoint_logger = logging.getLogger("Endpoint")
self.volume_logger = logging.getLogger("Volume")
self.system_logger.info('正在初始化MAT系统...')
self.system_logger.info("正在初始化MAT系统...")
self.videoSourceIndex = videoSourceIndex
self.cap = cv2.VideoCapture(videoSourceIndex, cv2.CAP_DSHOW)
self.ch340 = ch340.CH340()
self.total_volume = 0
self.sensity = sensity
self.state = State(bounce_time, end_bounce_time)
self.state = State(bounce_time)
atexit.register(self.ch340.stop)
self.history = History(max(bounce_time, end_bounce_time))
self.first_about = 0
self.history = History(bounce_time)
self.colored_volume = None
self.colored_time = None
self.colored_im = None
def ch340_pull(self):
"抽取12ml液体"
self.control_logger.info("开始抽取12ml")
self.ch340.max_speed()
self.ch340.pull(vol=12)
self.control_logger.info('完成抽取')
self.control_logger.info("完成抽取")
def ch340_init(self):
self.ch340.push(speed=1,t=1)
self.ch340.pull(speed=1.2,vol=1)
self.control_logger.info('电极位置已初始化')
"初始化电机位置,防止过头导致抽取溶液不准。"
# self.ch340.push(speed=1,t=1)
self.ch340.pull(speed=1.2, vol=1.8)
self.control_logger.info("电极位置已初始化")
def ch340_push(self, speed=0.1):
def ch340_push(self, speed: int | float = 0.1):
"常规推送保证推送时间为1s推送体积=speed"
self.ch340.push_async(speed=speed, t=1)
def process_left(self, now: float, value_only=False):
if self.state.mode == State.Mode.CRAZY:
value_only = True
def process_left(
self, now: float, value_only: Optional[bool] = False
) -> Optional[float]:
"""
计算当前时刻剩余体积并更新到self.total_volume。
@param now: 当前时间戳
@param value_only: 如果为True则只返回剩余体积不更新状态
@return: 当前剩余体积仅当value_only为True时返回
"""
if self.state.mode == State.Mode.ABOUT:
return self.total_volume
st = self.ch340.start
if not value_only:
self.ch340.stop()
assert abs(self.ch340._get_time()-1) < 1e-3
if not abs(self.ch340._get_time() - 1) < 1e-3:
self.control_logger.warning(
f"CH340 timing issue detected: {self.ch340._get_time()}"
)
return self.total_volume
r = now - st
ret = self.total_volume - (1-r) * self.speeds[self.state.mode.value]
ret = self.total_volume - (1 - r) * self.speeds[self.state.mode.value]
if value_only:
return ret
else:
@ -70,7 +90,7 @@ class MAT:
return None
# 录制当前帧到视频文件
if hasattr(self, 'capFile') and self.capFile.isOpened():
if hasattr(self, "capFile") and self.capFile.isOpened():
self.capFile.write(im)
ret, rate = self.predictor(im)
@ -90,44 +110,44 @@ class MAT:
if self.history.is_empty():
self.control_logger.error("未预期的没有可用的历史记录")
return ret
if self.state.mode == State.Mode.CRAZY:
self._display_status(im, ret, rate, val)
return ret
# === 状态进入逻辑 ===
if now - self._start_time<10:
if now - self._start_time < 10:
self._display_status(im, ret, rate, val)
return ret
if (
not self.edited
and self.history.base is not None
and rate > self.history.base * 2
):
self.edited = True
self.speeds[0] /= 2
# 1. middle: predictor返回middle立即进入slow状态
if ret == "middle":
if self.state.is_fast_mode():
self.process_left(now)
self.state.enter_middle_state(now)
self.control_logger.info(f"检测到middle立即进入slow模式当前体积: {val:.2f} ml")
self.control_logger.info(
f"检测到middle立即进入slow模式当前体积: {val:.2f} ml"
)
# 2. about: 返回about且处于middle则进入about状态
elif ret == "about":
if self.state.is_slow_mode():
self.process_left(now)
self.state.enter_about_state(now)
self.control_logger.info(f"检测到about进入about模式当前体积: {val:.2f} ml")
# 3. end返回colored置end_check标记开始检查
elif ret == "colored":
if not self.state.in_end_check:
self.state.enter_end_check(now)
if self.colored_volume is None:
self.colored_volume = val
self.colored_time = now
self.colored_im = im.copy()
self.endpoint_logger.info(f"检测到colored开始end检查记录体积: {val:.2f} ml")
# === 状态检查逻辑 ===
self.control_logger.info(
f"检测到about进入about模式当前体积: {val:.2f} ml"
)
# middle检查: 进入middle的bounce_time后在最近bounce_time内middle比例<70%返回fast状态
if self.state.should_check_middle_exit(now):
if self.state.should_check_middle_exit(now) and not self.history.last_end:
# 计算最近bounce_time内的middle比例
recent_records = self.history.get_recent_records(self.state.bounce_time, now)
recent_records = self.history.get_recent_records(
self.state.bounce_time, now
)
if recent_records:
trans_ratio = self.history.get_state_ratio("transport", recent_records)
@ -135,119 +155,173 @@ class MAT:
if trans_ratio > 0.4:
self.process_left(now)
self.state.exit_middle_check()
# about状态随middle一起退出
# self.state.exit_about_with_middle()
self.control_logger.info(f"middle比例{trans_ratio:.2%}<60%退出middle检查返回fast模式")
self.control_logger.info(
f"middle比例{trans_ratio:.2%}<60%退出middle检查返回fast模式"
)
if self.state.mode == State.Mode.ABOUT and self.state.about_check:
h = self.history.get_recent_records(self.about_time/3,now)
ratio = self.history.get_state_ratio("about", h)
self.history.about_history.append(ratio<0.3)
h = self.history.get_recent_records(self.about_time / 3, now)
ratio = self.history.get_state_ratio("transport", h)
self.history.about_history.append(ratio == 1)
while len(self.history.about_history) > 5:
self.history.about_history.pop(0)
if len(self.history.about_history) == 5:
if len(self.history.about_history) == 5 and self.history.last_end < 2:
rate = sum(self.history.about_history) / len(self.history.about_history)
if rate > 0.8:
self.state.exit_about()
self.control_logger.info(f"about比例{ratio:.2%}<80%退出about检查返回middle模式")
self.control_logger.info(
f"about比例{ratio:.2%}<80%退出about检查返回middle模式"
)
ret = self.end_check()
if ret == "colored":
return ret
if not self.history.last_end:
self.history.last_end = max(self.history.last_end, ret)
self.state.about_check = False
# end检查: 进入end之后的end_bounce_time如果end比例<80%,则重置;否则终止实验
if self.state.should_check_end_result(now):
colored_ratio = self.history.get_state_ratio("colored", self.history.get_recent_records(self.state.end_bounce_time, now))
if colored_ratio < 0.8:
# end比例<80%从history中找到第二个end并继续check逻辑
self.endpoint_logger.warning(f"colored比例{colored_ratio:.2%}<80%寻找下一个colored点")
# 寻找历史中倒数第二个colored状态
colored_times = self.history.get_states_by_type("colored")
if len(colored_times) >= 2:
# 使用倒数第二个colored时间重新开始检查
second_last_colored_time = colored_times[1]
self.state.end_detected_time = second_last_colored_time
# 更新colored记录为对应的体积
record = self.history.find_record_by_timestamp(second_last_colored_time)
if record:
self.colored_volume = record.volume
self.colored_time = record.timestamp
self.colored_im = record.image.copy()
self.endpoint_logger.info(f"重置到第二个colored点: {self.colored_volume:.2f} ml")
else:
# 没有足够的colored点重置end检查
self.state.reset_end_check()
self.colored_volume = None
self.colored_time = None
self.endpoint_logger.info("没有足够的colored点重置end检查")
else: # end比例>=80%,确认终点,终止实验
self.endpoint_logger.info(f"colored比例{colored_ratio:.2%}>=80%,确认滴定终点")
self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml")
self.running = False
self.ch340.stop()
if self.colored_im is not None:
cv2.imwrite(f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg", self.colored_im)
return "colored"
# 显示状态信息
self._display_status(im, ret, rate, val)
return ret
def end_check(self, dep=0):
"""终点检测,通过对图像直方图峰值分析,确认是否全局变色。"""
if not self.running:
return "colored"
if dep == 1:
suc, im = self.cap.read()
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
# 降低饱和度通道 (S通道是索引1)
hsv[:, :, 1] = hsv[:, :, 1] * 0.8
# 转换回BGR颜色空间
result = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
name = f"colored_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
cv2.imwrite(name, result)
self.colored_im = name
# 维护一个递归检测重复4次后认定为终点。
if dep > 3:
self.colored_volume = self.total_volume
self.endpoint_logger.info(f"最终体积: {self.colored_volume:.2f} ml")
self.running = False
self.ch340.stop()
return "colored"
suc, im = self.cap.read()
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
s = hsv[:, :, 0]
s = s[s > 0]
hist = cv2.calcHist([s], [0], None, [256], [0, 256])
hist = hist.flatten() # 转换为一维数组
# 峰值检测 - 找到直方图中的峰值
peaks, properties = find_peaks(
hist,
height=np.max(hist) * 0.2, # 峰值高度至少是最大值的10%
distance=10, # 峰值之间的最小距离
prominence=np.max(hist) * 0.01,
) # 峰值的突出度
if np.any(peaks > 130):
if dep == 0:
self.process_left(time.time())
self.history.end_history.append(True)
self.control_logger.info(f"检测到colored状态end_check at {dep}")
bgn = time.time()
while time.time() - bgn < 2:
suc, im = self.cap.read()
if hasattr(self, "capFile") and self.capFile.isOpened():
self.capFile.write(im)
cv2.putText(
im,
"ENDCHK",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 255, 0),
2,
)
cv2.imshow("Frame", im)
return self.end_check(dep + 1)
# time.sleep(2)
else:
self.colored_im = None
return dep
def _display_status(self, im, detection_result, rate, volume):
"""显示状态信息到图像上"""
mode_color = {
State.Mode.FAST: (255, 255, 255),
State.Mode.SLOW: (10, 215, 255),
State.Mode.ABOUT: (228,116,167),
State.Mode.CRAZY: (0, 255, 0),
# State.Mode.END: (0, 0, 255)
State.Mode.ABOUT: (228, 116, 167),
}
status_text = f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml"
status_text = (
f"Stat: {detection_result}, rate: {round(rate,2)}, Vol: {volume:.2f} ml"
)
status_text += self.state.get_status_text()
cv2.putText(im, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7,
mode_color[self.state.mode], 2)
cv2.putText(
im,
status_text,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
mode_color[self.state.mode],
2,
)
cv2.imshow("Frame", im)
cv2.waitKey(1)
def predictor(self,im):
hsv = cv2.cvtColor(im,cv2.COLOR_BGR2HSV)
s = hsv[:,:,1]
mask = s>30
cv2.imshow('mask',im*mask[:,:,np.newaxis])
tot = mask.shape[0]*mask.shape[1]
def predictor(self, im):
"""主预测函数分析图像并返回当前状态。当前colored不使用这个函数。"""
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
s = hsv[:, :, 1]
mask = s > self.sensity
cv2.imshow("mask", im * mask[:, :, np.newaxis])
tot = mask.shape[0] * mask.shape[1]
val = np.sum(mask)
rate = val/tot
rate = val / tot
if self.history.base is not None:
base = self.history.base
thr = (min(0.05,base*5), min(0.4,base*9), 0.75)
thr = (min(0.05, base * 5), min(0.2, base * 13), 0.5)
# thr = (base*5, base*13, 0.5)
if rate < thr[0]:
return "transport",rate
elif rate <thr[1]:
return "middle",rate
return "transport", rate
elif rate < thr[1]:
return "middle", rate
elif rate < thr[2]:
return "about",rate
return "about", rate
else:
return "colored",rate
return "colored", rate
else:
return "transport",rate
return "transport", rate
def __del__(self):
self.cap.release()
if hasattr(self, 'capFile') and self.capFile.isOpened():
if hasattr(self, "capFile") and self.capFile.isOpened():
self.capFile.release()
cv2.destroyAllWindows()
def format_date_time(self, timestamp):
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, about_time=1, cap_dir="Videos"):
def run(
self,
quick_speed=0.2,
slow_speed=0.05,
end_speed=0.02,
mid_time=0.5,
about_time=1,
cap_dir="Videos",
):
self.running = True
self.start_time = time.time()
self.speeds = (quick_speed, slow_speed, end_speed, 1.0)
self.speeds: list[float] = [quick_speed, slow_speed, end_speed]
self.need_check = False
self.edited = False
self.volume_list = []
self.color_list = []
if cap_dir is not None:
vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mkv"
@ -260,7 +334,7 @@ class MAT:
"防抖时间 ": self.state.bounce_time,
"快速模式速度": f"{quick_speed} ml/次",
"慢速模式速度": f"{slow_speed} ml/次",
"录制视频 ": vid_name
"录制视频 ": vid_name,
}
self.system_logger.info("实验参数:")
for key, value in experiment_params.items():
@ -271,7 +345,7 @@ class MAT:
# 初始化视频录制器
if vid_name != "Disabled":
fourcc = cv2.VideoWriter.fourcc(*'x264') # 使用更兼容的编码器
fourcc = cv2.VideoWriter.fourcc(*"x264") # 使用更兼容的编码器
self.capFile = cv2.VideoWriter(vid_name, fourcc, fps, (width, height))
if not self.capFile.isOpened():
self.system_logger.error(f"无法打开视频录制器: {vid_name}")
@ -282,41 +356,28 @@ class MAT:
self.ch340_init()
cnt = 0
self._start_time = time.time()
self.about_time=about_time
self.crazy = 0
self.about_time = about_time
while self.running:
if not self.ch340.running:
if self.state.mode == State.Mode.CRAZY:
match self.crazy:
case 0:
self.control_logger.info("crazy 0")
self.ch340_pull()
self.ch340.speed = 1.00
self.ch340.push_async(vol=12)
self.crazy = 1
case 1:
self.control_logger.info("crazy 1")
self.ch340_pull()
self.ch340.push_async(vol=8)
self.total_volume = 20
cnt = 2
self.crazy = 2
self.state.mode = State.Mode.FAST
else:
if 12 * cnt - self.total_volume < 0.5:
self.ch340_pull()
cnt += 1
time.sleep(0.01)
# 简化的推送逻辑
should_push = False
if self.state.is_fast_mode():
should_push = True
elif self.state.is_slow_mode() and time.time() - self.ch340.start > mid_time:
elif (
self.state.is_slow_mode()
and time.time() - self.ch340.start > mid_time
):
should_push = True
elif self.state.is_about_mode() and time.time() - self.ch340.start > about_time:
elif (
self.state.is_about_mode()
and time.time() - self.ch340.start > about_time
):
if not self.state.about_first_flag:
self.state.about_check = True
else:
@ -325,17 +386,23 @@ class MAT:
if should_push:
speed = self.speeds[self.state.mode.value]
self.volume_logger.info(f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次")
self.volume_logger.info(
f"当前体积: {self.total_volume:.2f} ml, 加入速度: {speed:.2f} ml/次"
)
self.ch340_push(speed)
self.total_volume += speed
self.volume_list.append(round(self.total_volume, 2))
try:
self.color_list.append(self.state.mode.name)
except Exception as e:
self.control_logger.error(f"报表失败: {e}")
# if (self.state.is_about_mode() and time.time() - self.ch340.start > end_time) or not self.state.is_about_mode():
if self._pred() is None:
self.control_logger.error("预测失败,跳过当前帧")
continue
# 释放视频录制器
if hasattr(self, 'capFile') and self.capFile.isOpened():
if hasattr(self, "capFile") and self.capFile.isOpened():
self.capFile.release()
self.system_logger.info(f"视频录制完成: {vid_name}")
@ -344,28 +411,40 @@ class MAT:
"总体积 ": f"{self.total_volume:.2f} ml",
"终点体积": f"{self.colored_volume:.2f} ml",
"理论体积": f"{self.colored_volume:.2f} ml",
"实验时长": f"{time.time() - self.start_time:.2f}"
"实验时长": f"{time.time() - self.start_time:.2f}",
"滴定速率": f"{self.total_volume / (time.time() - self.start_time):.2f} mL/s",
}
upload_data = {
"start_time": self.format_date_time(self.start_time),
"end_time": self.format_date_time(time.time()),
"volume_record": f"{self.volume_list}",
"voltage_record": f"{[]}",
"color_record": f"{self.color_list}",
"final_volume": f"{self.colored_volume}",
}
self.system_logger.info("实验结果:")
for key, value in experiment_results.items():
self.system_logger.info(f" {key}: {value}")
return upload_data, self.colored_im
if __name__ == "__main__":
token = login_to_platform("13504022184", "password")
# 创建MAT类的实例并运行
mat = MAT(
videoSourceIndex = 1,
bounce_time=4,
end_bounce_time=0.01
)
mat = MAT(videoSourceIndex=1, bounce_time=2, sensity=32)
mat.state.mode = State.Mode.FAST
mat.run(
slow_speed = 0.05,
quick_speed = 0.15,
final_data, finish_picture = mat.run(
slow_speed=0.05,
quick_speed=1.0,
about_time=3,
# cap_dir=None
)
with open("log.json", "w") as f:
import json
json.dump(final_data, f, indent=4)
send_data_to_platform(token, final_data, finish_picture)

View File

@ -1,5 +1,5 @@
numpy
opencv-python
pyserial
matplotlib
PySide6
scipy
requests

399
utils.py
View File

@ -1,21 +1,25 @@
import base64
import json
import logging
import os
import time
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from dataclasses import dataclass
from typing import List, Tuple, Optional, Any
from typing import List, Literal, Optional
import numpy as np
import time
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import threading
from collections import deque
import requests
stateType = Literal["transport", "middle", "about", "colored"]
@dataclass
class HistoryRecord:
"""历史记录的单个条目"""
timestamp: float
state: str # 'transport', 'middle', 'about', 'colored'
state: stateType
rate: float
volume: float
image: np.ndarray
@ -24,7 +28,7 @@ class HistoryRecord:
class History:
"""滑动窗口历史记录管理类"""
def __init__(self, max_window_size: float = 5.0, base_time = 5.0, display: bool = False):
def __init__(self, max_window_size: float = 5.0, base_time=5.0):
"""
初始化历史记录管理器
@ -34,7 +38,7 @@ class History:
display: 是否开启实时可视化显示
"""
if base_time > max_window_size:
max_window_size = base_time+0.2
max_window_size = base_time + 0.2
# raise ValueError("Base time must be less than or equal to max window size.")
self.records: List[HistoryRecord] = []
self.max_window_size = max_window_size
@ -43,29 +47,31 @@ class History:
self.base = None
self._base_time = base_time
self._base_cnt = 0
self.display = display
self.fig: Any = None
self.ax: Any = None
self.line: Any = None
self.base_line: Any = None # 基准线的引用
self.animation: Any = None
self._plot_thread: Optional[threading.Thread] = None
self._plot_running = False
self.end_history: list[bool] = []
self.last_end = 0
if self.display:
self._setup_plot()
def add_record(self, timestamp: float, state: str, rate: float, volume: float, image: np.ndarray):
def add_record(
self,
timestamp: float,
state: stateType,
rate: float,
volume: float,
image: np.ndarray,
):
"""添加新的历史记录"""
record = HistoryRecord(timestamp, state, rate, volume, image)
self.records.append(record)
self._cleanup_old_records(timestamp)
if self.base is None and (timestamp-self._base_time)>= self.records[0].timestamp:
if (
self.base is None
and (timestamp - self._base_time) >= self.records[0].timestamp
):
if self._base_cnt < 30:
self._base_cnt+=1
self._base_cnt += 1
return
base_records = self.get_recent_records(self._base_time, timestamp)
self.base = sum([rec.rate for rec in base_records]) / len(base_records)
get_endpoint_logger().info("Base rate calculated: %.2f", self.base)
logging.getLogger("Endpoint").info("Base rate calculated: %.2f", self.base)
def _cleanup_old_records(self, current_time: float):
"""清理过期的历史记录"""
@ -76,20 +82,29 @@ class History:
self.records.pop(0)
return True
def get_records_in_timespan(self, start_time: float, end_time: Optional[float] = None) -> List[HistoryRecord]:
def get_records_in_timespan(
self, start_time: float, end_time: Optional[float] = None
) -> List[HistoryRecord]:
"""获取指定时间段内的记录"""
if end_time is None:
return [record for record in self.records if record.timestamp >= start_time]
else:
return [record for record in self.records
if start_time <= record.timestamp <= end_time]
return [
record
for record in self.records
if start_time <= record.timestamp <= end_time
]
def get_recent_records(self, duration: float, current_time: float) -> List[HistoryRecord]:
def get_recent_records(
self, duration: float, current_time: float
) -> List[HistoryRecord]:
"""获取最近指定时间长度内的记录"""
start_time = current_time - duration
return self.get_records_in_timespan(start_time, current_time)
def get_state_ratio(self, target_state: str, records: Optional[List[HistoryRecord]] = None) -> float:
def get_state_ratio(
self, target_state: str, records: Optional[List[HistoryRecord]] = None
) -> float:
"""计算指定状态在记录中的比例"""
if records is None:
records = self.records
@ -102,9 +117,13 @@ class History:
def get_states_by_type(self, target_state: str) -> List[float]:
"""获取所有指定状态的时间戳"""
return [record.timestamp for record in self.records if record.state == target_state]
return [
record.timestamp for record in self.records if record.state == target_state
]
def find_record_by_timestamp(self, target_timestamp: float) -> Optional[HistoryRecord]:
def find_record_by_timestamp(
self, target_timestamp: float
) -> Optional[HistoryRecord]:
"""根据时间戳查找记录"""
for record in self.records:
if record.timestamp == target_timestamp:
@ -115,184 +134,18 @@ class History:
"""检查历史记录是否为空"""
return len(self.records) == 0
def __len__(self) -> int:
"""返回历史记录的数量"""
return len(self.records)
def clear(self):
"""清空所有历史记录"""
self.records.clear()
if self.display:
self._stop_plot()
def _setup_plot(self):
"""设置matplotlib图表"""
plt.ion() # 交互式模式
self.fig, self.ax = plt.subplots(figsize=(10, 6))
self.ax.set_title('实时Rate值变化')
self.ax.set_xlabel('时间 (s)')
self.ax.set_ylabel('Rate值')
self.ax.grid(True, alpha=0.3)
# 初始化空线条
self.line, = self.ax.plot([], [], 'b-', linewidth=2, label='Rate')
self.base_line = None # 基准线的引用
self.ax.legend()
# 启动更新线程
self._plot_running = True
self._plot_thread = threading.Thread(target=self._update_plot_loop, daemon=True)
self._plot_thread.start()
def _update_plot_loop(self):
"""绘图更新循环"""
while self._plot_running:
try:
self._update_plot()
time.sleep(0.1) # 100ms更新间隔
except Exception as e:
get_vision_logger().error(f"Plot update error: {e}")
break
def _update_plot(self):
"""更新图表数据 - 按照matplotlib推荐的方式重构"""
if not self.records:
return
start_time = time.time()
try:
# 获取时间和rate数据
timestamps = [record.timestamp for record in self.records]
rates = [record.rate for record in self.records]
if not timestamps:
return
# 将时间戳转换为相对时间(从第一个记录开始)
start_timestamp = timestamps[0]
relative_times = [(t - start_timestamp) for t in timestamps]
# 更新主线条数据
self.line.set_data(relative_times, rates)
# 处理base线
self._update_base_line()
# 自动调整坐标轴范围
self._update_axis_limits(relative_times, rates)
# 使用推荐的绘制方式
self.ax.draw_artist(self.line)
self.fig.canvas.draw() # 使用draw_idle而不是draw
except Exception as e:
get_vision_logger().error(f"Plot update error: {e}")
finally:
# 性能监控
elapsed = time.time() - start_time
if elapsed > 0.1:
get_vision_logger().warning(f"Plot update took too long: {elapsed:.3f}s")
def _update_base_line(self):
"""更新或创建基准线"""
if self.base is not None:
if self.base_line is None:
# 首次创建基准线
self.base_line = self.ax.axhline(
y=self.base,
color='r',
linestyle='--',
alpha=0.7,
label=f'Base Rate: {self.base:.2f}'
)
# 更新图例
self.ax.legend()
else:
# 更新现有基准线的位置
self.base_line.set_ydata([self.base, self.base])
# 更新标签
self.base_line.set_label(f'Base Rate: {self.base:.2f}')
self.ax.legend()
elif self.base_line is not None:
# 如果base被重置移除基准线
self.base_line.remove()
self.base_line = None
self.ax.legend()
def _update_axis_limits(self, relative_times, rates):
"""智能更新坐标轴范围"""
if not relative_times or not rates:
return
# X轴范围
max_time = max(relative_times)
self.ax.set_xlim(0, max_time + max(1, max_time * 0.05)) # 添加5%的边距
# Y轴范围计算
min_rate, max_rate = min(rates), max(rates)
if self.base is not None:
# 如果有基准线确保Y轴范围合理
y_min = 0
y_max = min(max(self.base * 35, max_rate * 1.2), 100) # 限制最大值为100
else:
# 没有基准线时的默认范围
margin = (max_rate - min_rate) * 0.1 if max_rate > min_rate else 1
y_min = max(0, min_rate - margin)
y_max = max_rate + margin
self.ax.set_ylim(y_min, y_max)
def _stop_plot(self):
"""停止绘图"""
self._plot_running = False
if self._plot_thread and self._plot_thread.is_alive():
self._plot_thread.join(timeout=1.0)
if self.fig:
plt.close(self.fig)
self.fig = None
self.ax = None
self.line = None
self.base_line = None
def save_plot(self, filename: Optional[str] = None):
"""保存当前图表"""
if not self.display or not self.fig:
get_vision_logger().warning("Plot not initialized, cannot save")
return
if filename is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"rate_plot_{timestamp}.png"
try:
self.fig.savefig(filename, dpi=300, bbox_inches='tight')
get_vision_logger().info(f"Plot saved to {filename}")
except Exception as e:
get_vision_logger().error(f"Failed to save plot: {e}")
def toggle_display(self):
"""切换显示状态"""
if self.display:
self._stop_plot()
self.display = False
else:
self.display = True
self._setup_plot()
class State:
"""滴定状态管理类"""
class Mode(Enum):
FAST = 0 # 快速模式
SLOW = 1 # 慢速模式 (middle)
ABOUT = 2 # 接近终点模式
CRAZY = 3 # CRAZY模式
# END = 3 # 终点模式
def __init__(self, bounce_time=1, end_bounce_time=5):
def __init__(self, bounce_time=1):
self.mode = self.Mode.FAST
self.bounce_time = bounce_time
self.end_bounce_time = end_bounce_time
# 状态检查标志
self.in_middle_check = False
@ -301,8 +154,7 @@ class State:
self.about_first_flag = False
# 时间记录
self.middle_detected_time = None
self.end_detected_time = None
self.middle_detected_time: Optional[float] = None
def is_fast_mode(self):
return self.mode == self.Mode.FAST
@ -313,9 +165,6 @@ class State:
def is_about_mode(self):
return self.mode == self.Mode.ABOUT
# def is_end_mode(self):
# return self.mode == self.Mode.END
def enter_middle_state(self, current_time):
"""进入middle状态 - 立即切换到slow模式并开始检查"""
self.mode = self.Mode.SLOW
@ -327,12 +176,6 @@ class State:
if self.mode == self.Mode.SLOW:
self.mode = self.Mode.ABOUT
def enter_end_check(self, current_time):
"""进入end检查状态"""
self.in_end_check = True
self.end_detected_time = current_time
self.mode = self.Mode.ABOUT
def exit_middle_check(self):
"""退出middle检查状态返回fast模式"""
self.in_middle_check = False
@ -348,33 +191,98 @@ class State:
def should_check_middle_exit(self, current_time):
"""检查是否应该进行middle退出检查"""
return (self.in_middle_check and
self.middle_detected_time is not None and
current_time - self.middle_detected_time > self.bounce_time and
(self.mode == self.Mode.SLOW))
def should_check_end_result(self, current_time):
"""检查是否应该进行end结果检查"""
return (self.in_end_check and
self.end_detected_time is not None and
current_time - self.end_detected_time > self.end_bounce_time)
def reset_end_check(self):
"""重置end检查状态"""
self.in_end_check = False
self.end_detected_time = None
return (
self.in_middle_check
and self.middle_detected_time is not None
and current_time - self.middle_detected_time > self.bounce_time
and (self.mode == self.Mode.SLOW)
)
def get_status_text(self):
"""获取状态显示文本"""
status = []
current_time = time.time()
if self.in_middle_check and current_time - self.middle_detected_time > self.bounce_time:
if (
self.in_middle_check
and current_time - self.middle_detected_time > self.bounce_time
):
status.append("MIDCHK")
if self.in_end_check and current_time - self.end_detected_time > self.end_bounce_time:
status.append("ENDCHK")
return ", " + ", ".join(status) if status else ""
def login_to_platform(username, password):
"""登录到平台获取token"""
try:
wwwF = {"userName": username, "password": password}
url = "https://jingsai.mools.net/api/login"
response = requests.post(url, wwwF, timeout=2)
if response is None:
print("错误", "网络连接失败 ")
return None
request = json.loads(response.text)
if request["code"] == 1:
# 登陆成功
# print("登陆成功",'登陆成功')
# 从服务器获取到的数据中找到token
token = request["token"]
print("成功", "登录成功!")
return token
elif request["code"] == 2:
print("错误", "用户名或密码错误!")
return None
else:
print("错误", "登陆失败 ")
return None
except Exception as e:
print(e)
print("错误", f"发送数据时出错:{e}")
return None
def send_data_to_platform(token, data, picture):
"""将数据发送到平台"""
try:
# if 1:
# 打开图片文件并转换为 Base64 编码
with open(picture, "rb") as picture_file:
picture_data = picture_file.read()
base64_encoded_picture = base64.b64encode(picture_data).decode("utf-8")
# print(base64_encoded_picture)
# 更新数据字典,添加 Base64 编码的图片
data["final_image"] = base64_encoded_picture
# print(data)
# 设置请求头
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
url = "https://jingsai.mools.net/api/upload-record"
# 准备 JSON 数据
json_data = json.dumps(data)
# 发送 POST 请求
response = requests.post(url, headers=headers, data=json_data)
request = json.loads(response.text)
# print(request['code'])
# 检查响应
if request["code"] == 1:
print("提交成功", "提交成功")
else:
print(
"错误",
f"网络请求失败,状态码:{response.status_code}\n错误信息:{response.text}",
)
except Exception as e:
raise e
print("错误", f"发送数据时出错:{e}")
def setup_logging(log_level=logging.INFO, log_dir="logs"):
"""
设置logging配置创建不同模块的logger
@ -388,42 +296,17 @@ def setup_logging(log_level=logging.INFO, log_dir="logs"):
os.makedirs(log_dir)
# 获取当前时间作为日志文件名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = os.path.join(log_dir, f"titration_{timestamp}.log")
# 配置根logger
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)8s - %(levelname)7s - %(message)s',
format="%(asctime)s - %(name)8s - %(levelname)7s - %(message)s",
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler() # 同时输出到控制台
]
logging.FileHandler(log_file, encoding="utf-8"),
logging.StreamHandler(), # 同时输出到控制台
],
)
return log_file
def get_system_logger():
"""系统初始化和控制相关的logger"""
return logging.getLogger("System")
def get_hardware_logger():
"""硬件控制相关的loggerCH340等"""
return logging.getLogger("Hardware")
def get_vision_logger():
"""图像处理和预测相关的logger"""
return logging.getLogger("Vision")
def get_control_logger():
"""控制逻辑相关的logger"""
return logging.getLogger("Control")
def get_endpoint_logger():
"""终点检测相关的logger"""
return logging.getLogger("Endpoint")
def get_volume_logger():
"""体积计算相关的logger"""
return logging.getLogger("Volume")

View File

@ -1,18 +1,67 @@
import cv2
import numpy as np
import ch340
flag = False
vidId = 1
cap = cv2.VideoCapture(vidId, cv2.CAP_DSHOW)
while not flag:
suc,im = cap.read()
cv2.imshow("image",im)
suc, im = cap.read()
cv2.imshow("image", im)
k = cv2.waitKey(0)
if k & 0xff == ord('q'):
cap.release()
cv2.destroyAllWindows()
flag=True
if k & 0xFF == ord("q"):
# cap.release()
# cv2.destroyAllWindows()
flag = True
else:
vidId+=1
vidId += 1
cap.release()
cap.open(vidId, cv2.CAP_DSHOW)
print(f"使用摄像头索引: {vidId}")
base = 30
pump = ch340.CH340()
while True:
suc, im = cap.read()
hsv = cv2.cvtColor(im, cv2.COLOR_BGR2HSV)
s = hsv[:, :, 1]
mask = s > base
cv2.imshow("mask", im * mask[:, :, np.newaxis])
tot = mask.shape[0] * mask.shape[1]
val = np.sum(mask)
rate = val / tot
thr = (0.05, 0.2, 0.5)
ret = ""
if rate < thr[0]:
ret = "transport"
elif rate < thr[1]:
ret = "middle"
elif rate < thr[2]:
ret = "about"
else:
ret = "colored"
im = cv2.putText(
im,
f"Rate: {rate:.2},base={base},{ret}",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 255, 0),
2,
)
cv2.imshow("image", im)
k = cv2.waitKey(1)
if k & 0xFF == ord("w"):
base += 1
elif k & 0xFF == ord("s"):
base -= 1
elif k & 0xFF == ord("a"):
pump.push_async(speed=0.1, vol=0.1)
elif k & 0xFF == ord("q"):
print("Camera index = ", vidId)
print("k =", base)
cap.release()
cv2.destroyAllWindows()
break