feat(viewer): v3 AUV track + USBL vector overlay
This commit is contained in:
125
tools/usbl_to_json.py
Normal file
125
tools/usbl_to_json.py
Normal file
@@ -0,0 +1,125 @@
|
||||
#!/usr/bin/env python3
|
||||
"""usbl_to_json.py - Convert combined_nav_usbl.csv to usbl.json + auv_track.geojson"""
|
||||
import csv, json, math, argparse, statistics
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
DEFAULT_INPUT = ROOT / "output" / "combined_nav_usbl.csv"
|
||||
OUTPUT_DIR = ROOT / "output"
|
||||
|
||||
def parse_ts(s):
|
||||
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
|
||||
try:
|
||||
dt = datetime.strptime(s.strip(), fmt).replace(tzinfo=timezone.utc)
|
||||
return int(dt.timestamp() * 1000)
|
||||
except ValueError:
|
||||
pass
|
||||
return None
|
||||
|
||||
def haversine_m(lat1, lon1, lat2, lon2):
|
||||
R = 6371000.0
|
||||
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||||
dphi = math.radians(lat2 - lat1)
|
||||
dlam = math.radians(lon2 - lon1)
|
||||
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
|
||||
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--input", default=str(DEFAULT_INPUT))
|
||||
ap.add_argument("--max-points", type=int, default=10000)
|
||||
args = ap.parse_args()
|
||||
|
||||
rows = []
|
||||
with open(args.input, newline="") as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
try:
|
||||
dist = float(row["Dist"])
|
||||
auv_lat = float(row["auv_lat"])
|
||||
auv_lon = float(row["auv_lon"])
|
||||
except (ValueError, KeyError):
|
||||
continue
|
||||
if dist <= 0 or auv_lat == 0 or auv_lon == 0:
|
||||
continue
|
||||
t_ms = parse_ts(row["Timestamp"])
|
||||
if t_ms is None:
|
||||
continue
|
||||
rows.append({
|
||||
"t": row["Timestamp"].strip(),
|
||||
"t_ms": t_ms,
|
||||
"usv_lat": float(row["lat"]),
|
||||
"usv_lon": float(row["lon"]),
|
||||
"heading": float(row["Heading"]),
|
||||
"dist": dist,
|
||||
"az": float(row["Azimuth"]),
|
||||
"elev": float(row["Elev"]),
|
||||
"snr": float(row["SNR"]),
|
||||
"auv_lat": auv_lat,
|
||||
"auv_lon": auv_lon,
|
||||
})
|
||||
|
||||
rows.sort(key=lambda r: r["t_ms"])
|
||||
n_raw = len(rows)
|
||||
|
||||
# Sample if > max-points (preserve begin/end)
|
||||
if n_raw > args.max_points:
|
||||
step = n_raw / args.max_points
|
||||
indices = set()
|
||||
indices.add(0)
|
||||
indices.add(n_raw - 1)
|
||||
for i in range(1, args.max_points - 1):
|
||||
indices.add(int(i * step))
|
||||
rows = [rows[i] for i in sorted(indices)]
|
||||
|
||||
n = len(rows)
|
||||
dists = [r["dist"] for r in rows]
|
||||
snrs = [r["snr"] for r in rows]
|
||||
auv_lats = [r["auv_lat"] for r in rows]
|
||||
auv_lons = [r["auv_lon"] for r in rows]
|
||||
|
||||
auv_bbox = [min(auv_lons), min(auv_lats), max(auv_lons), max(auv_lats)]
|
||||
|
||||
out = {
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"n_points": n,
|
||||
"n_raw": n_raw,
|
||||
"auv_bbox": auv_bbox,
|
||||
"stats": {
|
||||
"dist_min": round(min(dists), 3),
|
||||
"dist_max": round(max(dists), 3),
|
||||
"dist_median": round(statistics.median(dists), 3),
|
||||
"snr_min": round(min(snrs), 4),
|
||||
"snr_max": round(max(snrs), 4),
|
||||
},
|
||||
"points": rows,
|
||||
}
|
||||
|
||||
usbl_out = OUTPUT_DIR / "usbl.json"
|
||||
with open(usbl_out, "w") as f:
|
||||
json.dump(out, f, separators=(",", ":"))
|
||||
print(f"usbl.json: {n} points (raw={n_raw}) -> {usbl_out}")
|
||||
|
||||
# AUV track GeoJSON
|
||||
coords = [[r["auv_lon"], r["auv_lat"]] for r in rows]
|
||||
geojson = {
|
||||
"type": "FeatureCollection",
|
||||
"features": [{
|
||||
"type": "Feature",
|
||||
"geometry": {"type": "LineString", "coordinates": coords},
|
||||
"properties": {"name": "AUV track (USBL projection)", "color": "#ff8800"},
|
||||
}]
|
||||
}
|
||||
track_out = OUTPUT_DIR / "auv_track.geojson"
|
||||
with open(track_out, "w") as f:
|
||||
json.dump(geojson, f, separators=(",", ":"))
|
||||
print(f"auv_track.geojson: {len(coords)} coords -> {track_out}")
|
||||
|
||||
# Sanity check: first point haversine vs USBL dist
|
||||
r0 = rows[0]
|
||||
hav = haversine_m(r0["usv_lat"], r0["usv_lon"], r0["auv_lat"], r0["auv_lon"])
|
||||
print(f"Sanity [0]: USV=({r0['usv_lat']:.6f},{r0['usv_lon']:.6f}) AUV=({r0['auv_lat']:.6f},{r0['auv_lon']:.6f}) hav={hav:.2f}m USBL_dist={r0['dist']:.2f}m diff={abs(hav-r0['dist']):.3f}m")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user