# extract/parse_usv_gps.py import csv, sys, glob from datetime import datetime, timezone from typing import Union, IO import numpy as np import h5py def _ts_to_ns(ts_str: str) -> int: """Convert '2026-04-08 09:31:10.123456' to nanoseconds since epoch.""" dt = datetime.strptime(ts_str.strip(), "%Y-%m-%d %H:%M:%S.%f") dt = dt.replace(tzinfo=timezone.utc) return int(dt.timestamp() * 1e9) def parse_nav_log(source: Union[str, IO]) -> list[dict]: if isinstance(source, str): f = open(source, newline="", encoding="utf-8", errors="replace") close = True else: f = source close = False try: reader = csv.DictReader(f) buf: dict[str, dict] = {} # key = timestamp string for row in reader: data = row.get("data", "").strip() ts = row.get("timestamp", "").strip() val = row.get("value", "").strip() if ts not in buf: buf[ts] = {"ts_str": ts, "easting": None, "northing": None, "utm_num": None, "utm_let": None, "rtk_status": 0, "heading_deg": None, "fix_type_raw": None} if data == "Easting": buf[ts]["easting"] = float(val) elif data == "Northing": buf[ts]["northing"] = float(val) elif data == "UTM_number": buf[ts]["utm_num"] = str(int(float(val))) elif data == "UTM_letter": buf[ts]["utm_let"] = val elif data.lower() in ("heading", "hdg", "cog", "course", "yaw", "yaw_deg", "heading_deg", "course_deg"): try: buf[ts]["heading_deg"] = float(val) except ValueError: pass elif data.lower() in ("fix_type", "gps_fix_type", "rtk_type", "fix_status", "gps_status"): try: buf[ts]["fix_type_raw"] = int(float(val)) except ValueError: pass rows = [] for r in buf.values(): if r["easting"] is None or r["northing"] is None: continue zone = (r["utm_num"] or "") + (r["utm_let"] or "") # Map ArduPilot fix_type (0-6) → our 3-level RTK status # 0=NoGPS,1=NoFix,2=2D,3=3D,4=DGPS → 0 (3D/none) # 5=RTK_Float → 1, 6=RTK_Fixed → 2 raw_fix = r.get("fix_type_raw") if raw_fix is not None: if raw_fix >= 6: rtk = 2 elif raw_fix == 5: rtk = 1 else: rtk = 0 else: rtk = r["rtk_status"] # default 0 rows.append({ "t_ns": _ts_to_ns(r["ts_str"]), "easting": r["easting"], "northing": r["northing"], "utm_zone": zone, "rtk_status": rtk, "heading_deg": r.get("heading_deg"), # float or None }) return sorted(rows, key=lambda r: r["t_ns"]) finally: if close: f.close() def write_usv_gps_group(h5_path: str, rows: list[dict]) -> None: t = np.array([r["t_ns"] for r in rows], dtype=np.int64) e = np.array([r["easting"] for r in rows], dtype=np.float64) n = np.array([r["northing"] for r in rows], dtype=np.float64) rtk = np.array([r["rtk_status"] for r in rows], dtype=np.int8) hdg = np.array( [r["heading_deg"] if r["heading_deg"] is not None else np.nan for r in rows], dtype=np.float64, ) with h5py.File(h5_path, "a") as f: if "usv_gps" in f: del f["usv_gps"] grp = f.create_group("usv_gps") grp.create_dataset("t_ns", data=t, compression="gzip") grp.create_dataset("easting", data=e, compression="gzip") grp.create_dataset("northing", data=n, compression="gzip") grp.create_dataset("rtk_status", data=rtk, compression="gzip") grp.create_dataset("heading_deg", data=hdg, compression="gzip") grp.attrs["utm_zone"] = rows[0]["utm_zone"] if rows else "" def extract(csv_dir: str, out_h5: str) -> None: nav_files = glob.glob(f"{csv_dir}/*_navigation_log.csv") if not nav_files: raise FileNotFoundError(f"No navigation_log.csv in {csv_dir}") rows = parse_nav_log(nav_files[0]) write_usv_gps_group(out_h5, rows) print(f"USV GPS: {len(rows)} fixes (UTM) -> {out_h5} [/usv_gps]") if __name__ == "__main__": extract(sys.argv[1], sys.argv[2])