#!/usr/bin/env python3 """Parse USV long-format CSV → track.geojson + points.json + manifest.json v2: multi-session support via --input-dir, retro-compat with --input (single file) """ import argparse import csv import glob import json import os import sys from collections import defaultdict from datetime import datetime, timezone MAX_SLIDER_POINTS = 10000 def parse_args(): p = argparse.ArgumentParser(description="Parse USV nav CSV v2") g = p.add_mutually_exclusive_group(required=True) g.add_argument("--input", help="Single CSV navigation log (v1 compat)") g.add_argument("--input-dir", help="Directory: glob *navigation_log*.csv") p.add_argument("--output", required=True, help="Output directory") return p.parse_args() def find_csvs(input_dir): pattern = os.path.join(input_dir, "*navigation_log*.csv") files = sorted(glob.glob(pattern)) if not files: print(f"No navigation_log CSVs found in {input_dir}", file=sys.stderr) sys.exit(1) return files def load_csv(path): """Load long-format CSV → {timestamp: {field: value}}""" rows_by_ts = defaultdict(dict) with open(path, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: ts = row["timestamp"] field = row["data"] val = row["value"] rows_by_ts[ts][field] = val return rows_by_ts def get_float(d, *keys): for k in keys: v = d.get(k) if v is not None: try: return float(v) except ValueError: pass return None def ts_to_ms(ts_str): """Convert ISO-like timestamp string to epoch ms (UTC).""" # Try formats: '2026-03-24 09:04:07.123456' or '2026-03-24T09:04:07.123456' for fmt in ( "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S", ): try: dt = datetime.strptime(ts_str, fmt).replace(tzinfo=timezone.utc) return int(dt.timestamp() * 1000) except ValueError: continue return None def build_points(rows_by_ts, source_name): """Build sorted list of {t, t_ms, lat, lon, heading, source}.""" timestamps = sorted(rows_by_ts.keys()) state = {} points = [] for ts in timestamps: updates = rows_by_ts[ts] state.update(updates) lat = get_float(state, "Lat", "RAW_Lat") lon = get_float(state, "Lon", "RAW_Lon") heading = get_float(state, "Heading", "Yaw") if lat is None or lon is None: continue if lat == 0.0 and lon == 0.0: continue if abs(lat) < 1 and abs(lon) < 1: raw_lat = get_float(state, "GPS_RAW_INT_lat") raw_lon = get_float(state, "GPS_RAW_INT_lon") if raw_lat and raw_lon: lat = raw_lat / 1e7 lon = raw_lon / 1e7 else: continue if "Lat" in updates or "Lon" in updates or "RAW_Lat" in updates or "RAW_Lon" in updates: t_ms = ts_to_ms(ts) points.append({ "t": ts, "t_ms": t_ms, "lat": round(lat, 8), "lon": round(lon, 8), "heading": round(heading, 2) if heading is not None else None, "source": source_name, }) return points def sample_points_session(points, max_total, n_sessions): """Sample per session, always keeping first+last point of each session.""" if not points: return points quota = max(10, max_total // max(n_sessions, 1)) if len(points) <= quota: return points step = (len(points) - 2) / max(quota - 2, 1) sampled = [points[0]] for i in range(1, quota - 1): sampled.append(points[min(int(1 + i * step), len(points) - 2)]) sampled.append(points[-1]) return sampled def session_bbox(points): lats = [p["lat"] for p in points] lons = [p["lon"] for p in points] return [min(lons), min(lats), max(lons), max(lats)] def write_outputs(all_sessions, output_dir): """Write track.geojson, points.json, manifest.json.""" os.makedirs(output_dir, exist_ok=True) # Colors for multi-track COLORS = ["#00b4d8", "#e94560", "#06d6a0", "#ffd166", "#a855f7", "#f97316"] # ── track.geojson (MultiLineString per session) ── features = [] for i, sess in enumerate(all_sessions): coords = [[p["lon"], p["lat"]] for p in sess["points"]] features.append({ "type": "Feature", "geometry": {"type": "LineString", "coordinates": coords}, "properties": { "source_file": sess["source_file"], "source_name": sess["source_name"], "start_iso": sess["t_start"], "end_iso": sess["t_end"], "n_points": len(coords), "color": COLORS[i % len(COLORS)], "session_index": i, } }) geojson = {"type": "FeatureCollection", "features": features} geo_path = os.path.join(output_dir, "track.geojson") with open(geo_path, "w") as f: json.dump(geojson, f) print(f" track.geojson: {len(features)} sessions → {geo_path}") # ── points.json (all sampled, sorted by t_ms) ── all_points = [] n_sessions = len(all_sessions) for sess in all_sessions: sampled = sample_points_session(sess["points"], MAX_SLIDER_POINTS, n_sessions) all_points.extend(sampled) # Sort by t_ms (sessions may overlap in time) all_points.sort(key=lambda p: (p["t_ms"] or 0)) pts_path = os.path.join(output_dir, "points.json") with open(pts_path, "w") as f: json.dump(all_points, f) print(f" points.json: {len(all_points)} points (sampled) → {pts_path}") # ── manifest.json ── all_lats = [p["lat"] for s in all_sessions for p in s["points"]] all_lons = [p["lon"] for s in all_sessions for p in s["points"]] global_bbox = [min(all_lons), min(all_lats), max(all_lons), max(all_lats)] all_t_ms = [p["t_ms"] for s in all_sessions for p in s["points"] if p["t_ms"]] t_min_ms = min(all_t_ms) if all_t_ms else None t_max_ms = max(all_t_ms) if all_t_ms else None sessions_meta = [] for sess in all_sessions: sessions_meta.append({ "file": sess["source_file"], "source_name": sess["source_name"], "n_points": sess["n_points_raw"], "t_start": sess["t_start"], "t_end": sess["t_end"], "t_start_ms": sess["t_start_ms"], "t_end_ms": sess["t_end_ms"], "bbox": sess["bbox"], }) manifest = { "generated_at": datetime.now(timezone.utc).isoformat(), "n_sessions": len(all_sessions), "sessions": sessions_meta, "global_bbox": global_bbox, "t_min": all_sessions[0]["t_start"] if all_sessions else None, "t_max": all_sessions[-1]["t_end"] if all_sessions else None, "t_min_ms": t_min_ms, "t_max_ms": t_max_ms, "n_points_total_raw": sum(s["n_points_raw"] for s in all_sessions), "n_points_sampled": len(all_points), } mf_path = os.path.join(output_dir, "manifest.json") with open(mf_path, "w") as f: json.dump(manifest, f, indent=2) print(f" manifest.json → {mf_path}") return manifest def print_global_stats(manifest, all_sessions): print(f"\n=== Stats globales ===") print(f" Sessions: {manifest['n_sessions']}") print(f" Points bruts: {manifest['n_points_total_raw']}") print(f" Points sampled: {manifest['n_points_sampled']}") print(f" t_min: {manifest['t_min']}") print(f" t_max: {manifest['t_max']}") bb = manifest["global_bbox"] print(f" Bbox: lon [{bb[0]:.5f}, {bb[2]:.5f}] lat [{bb[1]:.5f}, {bb[3]:.5f}]") if manifest["t_min_ms"] and manifest["t_max_ms"]: dur_s = (manifest["t_max_ms"] - manifest["t_min_ms"]) / 1000 h, rem = divmod(int(dur_s), 3600) m, s = divmod(rem, 60) print(f" Durée totale: {h}h{m:02d}m{s:02d}s") for i, sess in enumerate(all_sessions): print(f" Session {i+1}: {sess['source_name']} {sess['n_points_raw']} pts {sess['t_start']} → {sess['t_end']}") def process_file(path): source_name = os.path.basename(path) print(f"\nChargement {source_name} ...") rows = load_csv(path) print(f" {len(rows)} timestamps uniques") points = build_points(rows, source_name) if not points: print(f" WARNING: aucun point GPS valide dans {source_name}") return None # Filter points without t_ms points = [p for p in points if p["t_ms"] is not None] lats = [p["lat"] for p in points] lons = [p["lon"] for p in points] return { "source_file": path, "source_name": source_name, "points": points, "n_points_raw": len(points), "t_start": points[0]["t"], "t_end": points[-1]["t"], "t_start_ms": points[0]["t_ms"], "t_end_ms": points[-1]["t_ms"], "bbox": [min(lons), min(lats), max(lons), max(lats)], } def main(): args = parse_args() if args.input: csv_files = [args.input] else: csv_files = find_csvs(args.input_dir) print(f"Fichiers trouvés: {len(csv_files)}") for f in csv_files: print(f" {os.path.basename(f)}") all_sessions = [] for path in csv_files: sess = process_file(path) if sess: all_sessions.append(sess) if not all_sessions: print("Aucune session valide.", file=sys.stderr) sys.exit(1) manifest = write_outputs(all_sessions, args.output) print_global_stats(manifest, all_sessions) print("\nDone.") if __name__ == "__main__": main()