Compare commits

...

18 Commits

Author SHA1 Message Date
70608de221 fix(pipeline): docker-compose env GDRIVE_REMOTE=cosma + rclone.conf mount + NAS sorties bind 2026-04-27 21:33:56 +00:00
Flagabat
0a0dac7fda feat: viewer — sortie selector + USV/AUV panels HTML + CSS
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 23:28:05 +02:00
Flagabat
62091a09b7 feat: pipeline-runner — FastAPI endpoints + SSE 2026-04-27 23:24:44 +02:00
Flagabat
3e1da53cc7 feat: pipeline-runner — runner rclone + orchestration 2026-04-27 23:23:13 +02:00
Flagabat
24f9394c75 feat: pipeline-runner — processor LTTB + signal extraction 2026-04-27 23:22:41 +02:00
Flagabat
c9dca1d071 feat: pipeline-runner — config 2026-04-27 23:21:44 +02:00
Flagabat
682050ef14 feat: pipeline-runner — scaffold docker + deps 2026-04-27 23:18:21 +02:00
Flagabat
4aec9d6295 fix: extract_mcap_signals — flatten signals output, unify time keys
- Merge new signals (pitch/roll/yaw/altitude/battery_v/obstacle_dist) at
  top level of output dict via **unpacking, not nested under 'signals' key
- Replace t_ms -> t in all new signal appends to match depth/state format
- Fix all_t computation to use unified 't' key across old and new signals

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 23:17:02 +02:00
Flagabat
af2bb6581f feat: extract_mcap_signals — pitch/roll/yaw, altitude, obstacle, battery
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 23:15:09 +02:00
Flagabat
bd3a2359d9 docs: plan implémentation pipeline GDrive → replay USV/AUV (12 tâches) 2026-04-27 22:29:41 +02:00
Flagabat
02e357b874 docs: spec pipeline GDrive → replay USV/AUV dans viewer 2026-04-27 22:23:18 +02:00
Flag
34e709b7c8 feat(viewer): bloc pipeline Mermaid + toggle overlay #btn-pipeline 2026-04-27 14:18:08 +00:00
Flag
07df61cbc4 feat(viewer): v6 - date-picker fonctionnel + Plotly charts depuis API 8766
- Date picker onChange charge toutes sessions du jour
- Mini-graphs Plotly: depth AUV, motors AUV, motors USV, USBL distance
- Slider 24h + cursor line Plotly synchronisé
- Map v5 intacte (Leaflet USV arrow + AUV USBL + panel)
- API: /api/missions -> /dives -> /sessions -> track/series/usbl_track
2026-04-27 14:14:57 +00:00
Poulpe
8a5ed6174c feat(viewer): v5 grid 2x2 + trail length + headless screenshot 2026-04-25 22:29:39 +00:00
Poulpe
103bf1cedd feat(viewer): v4 time-series graphs (depth, PWM USV/AUV, status) 2026-04-25 22:15:43 +00:00
Poulpe
3198164aff feat(viewer): v3 AUV track + USBL vector overlay 2026-04-25 21:40:05 +00:00
Poulpe
be2cd1d156 feat: Kogger USBL decoder + nav merge
- tools/parse_kogger_usbl.py: decode SBP protocol (ID=0x65 USBL_SOLUTION)
  from raw *_usbl.csv files, output combined_usbl.csv with Dist/Az/Elev/SNR
- tools/merge_nav_usbl.py: merge USBL data with navigation_log.csv,
  interpolate USV lat/lon/heading, compute AUV absolute position
  (azimuth relative to USV heading convention)
- vendor/Kogger-Protocol: SBP spec reference (submodule)
- 69-sttropez: 13986 USBL records decoded, avg USV-AUV dist 39m
2026-04-25 21:24:00 +00:00
Poulpe
b46f136b76 feat: v2 multi-session parser + timeline range viewer 2026-04-25 20:31:17 +00:00
22 changed files with 4165 additions and 150 deletions

3
.gitmodules vendored Normal file
View File

@@ -0,0 +1,3 @@
[submodule "vendor/Kogger-Protocol"]
path = vendor/Kogger-Protocol
url = https://github.com/koggertech/Kogger-Protocol.git

17
docker-compose.yml Normal file
View File

@@ -0,0 +1,17 @@
version: "3.9"
services:
pipeline-runner:
build:
context: .
dockerfile: pipeline_runner/Dockerfile
container_name: cosma-pipeline-runner
ports:
- "8767:8767"
volumes:
- /mnt/nas-cosma/cosma/sorties:/data/sorties
- /home/floppyrj45/.config/rclone/rclone.conf:/root/.config/rclone/rclone.conf:ro
environment:
- GDRIVE_REMOTE=cosma:06-Operations/06 - Sorties
- OUTPUT_DIR=/data/sorties
- TOOLS_DIR=/app/tools
restart: unless-stopped

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,184 @@
# Design : GDrive Pipeline Replay
**Date :** 2026-04-27
**Repo :** cosma-nav-tools
**Branche :** feat/flag-local
---
## Objectif
Bouton dans le viewer 8765 qui déclenche un sync GDrive → pipeline Python → affichage replay des signaux USV et AUV sur la même page, synchronisés avec le slider 24h existant.
---
## Architecture globale
```
GDrive (G:)
└─ 06-Operations/06-Sorties/#XX-lieu/YYYYMMDD-lieu/raw_data/
├─ SHIP/*.csv (USV nav + USBL + full)
└─ SUB/*.mcap (AUV ROS2) + bin/
↓ rclone sync (déclenché par bouton viewer)
.83 /data/sorties/#XX/raw/
├─ SHIP/
└─ SUB/
↓ pipeline Python (scripts existants cosma-nav-tools/tools/)
.83 /data/sorties/#XX/processed/
├─ usv.json.gz ← signaux USV downsamplés LTTB
├─ auv_AUV010.json.gz ← signaux AUV010 downsamplés LTTB
├─ auv_AUVxxx.json.gz ← un fichier par AUV détecté
└─ tracks.geojson ← USV + AUV tracks (Leaflet)
↓ servi par
port 8767 pipeline-runner FastAPI (cosma-nav-tools/pipeline_runner/)
port 8765 viewer/index.html ← page unique, fetch 8766 + 8767
```
---
## Section 1 : Pipeline-runner (port 8767)
### Structure
```
cosma-nav-tools/
pipeline_runner/
main.py ← FastAPI app + endpoints
runner.py ← logique rclone + orchestration scripts
config.py ← chemins GDrive, output dir, rclone remote
docker-compose.yml ← service pipeline-runner port 8767
data/sorties/ ← gitignored
```
### Endpoints
| Méthode | Route | Description |
|---------|-------|-------------|
| `GET` | `/sorties` | Liste sorties détectées sur GDrive (scan dossier) |
| `POST` | `/run/{sortie_id}` | Lance rclone pull + pipeline complet |
| `GET` | `/events/{sortie_id}` | SSE stream progress (étape + %) |
| `GET` | `/sorties/{id}/usv` | Données USV processed (JSON gzip) |
| `GET` | `/sorties/{id}/auvs` | Liste AUVs disponibles pour cette sortie |
| `GET` | `/sorties/{id}/auv/{auv_id}` | Données AUV processed (JSON gzip) |
| `GET` | `/sorties/{id}/tracks` | GeoJSON tracks USV + AUV |
### Flow interne POST /run/{sortie_id}
```
1. rclone sync GDrive/#id → /data/sorties/#id/raw/ SSE: "sync" 0→50%
2. parse_usv_nav.py + extract_usv_pwm.py SSE: "usv_parse" 50→65%
3. parse_kogger_usbl.py + merge_nav_usbl.py SSE: "usbl_merge" 65→80%
4. extract_mcap_signals.py (par AUV détecté) SSE: "auv_parse" 80→90%
5. usbl_to_json.py SSE: "usbl_json" 90→95%
6. downsample LTTB + écriture .json.gz SSE: "write" 95→100%
```
---
## Section 2 : Format données
### Signaux (usv.json.gz, auv_*.json.gz)
```json
{
"meta": {
"sortie": "#71-golrest",
"date": "2026-04-16",
"vehicle": "USV001",
"t_start": 1713268477,
"t_end": 1713282877
},
"signals": {
"yaw": [{ "t": 1713268477, "v": 142.3 }, ...],
"heading": [...],
"gps_status": [{ "t": ..., "v": "3D_FIX" }, ...],
"battery_v": [...],
"usbl_dist": [...],
"usbl_angle": [...],
"motor_1": [...],
"motor_2": [...],
"auv_status": [{ "t": ..., "v": "MISSION" }, ...]
}
}
```
- `t` = epoch UNIX secondes
- Max **4000 points par signal** via LTTB (Plotly optimal)
- Valeurs discrètes (status, mode) = strings → step-plot Plotly
- Endpoint optionnel `/range?from=&to=` pour zoom fin sur brutes
### Tracks
- **GeoJSON** FeatureCollection : une Feature LineString par véhicule
- Poids estimé : ~300KB gzip pour 4h à 1Hz
---
## Section 3 : Viewer 8765 — extensions
### Layout (page unique)
```
datebar ← inchangé
header ← + dropdown sortie + [Sync & Process] + progress bar SSE
map Leaflet ← inchangé
slider 24h ← shared X axis, curseur vertical sur tous les graphs
─────────────────────────────────────────────
USV PANEL
[Yaw] [Heading]
[GPS status] [Battery voltage]
[USBL dist] [USBL angle]
[Motor 1] [Motor 2]
[AUV status: disarm/mission/goto — step-plot]
─────────────────────────────────────────────
AUV PANEL ← tabs [AUV010] [AUV011] … si multi-AUV
[Pitch/Roll/Yaw — 3 traces] [Depth]
[Altitude] [Obstacle dist]
[USBL dist] [USBL angle]
[Battery voltage] [Arm/Disarm/Mode — step-plot]
[Motors ×6 PWM — 1 graph multi-trace M1…M6]
```
### Synchronisation slider
- Slider 24h existant → événement → `updateCursor(t)` sur tous les graphs Plotly via `Plotly.relayout` shapes
- Données chargées une fois en mémoire au click de sortie, pas de re-fetch au scrub
### Bouton Sync & Process
- `POST /run/{sortie_id}` → ouvre SSE `/events/{sortie_id}`
- Progress bar inline dans le header (ex: `rclone 34% → usv_parse…`)
- À 100% → fetch automatique USV + AUV + tracks → render graphs
---
## Section 4 : Déploiement .83
```yaml
# docker-compose.yml (ajout)
pipeline-runner:
build: ./pipeline_runner
ports:
- "8767:8767"
volumes:
- /data/sorties:/data/sorties
- /mnt/gdrive:/mnt/gdrive # rclone mount ou rclone exec
environment:
- GDRIVE_REMOTE=gdrive:Cosma - Internal/06-Operations/06 - Sorties
- OUTPUT_DIR=/data/sorties
```
---
## Hors scope (v1)
- Authentification / accès multi-user
- HDF5 archivage (peut venir en v2)
- Replay animé temps-réel (curseur qui avance automatiquement)
- Tests unitaires pipeline

