From 7330b230b86a44ebd2eb33916c71c0e09428fa86 Mon Sep 17 00:00:00 2001 From: Floppyrj45 Date: Fri, 24 Apr 2026 11:48:46 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20extract=5Fmcap=20=E2=80=94=20/navigatio?= =?UTF-8?q?n/altitude=20(Kogger)=20->=20HDF5=20altitude=5Fm=20+=20seafloor?= =?UTF-8?q?=5Fdepth=5Fm?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- extract/extract_mcap.py | 25 ++++++++++++++++++++++--- tests/test_extract_mcap.py | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/extract/extract_mcap.py b/extract/extract_mcap.py index d9de13c..7a9b262 100644 --- a/extract/extract_mcap.py +++ b/extract/extract_mcap.py @@ -7,13 +7,15 @@ from mcap_ros2.reader import read_ros2_messages TOPIC_GPS = "/mavros/global_position/global" # sensor_msgs/NavSatFix TOPIC_DEPTH = "/mavros/global_position/rel_alt" # std_msgs/Float64 +TOPIC_ALT = "/navigation/altitude" # std_msgs/Float64 - altitude above seafloor def write_auv_mcap_group(h5_path: str, t_ns: np.ndarray, lat: np.ndarray, lon: np.ndarray, - depth_m: np.ndarray) -> None: + depth_m: np.ndarray, + altitude_m: np.ndarray | None = None) -> None: with h5py.File(h5_path, "a") as f: if "auv_mcap" in f: del f["auv_mcap"] @@ -22,6 +24,9 @@ def write_auv_mcap_group(h5_path: str, grp.create_dataset("lat", data=lat.astype(np.float64), compression="gzip") grp.create_dataset("lon", data=lon.astype(np.float64), compression="gzip") grp.create_dataset("depth_m", data=depth_m.astype(np.float64), compression="gzip") + if altitude_m is not None: + grp.create_dataset("altitude_m", data=altitude_m.astype(np.float64), compression="gzip") + grp.create_dataset("seafloor_depth_m", data=(depth_m + altitude_m).astype(np.float64), compression="gzip") def _iter_topic(bag_dir: str, topic: str): @@ -60,8 +65,22 @@ def extract(bag_dir: str, out_h5: str) -> None: ts_arr = np.array(ts, dtype=np.int64) depths = np.interp(ts_arr, depth_times_arr, depth_vals) - write_auv_mcap_group(out_h5, ts_arr, np.array(lats), np.array(lons), depths) - print(f"AUV MCAP: {len(ts)} fixes -> {out_h5} [/auv_mcap]") + alt_by_t: dict[int, float] = {} + for msg in _iter_topic(bag_dir, TOPIC_ALT): + alt_by_t[msg.log_time_ns] = float(msg.ros_msg.data) + + if alt_by_t: + alt_times = sorted(alt_by_t.keys()) + alt_vals = np.array([alt_by_t[k] for k in alt_times], dtype=np.float64) + alt_times_arr = np.array(alt_times, dtype=np.int64) + altitudes = np.interp(ts_arr, alt_times_arr, alt_vals) + else: + altitudes = None + print("WARNING: /navigation/altitude not found in bags", file=sys.stderr) + + write_auv_mcap_group(out_h5, ts_arr, np.array(lats), np.array(lons), depths, altitudes) + alt_status = "ok" if altitudes is not None else "missing" + print(f"AUV MCAP: {len(ts)} fixes, depth ok, altitude {alt_status} -> {out_h5} [/auv_mcap]") if __name__ == "__main__": diff --git a/tests/test_extract_mcap.py b/tests/test_extract_mcap.py index 1846e57..aad625e 100644 --- a/tests/test_extract_mcap.py +++ b/tests/test_extract_mcap.py @@ -22,3 +22,41 @@ def test_write_auv_mcap_group(): assert list(f["auv_mcap/t_ns"][:]) == list(t) finally: os.unlink(path) + + +def test_write_auv_mcap_group_with_altitude(): + from extract.extract_mcap import write_auv_mcap_group + t = np.array([1000, 2000, 3000], dtype=np.int64) + lat = np.array([43.1, 43.2, 43.3]) + lon = np.array([5.6, 5.61, 5.62]) + depth = np.array([5.0, 5.5, 6.0]) + alt = np.array([1.2, 1.3, 1.1]) + + with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as tmp: + path = tmp.name + try: + write_auv_mcap_group(path, t, lat, lon, depth, alt) + with h5py.File(path, "r") as f: + assert "altitude_m" in f["auv_mcap"] + assert "seafloor_depth_m" in f["auv_mcap"] + assert np.allclose(f["auv_mcap/altitude_m"][:], alt) + assert np.allclose(f["auv_mcap/seafloor_depth_m"][:], depth + alt) + finally: + os.unlink(path) + + +def test_write_auv_mcap_group_without_altitude(): + from extract.extract_mcap import write_auv_mcap_group + t = np.array([1000, 2000], dtype=np.int64) + lat = np.zeros(2) + lon = np.zeros(2) + depth = np.array([3.0, 4.0]) + + with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as tmp: + path = tmp.name + try: + write_auv_mcap_group(path, t, lat, lon, depth, altitude_m=None) + with h5py.File(path, "r") as f: + assert "altitude_m" not in f["auv_mcap"] + finally: + os.unlink(path)