Former-commit-id: 717973133faeb79fa6ed667f3fcf7e614a3f310d
This commit is contained in:
2025-06-05 14:24:39 +08:00
parent 668bae72fa
commit cdc96e4ee3
2 changed files with 54 additions and 23 deletions

65
main.py
View File

@ -9,7 +9,7 @@ from utils import State, History
class MAT:
def __init__(self, videoSourceIndex=0, bounce_time=1, end_bounce_time=5):
def __init__(self, videoSourceIndex=0, bounce_time=2, end_bounce_time=5):
# 初始化logging
utils.setup_logging()
self.system_logger = utils.get_system_logger()
@ -27,6 +27,7 @@ class MAT:
atexit.register(self.ch340.stop)
self.history = History(max(bounce_time, end_bounce_time))
self.first_about = 0
self.colored_volume = None
self.colored_time = None
self.colored_im = None
@ -46,6 +47,8 @@ class MAT:
self.ch340.push_async(speed=speed, t=1)
def process_left(self, now: float, value_only=False):
if self.state.mode == State.Mode.CRAZY:
value_only = True
st = self.ch340.start
if not value_only:
self.ch340.stop()
@ -80,12 +83,12 @@ class MAT:
return ret
# 更新滑动窗口历史记录 - 维护最近end_bounce_time内的状态
self.history.add_record(now, ret,rate, val, im)
self.history.add_record(now, ret, rate, val, im)
if self.history.is_empty():
self.control_logger.error("未预期的没有可用的历史记录")
return ret
if self.crazy < 3:
if self.state.mode == State.Mode.CRAZY:
self._display_status(im, ret, rate, val)
return ret
@ -127,12 +130,26 @@ class MAT:
if recent_records:
trans_ratio = self.history.get_state_ratio("transport", recent_records)
if trans_ratio > 0.3:
if trans_ratio > 0.4:
self.process_left(now)
self.state.exit_middle_check()
# about状态随middle一起退出
self.state.exit_about_with_middle()
self.control_logger.info(f"middle比例{trans_ratio:.2%}<70%退出middle检查返回fast模式")
# self.state.exit_about_with_middle()
self.control_logger.info(f"middle比例{trans_ratio:.2%}<60%退出middle检查返回fast模式")
if self.state.mode == State.Mode.ABOUT and self.state.about_check:
h = self.history.get_recent_records(self.about_time,now)
ratio = self.history.get_state_ratio("about", h)
self.history.about_history.append(ratio<0.3)
while len(self.history.about_history) > 10:
self.history.about_history.pop(0)
if len(self.history.about_history) == 10:
rate = sum(self.history.about_history) / len(self.history.about_history)
if rate > 0.8:
self.state.exit_about()
self.control_logger.info(f"about比例{ratio:.2%}<80%退出about检查返回middle模式")
self.state.about_check = False
# end检查: 进入end之后的end_bounce_time如果end比例<80%,则重置;否则终止实验
if self.state.should_check_end_result(now):
@ -182,6 +199,7 @@ class MAT:
State.Mode.FAST: (255, 255, 255),
State.Mode.SLOW: (10, 215, 255),
State.Mode.ABOUT: (228,116,167),
State.Mode.CRAZY: (0, 255, 0),
# State.Mode.END: (0, 0, 255)
}
@ -203,7 +221,7 @@ class MAT:
rate = val/tot
if self.history.base is not None:
base = self.history.base
thr = (base*5, base*9, 0.75)
thr = (min(0.05,base*5), min(0.4,base*9), 0.75)
if rate < thr[0]:
return "transport",rate
elif rate <thr[1]:
@ -223,10 +241,11 @@ class MAT:
cv2.destroyAllWindows()
def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, end_time=1, cap_dir="Videos"):
def run(self, quick_speed=0.2, slow_speed=0.05, end_speed=0.02, mid_time=0.5, about_time=1, cap_dir="Videos"):
self.running = True
self.start_time = time.time()
self.speeds = (quick_speed, slow_speed, end_speed,end_speed)
self.speeds = (quick_speed, slow_speed, end_speed, 1.0)
self.need_check = False
if cap_dir is not None:
vid_name = f"{cap_dir}/{datetime.now().strftime('%Y%m%d_%H%M%S')}.mkv"
@ -261,11 +280,14 @@ class MAT:
self.ch340_init()
cnt = 0
self._start_time = time.time()
self.crazy = 4
self.about_time=about_time
self.crazy = 0
self.state.mode = State.Mode.CRAZY
while self.running:
if not self.ch340.running:
if self.crazy < 3:
if self.state.mode == State.Mode.CRAZY:
match self.crazy:
case 0:
self.control_logger.info("crazy 0")
@ -275,14 +297,12 @@ class MAT:
self.crazy = 1
case 1:
self.control_logger.info("crazy 1")
self.ch340.pull_async(vol=12)
self.crazy = 2
case 2:
self.control_logger.info("crazy 2")
self.ch340_pull()
self.ch340.push_async(vol=8)
self.total_volume = 20
cnt = 2
self.crazy = 3
self.crazy = 2
self.state.mode = State.Mode.FAST
else:
if 12 * cnt - self.total_volume < 0.5:
self.ch340_pull()
@ -295,7 +315,11 @@ class MAT:
should_push = True
elif self.state.is_slow_mode() and time.time() - self.ch340.start > mid_time:
should_push = True
elif self.state.is_about_mode() and time.time() - self.ch340.start > end_time:
elif self.state.is_about_mode() and time.time() - self.ch340.start > about_time:
if not self.state.about_first_flag:
self.state.about_check = True
else:
self.state.about_first_flag = False
should_push = True
if should_push:
@ -304,6 +328,7 @@ class MAT:
self.ch340_push(speed)
self.total_volume += speed
# if (self.state.is_about_mode() and time.time() - self.ch340.start > end_time) or not self.state.is_about_mode():
if self._pred() is None:
self.control_logger.error("预测失败,跳过当前帧")
continue
@ -329,15 +354,15 @@ if __name__ == "__main__":
# 创建MAT类的实例并运行
mat = MAT(
videoSourceIndex = 1,
bounce_time=3,
bounce_time=4,
end_bounce_time=0.01
)
mat.run(
slow_speed = 0.05,
quick_speed = 0.15,
end_time= 2,
cap_dir=None
about_time=3,
# cap_dir=None
)