View File

@@ -0,0 +1,9 @@
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y rclone && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY tools/ ./tools/
COPY vendor/ ./vendor/
COPY pipeline_runner/ ./pipeline_runner/
CMD ["uvicorn", "pipeline_runner.main:app", "--host", "0.0.0.0", "--port", "8767"]

View File

View File

@@ -0,0 +1,7 @@
import os
from pathlib import Path
GDRIVE_REMOTE = os.getenv("GDRIVE_REMOTE", "gdrive:Cosma - Internal/06-Operations/06 - Sorties")
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/data/sorties"))
TOOLS_DIR = Path(os.getenv("TOOLS_DIR", Path(__file__).parent.parent / "tools"))
LTTB_MAX_PTS = 4000

98
pipeline_runner/main.py Normal file
View File

@@ -0,0 +1,98 @@
import asyncio
import gzip
import json
from pathlib import Path
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from .config import OUTPUT_DIR
from .runner import run_pipeline, scan_sorties
app = FastAPI(title="COSMA Pipeline Runner")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
# Active pipeline jobs: sortie_id → asyncio.Queue
_jobs: dict[str, asyncio.Queue] = {}
@app.get("/sorties")
async def list_sorties():
return await scan_sorties()
@app.post("/run/{sortie_id:path}")
async def run_sortie(sortie_id: str):
if sortie_id in _jobs:
return {"status": "already_running"}
queue: asyncio.Queue = asyncio.Queue()
_jobs[sortie_id] = queue
asyncio.create_task(_run_and_cleanup(sortie_id, queue))
return {"status": "started"}
async def _run_and_cleanup(sortie_id: str, queue: asyncio.Queue):
try:
await run_pipeline(sortie_id, queue)
finally:
await asyncio.sleep(30)
_jobs.pop(sortie_id, None)
@app.get("/events/{sortie_id:path}")
async def sse_events(sortie_id: str):
if sortie_id not in _jobs:
raise HTTPException(404, "No active job for this sortie")
async def generate() -> AsyncGenerator[str, None]:
queue = _jobs[sortie_id]
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=60)
yield f"data: {json.dumps(event)}\n\n"
if event.get("step") in ("error", "write") and event.get("pct") in (0, 100):
break
except asyncio.TimeoutError:
yield "data: {\"step\":\"ping\"}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
def _read_gz(path: Path) -> dict:
with gzip.open(path) as f:
return json.loads(f.read())
@app.get("/sorties/{sortie_id:path}/usv")
async def get_usv(sortie_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / "usv.json.gz"
if not p.exists():
raise HTTPException(404, "USV data not found — run pipeline first")
return JSONResponse(_read_gz(p))
@app.get("/sorties/{sortie_id:path}/auvs")
async def list_auvs(sortie_id: str):
proc = OUTPUT_DIR / sortie_id / "processed"
auvs = [p.name.removesuffix(".json.gz").removeprefix("auv_")
for p in proc.glob("auv_*.json.gz")]
return sorted(auvs)
@app.get("/sorties/{sortie_id:path}/auv/{auv_id}")
async def get_auv(sortie_id: str, auv_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / f"auv_{auv_id}.json.gz"
if not p.exists():
raise HTTPException(404, f"AUV {auv_id} data not found")
return JSONResponse(_read_gz(p))
@app.get("/sorties/{sortie_id:path}/tracks")
async def get_tracks(sortie_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / "tracks.geojson"
if not p.exists():
raise HTTPException(404, "tracks.geojson not found")
with open(p) as f:
return JSONResponse(json.load(f))

View File

@@ -0,0 +1,174 @@
import csv
import gzip
import json
import math
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import numpy as np
from .config import LTTB_MAX_PTS
def _lttb(points: list[dict], max_pts: int) -> list[dict]:
"""Largest Triangle Three Buckets downsampling."""
n = len(points)
if n <= max_pts:
return points
ts = np.array([p["t"] for p in points], dtype=float)
vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float)
bucket_size = (n - 2) / (max_pts - 2)
out = [points[0]]
a = 0
for i in range(max_pts - 2):
avg_start = int((i + 1) * bucket_size) + 1
avg_end = min(int((i + 2) * bucket_size) + 1, n)
avg_t = np.mean(ts[avg_start:avg_end])
avg_v = np.mean(vs[avg_start:avg_end])
rng_start = int(i * bucket_size) + 1
rng_end = min(int((i + 1) * bucket_size) + 1, n)
max_area = -1.0
best = rng_start
at, av = ts[a], vs[a]
for j in range(rng_start, rng_end):
area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5
if area > max_area:
max_area = area
best = j
out.append(points[best])
a = best
out.append(points[-1])
return out
def _ts_to_epoch(ts_str: str) -> float:
"""Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds."""
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc)
return dt.timestamp()
except ValueError:
continue
return 0.0
def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]:
"""Read long-format navigation_log.csv → dict of signal arrays."""
signals: dict[str, list] = {}
wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix",
"Armed", "Mode", "M1", "M2"}
with open(nav_log_path, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
next(reader, None) # skip header
for row in reader:
if len(row) < 3:
continue
ts_str, field, val = row[0], row[1], row[2]
if field not in wanted:
continue
t = _ts_to_epoch(ts_str)
try:
v: Any = float(val)
except ValueError:
v = val.strip()
signals.setdefault(field, []).append({"t": t, "v": v})
return signals
def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]:
"""Read combined_usbl.csv → Dist and Azimuth signal arrays."""
dist_pts, az_pts = [], []
with open(usbl_csv_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
try:
t = _ts_to_epoch(row["Timestamp"])
d = float(row["Dist"]) if row["Dist"] else None
az = float(row["Azimuth"]) if row["Azimuth"] else None
except (KeyError, ValueError):
continue
if d is not None and not math.isnan(d):
dist_pts.append({"t": t, "v": d})
if az is not None and not math.isnan(az):
az_pts.append({"t": t, "v": az})
return {"usbl_dist": dist_pts, "usbl_angle": az_pts}
def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]:
"""Read mcap_signals.json → depth, motors M1-M6, state signals."""
with open(mcap_json_path) as f:
data = json.load(f)
signals: dict[str, list[dict]] = {}
def _unpack(key: str, series: list):
pts = []
for item in series:
t = item.get("t_ms", item.get("t", 0))
if isinstance(t, (int, float)) and t > 1e9:
t = t / 1000.0 # ms → s
pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))})
signals[key] = pts
if "depth" in data:
_unpack("depth", data["depth"])
if "pwm_auv" in data:
for m_key, series in data["pwm_auv"].items():
_unpack(m_key.lower(), series)
if "state" in data:
_unpack("arm_status", data["state"])
# New signals added by extended extract_mcap_signals.py
for key in ("pitch", "roll", "yaw", "altitude", "battery_v", "obstacle_dist"):
if key in data:
_unpack(key, data[key])
return signals
def write_usv_json(
nav_log_path: Path,
usbl_csv_path: Path | None,
output_path: Path,
sortie_id: str,
date: str,
) -> None:
signals = _read_nav_log(nav_log_path)
if usbl_csv_path and usbl_csv_path.exists():
signals.update(_read_usbl_csv(usbl_csv_path))
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
meta = {
"sortie": sortie_id,
"date": date,
"vehicle": "USV",
"t_start": min(t_all) if t_all else 0,
"t_end": max(t_all) if t_all else 0,
}
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
output_path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(output_path, "wb") as f:
f.write(payload)
def write_auv_json(
mcap_json_path: Path,
output_path: Path,
auv_id: str,
sortie_id: str,
date: str,
) -> None:
signals = _read_mcap_signals(mcap_json_path)
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
meta = {
"sortie": sortie_id,
"date": date,
"vehicle": auv_id,
"t_start": min(t_all) if t_all else 0,
"t_end": max(t_all) if t_all else 0,
}
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
output_path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(output_path, "wb") as f:
f.write(payload)

