126 lines
4.3 KiB
Python
126 lines
4.3 KiB
Python
#!/usr/bin/env python3
|
|
"""usbl_to_json.py - Convert combined_nav_usbl.csv to usbl.json + auv_track.geojson"""
|
|
import csv, json, math, argparse, statistics
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
DEFAULT_INPUT = ROOT / "output" / "combined_nav_usbl.csv"
|
|
OUTPUT_DIR = ROOT / "output"
|
|
|
|
def parse_ts(s):
|
|
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
|
|
try:
|
|
dt = datetime.strptime(s.strip(), fmt).replace(tzinfo=timezone.utc)
|
|
return int(dt.timestamp() * 1000)
|
|
except ValueError:
|
|
pass
|
|
return None
|
|
|
|
def haversine_m(lat1, lon1, lat2, lon2):
|
|
R = 6371000.0
|
|
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlam = math.radians(lon2 - lon1)
|
|
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
|
|
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--input", default=str(DEFAULT_INPUT))
|
|
ap.add_argument("--max-points", type=int, default=10000)
|
|
args = ap.parse_args()
|
|
|
|
rows = []
|
|
with open(args.input, newline="") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
try:
|
|
dist = float(row["Dist"])
|
|
auv_lat = float(row["auv_lat"])
|
|
auv_lon = float(row["auv_lon"])
|
|
except (ValueError, KeyError):
|
|
continue
|
|
if dist <= 0 or auv_lat == 0 or auv_lon == 0:
|
|
continue
|
|
t_ms = parse_ts(row["Timestamp"])
|
|
if t_ms is None:
|
|
continue
|
|
rows.append({
|
|
"t": row["Timestamp"].strip(),
|
|
"t_ms": t_ms,
|
|
"usv_lat": float(row["lat"]),
|
|
"usv_lon": float(row["lon"]),
|
|
"heading": float(row["Heading"]),
|
|
"dist": dist,
|
|
"az": float(row["Azimuth"]),
|
|
"elev": float(row["Elev"]),
|
|
"snr": float(row["SNR"]),
|
|
"auv_lat": auv_lat,
|
|
"auv_lon": auv_lon,
|
|
})
|
|
|
|
rows.sort(key=lambda r: r["t_ms"])
|
|
n_raw = len(rows)
|
|
|
|
# Sample if > max-points (preserve begin/end)
|
|
if n_raw > args.max_points:
|
|
step = n_raw / args.max_points
|
|
indices = set()
|
|
indices.add(0)
|
|
indices.add(n_raw - 1)
|
|
for i in range(1, args.max_points - 1):
|
|
indices.add(int(i * step))
|
|
rows = [rows[i] for i in sorted(indices)]
|
|
|
|
n = len(rows)
|
|
dists = [r["dist"] for r in rows]
|
|
snrs = [r["snr"] for r in rows]
|
|
auv_lats = [r["auv_lat"] for r in rows]
|
|
auv_lons = [r["auv_lon"] for r in rows]
|
|
|
|
auv_bbox = [min(auv_lons), min(auv_lats), max(auv_lons), max(auv_lats)]
|
|
|
|
out = {
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"n_points": n,
|
|
"n_raw": n_raw,
|
|
"auv_bbox": auv_bbox,
|
|
"stats": {
|
|
"dist_min": round(min(dists), 3),
|
|
"dist_max": round(max(dists), 3),
|
|
"dist_median": round(statistics.median(dists), 3),
|
|
"snr_min": round(min(snrs), 4),
|
|
"snr_max": round(max(snrs), 4),
|
|
},
|
|
"points": rows,
|
|
}
|
|
|
|
usbl_out = OUTPUT_DIR / "usbl.json"
|
|
with open(usbl_out, "w") as f:
|
|
json.dump(out, f, separators=(",", ":"))
|
|
print(f"usbl.json: {n} points (raw={n_raw}) -> {usbl_out}")
|
|
|
|
# AUV track GeoJSON
|
|
coords = [[r["auv_lon"], r["auv_lat"]] for r in rows]
|
|
geojson = {
|
|
"type": "FeatureCollection",
|
|
"features": [{
|
|
"type": "Feature",
|
|
"geometry": {"type": "LineString", "coordinates": coords},
|
|
"properties": {"name": "AUV track (USBL projection)", "color": "#ff8800"},
|
|
}]
|
|
}
|
|
track_out = OUTPUT_DIR / "auv_track.geojson"
|
|
with open(track_out, "w") as f:
|
|
json.dump(geojson, f, separators=(",", ":"))
|
|
print(f"auv_track.geojson: {len(coords)} coords -> {track_out}")
|
|
|
|
# Sanity check: first point haversine vs USBL dist
|
|
r0 = rows[0]
|
|
hav = haversine_m(r0["usv_lat"], r0["usv_lon"], r0["auv_lat"], r0["auv_lon"])
|
|
print(f"Sanity [0]: USV=({r0['usv_lat']:.6f},{r0['usv_lon']:.6f}) AUV=({r0['auv_lat']:.6f},{r0['auv_lon']:.6f}) hav={hav:.2f}m USBL_dist={r0['dist']:.2f}m diff={abs(hav-r0['dist']):.3f}m")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|