View File

@ -9,6 +9,7 @@ import time
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import threading
from collections import deque
@dataclass
class HistoryRecord:
@ -38,6 +39,7 @@ class History:
self.records: List[HistoryRecord] = []
self.max_window_size = max_window_size
self.fulled = False
self.about_history = [] # 用于存储最近的about状态记录
self.base = None
self._base_time = base_time # 可视化相关属性
self.display = display
@ -280,6 +282,7 @@ class State:
FAST = 0 # 快速模式
SLOW = 1 # 慢速模式 (middle)
ABOUT = 2 # 接近终点模式
CRAZY = 2 # CRAZY模式
# END = 3 # 终点模式
def __init__(self, bounce_time=1, end_bounce_time=5):
@ -332,8 +335,10 @@ class State:
self.middle_detected_time = None
self.mode = self.Mode.FAST
def exit_about_with_middle(self):
"""about状态随middle一起退出"""
def exit_about(self):
"""about状态退出"""
self.about_check = False
self.about_first_flag = True
if self.mode == self.Mode.ABOUT:
self.mode = self.Mode.SLOW
@ -341,7 +346,8 @@ class State:
"""检查是否应该进行middle退出检查"""
return (self.in_middle_check and
self.middle_detected_time is not None and
current_time - self.middle_detected_time > self.bounce_time)
current_time - self.middle_detected_time > self.bounce_time and
(self.mode == self.Mode.SLOW))
def should_check_end_result(self, current_time):
"""检查是否应该进行end结果检查"""