137
pipeline_runner/runner.py Normal file
View File

@@ -0,0 +1,137 @@
import asyncio
import re
import shutil
import subprocess
from pathlib import Path
from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR
from .processor import write_auv_json, write_usv_json
async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""):
await queue.put({"step": step, "pct": pct, "msg": msg})
def _find_nav_log(ship_dir: Path) -> Path | None:
for p in ship_dir.glob("*_navigation_log.csv"):
return p
return None
def _find_usbl_csv(ship_dir: Path) -> Path | None:
for p in ship_dir.glob("*_usbl.csv"):
return p
return None
def _detect_auvs(sub_dir: Path) -> list[str]:
"""Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010''AUV010')."""
auvs = []
for d in sub_dir.iterdir():
if d.is_dir():
m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE)
if m:
auvs.append(m.group(1).upper())
return sorted(set(auvs))
def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None:
for d in sub_dir.iterdir():
if d.is_dir() and auv_id.upper() in d.name.upper():
return d
return None
def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess:
cmd = ["python3", str(TOOLS_DIR / script_name)] + args
return subprocess.run(cmd, capture_output=True, text=True)
async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None:
"""Full pipeline: rclone sync → parse → process → write JSON.gz"""
raw_dir = OUTPUT_DIR / sortie_id / "raw"
proc_dir = OUTPUT_DIR / sortie_id / "processed"
proc_dir.mkdir(parents=True, exist_ok=True)
# Step 1: rclone sync
await _emit(queue, "sync", 0, "rclone sync en cours…")
gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}'
result = subprocess.run(
["rclone", "sync", gdrive_path, str(raw_dir), "--progress"],
capture_output=True, text=True
)
if result.returncode != 0:
await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}")
return
await _emit(queue, "sync", 50, "rclone terminé")
# Detect SHIP/SUB dirs inside raw/
ship_dir = next(raw_dir.rglob("logs/SHIP"), None)
sub_dir = next(raw_dir.rglob("logs/SUB"), None)
if not ship_dir or not ship_dir.exists():
await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync")
return
# Step 2: USV parsing
await _emit(queue, "usv_parse", 55, "Parsing logs USV…")
nav_log = _find_nav_log(ship_dir)
usbl_csv_raw = _find_usbl_csv(ship_dir)
usbl_parsed_path = proc_dir / "combined_usbl.csv"
if usbl_csv_raw:
_run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)])
date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10]
if nav_log:
write_usv_json(
nav_log_path=nav_log,
usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None,
output_path=proc_dir / "usv.json.gz",
sortie_id=sortie_id,
date=date_str,
)
await _emit(queue, "usv_parse", 70, "USV OK")
# Step 3: AUV parsing
if sub_dir and sub_dir.exists():
auv_ids = _detect_auvs(sub_dir)
total = len(auv_ids) or 1
for i, auv_id in enumerate(auv_ids):
await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}")
session_dir = _detect_session_dir(sub_dir, auv_id)
if not session_dir:
continue
mcap_out = proc_dir / f"mcap_{auv_id}.json"
_run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"])
# extract_mcap_signals writes to tools/../output/mcap_signals.json (hardcoded)
default_out = TOOLS_DIR.parent / "output" / "mcap_signals.json"
if not default_out.exists():
default_out = session_dir / "mcap_signals.json"
if default_out.exists():
shutil.copy(default_out, mcap_out)
if mcap_out.exists():
write_auv_json(
mcap_json_path=mcap_out,
output_path=proc_dir / f"auv_{auv_id}.json.gz",
auv_id=auv_id,
sortie_id=sortie_id,
date=date_str,
)
await _emit(queue, "write", 100, "Pipeline terminé")
async def scan_sorties() -> list[dict]:
"""List available sorties on GDrive via rclone lsd."""
result = subprocess.run(
["rclone", "lsd", GDRIVE_REMOTE],
capture_output=True, text=True
)
sorties = []
for line in result.stdout.splitlines():
parts = line.strip().split(None, 4)
if len(parts) == 5:
name = parts[4].strip()
processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists()
sorties.append({"id": name, "processed": processed})
return sorties

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
fastapi==0.115.0
uvicorn[standard]==0.30.6
aiofiles==24.1.0
numpy==2.1.1

Binary file not shown.

After

Width:  |  Height:  |  Size: 116 KiB

BIN
screenshots/viewer-v5.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

71
tools/check_sync.py Normal file
View File

