# extract/extract_mcap.py import sys from pathlib import Path import numpy as np import h5py from mcap_ros2.reader import read_ros2_messages TOPIC_GPS = "/mavros/global_position/global" # sensor_msgs/NavSatFix TOPIC_DEPTH = "/mavros/global_position/rel_alt" # std_msgs/Float64 TOPIC_ALT = "/navigation/altitude" # std_msgs/Float64 - altitude above seafloor def write_auv_mcap_group(h5_path: str, t_ns: np.ndarray, lat: np.ndarray, lon: np.ndarray, depth_m: np.ndarray, altitude_m: np.ndarray | None = None) -> None: with h5py.File(h5_path, "a") as f: if "auv_mcap" in f: del f["auv_mcap"] grp = f.create_group("auv_mcap") grp.create_dataset("t_ns", data=t_ns.astype(np.int64), compression="gzip") grp.create_dataset("lat", data=lat.astype(np.float64), compression="gzip") grp.create_dataset("lon", data=lon.astype(np.float64), compression="gzip") grp.create_dataset("depth_m", data=depth_m.astype(np.float64), compression="gzip") if altitude_m is not None: grp.create_dataset("altitude_m", data=altitude_m.astype(np.float64), compression="gzip") grp.create_dataset("seafloor_depth_m", data=(depth_m + altitude_m).astype(np.float64), compression="gzip") def _iter_topic(bag_dir: str, topic: str): """Yield messages for a topic across all .mcap files in bag_dir (sorted). Skips corrupt/incomplete files with a warning.""" mcap_files = sorted(Path(bag_dir).glob("*.mcap")) for mcap_file in mcap_files: try: yield from read_ros2_messages(str(mcap_file), topics=[topic]) except Exception as e: print(f"WARNING: skipping {mcap_file.name}: {e}", file=sys.stderr) def extract(bag_dir: str, out_h5: str) -> None: ts, lats, lons = [], [], [] depth_by_t: dict[int, float] = {} for msg in _iter_topic(bag_dir, TOPIC_DEPTH): depth_by_t[msg.log_time_ns] = float(msg.ros_msg.data) depth_times = sorted(depth_by_t.keys()) depth_vals = np.array([depth_by_t[k] for k in depth_times], dtype=np.float64) depth_times_arr = np.array(depth_times, dtype=np.int64) zero_count = 0 for msg in _iter_topic(bag_dir, TOPIC_GPS): fix = msg.ros_msg ts.append(msg.log_time_ns) lats.append(fix.latitude) lons.append(fix.longitude) if fix.latitude == 0.0 and fix.longitude == 0.0: zero_count += 1 if zero_count: print(f"WARNING: {zero_count}/{len(ts)} fixes have lat=0/lon=0 (no GPS lock)", file=sys.stderr) ts_arr = np.array(ts, dtype=np.int64) depths = np.interp(ts_arr, depth_times_arr, depth_vals) alt_by_t: dict[int, float] = {} for msg in _iter_topic(bag_dir, TOPIC_ALT): alt_by_t[msg.log_time_ns] = float(msg.ros_msg.data) if alt_by_t: alt_times = sorted(alt_by_t.keys()) alt_vals = np.array([alt_by_t[k] for k in alt_times], dtype=np.float64) alt_times_arr = np.array(alt_times, dtype=np.int64) altitudes = np.interp(ts_arr, alt_times_arr, alt_vals) else: altitudes = None print("WARNING: /navigation/altitude not found in bags", file=sys.stderr) write_auv_mcap_group(out_h5, ts_arr, np.array(lats), np.array(lons), depths, altitudes) alt_status = "ok" if altitudes is not None else "missing" print(f"AUV MCAP: {len(ts)} fixes, depth ok, altitude {alt_status} -> {out_h5} [/auv_mcap]") if __name__ == "__main__": extract(sys.argv[1], sys.argv[2])