From af2bb6581fab83723fd8dd05dc164c2fed071f06 Mon Sep 17 00:00:00 2001 From: Flagabat Date: Mon, 27 Apr 2026 23:15:09 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20extract=5Fmcap=5Fsignals=20=E2=80=94=20?= =?UTF-8?q?pitch/roll/yaw,=20altitude,=20obstacle,=20battery?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- tools/extract_mcap_signals.py | 51 ++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/tools/extract_mcap_signals.py b/tools/extract_mcap_signals.py index 35ea7df..3f7e5b9 100644 --- a/tools/extract_mcap_signals.py +++ b/tools/extract_mcap_signals.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """Extract AUV signals from MCAP files: depth, PWM, state.""" -import argparse, glob, json, os, sys +import argparse, glob, json, math, os, sys def main(): parser = argparse.ArgumentParser() @@ -26,7 +26,16 @@ def main(): depth_raw = [] pwm_raw = [] state_raw = [] - TOPICS = ['/mavros/imu/static_pressure', '/mavros/rc/out', '/mavros/state'] + signals = {} + TOPICS = [ + '/mavros/imu/static_pressure', + '/mavros/rc/out', + '/mavros/state', + '/mavros/imu/data', + '/mavros/altitude', + '/mavros/battery', + '/mavros/distance_sensor/hrlv_ez4_pub', + ] for mcap_file in mcap_files: try: @@ -53,6 +62,40 @@ def main(): state_raw.append({'t': t_ms, 'mode': str(ros_msg.mode), 'armed': bool(ros_msg.armed)}) except Exception: pass + elif topic == '/mavros/imu/data': + try: + q = ros_msg.orientation + sinr = 2*(q.w*q.x + q.y*q.z) + cosr = 1 - 2*(q.x*q.x + q.y*q.y) + roll = math.degrees(math.atan2(sinr, cosr)) + sinp = 2*(q.w*q.y - q.z*q.x) + pitch = math.degrees(math.asin(max(-1, min(1, sinp)))) + siny = 2*(q.w*q.z + q.x*q.y) + cosy = 1 - 2*(q.y*q.y + q.z*q.z) + yaw = math.degrees(math.atan2(siny, cosy)) + signals.setdefault('pitch', []).append({'t_ms': t_ms, 'v': pitch}) + signals.setdefault('roll', []).append({'t_ms': t_ms, 'v': roll}) + signals.setdefault('yaw', []).append({'t_ms': t_ms, 'v': yaw}) + except Exception: + pass + elif topic == '/mavros/altitude': + try: + signals.setdefault('altitude', []).append( + {'t_ms': t_ms, 'v': ros_msg.relative}) + except Exception: + pass + elif topic == '/mavros/battery': + try: + signals.setdefault('battery_v', []).append( + {'t_ms': t_ms, 'v': ros_msg.voltage}) + except Exception: + pass + elif topic == '/mavros/distance_sensor/hrlv_ez4_pub': + try: + signals.setdefault('obstacle_dist', []).append( + {'t_ms': t_ms, 'v': ros_msg.range}) + except Exception: + pass except Exception as e: print(f" Skip {os.path.basename(mcap_file)}: {e}") @@ -69,7 +112,8 @@ def main(): pwm_samples = sample(pwm_raw, args.max_pts) state = state_raw # events, keep all - all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw] + signals_flat = [pt for pts in signals.values() for pt in pts] + all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw] + [p['t_ms'] for p in signals_flat] t_min = min(all_t) if all_t else 0 t_max = max(all_t) if all_t else 0 @@ -94,6 +138,7 @@ def main(): 'depth': depth, 'pwm_auv': {'channels': channels, 'samples': pwm_samples}, 'state': state, + 'signals': {k: sample(v, args.max_pts) for k, v in signals.items()}, } outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')