@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""Check temporal alignment between MCAP AUV, USV PWM, and USBL data."""
import json, os, sys
from datetime import datetime, timezone
def fmt(ms):
if ms == 0: return 'N/A'
return datetime.fromtimestamp(ms/1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
def load(path):
with open(path) as f:
return json.load(f)
base = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
sources = {}
# MCAP signals
mcap_path = os.path.join(base, 'mcap_signals.json')
if os.path.exists(mcap_path):
d = load(mcap_path)
n = len(d.get('depth',[])) + len(d.get('pwm_auv',{}).get('samples',[])) + len(d.get('state',[]))
sources['MCAP AUV'] = {'t_min': d['t_min_utc_ms'], 't_max': d['t_max_utc_ms'], 'n': n}
else:
print(f"MISSING: {mcap_path}")
# USV PWM
usv_path = os.path.join(base, 'usv_pwm.json')
if os.path.exists(usv_path):
d = load(usv_path)
n = sum(len(v) for v in d.get('M',{}).values()) + sum(len(v) for v in d.get('RC',{}).values())
sources['USV PWM'] = {'t_min': d['t_min_utc_ms'], 't_max': d['t_max_utc_ms'], 'n': n}
else:
print(f"MISSING: {usv_path}")
# USBL
usbl_path = os.path.join(base, 'usbl.json')
if os.path.exists(usbl_path):
d = load(usbl_path)
pts = d.get('points', [])
if pts:
t_vals = [p['t_ms'] for p in pts]
sources['USBL'] = {'t_min': min(t_vals), 't_max': max(t_vals), 'n': len(pts)}
else:
sources['USBL'] = {'t_min': 0, 't_max': 0, 'n': 0}
else:
print(f"MISSING: {usbl_path}")
print(f"\n{'Source':<12} | {'t_min UTC':<20} | {'t_max UTC':<20} | {'n_pts':>6}")
print('-' * 68)
for name, s in sources.items():
print(f"{name:<12} | {fmt(s['t_min']):<20} | {fmt(s['t_max']):<20} | {s['n']:>6}")
# Overlap MCAP vs USV
if 'MCAP AUV' in sources and 'USV PWM' in sources:
mcap = sources['MCAP AUV']
usv = sources['USV PWM']
overlap_ms = min(mcap['t_max'], usv['t_max']) - max(mcap['t_min'], usv['t_min'])
print(f"\nMCAP t_min: {fmt(mcap['t_min'])} UTC")
print(f"USV t_min: {fmt(usv['t_min'])} UTC")
diff_min = (mcap['t_min'] - usv['t_min']) / 60000
print(f"t_min diff: {diff_min:+.1f} min (MCAP vs USV)")
if overlap_ms > 60000:
print(f"OK - overlap: {overlap_ms//1000} s")
elif overlap_ms < 0:
print(f"WARNING: no overlap! gap = {-overlap_ms//1000} s")
else:
print(f"SUSPECT: overlap <60s: {overlap_ms//1000} s")
if __name__ == '__main__':
pass

View File

@@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""Extract AUV signals from MCAP files: depth, PWM, state."""
import argparse, glob, json, math, os, sys
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--session-dir', required=True)
parser.add_argument('--max-pts', type=int, default=5000)
args = parser.parse_args()
session_name = os.path.basename(args.session_dir.rstrip('/'))
pattern = os.path.join(args.session_dir, '*.mcap')
mcap_files = sorted(glob.glob(pattern))
if not mcap_files:
print(f"No MCAP files in {args.session_dir}", file=sys.stderr)
sys.exit(1)
print(f"Found {len(mcap_files)} MCAP files")
try:
from mcap.reader import make_reader
from mcap_ros2.decoder import DecoderFactory
except ImportError as e:
print(f"Import error: {e}", file=sys.stderr)
sys.exit(1)
depth_raw = []
pwm_raw = []
state_raw = []
signals = {}
TOPICS = [
'/mavros/imu/static_pressure',
'/mavros/rc/out',
'/mavros/state',
'/mavros/imu/data',
'/mavros/altitude',
'/mavros/battery',
'/mavros/distance_sensor/hrlv_ez4_pub',
]
for mcap_file in mcap_files:
try:
with open(mcap_file, 'rb') as f:
reader = make_reader(f, decoder_factories=[DecoderFactory()])
for schema, channel, message, ros_msg in reader.iter_decoded_messages(topics=TOPICS):
t_ms = message.publish_time // 1_000_000
topic = channel.topic
if topic == '/mavros/imu/static_pressure':
try:
p = float(ros_msg.fluid_pressure)
depth_m = (p - 101325.0) / (1025.0 * 9.80665)
depth_raw.append({'t': t_ms, 'v': round(depth_m, 4)})
except Exception:
pass
elif topic == '/mavros/rc/out':
try:
ch = list(ros_msg.channels)
pwm_raw.append({'t': t_ms, 'v': ch})
except Exception:
pass
elif topic == '/mavros/state':
try:
state_raw.append({'t': t_ms, 'mode': str(ros_msg.mode), 'armed': bool(ros_msg.armed)})
except Exception:
pass
elif topic == '/mavros/imu/data':
try:
q = ros_msg.orientation
sinr = 2*(q.w*q.x + q.y*q.z)
cosr = 1 - 2*(q.x*q.x + q.y*q.y)
roll = math.degrees(math.atan2(sinr, cosr))
sinp = 2*(q.w*q.y - q.z*q.x)
pitch = math.degrees(math.asin(max(-1, min(1, sinp))))
siny = 2*(q.w*q.z + q.x*q.y)
cosy = 1 - 2*(q.y*q.y + q.z*q.z)
yaw = math.degrees(math.atan2(siny, cosy))
signals.setdefault('pitch', []).append({'t': t_ms, 'v': pitch})
signals.setdefault('roll', []).append({'t': t_ms, 'v': roll})
signals.setdefault('yaw', []).append({'t': t_ms, 'v': yaw})
except Exception:
pass
elif topic == '/mavros/altitude':
try:
signals.setdefault('altitude', []).append(
{'t': t_ms, 'v': ros_msg.relative})
except Exception:
pass
elif topic == '/mavros/battery':
try:
signals.setdefault('battery_v', []).append(
{'t': t_ms, 'v': ros_msg.voltage})
except Exception:
pass
elif topic == '/mavros/distance_sensor/hrlv_ez4_pub':
try:
signals.setdefault('obstacle_dist', []).append(
{'t': t_ms, 'v': ros_msg.range})
except Exception:
pass
except Exception as e:
print(f" Skip {os.path.basename(mcap_file)}: {e}")
def sample(lst, max_pts):
if len(lst) <= max_pts:
return lst
stride = len(lst) // max_pts
sampled = lst[::stride]
if sampled[-1] is not lst[-1]:
sampled.append(lst[-1])
return sampled
depth = sample(depth_raw, args.max_pts)
pwm_samples = sample(pwm_raw, args.max_pts)
state = state_raw # events, keep all
signals_flat = [pt for pts in signals.values() for pt in pts]
all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw + signals_flat]
t_min = min(all_t) if all_t else 0
t_max = max(all_t) if all_t else 0
n_ch = max((len(s['v']) for s in pwm_raw), default=0)
channels = list(range(n_ch))
from datetime import datetime, timezone
fmt = lambda ms: datetime.fromtimestamp(ms/1000, tz=timezone.utc).isoformat()
print(f"depth: {len(depth)} pts (raw {len(depth_raw)})")
if depth:
dvals = [p['v'] for p in depth]
print(f" depth range: {min(dvals):.3f} .. {max(dvals):.3f} m")
print(f"pwm_auv: {len(pwm_samples)} samples (raw {len(pwm_raw)}), {n_ch} channels")
print(f"state: {len(state)} events")
print(f"t_min: {fmt(t_min)}")
print(f"t_max: {fmt(t_max)}")
out = {
'session': session_name,
't_min_utc_ms': t_min,
't_max_utc_ms': t_max,
'depth': depth,
'pwm_auv': {'channels': channels, 'samples': pwm_samples},
'state': state,
**{k: sample(v, args.max_pts) for k, v in signals.items()},
}
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
os.makedirs(outdir, exist_ok=True)
outpath = os.path.join(outdir, 'mcap_signals.json')
with open(outpath, 'w') as f:
json.dump(out, f)
print(f"Written: {outpath}")
if __name__ == '__main__':
main()

89
tools/extract_usv_pwm.py Normal file
View File

