#!/usr/bin/env python3 """Extract AUV signals from MCAP files: depth, PWM, state.""" import argparse, glob, json, math, os, sys def main(): parser = argparse.ArgumentParser() parser.add_argument('--session-dir', required=True) parser.add_argument('--max-pts', type=int, default=5000) args = parser.parse_args() session_name = os.path.basename(args.session_dir.rstrip('/')) pattern = os.path.join(args.session_dir, '*.mcap') mcap_files = sorted(glob.glob(pattern)) if not mcap_files: print(f"No MCAP files in {args.session_dir}", file=sys.stderr) sys.exit(1) print(f"Found {len(mcap_files)} MCAP files") try: from mcap.reader import make_reader from mcap_ros2.decoder import DecoderFactory except ImportError as e: print(f"Import error: {e}", file=sys.stderr) sys.exit(1) depth_raw = [] pwm_raw = [] state_raw = [] signals = {} TOPICS = [ '/mavros/imu/static_pressure', '/mavros/rc/out', '/mavros/state', '/mavros/imu/data', '/mavros/altitude', '/mavros/battery', '/mavros/distance_sensor/hrlv_ez4_pub', ] for mcap_file in mcap_files: try: with open(mcap_file, 'rb') as f: reader = make_reader(f, decoder_factories=[DecoderFactory()]) for schema, channel, message, ros_msg in reader.iter_decoded_messages(topics=TOPICS): t_ms = message.publish_time // 1_000_000 topic = channel.topic if topic == '/mavros/imu/static_pressure': try: p = float(ros_msg.fluid_pressure) depth_m = (p - 101325.0) / (1025.0 * 9.80665) depth_raw.append({'t': t_ms, 'v': round(depth_m, 4)}) except Exception: pass elif topic == '/mavros/rc/out': try: ch = list(ros_msg.channels) pwm_raw.append({'t': t_ms, 'v': ch}) except Exception: pass elif topic == '/mavros/state': try: state_raw.append({'t': t_ms, 'mode': str(ros_msg.mode), 'armed': bool(ros_msg.armed)}) except Exception: pass elif topic == '/mavros/imu/data': try: q = ros_msg.orientation sinr = 2*(q.w*q.x + q.y*q.z) cosr = 1 - 2*(q.x*q.x + q.y*q.y) roll = math.degrees(math.atan2(sinr, cosr)) sinp = 2*(q.w*q.y - q.z*q.x) pitch = math.degrees(math.asin(max(-1, min(1, sinp)))) siny = 2*(q.w*q.z + q.x*q.y) cosy = 1 - 2*(q.y*q.y + q.z*q.z) yaw = math.degrees(math.atan2(siny, cosy)) signals.setdefault('pitch', []).append({'t': t_ms, 'v': pitch}) signals.setdefault('roll', []).append({'t': t_ms, 'v': roll}) signals.setdefault('yaw', []).append({'t': t_ms, 'v': yaw}) except Exception: pass elif topic == '/mavros/altitude': try: signals.setdefault('altitude', []).append( {'t': t_ms, 'v': ros_msg.relative}) except Exception: pass elif topic == '/mavros/battery': try: signals.setdefault('battery_v', []).append( {'t': t_ms, 'v': ros_msg.voltage}) except Exception: pass elif topic == '/mavros/distance_sensor/hrlv_ez4_pub': try: signals.setdefault('obstacle_dist', []).append( {'t': t_ms, 'v': ros_msg.range}) except Exception: pass except Exception as e: print(f" Skip {os.path.basename(mcap_file)}: {e}") def sample(lst, max_pts): if len(lst) <= max_pts: return lst stride = len(lst) // max_pts sampled = lst[::stride] if sampled[-1] is not lst[-1]: sampled.append(lst[-1]) return sampled depth = sample(depth_raw, args.max_pts) pwm_samples = sample(pwm_raw, args.max_pts) state = state_raw # events, keep all signals_flat = [pt for pts in signals.values() for pt in pts] all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw + signals_flat] t_min = min(all_t) if all_t else 0 t_max = max(all_t) if all_t else 0 n_ch = max((len(s['v']) for s in pwm_raw), default=0) channels = list(range(n_ch)) from datetime import datetime, timezone fmt = lambda ms: datetime.fromtimestamp(ms/1000, tz=timezone.utc).isoformat() print(f"depth: {len(depth)} pts (raw {len(depth_raw)})") if depth: dvals = [p['v'] for p in depth] print(f" depth range: {min(dvals):.3f} .. {max(dvals):.3f} m") print(f"pwm_auv: {len(pwm_samples)} samples (raw {len(pwm_raw)}), {n_ch} channels") print(f"state: {len(state)} events") print(f"t_min: {fmt(t_min)}") print(f"t_max: {fmt(t_max)}") out = { 'session': session_name, 't_min_utc_ms': t_min, 't_max_utc_ms': t_max, 'depth': depth, 'pwm_auv': {'channels': channels, 'samples': pwm_samples}, 'state': state, **{k: sample(v, args.max_pts) for k, v in signals.items()}, } outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output') os.makedirs(outdir, exist_ok=True) outpath = os.path.join(outdir, 'mcap_signals.json') with open(outpath, 'w') as f: json.dump(out, f) print(f"Written: {outpath}") if __name__ == '__main__': main()