feat(cosma-nav-tools): v1 parser + Leaflet viewer USV La Ciotat 2026-04-08

This commit is contained in:
Poulpe
2026-04-25 17:18:27 +00:00
commit 4ceec113b6
4 changed files with 318 additions and 0 deletions

162
tools/parse_usv_nav.py Normal file
View File

@@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""Parse USV long-format CSV → track.geojson + points.json"""
import argparse
import csv
import json
import os
import sys
from collections import defaultdict
MAX_SLIDER_POINTS = 5000
def parse_args():
p = argparse.ArgumentParser(description="Parse USV nav CSV")
p.add_argument("--input", required=True, help="CSV navigation log")
p.add_argument("--output", required=True, help="Output directory")
return p.parse_args()
def load_csv(path):
"""Load long-format CSV into {timestamp: {field: value}}"""
rows_by_ts = defaultdict(dict)
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
ts = row["timestamp"]
field = row["data"]
val = row["value"]
rows_by_ts[ts][field] = val
return rows_by_ts
def get_float(d, *keys):
for k in keys:
v = d.get(k)
if v is not None:
try:
return float(v)
except ValueError:
pass
return None
def build_points(rows_by_ts):
"""Build sorted list of {t, lat, lon, heading} where lat/lon valid."""
# We need to track last known lat/lon/heading per timestamp cluster.
# Strategy: walk timestamps in order, emit a point each time we see a Lat or Lon update.
# Accumulate state across timestamps.
timestamps = sorted(rows_by_ts.keys())
state = {}
points = []
for ts in timestamps:
updates = rows_by_ts[ts]
state.update(updates)
# Only emit point if we have both Lat and Lon from this or earlier ts
lat = get_float(state, "Lat", "RAW_Lat")
lon = get_float(state, "Lon", "RAW_Lon")
heading = get_float(state, "Heading", "Yaw")
if lat is None or lon is None:
continue
if lat == 0.0 and lon == 0.0:
continue
# GPS_RAW_INT fallback (1e-7 degrees)
if abs(lat) < 1 and abs(lon) < 1:
raw_lat = get_float(state, "GPS_RAW_INT_lat")
raw_lon = get_float(state, "GPS_RAW_INT_lon")
if raw_lat and raw_lon:
lat = raw_lat / 1e7
lon = raw_lon / 1e7
else:
continue
# Only emit if Lat or Lon just updated (reduce duplicate consecutive points)
if "Lat" in updates or "Lon" in updates or "RAW_Lat" in updates or "RAW_Lon" in updates:
points.append({
"t": ts,
"lat": round(lat, 8),
"lon": round(lon, 8),
"heading": round(heading, 2) if heading is not None else None,
})
return points
def sample_points(points, max_n):
if len(points) <= max_n:
return points
step = len(points) / max_n
return [points[int(i * step)] for i in range(max_n)]
def write_geojson(points, path):
coords = [[p["lon"], p["lat"]] for p in points]
geojson = {
"type": "FeatureCollection",
"features": [{
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": coords},
"properties": {
"start": points[0]["t"] if points else None,
"end": points[-1]["t"] if points else None,
"n_points": len(points),
}
}]
}
with open(path, "w") as f:
json.dump(geojson, f)
print(f" track.geojson: {len(coords)} coords → {path}")
def write_points_json(points, path):
with open(path, "w") as f:
json.dump(points, f)
print(f" points.json: {len(points)} points → {path}")
def print_stats(points):
if not points:
print("No valid points found!")
return
lats = [p["lat"] for p in points]
lons = [p["lon"] for p in points]
print(f"\n=== Stats ===")
print(f" N points (full): {len(points)}")
print(f" First ts: {points[0]['t']}")
print(f" Last ts: {points[-1]['t']}")
print(f" Bbox lat: {min(lats):.6f}{max(lats):.6f}")
print(f" Bbox lon: {min(lons):.6f}{max(lons):.6f}")
headings = [p["heading"] for p in points if p["heading"] is not None]
print(f" Heading data: {'yes' if headings else 'no'} ({len(headings)} values)")
def main():
args = parse_args()
os.makedirs(args.output, exist_ok=True)
print(f"Loading {args.input} ...")
rows = load_csv(args.input)
print(f" {len(rows)} unique timestamps")
points = build_points(rows)
print_stats(points)
if not points:
sys.exit(1)
write_geojson(points, os.path.join(args.output, "track.geojson"))
sampled = sample_points(points, MAX_SLIDER_POINTS)
if len(sampled) < len(points):
print(f" Sampled {len(sampled)} points for slider (from {len(points)})")
write_points_json(sampled, os.path.join(args.output, "points.json"))
print("\nDone.")
if __name__ == "__main__":
main()