@@ -0,0 +1,89 @@
#!/usr/bin/env python3
"""Extract USV PWM signals from navigation log CSVs."""
import argparse, csv, glob, json, os, re, sys
from datetime import datetime, timezone
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--nav-dir', required=True)
args = parser.parse_args()
pattern = os.path.join(args.nav_dir, '*_navigation_log.csv')
csv_files = sorted(glob.glob(pattern))
if not csv_files:
print(f"No navigation_log.csv in {args.nav_dir}", file=sys.stderr)
sys.exit(1)
print(f"Found {len(csv_files)} nav CSV files")
M_data = {}
RC_data = {}
for csv_file in csv_files:
print(f" Parsing {os.path.basename(csv_file)}")
try:
with open(csv_file, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
ts_str = row.get('timestamp', '').strip()
data = row.get('data', '').strip()
val_str = row.get('value', '').strip()
if not ts_str or not data or not val_str:
continue
is_M = re.match(r'^M\d+$', data)
is_RC = re.match(r'^RC\d+$', data)
if not is_M and not is_RC:
continue
try:
val = float(val_str)
except ValueError:
continue
try:
dt = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S.%f')
except ValueError:
try:
dt = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S')
except ValueError:
continue
# CET -> UTC: subtract 3600s
t_ms = int(dt.timestamp() * 1000) - 3600 * 1000
pt = {'t': t_ms, 'v': val}
if is_M:
M_data.setdefault(data, []).append(pt)
else:
RC_data.setdefault(data, []).append(pt)
except Exception as e:
print(f" Error {csv_file}: {e}")
all_t = []
for pts in list(M_data.values()) + list(RC_data.values()):
all_t.extend(p['t'] for p in pts)
t_min = min(all_t) if all_t else 0
t_max = max(all_t) if all_t else 0
for k in sorted(M_data):
print(f" {k}: {len(M_data[k])} pts")
for k in sorted(RC_data):
print(f" {k}: {len(RC_data[k])} pts")
fmt = lambda ms: datetime.fromtimestamp(ms/1000, tz=timezone.utc).isoformat()
print(f"t_min UTC: {fmt(t_min)}")
print(f"t_max UTC: {fmt(t_max)}")
out = {
'tz_assumed': 'CET (UTC+1)',
'tz_converted_to': 'UTC',
't_min_utc_ms': t_min,
't_max_utc_ms': t_max,
'M': M_data,
'RC': RC_data,
}
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
os.makedirs(outdir, exist_ok=True)
outpath = os.path.join(outdir, 'usv_pwm.json')
with open(outpath, 'w') as f:
json.dump(out, f)
print(f"Written: {outpath}")
if __name__ == '__main__':
main()

319
tools/merge_nav_usbl.py Normal file
View File

@@ -0,0 +1,319 @@
#!/usr/bin/env python3
"""
merge_nav_usbl.py — Merge USBL decoded data with USV navigation log
Inputs:
--usbl : combined_usbl.csv (output of parse_kogger_usbl.py)
--nav-dir: directory containing *_navigation_log.csv files
--output : output CSV (default: combined_nav_usbl.csv)
Nav log format: timestamp,data,value (long format)
data=Lat → latitude_deg
data=Lon → longitude_deg
data=Heading → heading_deg
Interpolation: for each USBL timestamp, find nearest nav point within 1s window.
If multiple sessions, use session matching by timestamp overlap.
AUV position calculation:
azimuth_deg from USBL is RELATIVE to USV heading (yaw) based on USBL hardware mounting.
AUV bearing from North = (USV heading + azimuth_deg) mod 360
Horizontal dist = dist_m * cos(elevation_deg * pi/180)
Note: if azimuth is already absolute (referenced to North), do NOT add heading.
Check: usbl_yaw in payload should match nav Heading if relative azimuth.
We use RELATIVE convention (add USV heading) — documented here.
Geodetic forward (haversine):
Using flat-earth approximation valid for distances < 500m:
dlat = horiz_dist * cos(bearing) / R_earth
dlon = horiz_dist * sin(bearing) / (R_earth * cos(lat))
R_earth = 6371000 m
"""
import csv
import io
import sys
import math
import os
R_EARTH = 6371000.0 # meters
def parse_nav_log(nav_file):
"""
Parse navigation_log.csv into:
- nav_points: sorted list of (timestamp_str, lat, lon) from Lat+Lon entries
- heading_series: sorted list of (timestamp_str, heading_deg) from Heading entries
Lat/Lon and Heading have different timestamps so must be interpolated separately.
Returns (nav_points, heading_series).
"""
lat_by_ts = {}
lon_by_ts = {}
heading_series_raw = []
with open(nav_file) as f:
reader = csv.DictReader(f)
for row in reader:
data = row.get('data', '')
ts = row.get('timestamp', '')
try:
val_raw = row.get('value', '') or ''
if not val_raw:
continue
val = float(val_raw)
except (ValueError, TypeError):
continue
if data == 'Lat':
lat_by_ts[ts] = val
elif data == 'Lon':
lon_by_ts[ts] = val
elif data == 'Heading':
heading_series_raw.append((ts, val))
# Build nav_points: timestamps where both Lat and Lon appear (same ts)
nav_points = []
for ts in sorted(set(lat_by_ts.keys()) & set(lon_by_ts.keys())):
nav_points.append((ts, lat_by_ts[ts], lon_by_ts[ts]))
# heading_series sorted by timestamp
heading_series = sorted(heading_series_raw, key=lambda x: x[0])
return nav_points, heading_series
def ts_to_seconds(ts_str):
"""Convert '2026-03-24 09:29:05.230392' to float seconds since epoch (approx)."""
# Simple: parse date+time, compute offset
try:
date_part, time_part = ts_str.strip().split(' ', 1)
y, mo, d = date_part.split('-')
parts = time_part.split(':')
h, m = int(parts[0]), int(parts[1])
s_str = parts[2]
s = float(s_str)
# Days since fixed epoch (don't need absolute, just relative diffs)
total = (int(y)*365 + int(mo)*30 + int(d)) * 86400 + h*3600 + m*60 + s
return total
except Exception:
return 0.0
def find_nearest(ts_sec, series_sec, series, max_gap=1.0):
"""Find nearest entry in sorted series within max_gap seconds."""
best_idx = -1
best_dt = float('inf')
for i, (s_sec, entry) in enumerate(zip(series_sec, series)):
dt = abs(s_sec - ts_sec)
if dt < best_dt:
best_dt = dt
best_idx = i
elif s_sec > ts_sec + max_gap:
break
if best_idx >= 0 and best_dt <= max_gap:
return series[best_idx], best_dt
return None, None
def geodetic_forward(lat_deg, lon_deg, bearing_deg, dist_m):
"""
Compute destination point given start lat/lon, bearing (deg from North), distance (m).
Flat-earth approximation valid for dist < 500m.
"""
bearing_rad = math.radians(bearing_deg)
lat_rad = math.radians(lat_deg)
dlat = dist_m * math.cos(bearing_rad) / R_EARTH
dlon = dist_m * math.sin(bearing_rad) / (R_EARTH * math.cos(lat_rad))
auv_lat = lat_deg + math.degrees(dlat)
auv_lon = lon_deg + math.degrees(dlon)
return auv_lat, auv_lon
def haversine_dist(lat1, lon1, lat2, lon2):
"""Distance in meters between two lat/lon points."""
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
return 2 * R_EARTH * math.asin(math.sqrt(a))
def find_nav_file_for_session(usbl_file, nav_dir):
"""Match nav file by common timestamp prefix."""
base = os.path.basename(usbl_file)
# e.g. 2026-03-24_09-28-44_USV003_usbl.csv -> 2026-03-24_09-28-44_USV003
prefix = base.replace('_usbl.csv', '')
nav_candidate = os.path.join(nav_dir, prefix + '_navigation_log.csv')
if os.path.exists(nav_candidate):
return nav_candidate
return None
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--usbl', required=True, help='combined_usbl.csv')
parser.add_argument('--nav-dir', required=True, help='Directory with *_navigation_log.csv')
parser.add_argument('--output', default='combined_nav_usbl.csv')
parser.add_argument('--max-gap', type=float, default=1.0, help='Max timestamp gap in seconds')
args = parser.parse_args()
# Load USBL records
usbl_records = []
with open(args.usbl) as f:
reader = csv.DictReader(f)
for row in reader:
usbl_records.append(row)
print("USBL records loaded: %d" % len(usbl_records))
# Group by source_file
from collections import defaultdict
by_source = defaultdict(list)
for rec in usbl_records:
by_source[rec.get('source_file', '')].append(rec)
# Load nav files
nav_data = {} # source_file -> (nav_points, nav_points_sec, heading_series, heading_sec)
nav_dir = args.nav_dir
for source_file in by_source.keys():
# Try to match nav file
prefix = source_file.replace('_usbl.csv', '')
nav_file = os.path.join(nav_dir, prefix + '_navigation_log.csv')
if not os.path.exists(nav_file):
# Try to find by scanning directory
matched = None
for fn in os.listdir(nav_dir):
if prefix in fn and 'navigation_log' in fn:
matched = os.path.join(nav_dir, fn)
break
if matched is None:
print("WARNING: no nav file found for %s" % source_file)
nav_data[source_file] = ([], [], [], [])
continue
nav_file = matched
nav_points, heading_series = parse_nav_log(nav_file)
nav_points_sec = [ts_to_seconds(pt[0]) for pt in nav_points]
heading_sec = [ts_to_seconds(h[0]) for h in heading_series]
nav_data[source_file] = (nav_points, nav_points_sec, heading_series, heading_sec)
print("Nav loaded for %s: %d pos points, %d heading points" % (
source_file, len(nav_points), len(heading_series)))
# Process and write output
output_rows = []
stats_match = 0
stats_nomatch = 0
for source_file, records in by_source.items():
nav_points, nav_points_sec, heading_series, heading_sec = nav_data.get(
source_file, ([], [], [], []))
for rec in records:
ts_str = rec.get('Timestamp', '')
ts_sec = ts_to_seconds(ts_str)
dist_str = rec.get('Dist', '')
azimuth_str = rec.get('Azimuth', '')
elev_str = rec.get('Elev', '')
snr_str = rec.get('SNR', '')
try:
dist = float(dist_str) if dist_str else float('nan')
azimuth = float(azimuth_str) if azimuth_str else float('nan')
elev = float(elev_str) if elev_str else float('nan')
snr = float(snr_str) if snr_str else float('nan')
except ValueError:
dist, azimuth, elev, snr = float('nan'), float('nan'), float('nan'), float('nan')
nav_pt, dt = find_nearest(ts_sec, nav_points_sec, nav_points, args.max_gap)
hdg_pt, _ = find_nearest(ts_sec, heading_sec, heading_series, args.max_gap)
if nav_pt is None:
stats_nomatch += 1
lat_usv, lon_usv, heading_usv = float('nan'), float('nan'), float('nan')
auv_lat, auv_lon = float('nan'), float('nan')
else:
stats_match += 1
_, lat_usv, lon_usv = nav_pt
heading_usv = hdg_pt[1] if hdg_pt is not None else float('nan')
# Calculate AUV absolute position
# Azimuth from USBL is relative to USV heading (yaw convention)
# AUV bearing from North = (USV Heading + azimuth_deg) mod 360
if not (math.isnan(dist) or math.isnan(azimuth) or
math.isnan(lat_usv) or math.isnan(heading_usv)):
horiz_dist = dist * math.cos(math.radians(elev)) if not math.isnan(elev) else dist
abs_bearing = (heading_usv + azimuth) % 360
auv_lat, auv_lon = geodetic_forward(lat_usv, lon_usv, abs_bearing, horiz_dist)
else:
auv_lat, auv_lon = float('nan'), float('nan')
output_rows.append({
'Timestamp': ts_str,
'lat': '%.7f' % lat_usv if not math.isnan(lat_usv) else '',
'lon': '%.7f' % lon_usv if not math.isnan(lon_usv) else '',
'Heading': '%.2f' % heading_usv if not math.isnan(heading_usv) else '',
'Dist': '%.4f' % dist if not math.isnan(dist) else '',
'Azimuth': '%.4f' % azimuth if not math.isnan(azimuth) else '',
'Elev': '%.4f' % elev if not math.isnan(elev) else '',
'SNR': '%.4f' % snr if not math.isnan(snr) else '',
'auv_lat': '%.7f' % auv_lat if not math.isnan(auv_lat) else '',
'auv_lon': '%.7f' % auv_lon if not math.isnan(auv_lon) else '',
'nav_dt_s': '%.3f' % dt if dt is not None else '',
})
print("\n=== Merge stats ===")
print("Matched: %d No nav match: %d" % (stats_match, stats_nomatch))
with open(args.output, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Timestamp', 'lat', 'lon', 'Heading', 'Dist', 'Azimuth', 'Elev', 'SNR',
'auv_lat', 'auv_lon', 'nav_dt_s'])
for row in output_rows:
writer.writerow([
row['Timestamp'], row['lat'], row['lon'], row['Heading'],
row['Dist'], row['Azimuth'], row['Elev'], row['SNR'],
row['auv_lat'], row['auv_lon'], row['nav_dt_s']
])
print("Output: %s (%d rows)" % (args.output, len(output_rows)))
# Sample output
if output_rows:
print("\n=== Sample (first 5 rows) ===")
print("Timestamp,lat,lon,Heading,Dist,Azimuth,Elev,SNR,auv_lat,auv_lon")
for row in output_rows[:5]:
print("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s" % (
row['Timestamp'], row['lat'], row['lon'], row['Heading'],
row['Dist'], row['Azimuth'], row['Elev'], row['SNR'],
row['auv_lat'], row['auv_lon']))
# AUV position stats
auv_lats = [float(r['auv_lat']) for r in output_rows if r['auv_lat']]
auv_lons = [float(r['auv_lon']) for r in output_rows if r['auv_lon']]
usv_lats = [float(r['lat']) for r in output_rows if r['lat']]
usv_lons = [float(r['lon']) for r in output_rows if r['lon']]
if auv_lats and usv_lats:
print("\n=== Position stats ===")
print("AUV lat range: %.6f - %.6f" % (min(auv_lats), max(auv_lats)))
print("AUV lon range: %.6f - %.6f" % (min(auv_lons), max(auv_lons)))
print("USV lat range: %.6f - %.6f" % (min(usv_lats), max(usv_lats)))
# Average USV-AUV distance
dists = []
for r in output_rows:
if r['lat'] and r['auv_lat']:
d = haversine_dist(float(r['lat']), float(r['lon']),
float(r['auv_lat']), float(r['auv_lon']))
dists.append(d)
if dists:
print("Avg USV-AUV dist: %.2f m (min=%.2f max=%.2f)" % (
sum(dists)/len(dists), min(dists), max(dists)))
if __name__ == '__main__':
main()

