feat: extract_mcap — MCAP NavSatFix + depth -> HDF5 /auv_mcap
This commit is contained in:
68
extract/extract_mcap.py
Normal file
68
extract/extract_mcap.py
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
# extract/extract_mcap.py
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
import numpy as np
|
||||||
|
import h5py
|
||||||
|
from mcap_ros2.reader import read_ros2_messages
|
||||||
|
|
||||||
|
TOPIC_GPS = "/mavros/global_position/global" # sensor_msgs/NavSatFix
|
||||||
|
TOPIC_DEPTH = "/mavros/global_position/rel_alt" # std_msgs/Float64
|
||||||
|
|
||||||
|
|
||||||
|
def write_auv_mcap_group(h5_path: str,
|
||||||
|
t_ns: np.ndarray,
|
||||||
|
lat: np.ndarray,
|
||||||
|
lon: np.ndarray,
|
||||||
|
depth_m: np.ndarray) -> None:
|
||||||
|
with h5py.File(h5_path, "a") as f:
|
||||||
|
if "auv_mcap" in f:
|
||||||
|
del f["auv_mcap"]
|
||||||
|
grp = f.create_group("auv_mcap")
|
||||||
|
grp.create_dataset("t_ns", data=t_ns.astype(np.int64), compression="gzip")
|
||||||
|
grp.create_dataset("lat", data=lat.astype(np.float64), compression="gzip")
|
||||||
|
grp.create_dataset("lon", data=lon.astype(np.float64), compression="gzip")
|
||||||
|
grp.create_dataset("depth_m", data=depth_m.astype(np.float64), compression="gzip")
|
||||||
|
|
||||||
|
|
||||||
|
def _iter_topic(bag_dir: str, topic: str):
|
||||||
|
"""Yield messages for a topic across all .mcap files in bag_dir (sorted).
|
||||||
|
Skips corrupt/incomplete files with a warning."""
|
||||||
|
mcap_files = sorted(Path(bag_dir).glob("*.mcap"))
|
||||||
|
for mcap_file in mcap_files:
|
||||||
|
try:
|
||||||
|
yield from read_ros2_messages(str(mcap_file), topics=[topic])
|
||||||
|
except Exception as e:
|
||||||
|
print(f"WARNING: skipping {mcap_file.name}: {e}", file=sys.stderr)
|
||||||
|
|
||||||
|
|
||||||
|
def extract(bag_dir: str, out_h5: str) -> None:
|
||||||
|
ts, lats, lons = [], [], []
|
||||||
|
depth_by_t: dict[int, float] = {}
|
||||||
|
|
||||||
|
for msg in _iter_topic(bag_dir, TOPIC_DEPTH):
|
||||||
|
depth_by_t[msg.log_time_ns] = float(msg.ros_msg.data)
|
||||||
|
|
||||||
|
depth_times = sorted(depth_by_t.keys())
|
||||||
|
depth_vals = np.array([depth_by_t[k] for k in depth_times], dtype=np.float64)
|
||||||
|
depth_times_arr = np.array(depth_times, dtype=np.int64)
|
||||||
|
|
||||||
|
zero_count = 0
|
||||||
|
for msg in _iter_topic(bag_dir, TOPIC_GPS):
|
||||||
|
fix = msg.ros_msg
|
||||||
|
ts.append(msg.log_time_ns)
|
||||||
|
lats.append(fix.latitude)
|
||||||
|
lons.append(fix.longitude)
|
||||||
|
if fix.latitude == 0.0 and fix.longitude == 0.0:
|
||||||
|
zero_count += 1
|
||||||
|
if zero_count:
|
||||||
|
print(f"WARNING: {zero_count}/{len(ts)} fixes have lat=0/lon=0 (no GPS lock)", file=sys.stderr)
|
||||||
|
|
||||||
|
ts_arr = np.array(ts, dtype=np.int64)
|
||||||
|
depths = np.interp(ts_arr, depth_times_arr, depth_vals)
|
||||||
|
|
||||||
|
write_auv_mcap_group(out_h5, ts_arr, np.array(lats), np.array(lons), depths)
|
||||||
|
print(f"AUV MCAP: {len(ts)} fixes -> {out_h5} [/auv_mcap]")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
extract(sys.argv[1], sys.argv[2])
|
||||||
24
tests/test_extract_mcap.py
Normal file
24
tests/test_extract_mcap.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
import tempfile, os
|
||||||
|
import numpy as np
|
||||||
|
import h5py
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
def test_write_auv_mcap_group():
|
||||||
|
from extract.extract_mcap import write_auv_mcap_group
|
||||||
|
t = np.array([1000, 2000, 3000], dtype=np.int64)
|
||||||
|
lat = np.array([43.1, 43.2, 43.3])
|
||||||
|
lon = np.array([5.6, 5.61, 5.62])
|
||||||
|
depth = np.array([5.0, 5.5, 6.0])
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as tmp:
|
||||||
|
path = tmp.name
|
||||||
|
try:
|
||||||
|
write_auv_mcap_group(path, t, lat, lon, depth)
|
||||||
|
with h5py.File(path, "r") as f:
|
||||||
|
assert "auv_mcap" in f
|
||||||
|
assert np.allclose(f["auv_mcap/lat"][:], lat)
|
||||||
|
assert np.allclose(f["auv_mcap/lon"][:], lon)
|
||||||
|
assert np.allclose(f["auv_mcap/depth_m"][:], depth)
|
||||||
|
assert list(f["auv_mcap/t_ns"][:]) == list(t)
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
Reference in New Issue
Block a user