feat: parse_usv_gps — navigation_log CSV UTM -> HDF5 /usv_gps
This commit is contained in:
85
extract/parse_usv_gps.py
Normal file
85
extract/parse_usv_gps.py
Normal file
@@ -0,0 +1,85 @@
|
||||
# extract/parse_usv_gps.py
|
||||
import csv, sys, glob
|
||||
from datetime import datetime, timezone
|
||||
from typing import Union, IO
|
||||
import numpy as np
|
||||
import h5py
|
||||
|
||||
|
||||
def _ts_to_ns(ts_str: str) -> int:
|
||||
"""Convert '2026-04-08 09:31:10.123456' to nanoseconds since epoch."""
|
||||
dt = datetime.strptime(ts_str.strip(), "%Y-%m-%d %H:%M:%S.%f")
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return int(dt.timestamp() * 1e9)
|
||||
|
||||
|
||||
def parse_nav_log(source: Union[str, IO]) -> list[dict]:
|
||||
if isinstance(source, str):
|
||||
f = open(source, newline="", encoding="utf-8", errors="replace")
|
||||
close = True
|
||||
else:
|
||||
f = source
|
||||
close = False
|
||||
try:
|
||||
reader = csv.DictReader(f)
|
||||
buf: dict[str, dict] = {} # key = timestamp string
|
||||
for row in reader:
|
||||
data = row.get("data", "").strip()
|
||||
ts = row.get("timestamp", "").strip()
|
||||
val = row.get("value", "").strip()
|
||||
if ts not in buf:
|
||||
buf[ts] = {"ts_str": ts, "easting": None, "northing": None,
|
||||
"utm_num": None, "utm_let": None, "rtk_status": 0}
|
||||
if data == "Easting":
|
||||
buf[ts]["easting"] = float(val)
|
||||
elif data == "Northing":
|
||||
buf[ts]["northing"] = float(val)
|
||||
elif data == "UTM_number":
|
||||
buf[ts]["utm_num"] = str(int(float(val)))
|
||||
elif data == "UTM_letter":
|
||||
buf[ts]["utm_let"] = val
|
||||
rows = []
|
||||
for r in buf.values():
|
||||
if r["easting"] is None or r["northing"] is None:
|
||||
continue
|
||||
zone = (r["utm_num"] or "") + (r["utm_let"] or "")
|
||||
rows.append({
|
||||
"t_ns": _ts_to_ns(r["ts_str"]),
|
||||
"easting": r["easting"],
|
||||
"northing": r["northing"],
|
||||
"utm_zone": zone,
|
||||
"rtk_status": r["rtk_status"],
|
||||
})
|
||||
return sorted(rows, key=lambda r: r["t_ns"])
|
||||
finally:
|
||||
if close:
|
||||
f.close()
|
||||
|
||||
|
||||
def write_usv_gps_group(h5_path: str, rows: list[dict]) -> None:
|
||||
t = np.array([r["t_ns"] for r in rows], dtype=np.int64)
|
||||
e = np.array([r["easting"] for r in rows], dtype=np.float64)
|
||||
n = np.array([r["northing"] for r in rows], dtype=np.float64)
|
||||
rtk = np.array([r["rtk_status"] for r in rows], dtype=np.int8)
|
||||
with h5py.File(h5_path, "a") as f:
|
||||
if "usv_gps" in f:
|
||||
del f["usv_gps"]
|
||||
grp = f.create_group("usv_gps")
|
||||
grp.create_dataset("t_ns", data=t, compression="gzip")
|
||||
grp.create_dataset("easting", data=e, compression="gzip")
|
||||
grp.create_dataset("northing", data=n, compression="gzip")
|
||||
grp.create_dataset("rtk_status", data=rtk, compression="gzip")
|
||||
grp.attrs["utm_zone"] = rows[0]["utm_zone"] if rows else ""
|
||||
|
||||
|
||||
def extract(csv_dir: str, out_h5: str) -> None:
|
||||
nav_files = glob.glob(f"{csv_dir}/*_navigation_log.csv")
|
||||
if not nav_files:
|
||||
raise FileNotFoundError(f"No navigation_log.csv in {csv_dir}")
|
||||
rows = parse_nav_log(nav_files[0])
|
||||
write_usv_gps_group(out_h5, rows)
|
||||
print(f"USV GPS: {len(rows)} fixes (UTM) -> {out_h5} [/usv_gps]")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
extract(sys.argv[1], sys.argv[2])
|
||||
35
tests/test_parse_usv_gps.py
Normal file
35
tests/test_parse_usv_gps.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import tempfile, os, io
|
||||
import numpy as np
|
||||
import h5py
|
||||
|
||||
SAMPLE_CSV = """timestamp,data,value
|
||||
2026-04-08 09:31:10.100000,Easting,318234.5
|
||||
2026-04-08 09:31:10.100000,Northing,4823456.7
|
||||
2026-04-08 09:31:10.100000,UTM_number,32
|
||||
2026-04-08 09:31:10.100000,UTM_letter,T
|
||||
2026-04-08 09:31:11.200000,Easting,318235.1
|
||||
2026-04-08 09:31:11.200000,Northing,4823457.2
|
||||
2026-04-08 09:31:11.200000,UTM_number,32
|
||||
2026-04-08 09:31:11.200000,UTM_letter,T
|
||||
"""
|
||||
|
||||
def test_parse_usv_gps():
|
||||
from extract.parse_usv_gps import parse_nav_log, write_usv_gps_group
|
||||
rows = parse_nav_log(io.StringIO(SAMPLE_CSV))
|
||||
assert len(rows) == 2
|
||||
assert abs(rows[0]["easting"] - 318234.5) < 0.01
|
||||
assert abs(rows[0]["northing"] - 4823456.7) < 0.01
|
||||
assert rows[0]["utm_zone"] == "32T"
|
||||
assert rows[0]["rtk_status"] == 0
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".h5", delete=False) as tmp:
|
||||
path = tmp.name
|
||||
try:
|
||||
write_usv_gps_group(path, rows)
|
||||
with h5py.File(path, "r") as f:
|
||||
assert "usv_gps" in f
|
||||
assert len(f["usv_gps/easting"][:]) == 2
|
||||
assert len(f["usv_gps/northing"][:]) == 2
|
||||
assert len(f["usv_gps/t_ns"][:]) == 2
|
||||
finally:
|
||||
os.unlink(path)
|
||||
Reference in New Issue
Block a user