#!/usr/bin/env python3 """Parse USV long-format CSV → track.geojson + points.json""" import argparse import csv import json import os import sys from collections import defaultdict MAX_SLIDER_POINTS = 5000 def parse_args(): p = argparse.ArgumentParser(description="Parse USV nav CSV") p.add_argument("--input", required=True, help="CSV navigation log") p.add_argument("--output", required=True, help="Output directory") return p.parse_args() def load_csv(path): """Load long-format CSV into {timestamp: {field: value}}""" rows_by_ts = defaultdict(dict) with open(path, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: ts = row["timestamp"] field = row["data"] val = row["value"] rows_by_ts[ts][field] = val return rows_by_ts def get_float(d, *keys): for k in keys: v = d.get(k) if v is not None: try: return float(v) except ValueError: pass return None def build_points(rows_by_ts): """Build sorted list of {t, lat, lon, heading} where lat/lon valid.""" # We need to track last known lat/lon/heading per timestamp cluster. # Strategy: walk timestamps in order, emit a point each time we see a Lat or Lon update. # Accumulate state across timestamps. timestamps = sorted(rows_by_ts.keys()) state = {} points = [] for ts in timestamps: updates = rows_by_ts[ts] state.update(updates) # Only emit point if we have both Lat and Lon from this or earlier ts lat = get_float(state, "Lat", "RAW_Lat") lon = get_float(state, "Lon", "RAW_Lon") heading = get_float(state, "Heading", "Yaw") if lat is None or lon is None: continue if lat == 0.0 and lon == 0.0: continue # GPS_RAW_INT fallback (1e-7 degrees) if abs(lat) < 1 and abs(lon) < 1: raw_lat = get_float(state, "GPS_RAW_INT_lat") raw_lon = get_float(state, "GPS_RAW_INT_lon") if raw_lat and raw_lon: lat = raw_lat / 1e7 lon = raw_lon / 1e7 else: continue # Only emit if Lat or Lon just updated (reduce duplicate consecutive points) if "Lat" in updates or "Lon" in updates or "RAW_Lat" in updates or "RAW_Lon" in updates: points.append({ "t": ts, "lat": round(lat, 8), "lon": round(lon, 8), "heading": round(heading, 2) if heading is not None else None, }) return points def sample_points(points, max_n): if len(points) <= max_n: return points step = len(points) / max_n return [points[int(i * step)] for i in range(max_n)] def write_geojson(points, path): coords = [[p["lon"], p["lat"]] for p in points] geojson = { "type": "FeatureCollection", "features": [{ "type": "Feature", "geometry": {"type": "LineString", "coordinates": coords}, "properties": { "start": points[0]["t"] if points else None, "end": points[-1]["t"] if points else None, "n_points": len(points), } }] } with open(path, "w") as f: json.dump(geojson, f) print(f" track.geojson: {len(coords)} coords → {path}") def write_points_json(points, path): with open(path, "w") as f: json.dump(points, f) print(f" points.json: {len(points)} points → {path}") def print_stats(points): if not points: print("No valid points found!") return lats = [p["lat"] for p in points] lons = [p["lon"] for p in points] print(f"\n=== Stats ===") print(f" N points (full): {len(points)}") print(f" First ts: {points[0]['t']}") print(f" Last ts: {points[-1]['t']}") print(f" Bbox lat: {min(lats):.6f} → {max(lats):.6f}") print(f" Bbox lon: {min(lons):.6f} → {max(lons):.6f}") headings = [p["heading"] for p in points if p["heading"] is not None] print(f" Heading data: {'yes' if headings else 'no'} ({len(headings)} values)") def main(): args = parse_args() os.makedirs(args.output, exist_ok=True) print(f"Loading {args.input} ...") rows = load_csv(args.input) print(f" {len(rows)} unique timestamps") points = build_points(rows) print_stats(points) if not points: sys.exit(1) write_geojson(points, os.path.join(args.output, "track.geojson")) sampled = sample_points(points, MAX_SLIDER_POINTS) if len(sampled) < len(points): print(f" Sampled {len(sampled)} points for slider (from {len(points)})") write_points_json(sampled, os.path.join(args.output, "points.json")) print("\nDone.") if __name__ == "__main__": main()