#!/usr/bin/env python3 """usbl_to_json.py - Convert combined_nav_usbl.csv to usbl.json + auv_track.geojson""" import csv, json, math, argparse, statistics from datetime import datetime, timezone from pathlib import Path ROOT = Path(__file__).resolve().parent.parent DEFAULT_INPUT = ROOT / "output" / "combined_nav_usbl.csv" OUTPUT_DIR = ROOT / "output" def parse_ts(s): for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"): try: dt = datetime.strptime(s.strip(), fmt).replace(tzinfo=timezone.utc) return int(dt.timestamp() * 1000) except ValueError: pass return None def haversine_m(lat1, lon1, lat2, lon2): R = 6371000.0 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2 return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) def main(): ap = argparse.ArgumentParser() ap.add_argument("--input", default=str(DEFAULT_INPUT)) ap.add_argument("--max-points", type=int, default=10000) args = ap.parse_args() rows = [] with open(args.input, newline="") as f: reader = csv.DictReader(f) for row in reader: try: dist = float(row["Dist"]) auv_lat = float(row["auv_lat"]) auv_lon = float(row["auv_lon"]) except (ValueError, KeyError): continue if dist <= 0 or auv_lat == 0 or auv_lon == 0: continue t_ms = parse_ts(row["Timestamp"]) if t_ms is None: continue rows.append({ "t": row["Timestamp"].strip(), "t_ms": t_ms, "usv_lat": float(row["lat"]), "usv_lon": float(row["lon"]), "heading": float(row["Heading"]), "dist": dist, "az": float(row["Azimuth"]), "elev": float(row["Elev"]), "snr": float(row["SNR"]), "auv_lat": auv_lat, "auv_lon": auv_lon, }) rows.sort(key=lambda r: r["t_ms"]) n_raw = len(rows) # Sample if > max-points (preserve begin/end) if n_raw > args.max_points: step = n_raw / args.max_points indices = set() indices.add(0) indices.add(n_raw - 1) for i in range(1, args.max_points - 1): indices.add(int(i * step)) rows = [rows[i] for i in sorted(indices)] n = len(rows) dists = [r["dist"] for r in rows] snrs = [r["snr"] for r in rows] auv_lats = [r["auv_lat"] for r in rows] auv_lons = [r["auv_lon"] for r in rows] auv_bbox = [min(auv_lons), min(auv_lats), max(auv_lons), max(auv_lats)] out = { "generated_at": datetime.now(timezone.utc).isoformat(), "n_points": n, "n_raw": n_raw, "auv_bbox": auv_bbox, "stats": { "dist_min": round(min(dists), 3), "dist_max": round(max(dists), 3), "dist_median": round(statistics.median(dists), 3), "snr_min": round(min(snrs), 4), "snr_max": round(max(snrs), 4), }, "points": rows, } usbl_out = OUTPUT_DIR / "usbl.json" with open(usbl_out, "w") as f: json.dump(out, f, separators=(",", ":")) print(f"usbl.json: {n} points (raw={n_raw}) -> {usbl_out}") # AUV track GeoJSON coords = [[r["auv_lon"], r["auv_lat"]] for r in rows] geojson = { "type": "FeatureCollection", "features": [{ "type": "Feature", "geometry": {"type": "LineString", "coordinates": coords}, "properties": {"name": "AUV track (USBL projection)", "color": "#ff8800"}, }] } track_out = OUTPUT_DIR / "auv_track.geojson" with open(track_out, "w") as f: json.dump(geojson, f, separators=(",", ":")) print(f"auv_track.geojson: {len(coords)} coords -> {track_out}") # Sanity check: first point haversine vs USBL dist r0 = rows[0] hav = haversine_m(r0["usv_lat"], r0["usv_lon"], r0["auv_lat"], r0["auv_lon"]) print(f"Sanity [0]: USV=({r0['usv_lat']:.6f},{r0['usv_lon']:.6f}) AUV=({r0['auv_lat']:.6f},{r0['auv_lon']:.6f}) hav={hav:.2f}m USBL_dist={r0['dist']:.2f}m diff={abs(hav-r0['dist']):.3f}m") if __name__ == "__main__": main()