Files
cosma-nav/extract/parse_usv_gps.py

120 lines
4.6 KiB
Python

# extract/parse_usv_gps.py
import csv, sys, glob
from datetime import datetime, timezone
from typing import Union, IO
import numpy as np
import h5py
def _ts_to_ns(ts_str: str) -> int:
"""Convert '2026-04-08 09:31:10.123456' to nanoseconds since epoch."""
dt = datetime.strptime(ts_str.strip(), "%Y-%m-%d %H:%M:%S.%f")
dt = dt.replace(tzinfo=timezone.utc)
return int(dt.timestamp() * 1e9)
def parse_nav_log(source: Union[str, IO]) -> list[dict]:
if isinstance(source, str):
f = open(source, newline="", encoding="utf-8", errors="replace")
close = True
else:
f = source
close = False
try:
reader = csv.DictReader(f)
buf: dict[str, dict] = {} # key = timestamp string
for row in reader:
data = row.get("data", "").strip()
ts = row.get("timestamp", "").strip()
val = row.get("value", "").strip()
if ts not in buf:
buf[ts] = {"ts_str": ts, "easting": None, "northing": None,
"utm_num": None, "utm_let": None, "rtk_status": 0,
"heading_deg": None, "fix_type_raw": None}
if data == "Easting":
buf[ts]["easting"] = float(val)
elif data == "Northing":
buf[ts]["northing"] = float(val)
elif data == "UTM_number":
buf[ts]["utm_num"] = str(int(float(val)))
elif data == "UTM_letter":
buf[ts]["utm_let"] = val
elif data.lower() in ("heading", "hdg", "cog", "course", "yaw",
"yaw_deg", "heading_deg", "course_deg"):
try:
buf[ts]["heading_deg"] = float(val)
except ValueError:
pass
elif data.lower() in ("fix_type", "gps_fix_type", "rtk_type",
"fix_status", "gps_status"):
try:
buf[ts]["fix_type_raw"] = int(float(val))
except ValueError:
pass
rows = []
for r in buf.values():
if r["easting"] is None or r["northing"] is None:
continue
zone = (r["utm_num"] or "") + (r["utm_let"] or "")
# Map ArduPilot fix_type (0-6) → our 3-level RTK status
# 0=NoGPS,1=NoFix,2=2D,3=3D,4=DGPS → 0 (3D/none)
# 5=RTK_Float → 1, 6=RTK_Fixed → 2
raw_fix = r.get("fix_type_raw")
if raw_fix is not None:
if raw_fix >= 6:
rtk = 2
elif raw_fix == 5:
rtk = 1
else:
rtk = 0
else:
rtk = r["rtk_status"] # default 0
rows.append({
"t_ns": _ts_to_ns(r["ts_str"]),
"easting": r["easting"],
"northing": r["northing"],
"utm_zone": zone,
"rtk_status": rtk,
"heading_deg": r.get("heading_deg"), # float or None
})
return sorted(rows, key=lambda r: r["t_ns"])
finally:
if close:
f.close()
def write_usv_gps_group(h5_path: str, rows: list[dict]) -> None:
t = np.array([r["t_ns"] for r in rows], dtype=np.int64)
e = np.array([r["easting"] for r in rows], dtype=np.float64)
n = np.array([r["northing"] for r in rows], dtype=np.float64)
rtk = np.array([r["rtk_status"] for r in rows], dtype=np.int8)
hdg = np.array(
[r["heading_deg"] if r["heading_deg"] is not None else np.nan
for r in rows],
dtype=np.float64,
)
with h5py.File(h5_path, "a") as f:
if "usv_gps" in f:
del f["usv_gps"]
grp = f.create_group("usv_gps")
grp.create_dataset("t_ns", data=t, compression="gzip")
grp.create_dataset("easting", data=e, compression="gzip")
grp.create_dataset("northing", data=n, compression="gzip")
grp.create_dataset("rtk_status", data=rtk, compression="gzip")
grp.create_dataset("heading_deg", data=hdg, compression="gzip")
grp.attrs["utm_zone"] = rows[0]["utm_zone"] if rows else ""
def extract(csv_dir: str, out_h5: str) -> None:
nav_files = glob.glob(f"{csv_dir}/*_navigation_log.csv")
if not nav_files:
raise FileNotFoundError(f"No navigation_log.csv in {csv_dir}")
rows = parse_nav_log(nav_files[0])
write_usv_gps_group(out_h5, rows)
print(f"USV GPS: {len(rows)} fixes (UTM) -> {out_h5} [/usv_gps]")
if __name__ == "__main__":
extract(sys.argv[1], sys.argv[2])