From 28fa6ddb0d1f15963e17b13595acf185a39438ed Mon Sep 17 00:00:00 2001 From: Floppyrj45 Date: Fri, 24 Apr 2026 01:46:46 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20parse=5Fusv=5Fgps=20=E2=80=94=20navigat?= =?UTF-8?q?ion=5Flog=20CSV=20UTM=20->=20HDF5=20/usv=5Fgps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- extract/parse_usv_gps.py | 85 +++++++++++++++++++++++++++++++++++++ tests/test_parse_usv_gps.py | 35 +++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 extract/parse_usv_gps.py create mode 100644 tests/test_parse_usv_gps.py diff --git a/extract/parse_usv_gps.py b/extract/parse_usv_gps.py new file mode 100644 index 0000000..1feabb5 --- /dev/null +++ b/extract/parse_usv_gps.py @@ -0,0 +1,85 @@ +# extract/parse_usv_gps.py +import csv, sys, glob +from datetime import datetime, timezone +from typing import Union, IO +import numpy as np +import h5py + + +def _ts_to_ns(ts_str: str) -> int: + """Convert '2026-04-08 09:31:10.123456' to nanoseconds since epoch.""" + dt = datetime.strptime(ts_str.strip(), "%Y-%m-%d %H:%M:%S.%f") + dt = dt.replace(tzinfo=timezone.utc) + return int(dt.timestamp() * 1e9) + + +def parse_nav_log(source: Union[str, IO]) -> list[dict]: + if isinstance(source, str): + f = open(source, newline="", encoding="utf-8", errors="replace") + close = True + else: + f = source + close = False + try: + reader = csv.DictReader(f) + buf: dict[str, dict] = {} # key = timestamp string + for row in reader: + data = row.get("data", "").strip() + ts = row.get("timestamp", "").strip() + val = row.get("value", "").strip() + if ts not in buf: + buf[ts] = {"ts_str": ts, "easting": None, "northing": None, + "utm_num": None, "utm_let": None, "rtk_status": 0} + if data == "Easting": + buf[ts]["easting"] = float(val) + elif data == "Northing": + buf[ts]["northing"] = float(val) + elif data == "UTM_number": + buf[ts]["utm_num"] = str(int(float(val))) + elif data == "UTM_letter": + buf[ts]["utm_let"] = val + rows = [] + for r in buf.values(): + if r["easting"] is None or r["northing"] is None: + continue + zone = (r["utm_num"] or "") + (r["utm_let"] or "") + rows.append({ + "t_ns": _ts_to_ns(r["ts_str"]), + "easting": r["easting"], + "northing": r["northing"], + "utm_zone": zone, + "rtk_status": r["rtk_status"], + }) + return sorted(rows, key=lambda r: r["t_ns"]) + finally: + if close: + f.close() + + +def write_usv_gps_group(h5_path: str, rows: list[dict]) -> None: + t = np.array([r["t_ns"] for r in rows], dtype=np.int64) + e = np.array([r["easting"] for r in rows], dtype=np.float64) + n = np.array([r["northing"] for r in rows], dtype=np.float64) + rtk = np.array([r["rtk_status"] for r in rows], dtype=np.int8) + with h5py.File(h5_path, "a") as f: + if "usv_gps" in f: + del f["usv_gps"] + grp = f.create_group("usv_gps") + grp.create_dataset("t_ns", data=t, compression="gzip") + grp.create_dataset("easting", data=e, compression="gzip") + grp.create_dataset("northing", data=n, compression="gzip") + grp.create_dataset("rtk_status", data=rtk, compression="gzip") + grp.attrs["utm_zone"] = rows[0]["utm_zone"] if rows else "" + + +def extract(csv_dir: str, out_h5: str) -> None: + nav_files = glob.glob(f"{csv_dir}/*_navigation_log.csv") + if not nav_files: + raise FileNotFoundError(f"No navigation_log.csv in {csv_dir}") + rows = parse_nav_log(nav_files[0]) + write_usv_gps_group(out_h5, rows) + print(f"USV GPS: {len(rows)} fixes (UTM) -> {out_h5} [/usv_gps]") + + +if __name__ == "__main__": + extract(sys.argv[1], sys.argv[2]) diff --git a/tests/test_parse_usv_gps.py b/tests/test_parse_usv_gps.py new file mode 100644 index 0000000..1430297 --- /dev/null +++ b/tests/test_parse_usv_gps.py @@ -0,0 +1,35 @@ +import tempfile, os, io +import numpy as np +import h5py + +SAMPLE_CSV = """timestamp,data,value +2026-04-08 09:31:10.100000,Easting,318234.5 +2026-04-08 09:31:10.100000,Northing,4823456.7 +2026-04-08 09:31:10.100000,UTM_number,32 +2026-04-08 09:31:10.100000,UTM_letter,T +2026-04-08 09:31:11.200000,Easting,318235.1 +2026-04-08 09:31:11.200000,Northing,4823457.2 +2026-04-08 09:31:11.200000,UTM_number,32 +2026-04-08 09:31:11.200000,UTM_letter,T +""" + +def test_parse_usv_gps(): + from extract.parse_usv_gps import parse_nav_log, write_usv_gps_group + rows = parse_nav_log(io.StringIO(SAMPLE_CSV)) + assert len(rows) == 2 + assert abs(rows[0]["easting"] - 318234.5) < 0.01 + assert abs(rows[0]["northing"] - 4823456.7) < 0.01 + assert rows[0]["utm_zone"] == "32T" + assert rows[0]["rtk_status"] == 0 + + with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as tmp: + path = tmp.name + try: + write_usv_gps_group(path, rows) + with h5py.File(path, "r") as f: + assert "usv_gps" in f + assert len(f["usv_gps/easting"][:]) == 2 + assert len(f["usv_gps/northing"][:]) == 2 + assert len(f["usv_gps/t_ns"][:]) == 2 + finally: + os.unlink(path)