# extract/parse_usv_gps.py import csv, sys, glob from datetime import datetime, timezone from typing import Union, IO import numpy as np import h5py def _ts_to_ns(ts_str: str) -> int: """Convert '2026-04-08 09:31:10.123456' to nanoseconds since epoch.""" dt = datetime.strptime(ts_str.strip(), "%Y-%m-%d %H:%M:%S.%f") dt = dt.replace(tzinfo=timezone.utc) return int(dt.timestamp() * 1e9) def parse_nav_log(source: Union[str, IO]) -> list[dict]: if isinstance(source, str): f = open(source, newline="", encoding="utf-8", errors="replace") close = True else: f = source close = False try: reader = csv.DictReader(f) buf: dict[str, dict] = {} # key = timestamp string for row in reader: data = row.get("data", "").strip() ts = row.get("timestamp", "").strip() val = row.get("value", "").strip() if ts not in buf: buf[ts] = {"ts_str": ts, "easting": None, "northing": None, "utm_num": None, "utm_let": None, "rtk_status": 0} if data == "Easting": buf[ts]["easting"] = float(val) elif data == "Northing": buf[ts]["northing"] = float(val) elif data == "UTM_number": buf[ts]["utm_num"] = str(int(float(val))) elif data == "UTM_letter": buf[ts]["utm_let"] = val rows = [] for r in buf.values(): if r["easting"] is None or r["northing"] is None: continue zone = (r["utm_num"] or "") + (r["utm_let"] or "") rows.append({ "t_ns": _ts_to_ns(r["ts_str"]), "easting": r["easting"], "northing": r["northing"], "utm_zone": zone, "rtk_status": r["rtk_status"], }) return sorted(rows, key=lambda r: r["t_ns"]) finally: if close: f.close() def write_usv_gps_group(h5_path: str, rows: list[dict]) -> None: t = np.array([r["t_ns"] for r in rows], dtype=np.int64) e = np.array([r["easting"] for r in rows], dtype=np.float64) n = np.array([r["northing"] for r in rows], dtype=np.float64) rtk = np.array([r["rtk_status"] for r in rows], dtype=np.int8) with h5py.File(h5_path, "a") as f: if "usv_gps" in f: del f["usv_gps"] grp = f.create_group("usv_gps") grp.create_dataset("t_ns", data=t, compression="gzip") grp.create_dataset("easting", data=e, compression="gzip") grp.create_dataset("northing", data=n, compression="gzip") grp.create_dataset("rtk_status", data=rtk, compression="gzip") grp.attrs["utm_zone"] = rows[0]["utm_zone"] if rows else "" def extract(csv_dir: str, out_h5: str) -> None: nav_files = glob.glob(f"{csv_dir}/*_navigation_log.csv") if not nav_files: raise FileNotFoundError(f"No navigation_log.csv in {csv_dir}") rows = parse_nav_log(nav_files[0]) write_usv_gps_group(out_h5, rows) print(f"USV GPS: {len(rows)} fixes (UTM) -> {out_h5} [/usv_gps]") if __name__ == "__main__": extract(sys.argv[1], sys.argv[2])