242
tools/parse_kogger_usbl.py Normal file
View File

@@ -0,0 +1,242 @@
#!/usr/bin/env python3
"""
parse_kogger_usbl.py — Decode Kogger USBL raw CSV files (SBP protocol)
Protocol spec:
Frame: BB 55 | ROUTE | MODE | ID | LENGTH | PAYLOAD[LENGTH] | CHKSUM1 | CHKSUM2
Checksum: Fletcher-16 over (ROUTE + MODE + ID + LENGTH + PAYLOAD)
Frame ID 0x65 = ID_USBL_SOLUTION
Struct (packed, little-endian):
id(U1) role(U1) watermark(U2)
timestamp_us(S8) ping_counter(U4) carrier_counter(S8)
distance_m(F4) distance_unc(F4)
azimuth_deg(F4) azimuth_unc(F4)
elevation_deg(F4) elevation_unc(F4)
snr(F4)
x_m(F4) y_m(F4) latitude_deg(D8) longitude_deg(D8) depth_m(F4)
usbl_yaw(F4) usbl_pitch(F4) usbl_roll(F4)
usbl_latitude(D8) usbl_longitude(D8) last_iTOW(U4)
beacon_n(F4) beacon_e(F4)
[+ 32 bytes extra NaN padding observed in firmware v2]
Timestamp assignment: timestamp from the last RECEIVED packet before the frame sync byte.
Usage:
python3 parse_kogger_usbl.py FILE1.csv [FILE2.csv ...] -o combined_usbl.csv
"""
import ast
import csv
import io
import os
import struct
import sys
import collections
import math
SYNC = b"\xbb\x55"
ID_USBL_SOLUTION = 0x65
USBL_FMT = '<BBHqIq' + 'f'*7 + 'ff' + 'dd' + 'f' + 'fff' + 'dd' + 'I' + 'ff'
USBL_FMT_SIZE = struct.calcsize(USBL_FMT)
def parse_bytes_field(field):
"""Parse b'...' Python literal from CSV field."""
field = field.strip()
if not (field.startswith("b'") or field.startswith('b"')):
return b""
try:
result = ast.literal_eval(field)
if isinstance(result, str):
result = result.encode('latin-1')
return result
except Exception:
return b""
def fletcher16(data):
c1, c2 = 0, 0
for byte in data:
c1 = (c1 + byte) & 0xFF
c2 = (c2 + c1) & 0xFF
return c1, c2
def parse_usbl_csv(csv_file):
"""
Parse a raw USBL CSV file, reconstruct byte stream, decode SBP frames.
Returns list of dicts with decoded USBL_SOLUTION data.
"""
with open(csv_file) as f:
content = f.read()
buf = b""
ts_offsets = [] # (byte_offset_in_buf, timestamp_str)
reader = csv.reader(io.StringIO(content))
for row in reader:
if len(row) < 3:
continue
ts, direction, raw = row[0], row[1], row[2]
if direction != "RECEIVED":
continue
b = parse_bytes_field(raw)
if b:
off = len(buf)
buf += b
ts_offsets.append((off, ts))
# Find all BB55 sync positions
positions = []
i = 0
while True:
pos = buf.find(SYNC, i)
if pos == -1:
break
positions.append(pos)
i = pos + 1
frame_id_counter = collections.Counter()
usbl_records = []
valid_total = 0
for pos in positions:
if pos + 6 > len(buf):
continue
route = buf[pos+2]
mode = buf[pos+3]
frame_id = buf[pos+4]
length = buf[pos+5]
if pos + 6 + length + 2 > len(buf):
continue
payload = buf[pos+6:pos+6+length]
chk1_a = buf[pos+6+length]
chk2_a = buf[pos+6+length+1]
c1, c2 = fletcher16(buf[pos+2:pos+6+length])
if c1 != chk1_a or c2 != chk2_a:
continue
valid_total += 1
frame_id_counter[frame_id] += 1
if frame_id != ID_USBL_SOLUTION:
continue
# Get timestamp: last ts_offset entry before this position
ts = ts_offsets[0][1] if ts_offsets else ""
for off, t in ts_offsets:
if off <= pos:
ts = t
else:
break
if len(payload) < USBL_FMT_SIZE:
continue
fields = struct.unpack_from(USBL_FMT, payload)
rec = {
'Timestamp': ts,
'usbl_id': fields[0],
'usbl_role': fields[1],
'usbl_timestamp_us': fields[3],
'ping_counter': fields[4],
'Dist': fields[6],
'dist_unc': fields[7],
'Azimuth': fields[8],
'azimuth_unc': fields[9],
'Elev': fields[10],
'elev_unc': fields[11],
'SNR': fields[12],
'x_m': fields[13],
'y_m': fields[14],
'usbl_lat_computed': fields[15],
'usbl_lon_computed': fields[16],
'depth_m': fields[17],
'usbl_yaw': fields[18],
'usbl_pitch': fields[19],
'usbl_roll': fields[20],
'source_file': os.path.basename(csv_file),
}
usbl_records.append(rec)
return usbl_records, frame_id_counter, valid_total, len(positions)
def main():
import argparse
parser = argparse.ArgumentParser(description='Decode Kogger USBL raw CSV files')
parser.add_argument('files', nargs='+', help='Input *_usbl.csv files')
parser.add_argument('-o', '--output', default='combined_usbl.csv', help='Output CSV')
args = parser.parse_args()
all_records = []
total_sync = 0
total_valid = 0
global_id_counter = collections.Counter()
for csv_file in args.files:
print("Processing: %s" % csv_file)
records, id_counter, valid, n_sync = parse_usbl_csv(csv_file)
all_records.extend(records)
total_sync += n_sync
total_valid += valid
global_id_counter.update(id_counter)
print(" Sync markers: %d Valid frames: %d USBL records: %d" % (n_sync, valid, len(records)))
print("\n=== Summary ===")
print("Total sync markers (BB55): %d" % total_sync)
print("Total valid frames: %d" % total_valid)
print("Total USBL_SOLUTION records: %d" % len(all_records))
print("\nFrame ID histogram:")
for fid, cnt in sorted(global_id_counter.items(), key=lambda x: -x[1]):
name = "USBL_SOLUTION" if fid == 0x65 else ("USBL_CONTROL" if fid == 0x68 else "UNKNOWN")
print(" ID=0x%02x(%3d) %-15s : %d frames" % (fid, fid, name, cnt))
if all_records:
dists = [r['Dist'] for r in all_records if not math.isnan(r['Dist'])]
azs = [r['Azimuth'] for r in all_records if not math.isnan(r['Azimuth'])]
snrs = [r['SNR'] for r in all_records if not math.isnan(r['SNR'])]
if dists:
dists_sorted = sorted(dists)
n = len(dists_sorted)
median = dists_sorted[n//2]
print("\nDist (m): min=%.2f median=%.2f max=%.2f" % (min(dists), median, max(dists)))
if azs:
print("Azimuth : min=%.2f max=%.2f" % (min(azs), max(azs)))
if snrs:
print("SNR : min=%.2f max=%.2f" % (min(snrs), max(snrs)))
# Write output CSV
with open(args.output, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['Timestamp', 'Dist', 'Azimuth', 'Elev', 'SNR', 'FrameID',
'x_m', 'y_m', 'depth_m', 'dist_unc', 'azimuth_unc', 'elev_unc',
'usbl_yaw', 'usbl_pitch', 'usbl_roll', 'source_file'])
for r in all_records:
writer.writerow([
r['Timestamp'],
'' if math.isnan(r['Dist']) else '%.4f' % r['Dist'],
'' if math.isnan(r['Azimuth']) else '%.4f' % r['Azimuth'],
'' if math.isnan(r['Elev']) else '%.4f' % r['Elev'],
'' if math.isnan(r['SNR']) else '%.4f' % r['SNR'],
'0x65',
'' if math.isnan(r['x_m']) else '%.4f' % r['x_m'],
'' if math.isnan(r['y_m']) else '%.4f' % r['y_m'],
'' if math.isnan(r['depth_m']) else '%.4f' % r['depth_m'],
'%.4f' % r['dist_unc'],
'%.4f' % r['azimuth_unc'],
'%.4f' % r['elev_unc'],
'%.4f' % r['usbl_yaw'],
'%.4f' % r['usbl_pitch'],
'%.4f' % r['usbl_roll'],
r['source_file'],
])
print("\nOutput: %s (%d records)" % (args.output, len(all_records)))
if __name__ == '__main__':
main()

View File

@@ -1,24 +1,39 @@
#!/usr/bin/env python3
"""Parse USV long-format CSV → track.geojson + points.json"""
"""Parse USV long-format CSV → track.geojson + points.json + manifest.json
v2: multi-session support via --input-dir, retro-compat with --input (single file)
"""
import argparse
import csv
import glob
import json
import os
import sys
from collections import defaultdict
from datetime import datetime, timezone
MAX_SLIDER_POINTS = 5000
MAX_SLIDER_POINTS = 10000
def parse_args():
p = argparse.ArgumentParser(description="Parse USV nav CSV")
p.add_argument("--input", required=True, help="CSV navigation log")
p = argparse.ArgumentParser(description="Parse USV nav CSV v2")
g = p.add_mutually_exclusive_group(required=True)
g.add_argument("--input", help="Single CSV navigation log (v1 compat)")
g.add_argument("--input-dir", help="Directory: glob *navigation_log*.csv")
p.add_argument("--output", required=True, help="Output directory")
return p.parse_args()
def find_csvs(input_dir):
pattern = os.path.join(input_dir, "*navigation_log*.csv")
files = sorted(glob.glob(pattern))
if not files:
print(f"No navigation_log CSVs found in {input_dir}", file=sys.stderr)
sys.exit(1)
return files
def load_csv(path):
"""Load long-format CSV into {timestamp: {field: value}}"""
"""Load long-format CSV {timestamp: {field: value}}"""
rows_by_ts = defaultdict(dict)
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
@@ -41,13 +56,26 @@ def get_float(d, *keys):
return None
def build_points(rows_by_ts):
"""Build sorted list of {t, lat, lon, heading} where lat/lon valid."""
# We need to track last known lat/lon/heading per timestamp cluster.
# Strategy: walk timestamps in order, emit a point each time we see a Lat or Lon update.
# Accumulate state across timestamps.
timestamps = sorted(rows_by_ts.keys())
def ts_to_ms(ts_str):
"""Convert ISO-like timestamp string to epoch ms (UTC)."""
# Try formats: '2026-03-24 09:04:07.123456' or '2026-03-24T09:04:07.123456'
for fmt in (
"%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S.%f",
"%Y-%m-%d %H:%M:%S",
):
try:
dt = datetime.strptime(ts_str, fmt).replace(tzinfo=timezone.utc)
return int(dt.timestamp() * 1000)
except ValueError:
continue
return None
def build_points(rows_by_ts, source_name):
"""Build sorted list of {t, t_ms, lat, lon, heading, source}."""
timestamps = sorted(rows_by_ts.keys())
state = {}
points = []
@@ -55,7 +83,6 @@ def build_points(rows_by_ts):
updates = rows_by_ts[ts]
state.update(updates)
# Only emit point if we have both Lat and Lon from this or earlier ts
lat = get_float(state, "Lat", "RAW_Lat")
lon = get_float(state, "Lon", "RAW_Lon")
heading = get_float(state, "Heading", "Yaw")
@@ -64,7 +91,6 @@ def build_points(rows_by_ts):
continue
if lat == 0.0 and lon == 0.0:
continue
# GPS_RAW_INT fallback (1e-7 degrees)
if abs(lat) < 1 and abs(lon) < 1:
raw_lat = get_float(state, "GPS_RAW_INT_lat")
raw_lon = get_float(state, "GPS_RAW_INT_lon")
@@ -74,87 +100,200 @@ def build_points(rows_by_ts):
else:
continue
# Only emit if Lat or Lon just updated (reduce duplicate consecutive points)
if "Lat" in updates or "Lon" in updates or "RAW_Lat" in updates or "RAW_Lon" in updates:
t_ms = ts_to_ms(ts)
points.append({
"t": ts,
"t_ms": t_ms,
"lat": round(lat, 8),
"lon": round(lon, 8),
"heading": round(heading, 2) if heading is not None else None,
"source": source_name,
})
return points
def sample_points(points, max_n):
if len(points) <= max_n:
def sample_points_session(points, max_total, n_sessions):
"""Sample per session, always keeping first+last point of each session."""
if not points:
return points
step = len(points) / max_n
return [points[int(i * step)] for i in range(max_n)]
quota = max(10, max_total // max(n_sessions, 1))
if len(points) <= quota:
return points
step = (len(points) - 2) / max(quota - 2, 1)
sampled = [points[0]]
for i in range(1, quota - 1):
sampled.append(points[min(int(1 + i * step), len(points) - 2)])
sampled.append(points[-1])
return sampled
def write_geojson(points, path):
coords = [[p["lon"], p["lat"]] for p in points]
geojson = {
"type": "FeatureCollection",
"features": [{
def session_bbox(points):
lats = [p["lat"] for p in points]
lons = [p["lon"] for p in points]
return [min(lons), min(lats), max(lons), max(lats)]
def write_outputs(all_sessions, output_dir):
"""Write track.geojson, points.json, manifest.json."""
os.makedirs(output_dir, exist_ok=True)
# Colors for multi-track
COLORS = ["#00b4d8", "#e94560", "#06d6a0", "#ffd166", "#a855f7", "#f97316"]
# ── track.geojson (MultiLineString per session) ──
features = []
for i, sess in enumerate(all_sessions):
coords = [[p["lon"], p["lat"]] for p in sess["points"]]
features.append({
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": coords},
"properties": {
"start": points[0]["t"] if points else None,
"end": points[-1]["t"] if points else None,
"n_points": len(points),
"source_file": sess["source_file"],
"source_name": sess["source_name"],
"start_iso": sess["t_start"],
"end_iso": sess["t_end"],
"n_points": len(coords),
"color": COLORS[i % len(COLORS)],
"session_index": i,
}
}]
}
with open(path, "w") as f:
})
geojson = {"type": "FeatureCollection", "features": features}
geo_path = os.path.join(output_dir, "track.geojson")
with open(geo_path, "w") as f:
json.dump(geojson, f)
print(f" track.geojson: {len(coords)} coords → {path}")
print(f" track.geojson: {len(features)} sessions → {geo_path}")
# ── points.json (all sampled, sorted by t_ms) ──
all_points = []
n_sessions = len(all_sessions)
for sess in all_sessions:
sampled = sample_points_session(sess["points"], MAX_SLIDER_POINTS, n_sessions)
all_points.extend(sampled)
# Sort by t_ms (sessions may overlap in time)
all_points.sort(key=lambda p: (p["t_ms"] or 0))
pts_path = os.path.join(output_dir, "points.json")
with open(pts_path, "w") as f:
json.dump(all_points, f)
print(f" points.json: {len(all_points)} points (sampled) → {pts_path}")
# ── manifest.json ──
all_lats = [p["lat"] for s in all_sessions for p in s["points"]]
all_lons = [p["lon"] for s in all_sessions for p in s["points"]]
global_bbox = [min(all_lons), min(all_lats), max(all_lons), max(all_lats)]
all_t_ms = [p["t_ms"] for s in all_sessions for p in s["points"] if p["t_ms"]]
t_min_ms = min(all_t_ms) if all_t_ms else None
t_max_ms = max(all_t_ms) if all_t_ms else None
sessions_meta = []
for sess in all_sessions:
sessions_meta.append({
"file": sess["source_file"],
"source_name": sess["source_name"],
"n_points": sess["n_points_raw"],
"t_start": sess["t_start"],
"t_end": sess["t_end"],
"t_start_ms": sess["t_start_ms"],
"t_end_ms": sess["t_end_ms"],
"bbox": sess["bbox"],
})
manifest = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"n_sessions": len(all_sessions),
"sessions": sessions_meta,
"global_bbox": global_bbox,
"t_min": all_sessions[0]["t_start"] if all_sessions else None,
"t_max": all_sessions[-1]["t_end"] if all_sessions else None,
"t_min_ms": t_min_ms,
"t_max_ms": t_max_ms,
"n_points_total_raw": sum(s["n_points_raw"] for s in all_sessions),
"n_points_sampled": len(all_points),
}
mf_path = os.path.join(output_dir, "manifest.json")
with open(mf_path, "w") as f:
json.dump(manifest, f, indent=2)
print(f" manifest.json → {mf_path}")
return manifest
def write_points_json(points, path):
with open(path, "w") as f:
json.dump(points, f)
print(f" points.json: {len(points)} points → {path}")
def print_global_stats(manifest, all_sessions):
print(f"\n=== Stats globales ===")
print(f" Sessions: {manifest['n_sessions']}")
print(f" Points bruts: {manifest['n_points_total_raw']}")
print(f" Points sampled: {manifest['n_points_sampled']}")
print(f" t_min: {manifest['t_min']}")
print(f" t_max: {manifest['t_max']}")
bb = manifest["global_bbox"]
print(f" Bbox: lon [{bb[0]:.5f}, {bb[2]:.5f}] lat [{bb[1]:.5f}, {bb[3]:.5f}]")
if manifest["t_min_ms"] and manifest["t_max_ms"]:
dur_s = (manifest["t_max_ms"] - manifest["t_min_ms"]) / 1000
h, rem = divmod(int(dur_s), 3600)
m, s = divmod(rem, 60)
print(f" Durée totale: {h}h{m:02d}m{s:02d}s")
for i, sess in enumerate(all_sessions):
print(f" Session {i+1}: {sess['source_name']} {sess['n_points_raw']} pts {sess['t_start']}{sess['t_end']}")
def print_stats(points):
def process_file(path):
source_name = os.path.basename(path)
print(f"\nChargement {source_name} ...")
rows = load_csv(path)
print(f" {len(rows)} timestamps uniques")
points = build_points(rows, source_name)
if not points:
print("No valid points found!")
return
print(f" WARNING: aucun point GPS valide dans {source_name}")
return None
# Filter points without t_ms
points = [p for p in points if p["t_ms"] is not None]
lats = [p["lat"] for p in points]
lons = [p["lon"] for p in points]
print(f"\n=== Stats ===")
print(f" N points (full): {len(points)}")
print(f" First ts: {points[0]['t']}")
print(f" Last ts: {points[-1]['t']}")
print(f" Bbox lat: {min(lats):.6f}{max(lats):.6f}")
print(f" Bbox lon: {min(lons):.6f}{max(lons):.6f}")
headings = [p["heading"] for p in points if p["heading"] is not None]
print(f" Heading data: {'yes' if headings else 'no'} ({len(headings)} values)")
return {
"source_file": path,
"source_name": source_name,
"points": points,
"n_points_raw": len(points),
"t_start": points[0]["t"],
"t_end": points[-1]["t"],
"t_start_ms": points[0]["t_ms"],
"t_end_ms": points[-1]["t_ms"],
"bbox": [min(lons), min(lats), max(lons), max(lats)],
}
def main():
args = parse_args()
os.makedirs(args.output, exist_ok=True)
print(f"Loading {args.input} ...")
rows = load_csv(args.input)
print(f" {len(rows)} unique timestamps")
if args.input:
csv_files = [args.input]
else:
csv_files = find_csvs(args.input_dir)
points = build_points(rows)
print_stats(points)
print(f"Fichiers trouvés: {len(csv_files)}")
for f in csv_files:
print(f" {os.path.basename(f)}")
if not points:
all_sessions = []
for path in csv_files:
sess = process_file(path)
if sess:
all_sessions.append(sess)
if not all_sessions:
print("Aucune session valide.", file=sys.stderr)
sys.exit(1)
write_geojson(points, os.path.join(args.output, "track.geojson"))
sampled = sample_points(points, MAX_SLIDER_POINTS)
if len(sampled) < len(points):
print(f" Sampled {len(sampled)} points for slider (from {len(points)})")
write_points_json(sampled, os.path.join(args.output, "points.json"))
manifest = write_outputs(all_sessions, args.output)
print_global_stats(manifest, all_sessions)
print("\nDone.")

125
tools/usbl_to_json.py Normal file
View File

@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""usbl_to_json.py - Convert combined_nav_usbl.csv to usbl.json + auv_track.geojson"""
import csv, json, math, argparse, statistics
from datetime import datetime, timezone
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
DEFAULT_INPUT = ROOT / "output" / "combined_nav_usbl.csv"
OUTPUT_DIR = ROOT / "output"
def parse_ts(s):
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(s.strip(), fmt).replace(tzinfo=timezone.utc)
return int(dt.timestamp() * 1000)
except ValueError:
pass
return None
def haversine_m(lat1, lon1, lat2, lon2):
R = 6371000.0
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--input", default=str(DEFAULT_INPUT))
ap.add_argument("--max-points", type=int, default=10000)
args = ap.parse_args()
rows = []
with open(args.input, newline="") as f:
reader = csv.DictReader(f)
for row in reader:
try:
dist = float(row["Dist"])
auv_lat = float(row["auv_lat"])
auv_lon = float(row["auv_lon"])
except (ValueError, KeyError):
continue
if dist <= 0 or auv_lat == 0 or auv_lon == 0:
continue
t_ms = parse_ts(row["Timestamp"])
if t_ms is None:
continue
rows.append({
"t": row["Timestamp"].strip(),
"t_ms": t_ms,
"usv_lat": float(row["lat"]),
"usv_lon": float(row["lon"]),
"heading": float(row["Heading"]),
"dist": dist,
"az": float(row["Azimuth"]),
"elev": float(row["Elev"]),
"snr": float(row["SNR"]),
"auv_lat": auv_lat,
"auv_lon": auv_lon,
})
rows.sort(key=lambda r: r["t_ms"])
n_raw = len(rows)
# Sample if > max-points (preserve begin/end)
if n_raw > args.max_points:
step = n_raw / args.max_points
indices = set()
indices.add(0)
indices.add(n_raw - 1)
for i in range(1, args.max_points - 1):
indices.add(int(i * step))
rows = [rows[i] for i in sorted(indices)]
n = len(rows)
dists = [r["dist"] for r in rows]
snrs = [r["snr"] for r in rows]
auv_lats = [r["auv_lat"] for r in rows]
auv_lons = [r["auv_lon"] for r in rows]
auv_bbox = [min(auv_lons), min(auv_lats), max(auv_lons), max(auv_lats)]
out = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"n_points": n,
"n_raw": n_raw,
"auv_bbox": auv_bbox,
"stats": {
"dist_min": round(min(dists), 3),
"dist_max": round(max(dists), 3),
"dist_median": round(statistics.median(dists), 3),
"snr_min": round(min(snrs), 4),
"snr_max": round(max(snrs), 4),
},
"points": rows,
}
usbl_out = OUTPUT_DIR / "usbl.json"
with open(usbl_out, "w") as f:
json.dump(out, f, separators=(",", ":"))
print(f"usbl.json: {n} points (raw={n_raw}) -> {usbl_out}")
# AUV track GeoJSON
coords = [[r["auv_lon"], r["auv_lat"]] for r in rows]
geojson = {
"type": "FeatureCollection",
"features": [{
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": coords},
"properties": {"name": "AUV track (USBL projection)", "color": "#ff8800"},
}]
}
track_out = OUTPUT_DIR / "auv_track.geojson"
with open(track_out, "w") as f:
json.dump(geojson, f, separators=(",", ":"))
print(f"auv_track.geojson: {len(coords)} coords -> {track_out}")
# Sanity check: first point haversine vs USBL dist
r0 = rows[0]
hav = haversine_m(r0["usv_lat"], r0["usv_lon"], r0["auv_lat"], r0["auv_lon"])
print(f"Sanity [0]: USV=({r0['usv_lat']:.6f},{r0['usv_lon']:.6f}) AUV=({r0['auv_lat']:.6f},{r0['auv_lon']:.6f}) hav={hav:.2f}m USBL_dist={r0['dist']:.2f}m diff={abs(hav-r0['dist']):.3f}m")
if __name__ == "__main__":
main()

1
vendor/Kogger-Protocol vendored Submodule

Submodule vendor/Kogger-Protocol added at d62576fee5

File diff suppressed because it is too large Load Diff