Compare commits
18 Commits
6f2f6d2d72
...
feat/flag-
| Author | SHA1 | Date | |
|---|---|---|---|
| 70608de221 | |||
|
|
0a0dac7fda | ||
|
|
62091a09b7 | ||
|
|
3e1da53cc7 | ||
|
|
24f9394c75 | ||
|
|
c9dca1d071 | ||
|
|
682050ef14 | ||
|
|
4aec9d6295 | ||
|
|
af2bb6581f | ||
|
|
bd3a2359d9 | ||
|
|
02e357b874 | ||
|
|
34e709b7c8 | ||
|
|
07df61cbc4 | ||
|
|
8a5ed6174c | ||
|
|
103bf1cedd | ||
|
|
3198164aff | ||
|
|
be2cd1d156 | ||
|
|
b46f136b76 |
3
.gitmodules
vendored
Normal file
3
.gitmodules
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
[submodule "vendor/Kogger-Protocol"]
|
||||
path = vendor/Kogger-Protocol
|
||||
url = https://github.com/koggertech/Kogger-Protocol.git
|
||||
17
docker-compose.yml
Normal file
17
docker-compose.yml
Normal file
@@ -0,0 +1,17 @@
|
||||
version: "3.9"
|
||||
services:
|
||||
pipeline-runner:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: pipeline_runner/Dockerfile
|
||||
container_name: cosma-pipeline-runner
|
||||
ports:
|
||||
- "8767:8767"
|
||||
volumes:
|
||||
- /mnt/nas-cosma/cosma/sorties:/data/sorties
|
||||
- /home/floppyrj45/.config/rclone/rclone.conf:/root/.config/rclone/rclone.conf:ro
|
||||
environment:
|
||||
- GDRIVE_REMOTE=cosma:06-Operations/06 - Sorties
|
||||
- OUTPUT_DIR=/data/sorties
|
||||
- TOOLS_DIR=/app/tools
|
||||
restart: unless-stopped
|
||||
1219
docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md
Normal file
1219
docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,184 @@
|
||||
# Design : GDrive Pipeline Replay
|
||||
|
||||
**Date :** 2026-04-27
|
||||
**Repo :** cosma-nav-tools
|
||||
**Branche :** feat/flag-local
|
||||
|
||||
---
|
||||
|
||||
## Objectif
|
||||
|
||||
Bouton dans le viewer 8765 qui déclenche un sync GDrive → pipeline Python → affichage replay des signaux USV et AUV sur la même page, synchronisés avec le slider 24h existant.
|
||||
|
||||
---
|
||||
|
||||
## Architecture globale
|
||||
|
||||
```
|
||||
GDrive (G:)
|
||||
└─ 06-Operations/06-Sorties/#XX-lieu/YYYYMMDD-lieu/raw_data/
|
||||
├─ SHIP/*.csv (USV nav + USBL + full)
|
||||
└─ SUB/*.mcap (AUV ROS2) + bin/
|
||||
|
||||
↓ rclone sync (déclenché par bouton viewer)
|
||||
|
||||
.83 /data/sorties/#XX/raw/
|
||||
├─ SHIP/
|
||||
└─ SUB/
|
||||
|
||||
↓ pipeline Python (scripts existants cosma-nav-tools/tools/)
|
||||
|
||||
.83 /data/sorties/#XX/processed/
|
||||
├─ usv.json.gz ← signaux USV downsamplés LTTB
|
||||
├─ auv_AUV010.json.gz ← signaux AUV010 downsamplés LTTB
|
||||
├─ auv_AUVxxx.json.gz ← un fichier par AUV détecté
|
||||
└─ tracks.geojson ← USV + AUV tracks (Leaflet)
|
||||
|
||||
↓ servi par
|
||||
|
||||
port 8767 pipeline-runner FastAPI (cosma-nav-tools/pipeline_runner/)
|
||||
port 8765 viewer/index.html ← page unique, fetch 8766 + 8767
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Section 1 : Pipeline-runner (port 8767)
|
||||
|
||||
### Structure
|
||||
|
||||
```
|
||||
cosma-nav-tools/
|
||||
pipeline_runner/
|
||||
main.py ← FastAPI app + endpoints
|
||||
runner.py ← logique rclone + orchestration scripts
|
||||
config.py ← chemins GDrive, output dir, rclone remote
|
||||
docker-compose.yml ← service pipeline-runner port 8767
|
||||
data/sorties/ ← gitignored
|
||||
```
|
||||
|
||||
### Endpoints
|
||||
|
||||
| Méthode | Route | Description |
|
||||
|---------|-------|-------------|
|
||||
| `GET` | `/sorties` | Liste sorties détectées sur GDrive (scan dossier) |
|
||||
| `POST` | `/run/{sortie_id}` | Lance rclone pull + pipeline complet |
|
||||
| `GET` | `/events/{sortie_id}` | SSE stream progress (étape + %) |
|
||||
| `GET` | `/sorties/{id}/usv` | Données USV processed (JSON gzip) |
|
||||
| `GET` | `/sorties/{id}/auvs` | Liste AUVs disponibles pour cette sortie |
|
||||
| `GET` | `/sorties/{id}/auv/{auv_id}` | Données AUV processed (JSON gzip) |
|
||||
| `GET` | `/sorties/{id}/tracks` | GeoJSON tracks USV + AUV |
|
||||
|
||||
### Flow interne POST /run/{sortie_id}
|
||||
|
||||
```
|
||||
1. rclone sync GDrive/#id → /data/sorties/#id/raw/ SSE: "sync" 0→50%
|
||||
2. parse_usv_nav.py + extract_usv_pwm.py SSE: "usv_parse" 50→65%
|
||||
3. parse_kogger_usbl.py + merge_nav_usbl.py SSE: "usbl_merge" 65→80%
|
||||
4. extract_mcap_signals.py (par AUV détecté) SSE: "auv_parse" 80→90%
|
||||
5. usbl_to_json.py SSE: "usbl_json" 90→95%
|
||||
6. downsample LTTB + écriture .json.gz SSE: "write" 95→100%
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Section 2 : Format données
|
||||
|
||||
### Signaux (usv.json.gz, auv_*.json.gz)
|
||||
|
||||
```json
|
||||
{
|
||||
"meta": {
|
||||
"sortie": "#71-golrest",
|
||||
"date": "2026-04-16",
|
||||
"vehicle": "USV001",
|
||||
"t_start": 1713268477,
|
||||
"t_end": 1713282877
|
||||
},
|
||||
"signals": {
|
||||
"yaw": [{ "t": 1713268477, "v": 142.3 }, ...],
|
||||
"heading": [...],
|
||||
"gps_status": [{ "t": ..., "v": "3D_FIX" }, ...],
|
||||
"battery_v": [...],
|
||||
"usbl_dist": [...],
|
||||
"usbl_angle": [...],
|
||||
"motor_1": [...],
|
||||
"motor_2": [...],
|
||||
"auv_status": [{ "t": ..., "v": "MISSION" }, ...]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- `t` = epoch UNIX secondes
|
||||
- Max **4000 points par signal** via LTTB (Plotly optimal)
|
||||
- Valeurs discrètes (status, mode) = strings → step-plot Plotly
|
||||
- Endpoint optionnel `/range?from=&to=` pour zoom fin sur brutes
|
||||
|
||||
### Tracks
|
||||
|
||||
- **GeoJSON** FeatureCollection : une Feature LineString par véhicule
|
||||
- Poids estimé : ~300KB gzip pour 4h à 1Hz
|
||||
|
||||
---
|
||||
|
||||
## Section 3 : Viewer 8765 — extensions
|
||||
|
||||
### Layout (page unique)
|
||||
|
||||
```
|
||||
datebar ← inchangé
|
||||
header ← + dropdown sortie + [Sync & Process] + progress bar SSE
|
||||
map Leaflet ← inchangé
|
||||
slider 24h ← shared X axis, curseur vertical sur tous les graphs
|
||||
─────────────────────────────────────────────
|
||||
USV PANEL
|
||||
[Yaw] [Heading]
|
||||
[GPS status] [Battery voltage]
|
||||
[USBL dist] [USBL angle]
|
||||
[Motor 1] [Motor 2]
|
||||
[AUV status: disarm/mission/goto — step-plot]
|
||||
─────────────────────────────────────────────
|
||||
AUV PANEL ← tabs [AUV010] [AUV011] … si multi-AUV
|
||||
[Pitch/Roll/Yaw — 3 traces] [Depth]
|
||||
[Altitude] [Obstacle dist]
|
||||
[USBL dist] [USBL angle]
|
||||
[Battery voltage] [Arm/Disarm/Mode — step-plot]
|
||||
[Motors ×6 PWM — 1 graph multi-trace M1…M6]
|
||||
```
|
||||
|
||||
### Synchronisation slider
|
||||
|
||||
- Slider 24h existant → événement → `updateCursor(t)` sur tous les graphs Plotly via `Plotly.relayout` shapes
|
||||
- Données chargées une fois en mémoire au click de sortie, pas de re-fetch au scrub
|
||||
|
||||
### Bouton Sync & Process
|
||||
|
||||
- `POST /run/{sortie_id}` → ouvre SSE `/events/{sortie_id}`
|
||||
- Progress bar inline dans le header (ex: `rclone 34% → usv_parse…`)
|
||||
- À 100% → fetch automatique USV + AUV + tracks → render graphs
|
||||
|
||||
---
|
||||
|
||||
## Section 4 : Déploiement .83
|
||||
|
||||
```yaml
|
||||
# docker-compose.yml (ajout)
|
||||
pipeline-runner:
|
||||
build: ./pipeline_runner
|
||||
ports:
|
||||
- "8767:8767"
|
||||
volumes:
|
||||
- /data/sorties:/data/sorties
|
||||
- /mnt/gdrive:/mnt/gdrive # rclone mount ou rclone exec
|
||||
environment:
|
||||
- GDRIVE_REMOTE=gdrive:Cosma - Internal/06-Operations/06 - Sorties
|
||||
- OUTPUT_DIR=/data/sorties
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Hors scope (v1)
|
||||
|
||||
- Authentification / accès multi-user
|
||||
- HDF5 archivage (peut venir en v2)
|
||||
- Replay animé temps-réel (curseur qui avance automatiquement)
|
||||
- Tests unitaires pipeline
|
||||
9
pipeline_runner/Dockerfile
Normal file
9
pipeline_runner/Dockerfile
Normal file
@@ -0,0 +1,9 @@
|
||||
FROM python:3.12-slim
|
||||
WORKDIR /app
|
||||
RUN apt-get update && apt-get install -y rclone && rm -rf /var/lib/apt/lists/*
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
COPY tools/ ./tools/
|
||||
COPY vendor/ ./vendor/
|
||||
COPY pipeline_runner/ ./pipeline_runner/
|
||||
CMD ["uvicorn", "pipeline_runner.main:app", "--host", "0.0.0.0", "--port", "8767"]
|
||||
0
pipeline_runner/__init__.py
Normal file
0
pipeline_runner/__init__.py
Normal file
7
pipeline_runner/config.py
Normal file
7
pipeline_runner/config.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
GDRIVE_REMOTE = os.getenv("GDRIVE_REMOTE", "gdrive:Cosma - Internal/06-Operations/06 - Sorties")
|
||||
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/data/sorties"))
|
||||
TOOLS_DIR = Path(os.getenv("TOOLS_DIR", Path(__file__).parent.parent / "tools"))
|
||||
LTTB_MAX_PTS = 4000
|
||||
98
pipeline_runner/main.py
Normal file
98
pipeline_runner/main.py
Normal file
@@ -0,0 +1,98 @@
|
||||
import asyncio
|
||||
import gzip
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
|
||||
from .config import OUTPUT_DIR
|
||||
from .runner import run_pipeline, scan_sorties
|
||||
|
||||
app = FastAPI(title="COSMA Pipeline Runner")
|
||||
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
||||
|
||||
# Active pipeline jobs: sortie_id → asyncio.Queue
|
||||
_jobs: dict[str, asyncio.Queue] = {}
|
||||
|
||||
|
||||
@app.get("/sorties")
|
||||
async def list_sorties():
|
||||
return await scan_sorties()
|
||||
|
||||
|
||||
@app.post("/run/{sortie_id:path}")
|
||||
async def run_sortie(sortie_id: str):
|
||||
if sortie_id in _jobs:
|
||||
return {"status": "already_running"}
|
||||
queue: asyncio.Queue = asyncio.Queue()
|
||||
_jobs[sortie_id] = queue
|
||||
asyncio.create_task(_run_and_cleanup(sortie_id, queue))
|
||||
return {"status": "started"}
|
||||
|
||||
|
||||
async def _run_and_cleanup(sortie_id: str, queue: asyncio.Queue):
|
||||
try:
|
||||
await run_pipeline(sortie_id, queue)
|
||||
finally:
|
||||
await asyncio.sleep(30)
|
||||
_jobs.pop(sortie_id, None)
|
||||
|
||||
|
||||
@app.get("/events/{sortie_id:path}")
|
||||
async def sse_events(sortie_id: str):
|
||||
if sortie_id not in _jobs:
|
||||
raise HTTPException(404, "No active job for this sortie")
|
||||
|
||||
async def generate() -> AsyncGenerator[str, None]:
|
||||
queue = _jobs[sortie_id]
|
||||
while True:
|
||||
try:
|
||||
event = await asyncio.wait_for(queue.get(), timeout=60)
|
||||
yield f"data: {json.dumps(event)}\n\n"
|
||||
if event.get("step") in ("error", "write") and event.get("pct") in (0, 100):
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
yield "data: {\"step\":\"ping\"}\n\n"
|
||||
|
||||
return StreamingResponse(generate(), media_type="text/event-stream")
|
||||
|
||||
|
||||
def _read_gz(path: Path) -> dict:
|
||||
with gzip.open(path) as f:
|
||||
return json.loads(f.read())
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/usv")
|
||||
async def get_usv(sortie_id: str):
|
||||
p = OUTPUT_DIR / sortie_id / "processed" / "usv.json.gz"
|
||||
if not p.exists():
|
||||
raise HTTPException(404, "USV data not found — run pipeline first")
|
||||
return JSONResponse(_read_gz(p))
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/auvs")
|
||||
async def list_auvs(sortie_id: str):
|
||||
proc = OUTPUT_DIR / sortie_id / "processed"
|
||||
auvs = [p.name.removesuffix(".json.gz").removeprefix("auv_")
|
||||
for p in proc.glob("auv_*.json.gz")]
|
||||
return sorted(auvs)
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/auv/{auv_id}")
|
||||
async def get_auv(sortie_id: str, auv_id: str):
|
||||
p = OUTPUT_DIR / sortie_id / "processed" / f"auv_{auv_id}.json.gz"
|
||||
if not p.exists():
|
||||
raise HTTPException(404, f"AUV {auv_id} data not found")
|
||||
return JSONResponse(_read_gz(p))
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/tracks")
|
||||
async def get_tracks(sortie_id: str):
|
||||
p = OUTPUT_DIR / sortie_id / "processed" / "tracks.geojson"
|
||||
if not p.exists():
|
||||
raise HTTPException(404, "tracks.geojson not found")
|
||||
with open(p) as f:
|
||||
return JSONResponse(json.load(f))
|
||||
174
pipeline_runner/processor.py
Normal file
174
pipeline_runner/processor.py
Normal file
@@ -0,0 +1,174 @@
|
||||
import csv
|
||||
import gzip
|
||||
import json
|
||||
import math
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
from .config import LTTB_MAX_PTS
|
||||
|
||||
|
||||
def _lttb(points: list[dict], max_pts: int) -> list[dict]:
|
||||
"""Largest Triangle Three Buckets downsampling."""
|
||||
n = len(points)
|
||||
if n <= max_pts:
|
||||
return points
|
||||
ts = np.array([p["t"] for p in points], dtype=float)
|
||||
vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float)
|
||||
bucket_size = (n - 2) / (max_pts - 2)
|
||||
out = [points[0]]
|
||||
a = 0
|
||||
for i in range(max_pts - 2):
|
||||
avg_start = int((i + 1) * bucket_size) + 1
|
||||
avg_end = min(int((i + 2) * bucket_size) + 1, n)
|
||||
avg_t = np.mean(ts[avg_start:avg_end])
|
||||
avg_v = np.mean(vs[avg_start:avg_end])
|
||||
rng_start = int(i * bucket_size) + 1
|
||||
rng_end = min(int((i + 1) * bucket_size) + 1, n)
|
||||
max_area = -1.0
|
||||
best = rng_start
|
||||
at, av = ts[a], vs[a]
|
||||
for j in range(rng_start, rng_end):
|
||||
area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5
|
||||
if area > max_area:
|
||||
max_area = area
|
||||
best = j
|
||||
out.append(points[best])
|
||||
a = best
|
||||
out.append(points[-1])
|
||||
return out
|
||||
|
||||
|
||||
def _ts_to_epoch(ts_str: str) -> float:
|
||||
"""Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds."""
|
||||
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
|
||||
try:
|
||||
dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc)
|
||||
return dt.timestamp()
|
||||
except ValueError:
|
||||
continue
|
||||
return 0.0
|
||||
|
||||
|
||||
def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]:
|
||||
"""Read long-format navigation_log.csv → dict of signal arrays."""
|
||||
signals: dict[str, list] = {}
|
||||
wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix",
|
||||
"Armed", "Mode", "M1", "M2"}
|
||||
with open(nav_log_path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.reader(f)
|
||||
next(reader, None) # skip header
|
||||
for row in reader:
|
||||
if len(row) < 3:
|
||||
continue
|
||||
ts_str, field, val = row[0], row[1], row[2]
|
||||
if field not in wanted:
|
||||
continue
|
||||
t = _ts_to_epoch(ts_str)
|
||||
try:
|
||||
v: Any = float(val)
|
||||
except ValueError:
|
||||
v = val.strip()
|
||||
signals.setdefault(field, []).append({"t": t, "v": v})
|
||||
return signals
|
||||
|
||||
|
||||
def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]:
|
||||
"""Read combined_usbl.csv → Dist and Azimuth signal arrays."""
|
||||
dist_pts, az_pts = [], []
|
||||
with open(usbl_csv_path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
try:
|
||||
t = _ts_to_epoch(row["Timestamp"])
|
||||
d = float(row["Dist"]) if row["Dist"] else None
|
||||
az = float(row["Azimuth"]) if row["Azimuth"] else None
|
||||
except (KeyError, ValueError):
|
||||
continue
|
||||
if d is not None and not math.isnan(d):
|
||||
dist_pts.append({"t": t, "v": d})
|
||||
if az is not None and not math.isnan(az):
|
||||
az_pts.append({"t": t, "v": az})
|
||||
return {"usbl_dist": dist_pts, "usbl_angle": az_pts}
|
||||
|
||||
|
||||
def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]:
|
||||
"""Read mcap_signals.json → depth, motors M1-M6, state signals."""
|
||||
with open(mcap_json_path) as f:
|
||||
data = json.load(f)
|
||||
signals: dict[str, list[dict]] = {}
|
||||
|
||||
def _unpack(key: str, series: list):
|
||||
pts = []
|
||||
for item in series:
|
||||
t = item.get("t_ms", item.get("t", 0))
|
||||
if isinstance(t, (int, float)) and t > 1e9:
|
||||
t = t / 1000.0 # ms → s
|
||||
pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))})
|
||||
signals[key] = pts
|
||||
|
||||
if "depth" in data:
|
||||
_unpack("depth", data["depth"])
|
||||
if "pwm_auv" in data:
|
||||
for m_key, series in data["pwm_auv"].items():
|
||||
_unpack(m_key.lower(), series)
|
||||
if "state" in data:
|
||||
_unpack("arm_status", data["state"])
|
||||
# New signals added by extended extract_mcap_signals.py
|
||||
for key in ("pitch", "roll", "yaw", "altitude", "battery_v", "obstacle_dist"):
|
||||
if key in data:
|
||||
_unpack(key, data[key])
|
||||
return signals
|
||||
|
||||
|
||||
def write_usv_json(
|
||||
nav_log_path: Path,
|
||||
usbl_csv_path: Path | None,
|
||||
output_path: Path,
|
||||
sortie_id: str,
|
||||
date: str,
|
||||
) -> None:
|
||||
signals = _read_nav_log(nav_log_path)
|
||||
if usbl_csv_path and usbl_csv_path.exists():
|
||||
signals.update(_read_usbl_csv(usbl_csv_path))
|
||||
|
||||
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
|
||||
meta = {
|
||||
"sortie": sortie_id,
|
||||
"date": date,
|
||||
"vehicle": "USV",
|
||||
"t_start": min(t_all) if t_all else 0,
|
||||
"t_end": max(t_all) if t_all else 0,
|
||||
}
|
||||
|
||||
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
|
||||
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with gzip.open(output_path, "wb") as f:
|
||||
f.write(payload)
|
||||
|
||||
|
||||
def write_auv_json(
|
||||
mcap_json_path: Path,
|
||||
output_path: Path,
|
||||
auv_id: str,
|
||||
sortie_id: str,
|
||||
date: str,
|
||||
) -> None:
|
||||
signals = _read_mcap_signals(mcap_json_path)
|
||||
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
|
||||
meta = {
|
||||
"sortie": sortie_id,
|
||||
"date": date,
|
||||
"vehicle": auv_id,
|
||||
"t_start": min(t_all) if t_all else 0,
|
||||
"t_end": max(t_all) if t_all else 0,
|
||||
}
|
||||
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
|
||||
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with gzip.open(output_path, "wb") as f:
|
||||
f.write(payload)
|
||||
137
pipeline_runner/runner.py
Normal file
137
pipeline_runner/runner.py
Normal file
@@ -0,0 +1,137 @@
|
||||
import asyncio
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR
|
||||
from .processor import write_auv_json, write_usv_json
|
||||
|
||||
|
||||
async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""):
|
||||
await queue.put({"step": step, "pct": pct, "msg": msg})
|
||||
|
||||
|
||||
def _find_nav_log(ship_dir: Path) -> Path | None:
|
||||
for p in ship_dir.glob("*_navigation_log.csv"):
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def _find_usbl_csv(ship_dir: Path) -> Path | None:
|
||||
for p in ship_dir.glob("*_usbl.csv"):
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def _detect_auvs(sub_dir: Path) -> list[str]:
|
||||
"""Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010' → 'AUV010')."""
|
||||
auvs = []
|
||||
for d in sub_dir.iterdir():
|
||||
if d.is_dir():
|
||||
m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE)
|
||||
if m:
|
||||
auvs.append(m.group(1).upper())
|
||||
return sorted(set(auvs))
|
||||
|
||||
|
||||
def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None:
|
||||
for d in sub_dir.iterdir():
|
||||
if d.is_dir() and auv_id.upper() in d.name.upper():
|
||||
return d
|
||||
return None
|
||||
|
||||
|
||||
def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess:
|
||||
cmd = ["python3", str(TOOLS_DIR / script_name)] + args
|
||||
return subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
|
||||
async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None:
|
||||
"""Full pipeline: rclone sync → parse → process → write JSON.gz"""
|
||||
raw_dir = OUTPUT_DIR / sortie_id / "raw"
|
||||
proc_dir = OUTPUT_DIR / sortie_id / "processed"
|
||||
proc_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Step 1: rclone sync
|
||||
await _emit(queue, "sync", 0, "rclone sync en cours…")
|
||||
gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}'
|
||||
result = subprocess.run(
|
||||
["rclone", "sync", gdrive_path, str(raw_dir), "--progress"],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
if result.returncode != 0:
|
||||
await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}")
|
||||
return
|
||||
await _emit(queue, "sync", 50, "rclone terminé")
|
||||
|
||||
# Detect SHIP/SUB dirs inside raw/
|
||||
ship_dir = next(raw_dir.rglob("logs/SHIP"), None)
|
||||
sub_dir = next(raw_dir.rglob("logs/SUB"), None)
|
||||
|
||||
if not ship_dir or not ship_dir.exists():
|
||||
await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync")
|
||||
return
|
||||
|
||||
# Step 2: USV parsing
|
||||
await _emit(queue, "usv_parse", 55, "Parsing logs USV…")
|
||||
nav_log = _find_nav_log(ship_dir)
|
||||
usbl_csv_raw = _find_usbl_csv(ship_dir)
|
||||
usbl_parsed_path = proc_dir / "combined_usbl.csv"
|
||||
|
||||
if usbl_csv_raw:
|
||||
_run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)])
|
||||
|
||||
date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10]
|
||||
if nav_log:
|
||||
write_usv_json(
|
||||
nav_log_path=nav_log,
|
||||
usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None,
|
||||
output_path=proc_dir / "usv.json.gz",
|
||||
sortie_id=sortie_id,
|
||||
date=date_str,
|
||||
)
|
||||
await _emit(queue, "usv_parse", 70, "USV OK")
|
||||
|
||||
# Step 3: AUV parsing
|
||||
if sub_dir and sub_dir.exists():
|
||||
auv_ids = _detect_auvs(sub_dir)
|
||||
total = len(auv_ids) or 1
|
||||
for i, auv_id in enumerate(auv_ids):
|
||||
await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}…")
|
||||
session_dir = _detect_session_dir(sub_dir, auv_id)
|
||||
if not session_dir:
|
||||
continue
|
||||
mcap_out = proc_dir / f"mcap_{auv_id}.json"
|
||||
_run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"])
|
||||
# extract_mcap_signals writes to tools/../output/mcap_signals.json (hardcoded)
|
||||
default_out = TOOLS_DIR.parent / "output" / "mcap_signals.json"
|
||||
if not default_out.exists():
|
||||
default_out = session_dir / "mcap_signals.json"
|
||||
if default_out.exists():
|
||||
shutil.copy(default_out, mcap_out)
|
||||
if mcap_out.exists():
|
||||
write_auv_json(
|
||||
mcap_json_path=mcap_out,
|
||||
output_path=proc_dir / f"auv_{auv_id}.json.gz",
|
||||
auv_id=auv_id,
|
||||
sortie_id=sortie_id,
|
||||
date=date_str,
|
||||
)
|
||||
await _emit(queue, "write", 100, "Pipeline terminé")
|
||||
|
||||
|
||||
async def scan_sorties() -> list[dict]:
|
||||
"""List available sorties on GDrive via rclone lsd."""
|
||||
result = subprocess.run(
|
||||
["rclone", "lsd", GDRIVE_REMOTE],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
sorties = []
|
||||
for line in result.stdout.splitlines():
|
||||
parts = line.strip().split(None, 4)
|
||||
if len(parts) == 5:
|
||||
name = parts[4].strip()
|
||||
processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists()
|
||||
sorties.append({"id": name, "processed": processed})
|
||||
return sorties
|
||||
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
fastapi==0.115.0
|
||||
uvicorn[standard]==0.30.6
|
||||
aiofiles==24.1.0
|
||||
numpy==2.1.1
|
||||
BIN
screenshots/viewer-v5-wait.png
Normal file
BIN
screenshots/viewer-v5-wait.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 116 KiB |
BIN
screenshots/viewer-v5.png
Normal file
BIN
screenshots/viewer-v5.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 37 KiB |
71
tools/check_sync.py
Normal file
71
tools/check_sync.py
Normal file
@@ -0,0 +1,71 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Check temporal alignment between MCAP AUV, USV PWM, and USBL data."""
|
||||
import json, os, sys
|
||||
from datetime import datetime, timezone
|
||||
|
||||
def fmt(ms):
|
||||
if ms == 0: return 'N/A'
|
||||
return datetime.fromtimestamp(ms/1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
def load(path):
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
|
||||
base = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
|
||||
|
||||
sources = {}
|
||||
|
||||
# MCAP signals
|
||||
mcap_path = os.path.join(base, 'mcap_signals.json')
|
||||
if os.path.exists(mcap_path):
|
||||
d = load(mcap_path)
|
||||
n = len(d.get('depth',[])) + len(d.get('pwm_auv',{}).get('samples',[])) + len(d.get('state',[]))
|
||||
sources['MCAP AUV'] = {'t_min': d['t_min_utc_ms'], 't_max': d['t_max_utc_ms'], 'n': n}
|
||||
else:
|
||||
print(f"MISSING: {mcap_path}")
|
||||
|
||||
# USV PWM
|
||||
usv_path = os.path.join(base, 'usv_pwm.json')
|
||||
if os.path.exists(usv_path):
|
||||
d = load(usv_path)
|
||||
n = sum(len(v) for v in d.get('M',{}).values()) + sum(len(v) for v in d.get('RC',{}).values())
|
||||
sources['USV PWM'] = {'t_min': d['t_min_utc_ms'], 't_max': d['t_max_utc_ms'], 'n': n}
|
||||
else:
|
||||
print(f"MISSING: {usv_path}")
|
||||
|
||||
# USBL
|
||||
usbl_path = os.path.join(base, 'usbl.json')
|
||||
if os.path.exists(usbl_path):
|
||||
d = load(usbl_path)
|
||||
pts = d.get('points', [])
|
||||
if pts:
|
||||
t_vals = [p['t_ms'] for p in pts]
|
||||
sources['USBL'] = {'t_min': min(t_vals), 't_max': max(t_vals), 'n': len(pts)}
|
||||
else:
|
||||
sources['USBL'] = {'t_min': 0, 't_max': 0, 'n': 0}
|
||||
else:
|
||||
print(f"MISSING: {usbl_path}")
|
||||
|
||||
print(f"\n{'Source':<12} | {'t_min UTC':<20} | {'t_max UTC':<20} | {'n_pts':>6}")
|
||||
print('-' * 68)
|
||||
for name, s in sources.items():
|
||||
print(f"{name:<12} | {fmt(s['t_min']):<20} | {fmt(s['t_max']):<20} | {s['n']:>6}")
|
||||
|
||||
# Overlap MCAP vs USV
|
||||
if 'MCAP AUV' in sources and 'USV PWM' in sources:
|
||||
mcap = sources['MCAP AUV']
|
||||
usv = sources['USV PWM']
|
||||
overlap_ms = min(mcap['t_max'], usv['t_max']) - max(mcap['t_min'], usv['t_min'])
|
||||
print(f"\nMCAP t_min: {fmt(mcap['t_min'])} UTC")
|
||||
print(f"USV t_min: {fmt(usv['t_min'])} UTC")
|
||||
diff_min = (mcap['t_min'] - usv['t_min']) / 60000
|
||||
print(f"t_min diff: {diff_min:+.1f} min (MCAP vs USV)")
|
||||
if overlap_ms > 60000:
|
||||
print(f"OK - overlap: {overlap_ms//1000} s")
|
||||
elif overlap_ms < 0:
|
||||
print(f"WARNING: no overlap! gap = {-overlap_ms//1000} s")
|
||||
else:
|
||||
print(f"SUSPECT: overlap <60s: {overlap_ms//1000} s")
|
||||
|
||||
if __name__ == '__main__':
|
||||
pass
|
||||
152
tools/extract_mcap_signals.py
Normal file
152
tools/extract_mcap_signals.py
Normal file
@@ -0,0 +1,152 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Extract AUV signals from MCAP files: depth, PWM, state."""
|
||||
import argparse, glob, json, math, os, sys
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--session-dir', required=True)
|
||||
parser.add_argument('--max-pts', type=int, default=5000)
|
||||
args = parser.parse_args()
|
||||
|
||||
session_name = os.path.basename(args.session_dir.rstrip('/'))
|
||||
pattern = os.path.join(args.session_dir, '*.mcap')
|
||||
mcap_files = sorted(glob.glob(pattern))
|
||||
if not mcap_files:
|
||||
print(f"No MCAP files in {args.session_dir}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
print(f"Found {len(mcap_files)} MCAP files")
|
||||
|
||||
try:
|
||||
from mcap.reader import make_reader
|
||||
from mcap_ros2.decoder import DecoderFactory
|
||||
except ImportError as e:
|
||||
print(f"Import error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
depth_raw = []
|
||||
pwm_raw = []
|
||||
state_raw = []
|
||||
signals = {}
|
||||
TOPICS = [
|
||||
'/mavros/imu/static_pressure',
|
||||
'/mavros/rc/out',
|
||||
'/mavros/state',
|
||||
'/mavros/imu/data',
|
||||
'/mavros/altitude',
|
||||
'/mavros/battery',
|
||||
'/mavros/distance_sensor/hrlv_ez4_pub',
|
||||
]
|
||||
|
||||
for mcap_file in mcap_files:
|
||||
try:
|
||||
with open(mcap_file, 'rb') as f:
|
||||
reader = make_reader(f, decoder_factories=[DecoderFactory()])
|
||||
for schema, channel, message, ros_msg in reader.iter_decoded_messages(topics=TOPICS):
|
||||
t_ms = message.publish_time // 1_000_000
|
||||
topic = channel.topic
|
||||
if topic == '/mavros/imu/static_pressure':
|
||||
try:
|
||||
p = float(ros_msg.fluid_pressure)
|
||||
depth_m = (p - 101325.0) / (1025.0 * 9.80665)
|
||||
depth_raw.append({'t': t_ms, 'v': round(depth_m, 4)})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/rc/out':
|
||||
try:
|
||||
ch = list(ros_msg.channels)
|
||||
pwm_raw.append({'t': t_ms, 'v': ch})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/state':
|
||||
try:
|
||||
state_raw.append({'t': t_ms, 'mode': str(ros_msg.mode), 'armed': bool(ros_msg.armed)})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/imu/data':
|
||||
try:
|
||||
q = ros_msg.orientation
|
||||
sinr = 2*(q.w*q.x + q.y*q.z)
|
||||
cosr = 1 - 2*(q.x*q.x + q.y*q.y)
|
||||
roll = math.degrees(math.atan2(sinr, cosr))
|
||||
sinp = 2*(q.w*q.y - q.z*q.x)
|
||||
pitch = math.degrees(math.asin(max(-1, min(1, sinp))))
|
||||
siny = 2*(q.w*q.z + q.x*q.y)
|
||||
cosy = 1 - 2*(q.y*q.y + q.z*q.z)
|
||||
yaw = math.degrees(math.atan2(siny, cosy))
|
||||
signals.setdefault('pitch', []).append({'t': t_ms, 'v': pitch})
|
||||
signals.setdefault('roll', []).append({'t': t_ms, 'v': roll})
|
||||
signals.setdefault('yaw', []).append({'t': t_ms, 'v': yaw})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/altitude':
|
||||
try:
|
||||
signals.setdefault('altitude', []).append(
|
||||
{'t': t_ms, 'v': ros_msg.relative})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/battery':
|
||||
try:
|
||||
signals.setdefault('battery_v', []).append(
|
||||
{'t': t_ms, 'v': ros_msg.voltage})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/distance_sensor/hrlv_ez4_pub':
|
||||
try:
|
||||
signals.setdefault('obstacle_dist', []).append(
|
||||
{'t': t_ms, 'v': ros_msg.range})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f" Skip {os.path.basename(mcap_file)}: {e}")
|
||||
|
||||
def sample(lst, max_pts):
|
||||
if len(lst) <= max_pts:
|
||||
return lst
|
||||
stride = len(lst) // max_pts
|
||||
sampled = lst[::stride]
|
||||
if sampled[-1] is not lst[-1]:
|
||||
sampled.append(lst[-1])
|
||||
return sampled
|
||||
|
||||
depth = sample(depth_raw, args.max_pts)
|
||||
pwm_samples = sample(pwm_raw, args.max_pts)
|
||||
state = state_raw # events, keep all
|
||||
|
||||
signals_flat = [pt for pts in signals.values() for pt in pts]
|
||||
all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw + signals_flat]
|
||||
t_min = min(all_t) if all_t else 0
|
||||
t_max = max(all_t) if all_t else 0
|
||||
|
||||
n_ch = max((len(s['v']) for s in pwm_raw), default=0)
|
||||
channels = list(range(n_ch))
|
||||
|
||||
from datetime import datetime, timezone
|
||||
fmt = lambda ms: datetime.fromtimestamp(ms/1000, tz=timezone.utc).isoformat()
|
||||
print(f"depth: {len(depth)} pts (raw {len(depth_raw)})")
|
||||
if depth:
|
||||
dvals = [p['v'] for p in depth]
|
||||
print(f" depth range: {min(dvals):.3f} .. {max(dvals):.3f} m")
|
||||
print(f"pwm_auv: {len(pwm_samples)} samples (raw {len(pwm_raw)}), {n_ch} channels")
|
||||
print(f"state: {len(state)} events")
|
||||
print(f"t_min: {fmt(t_min)}")
|
||||
print(f"t_max: {fmt(t_max)}")
|
||||
|
||||
out = {
|
||||
'session': session_name,
|
||||
't_min_utc_ms': t_min,
|
||||
't_max_utc_ms': t_max,
|
||||
'depth': depth,
|
||||
'pwm_auv': {'channels': channels, 'samples': pwm_samples},
|
||||
'state': state,
|
||||
**{k: sample(v, args.max_pts) for k, v in signals.items()},
|
||||
}
|
||||
|
||||
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
|
||||
os.makedirs(outdir, exist_ok=True)
|
||||
outpath = os.path.join(outdir, 'mcap_signals.json')
|
||||
with open(outpath, 'w') as f:
|
||||
json.dump(out, f)
|
||||
print(f"Written: {outpath}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
89
tools/extract_usv_pwm.py
Normal file
89
tools/extract_usv_pwm.py
Normal file
@@ -0,0 +1,89 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Extract USV PWM signals from navigation log CSVs."""
|
||||
import argparse, csv, glob, json, os, re, sys
|
||||
from datetime import datetime, timezone
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--nav-dir', required=True)
|
||||
args = parser.parse_args()
|
||||
|
||||
pattern = os.path.join(args.nav_dir, '*_navigation_log.csv')
|
||||
csv_files = sorted(glob.glob(pattern))
|
||||
if not csv_files:
|
||||
print(f"No navigation_log.csv in {args.nav_dir}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
print(f"Found {len(csv_files)} nav CSV files")
|
||||
|
||||
M_data = {}
|
||||
RC_data = {}
|
||||
|
||||
for csv_file in csv_files:
|
||||
print(f" Parsing {os.path.basename(csv_file)}")
|
||||
try:
|
||||
with open(csv_file, 'r') as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
ts_str = row.get('timestamp', '').strip()
|
||||
data = row.get('data', '').strip()
|
||||
val_str = row.get('value', '').strip()
|
||||
if not ts_str or not data or not val_str:
|
||||
continue
|
||||
is_M = re.match(r'^M\d+$', data)
|
||||
is_RC = re.match(r'^RC\d+$', data)
|
||||
if not is_M and not is_RC:
|
||||
continue
|
||||
try:
|
||||
val = float(val_str)
|
||||
except ValueError:
|
||||
continue
|
||||
try:
|
||||
dt = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S.%f')
|
||||
except ValueError:
|
||||
try:
|
||||
dt = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S')
|
||||
except ValueError:
|
||||
continue
|
||||
# CET -> UTC: subtract 3600s
|
||||
t_ms = int(dt.timestamp() * 1000) - 3600 * 1000
|
||||
pt = {'t': t_ms, 'v': val}
|
||||
if is_M:
|
||||
M_data.setdefault(data, []).append(pt)
|
||||
else:
|
||||
RC_data.setdefault(data, []).append(pt)
|
||||
except Exception as e:
|
||||
print(f" Error {csv_file}: {e}")
|
||||
|
||||
all_t = []
|
||||
for pts in list(M_data.values()) + list(RC_data.values()):
|
||||
all_t.extend(p['t'] for p in pts)
|
||||
t_min = min(all_t) if all_t else 0
|
||||
t_max = max(all_t) if all_t else 0
|
||||
|
||||
for k in sorted(M_data):
|
||||
print(f" {k}: {len(M_data[k])} pts")
|
||||
for k in sorted(RC_data):
|
||||
print(f" {k}: {len(RC_data[k])} pts")
|
||||
|
||||
fmt = lambda ms: datetime.fromtimestamp(ms/1000, tz=timezone.utc).isoformat()
|
||||
print(f"t_min UTC: {fmt(t_min)}")
|
||||
print(f"t_max UTC: {fmt(t_max)}")
|
||||
|
||||
out = {
|
||||
'tz_assumed': 'CET (UTC+1)',
|
||||
'tz_converted_to': 'UTC',
|
||||
't_min_utc_ms': t_min,
|
||||
't_max_utc_ms': t_max,
|
||||
'M': M_data,
|
||||
'RC': RC_data,
|
||||
}
|
||||
|
||||
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
|
||||
os.makedirs(outdir, exist_ok=True)
|
||||
outpath = os.path.join(outdir, 'usv_pwm.json')
|
||||
with open(outpath, 'w') as f:
|
||||
json.dump(out, f)
|
||||
print(f"Written: {outpath}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
319
tools/merge_nav_usbl.py
Normal file
319
tools/merge_nav_usbl.py
Normal file
@@ -0,0 +1,319 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
merge_nav_usbl.py — Merge USBL decoded data with USV navigation log
|
||||
|
||||
Inputs:
|
||||
--usbl : combined_usbl.csv (output of parse_kogger_usbl.py)
|
||||
--nav-dir: directory containing *_navigation_log.csv files
|
||||
--output : output CSV (default: combined_nav_usbl.csv)
|
||||
|
||||
Nav log format: timestamp,data,value (long format)
|
||||
data=Lat → latitude_deg
|
||||
data=Lon → longitude_deg
|
||||
data=Heading → heading_deg
|
||||
|
||||
Interpolation: for each USBL timestamp, find nearest nav point within 1s window.
|
||||
If multiple sessions, use session matching by timestamp overlap.
|
||||
|
||||
AUV position calculation:
|
||||
azimuth_deg from USBL is RELATIVE to USV heading (yaw) based on USBL hardware mounting.
|
||||
AUV bearing from North = (USV heading + azimuth_deg) mod 360
|
||||
Horizontal dist = dist_m * cos(elevation_deg * pi/180)
|
||||
|
||||
Note: if azimuth is already absolute (referenced to North), do NOT add heading.
|
||||
Check: usbl_yaw in payload should match nav Heading if relative azimuth.
|
||||
We use RELATIVE convention (add USV heading) — documented here.
|
||||
|
||||
Geodetic forward (haversine):
|
||||
Using flat-earth approximation valid for distances < 500m:
|
||||
dlat = horiz_dist * cos(bearing) / R_earth
|
||||
dlon = horiz_dist * sin(bearing) / (R_earth * cos(lat))
|
||||
|
||||
R_earth = 6371000 m
|
||||
"""
|
||||
|
||||
import csv
|
||||
import io
|
||||
import sys
|
||||
import math
|
||||
import os
|
||||
|
||||
R_EARTH = 6371000.0 # meters
|
||||
|
||||
|
||||
def parse_nav_log(nav_file):
|
||||
"""
|
||||
Parse navigation_log.csv into:
|
||||
- nav_points: sorted list of (timestamp_str, lat, lon) from Lat+Lon entries
|
||||
- heading_series: sorted list of (timestamp_str, heading_deg) from Heading entries
|
||||
|
||||
Lat/Lon and Heading have different timestamps so must be interpolated separately.
|
||||
Returns (nav_points, heading_series).
|
||||
"""
|
||||
lat_by_ts = {}
|
||||
lon_by_ts = {}
|
||||
heading_series_raw = []
|
||||
|
||||
with open(nav_file) as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
data = row.get('data', '')
|
||||
ts = row.get('timestamp', '')
|
||||
try:
|
||||
val_raw = row.get('value', '') or ''
|
||||
if not val_raw:
|
||||
continue
|
||||
val = float(val_raw)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
if data == 'Lat':
|
||||
lat_by_ts[ts] = val
|
||||
elif data == 'Lon':
|
||||
lon_by_ts[ts] = val
|
||||
elif data == 'Heading':
|
||||
heading_series_raw.append((ts, val))
|
||||
|
||||
# Build nav_points: timestamps where both Lat and Lon appear (same ts)
|
||||
nav_points = []
|
||||
for ts in sorted(set(lat_by_ts.keys()) & set(lon_by_ts.keys())):
|
||||
nav_points.append((ts, lat_by_ts[ts], lon_by_ts[ts]))
|
||||
|
||||
# heading_series sorted by timestamp
|
||||
heading_series = sorted(heading_series_raw, key=lambda x: x[0])
|
||||
|
||||
return nav_points, heading_series
|
||||
|
||||
|
||||
def ts_to_seconds(ts_str):
|
||||
"""Convert '2026-03-24 09:29:05.230392' to float seconds since epoch (approx)."""
|
||||
# Simple: parse date+time, compute offset
|
||||
try:
|
||||
date_part, time_part = ts_str.strip().split(' ', 1)
|
||||
y, mo, d = date_part.split('-')
|
||||
parts = time_part.split(':')
|
||||
h, m = int(parts[0]), int(parts[1])
|
||||
s_str = parts[2]
|
||||
s = float(s_str)
|
||||
# Days since fixed epoch (don't need absolute, just relative diffs)
|
||||
total = (int(y)*365 + int(mo)*30 + int(d)) * 86400 + h*3600 + m*60 + s
|
||||
return total
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
|
||||
def find_nearest(ts_sec, series_sec, series, max_gap=1.0):
|
||||
"""Find nearest entry in sorted series within max_gap seconds."""
|
||||
best_idx = -1
|
||||
best_dt = float('inf')
|
||||
for i, (s_sec, entry) in enumerate(zip(series_sec, series)):
|
||||
dt = abs(s_sec - ts_sec)
|
||||
if dt < best_dt:
|
||||
best_dt = dt
|
||||
best_idx = i
|
||||
elif s_sec > ts_sec + max_gap:
|
||||
break
|
||||
if best_idx >= 0 and best_dt <= max_gap:
|
||||
return series[best_idx], best_dt
|
||||
return None, None
|
||||
|
||||
|
||||
def geodetic_forward(lat_deg, lon_deg, bearing_deg, dist_m):
|
||||
"""
|
||||
Compute destination point given start lat/lon, bearing (deg from North), distance (m).
|
||||
Flat-earth approximation valid for dist < 500m.
|
||||
"""
|
||||
bearing_rad = math.radians(bearing_deg)
|
||||
lat_rad = math.radians(lat_deg)
|
||||
dlat = dist_m * math.cos(bearing_rad) / R_EARTH
|
||||
dlon = dist_m * math.sin(bearing_rad) / (R_EARTH * math.cos(lat_rad))
|
||||
auv_lat = lat_deg + math.degrees(dlat)
|
||||
auv_lon = lon_deg + math.degrees(dlon)
|
||||
return auv_lat, auv_lon
|
||||
|
||||
|
||||
def haversine_dist(lat1, lon1, lat2, lon2):
|
||||
"""Distance in meters between two lat/lon points."""
|
||||
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||||
dphi = math.radians(lat2 - lat1)
|
||||
dlam = math.radians(lon2 - lon1)
|
||||
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
|
||||
return 2 * R_EARTH * math.asin(math.sqrt(a))
|
||||
|
||||
|
||||
def find_nav_file_for_session(usbl_file, nav_dir):
|
||||
"""Match nav file by common timestamp prefix."""
|
||||
base = os.path.basename(usbl_file)
|
||||
# e.g. 2026-03-24_09-28-44_USV003_usbl.csv -> 2026-03-24_09-28-44_USV003
|
||||
prefix = base.replace('_usbl.csv', '')
|
||||
nav_candidate = os.path.join(nav_dir, prefix + '_navigation_log.csv')
|
||||
if os.path.exists(nav_candidate):
|
||||
return nav_candidate
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--usbl', required=True, help='combined_usbl.csv')
|
||||
parser.add_argument('--nav-dir', required=True, help='Directory with *_navigation_log.csv')
|
||||
parser.add_argument('--output', default='combined_nav_usbl.csv')
|
||||
parser.add_argument('--max-gap', type=float, default=1.0, help='Max timestamp gap in seconds')
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load USBL records
|
||||
usbl_records = []
|
||||
with open(args.usbl) as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
usbl_records.append(row)
|
||||
|
||||
print("USBL records loaded: %d" % len(usbl_records))
|
||||
|
||||
# Group by source_file
|
||||
from collections import defaultdict
|
||||
by_source = defaultdict(list)
|
||||
for rec in usbl_records:
|
||||
by_source[rec.get('source_file', '')].append(rec)
|
||||
|
||||
# Load nav files
|
||||
nav_data = {} # source_file -> (nav_points, nav_points_sec, heading_series, heading_sec)
|
||||
nav_dir = args.nav_dir
|
||||
|
||||
for source_file in by_source.keys():
|
||||
# Try to match nav file
|
||||
prefix = source_file.replace('_usbl.csv', '')
|
||||
nav_file = os.path.join(nav_dir, prefix + '_navigation_log.csv')
|
||||
if not os.path.exists(nav_file):
|
||||
# Try to find by scanning directory
|
||||
matched = None
|
||||
for fn in os.listdir(nav_dir):
|
||||
if prefix in fn and 'navigation_log' in fn:
|
||||
matched = os.path.join(nav_dir, fn)
|
||||
break
|
||||
if matched is None:
|
||||
print("WARNING: no nav file found for %s" % source_file)
|
||||
nav_data[source_file] = ([], [], [], [])
|
||||
continue
|
||||
nav_file = matched
|
||||
nav_points, heading_series = parse_nav_log(nav_file)
|
||||
nav_points_sec = [ts_to_seconds(pt[0]) for pt in nav_points]
|
||||
heading_sec = [ts_to_seconds(h[0]) for h in heading_series]
|
||||
nav_data[source_file] = (nav_points, nav_points_sec, heading_series, heading_sec)
|
||||
print("Nav loaded for %s: %d pos points, %d heading points" % (
|
||||
source_file, len(nav_points), len(heading_series)))
|
||||
|
||||
# Process and write output
|
||||
output_rows = []
|
||||
stats_match = 0
|
||||
stats_nomatch = 0
|
||||
|
||||
for source_file, records in by_source.items():
|
||||
nav_points, nav_points_sec, heading_series, heading_sec = nav_data.get(
|
||||
source_file, ([], [], [], []))
|
||||
|
||||
for rec in records:
|
||||
ts_str = rec.get('Timestamp', '')
|
||||
ts_sec = ts_to_seconds(ts_str)
|
||||
|
||||
dist_str = rec.get('Dist', '')
|
||||
azimuth_str = rec.get('Azimuth', '')
|
||||
elev_str = rec.get('Elev', '')
|
||||
snr_str = rec.get('SNR', '')
|
||||
|
||||
try:
|
||||
dist = float(dist_str) if dist_str else float('nan')
|
||||
azimuth = float(azimuth_str) if azimuth_str else float('nan')
|
||||
elev = float(elev_str) if elev_str else float('nan')
|
||||
snr = float(snr_str) if snr_str else float('nan')
|
||||
except ValueError:
|
||||
dist, azimuth, elev, snr = float('nan'), float('nan'), float('nan'), float('nan')
|
||||
|
||||
nav_pt, dt = find_nearest(ts_sec, nav_points_sec, nav_points, args.max_gap)
|
||||
hdg_pt, _ = find_nearest(ts_sec, heading_sec, heading_series, args.max_gap)
|
||||
|
||||
if nav_pt is None:
|
||||
stats_nomatch += 1
|
||||
lat_usv, lon_usv, heading_usv = float('nan'), float('nan'), float('nan')
|
||||
auv_lat, auv_lon = float('nan'), float('nan')
|
||||
else:
|
||||
stats_match += 1
|
||||
_, lat_usv, lon_usv = nav_pt
|
||||
heading_usv = hdg_pt[1] if hdg_pt is not None else float('nan')
|
||||
|
||||
# Calculate AUV absolute position
|
||||
# Azimuth from USBL is relative to USV heading (yaw convention)
|
||||
# AUV bearing from North = (USV Heading + azimuth_deg) mod 360
|
||||
if not (math.isnan(dist) or math.isnan(azimuth) or
|
||||
math.isnan(lat_usv) or math.isnan(heading_usv)):
|
||||
horiz_dist = dist * math.cos(math.radians(elev)) if not math.isnan(elev) else dist
|
||||
abs_bearing = (heading_usv + azimuth) % 360
|
||||
auv_lat, auv_lon = geodetic_forward(lat_usv, lon_usv, abs_bearing, horiz_dist)
|
||||
else:
|
||||
auv_lat, auv_lon = float('nan'), float('nan')
|
||||
|
||||
output_rows.append({
|
||||
'Timestamp': ts_str,
|
||||
'lat': '%.7f' % lat_usv if not math.isnan(lat_usv) else '',
|
||||
'lon': '%.7f' % lon_usv if not math.isnan(lon_usv) else '',
|
||||
'Heading': '%.2f' % heading_usv if not math.isnan(heading_usv) else '',
|
||||
'Dist': '%.4f' % dist if not math.isnan(dist) else '',
|
||||
'Azimuth': '%.4f' % azimuth if not math.isnan(azimuth) else '',
|
||||
'Elev': '%.4f' % elev if not math.isnan(elev) else '',
|
||||
'SNR': '%.4f' % snr if not math.isnan(snr) else '',
|
||||
'auv_lat': '%.7f' % auv_lat if not math.isnan(auv_lat) else '',
|
||||
'auv_lon': '%.7f' % auv_lon if not math.isnan(auv_lon) else '',
|
||||
'nav_dt_s': '%.3f' % dt if dt is not None else '',
|
||||
})
|
||||
|
||||
print("\n=== Merge stats ===")
|
||||
print("Matched: %d No nav match: %d" % (stats_match, stats_nomatch))
|
||||
|
||||
with open(args.output, 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(['Timestamp', 'lat', 'lon', 'Heading', 'Dist', 'Azimuth', 'Elev', 'SNR',
|
||||
'auv_lat', 'auv_lon', 'nav_dt_s'])
|
||||
for row in output_rows:
|
||||
writer.writerow([
|
||||
row['Timestamp'], row['lat'], row['lon'], row['Heading'],
|
||||
row['Dist'], row['Azimuth'], row['Elev'], row['SNR'],
|
||||
row['auv_lat'], row['auv_lon'], row['nav_dt_s']
|
||||
])
|
||||
|
||||
print("Output: %s (%d rows)" % (args.output, len(output_rows)))
|
||||
|
||||
# Sample output
|
||||
if output_rows:
|
||||
print("\n=== Sample (first 5 rows) ===")
|
||||
print("Timestamp,lat,lon,Heading,Dist,Azimuth,Elev,SNR,auv_lat,auv_lon")
|
||||
for row in output_rows[:5]:
|
||||
print("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s" % (
|
||||
row['Timestamp'], row['lat'], row['lon'], row['Heading'],
|
||||
row['Dist'], row['Azimuth'], row['Elev'], row['SNR'],
|
||||
row['auv_lat'], row['auv_lon']))
|
||||
|
||||
# AUV position stats
|
||||
auv_lats = [float(r['auv_lat']) for r in output_rows if r['auv_lat']]
|
||||
auv_lons = [float(r['auv_lon']) for r in output_rows if r['auv_lon']]
|
||||
usv_lats = [float(r['lat']) for r in output_rows if r['lat']]
|
||||
usv_lons = [float(r['lon']) for r in output_rows if r['lon']]
|
||||
|
||||
if auv_lats and usv_lats:
|
||||
print("\n=== Position stats ===")
|
||||
print("AUV lat range: %.6f - %.6f" % (min(auv_lats), max(auv_lats)))
|
||||
print("AUV lon range: %.6f - %.6f" % (min(auv_lons), max(auv_lons)))
|
||||
print("USV lat range: %.6f - %.6f" % (min(usv_lats), max(usv_lats)))
|
||||
|
||||
# Average USV-AUV distance
|
||||
dists = []
|
||||
for r in output_rows:
|
||||
if r['lat'] and r['auv_lat']:
|
||||
d = haversine_dist(float(r['lat']), float(r['lon']),
|
||||
float(r['auv_lat']), float(r['auv_lon']))
|
||||
dists.append(d)
|
||||
if dists:
|
||||
print("Avg USV-AUV dist: %.2f m (min=%.2f max=%.2f)" % (
|
||||
sum(dists)/len(dists), min(dists), max(dists)))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
242
tools/parse_kogger_usbl.py
Normal file
242
tools/parse_kogger_usbl.py
Normal file
@@ -0,0 +1,242 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
parse_kogger_usbl.py — Decode Kogger USBL raw CSV files (SBP protocol)
|
||||
|
||||
Protocol spec:
|
||||
Frame: BB 55 | ROUTE | MODE | ID | LENGTH | PAYLOAD[LENGTH] | CHKSUM1 | CHKSUM2
|
||||
Checksum: Fletcher-16 over (ROUTE + MODE + ID + LENGTH + PAYLOAD)
|
||||
|
||||
Frame ID 0x65 = ID_USBL_SOLUTION
|
||||
Struct (packed, little-endian):
|
||||
id(U1) role(U1) watermark(U2)
|
||||
timestamp_us(S8) ping_counter(U4) carrier_counter(S8)
|
||||
distance_m(F4) distance_unc(F4)
|
||||
azimuth_deg(F4) azimuth_unc(F4)
|
||||
elevation_deg(F4) elevation_unc(F4)
|
||||
snr(F4)
|
||||
x_m(F4) y_m(F4) latitude_deg(D8) longitude_deg(D8) depth_m(F4)
|
||||
usbl_yaw(F4) usbl_pitch(F4) usbl_roll(F4)
|
||||
usbl_latitude(D8) usbl_longitude(D8) last_iTOW(U4)
|
||||
beacon_n(F4) beacon_e(F4)
|
||||
[+ 32 bytes extra NaN padding observed in firmware v2]
|
||||
|
||||
Timestamp assignment: timestamp from the last RECEIVED packet before the frame sync byte.
|
||||
|
||||
Usage:
|
||||
python3 parse_kogger_usbl.py FILE1.csv [FILE2.csv ...] -o combined_usbl.csv
|
||||
"""
|
||||
|
||||
import ast
|
||||
import csv
|
||||
import io
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
import collections
|
||||
import math
|
||||
|
||||
SYNC = b"\xbb\x55"
|
||||
ID_USBL_SOLUTION = 0x65
|
||||
|
||||
USBL_FMT = '<BBHqIq' + 'f'*7 + 'ff' + 'dd' + 'f' + 'fff' + 'dd' + 'I' + 'ff'
|
||||
USBL_FMT_SIZE = struct.calcsize(USBL_FMT)
|
||||
|
||||
|
||||
def parse_bytes_field(field):
|
||||
"""Parse b'...' Python literal from CSV field."""
|
||||
field = field.strip()
|
||||
if not (field.startswith("b'") or field.startswith('b"')):
|
||||
return b""
|
||||
try:
|
||||
result = ast.literal_eval(field)
|
||||
if isinstance(result, str):
|
||||
result = result.encode('latin-1')
|
||||
return result
|
||||
except Exception:
|
||||
return b""
|
||||
|
||||
|
||||
def fletcher16(data):
|
||||
c1, c2 = 0, 0
|
||||
for byte in data:
|
||||
c1 = (c1 + byte) & 0xFF
|
||||
c2 = (c2 + c1) & 0xFF
|
||||
return c1, c2
|
||||
|
||||
|
||||
def parse_usbl_csv(csv_file):
|
||||
"""
|
||||
Parse a raw USBL CSV file, reconstruct byte stream, decode SBP frames.
|
||||
Returns list of dicts with decoded USBL_SOLUTION data.
|
||||
"""
|
||||
with open(csv_file) as f:
|
||||
content = f.read()
|
||||
|
||||
buf = b""
|
||||
ts_offsets = [] # (byte_offset_in_buf, timestamp_str)
|
||||
|
||||
reader = csv.reader(io.StringIO(content))
|
||||
for row in reader:
|
||||
if len(row) < 3:
|
||||
continue
|
||||
ts, direction, raw = row[0], row[1], row[2]
|
||||
if direction != "RECEIVED":
|
||||
continue
|
||||
b = parse_bytes_field(raw)
|
||||
if b:
|
||||
off = len(buf)
|
||||
buf += b
|
||||
ts_offsets.append((off, ts))
|
||||
|
||||
# Find all BB55 sync positions
|
||||
positions = []
|
||||
i = 0
|
||||
while True:
|
||||
pos = buf.find(SYNC, i)
|
||||
if pos == -1:
|
||||
break
|
||||
positions.append(pos)
|
||||
i = pos + 1
|
||||
|
||||
frame_id_counter = collections.Counter()
|
||||
usbl_records = []
|
||||
valid_total = 0
|
||||
|
||||
for pos in positions:
|
||||
if pos + 6 > len(buf):
|
||||
continue
|
||||
route = buf[pos+2]
|
||||
mode = buf[pos+3]
|
||||
frame_id = buf[pos+4]
|
||||
length = buf[pos+5]
|
||||
if pos + 6 + length + 2 > len(buf):
|
||||
continue
|
||||
payload = buf[pos+6:pos+6+length]
|
||||
chk1_a = buf[pos+6+length]
|
||||
chk2_a = buf[pos+6+length+1]
|
||||
|
||||
c1, c2 = fletcher16(buf[pos+2:pos+6+length])
|
||||
if c1 != chk1_a or c2 != chk2_a:
|
||||
continue
|
||||
|
||||
valid_total += 1
|
||||
frame_id_counter[frame_id] += 1
|
||||
|
||||
if frame_id != ID_USBL_SOLUTION:
|
||||
continue
|
||||
|
||||
# Get timestamp: last ts_offset entry before this position
|
||||
ts = ts_offsets[0][1] if ts_offsets else ""
|
||||
for off, t in ts_offsets:
|
||||
if off <= pos:
|
||||
ts = t
|
||||
else:
|
||||
break
|
||||
|
||||
if len(payload) < USBL_FMT_SIZE:
|
||||
continue
|
||||
|
||||
fields = struct.unpack_from(USBL_FMT, payload)
|
||||
rec = {
|
||||
'Timestamp': ts,
|
||||
'usbl_id': fields[0],
|
||||
'usbl_role': fields[1],
|
||||
'usbl_timestamp_us': fields[3],
|
||||
'ping_counter': fields[4],
|
||||
'Dist': fields[6],
|
||||
'dist_unc': fields[7],
|
||||
'Azimuth': fields[8],
|
||||
'azimuth_unc': fields[9],
|
||||
'Elev': fields[10],
|
||||
'elev_unc': fields[11],
|
||||
'SNR': fields[12],
|
||||
'x_m': fields[13],
|
||||
'y_m': fields[14],
|
||||
'usbl_lat_computed': fields[15],
|
||||
'usbl_lon_computed': fields[16],
|
||||
'depth_m': fields[17],
|
||||
'usbl_yaw': fields[18],
|
||||
'usbl_pitch': fields[19],
|
||||
'usbl_roll': fields[20],
|
||||
'source_file': os.path.basename(csv_file),
|
||||
}
|
||||
usbl_records.append(rec)
|
||||
|
||||
return usbl_records, frame_id_counter, valid_total, len(positions)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='Decode Kogger USBL raw CSV files')
|
||||
parser.add_argument('files', nargs='+', help='Input *_usbl.csv files')
|
||||
parser.add_argument('-o', '--output', default='combined_usbl.csv', help='Output CSV')
|
||||
args = parser.parse_args()
|
||||
|
||||
all_records = []
|
||||
total_sync = 0
|
||||
total_valid = 0
|
||||
global_id_counter = collections.Counter()
|
||||
|
||||
for csv_file in args.files:
|
||||
print("Processing: %s" % csv_file)
|
||||
records, id_counter, valid, n_sync = parse_usbl_csv(csv_file)
|
||||
all_records.extend(records)
|
||||
total_sync += n_sync
|
||||
total_valid += valid
|
||||
global_id_counter.update(id_counter)
|
||||
print(" Sync markers: %d Valid frames: %d USBL records: %d" % (n_sync, valid, len(records)))
|
||||
|
||||
print("\n=== Summary ===")
|
||||
print("Total sync markers (BB55): %d" % total_sync)
|
||||
print("Total valid frames: %d" % total_valid)
|
||||
print("Total USBL_SOLUTION records: %d" % len(all_records))
|
||||
print("\nFrame ID histogram:")
|
||||
for fid, cnt in sorted(global_id_counter.items(), key=lambda x: -x[1]):
|
||||
name = "USBL_SOLUTION" if fid == 0x65 else ("USBL_CONTROL" if fid == 0x68 else "UNKNOWN")
|
||||
print(" ID=0x%02x(%3d) %-15s : %d frames" % (fid, fid, name, cnt))
|
||||
|
||||
if all_records:
|
||||
dists = [r['Dist'] for r in all_records if not math.isnan(r['Dist'])]
|
||||
azs = [r['Azimuth'] for r in all_records if not math.isnan(r['Azimuth'])]
|
||||
snrs = [r['SNR'] for r in all_records if not math.isnan(r['SNR'])]
|
||||
if dists:
|
||||
dists_sorted = sorted(dists)
|
||||
n = len(dists_sorted)
|
||||
median = dists_sorted[n//2]
|
||||
print("\nDist (m): min=%.2f median=%.2f max=%.2f" % (min(dists), median, max(dists)))
|
||||
if azs:
|
||||
print("Azimuth : min=%.2f max=%.2f" % (min(azs), max(azs)))
|
||||
if snrs:
|
||||
print("SNR : min=%.2f max=%.2f" % (min(snrs), max(snrs)))
|
||||
|
||||
# Write output CSV
|
||||
with open(args.output, 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(['Timestamp', 'Dist', 'Azimuth', 'Elev', 'SNR', 'FrameID',
|
||||
'x_m', 'y_m', 'depth_m', 'dist_unc', 'azimuth_unc', 'elev_unc',
|
||||
'usbl_yaw', 'usbl_pitch', 'usbl_roll', 'source_file'])
|
||||
for r in all_records:
|
||||
writer.writerow([
|
||||
r['Timestamp'],
|
||||
'' if math.isnan(r['Dist']) else '%.4f' % r['Dist'],
|
||||
'' if math.isnan(r['Azimuth']) else '%.4f' % r['Azimuth'],
|
||||
'' if math.isnan(r['Elev']) else '%.4f' % r['Elev'],
|
||||
'' if math.isnan(r['SNR']) else '%.4f' % r['SNR'],
|
||||
'0x65',
|
||||
'' if math.isnan(r['x_m']) else '%.4f' % r['x_m'],
|
||||
'' if math.isnan(r['y_m']) else '%.4f' % r['y_m'],
|
||||
'' if math.isnan(r['depth_m']) else '%.4f' % r['depth_m'],
|
||||
'%.4f' % r['dist_unc'],
|
||||
'%.4f' % r['azimuth_unc'],
|
||||
'%.4f' % r['elev_unc'],
|
||||
'%.4f' % r['usbl_yaw'],
|
||||
'%.4f' % r['usbl_pitch'],
|
||||
'%.4f' % r['usbl_roll'],
|
||||
r['source_file'],
|
||||
])
|
||||
|
||||
print("\nOutput: %s (%d records)" % (args.output, len(all_records)))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -1,24 +1,39 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Parse USV long-format CSV → track.geojson + points.json"""
|
||||
"""Parse USV long-format CSV → track.geojson + points.json + manifest.json
|
||||
v2: multi-session support via --input-dir, retro-compat with --input (single file)
|
||||
"""
|
||||
import argparse
|
||||
import csv
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
|
||||
MAX_SLIDER_POINTS = 5000
|
||||
MAX_SLIDER_POINTS = 10000
|
||||
|
||||
|
||||
def parse_args():
|
||||
p = argparse.ArgumentParser(description="Parse USV nav CSV")
|
||||
p.add_argument("--input", required=True, help="CSV navigation log")
|
||||
p = argparse.ArgumentParser(description="Parse USV nav CSV v2")
|
||||
g = p.add_mutually_exclusive_group(required=True)
|
||||
g.add_argument("--input", help="Single CSV navigation log (v1 compat)")
|
||||
g.add_argument("--input-dir", help="Directory: glob *navigation_log*.csv")
|
||||
p.add_argument("--output", required=True, help="Output directory")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def find_csvs(input_dir):
|
||||
pattern = os.path.join(input_dir, "*navigation_log*.csv")
|
||||
files = sorted(glob.glob(pattern))
|
||||
if not files:
|
||||
print(f"No navigation_log CSVs found in {input_dir}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return files
|
||||
|
||||
|
||||
def load_csv(path):
|
||||
"""Load long-format CSV into {timestamp: {field: value}}"""
|
||||
"""Load long-format CSV → {timestamp: {field: value}}"""
|
||||
rows_by_ts = defaultdict(dict)
|
||||
with open(path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f)
|
||||
@@ -41,13 +56,26 @@ def get_float(d, *keys):
|
||||
return None
|
||||
|
||||
|
||||
def build_points(rows_by_ts):
|
||||
"""Build sorted list of {t, lat, lon, heading} where lat/lon valid."""
|
||||
# We need to track last known lat/lon/heading per timestamp cluster.
|
||||
# Strategy: walk timestamps in order, emit a point each time we see a Lat or Lon update.
|
||||
# Accumulate state across timestamps.
|
||||
timestamps = sorted(rows_by_ts.keys())
|
||||
def ts_to_ms(ts_str):
|
||||
"""Convert ISO-like timestamp string to epoch ms (UTC)."""
|
||||
# Try formats: '2026-03-24 09:04:07.123456' or '2026-03-24T09:04:07.123456'
|
||||
for fmt in (
|
||||
"%Y-%m-%dT%H:%M:%S.%f",
|
||||
"%Y-%m-%dT%H:%M:%S",
|
||||
"%Y-%m-%d %H:%M:%S.%f",
|
||||
"%Y-%m-%d %H:%M:%S",
|
||||
):
|
||||
try:
|
||||
dt = datetime.strptime(ts_str, fmt).replace(tzinfo=timezone.utc)
|
||||
return int(dt.timestamp() * 1000)
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def build_points(rows_by_ts, source_name):
|
||||
"""Build sorted list of {t, t_ms, lat, lon, heading, source}."""
|
||||
timestamps = sorted(rows_by_ts.keys())
|
||||
state = {}
|
||||
points = []
|
||||
|
||||
@@ -55,7 +83,6 @@ def build_points(rows_by_ts):
|
||||
updates = rows_by_ts[ts]
|
||||
state.update(updates)
|
||||
|
||||
# Only emit point if we have both Lat and Lon from this or earlier ts
|
||||
lat = get_float(state, "Lat", "RAW_Lat")
|
||||
lon = get_float(state, "Lon", "RAW_Lon")
|
||||
heading = get_float(state, "Heading", "Yaw")
|
||||
@@ -64,7 +91,6 @@ def build_points(rows_by_ts):
|
||||
continue
|
||||
if lat == 0.0 and lon == 0.0:
|
||||
continue
|
||||
# GPS_RAW_INT fallback (1e-7 degrees)
|
||||
if abs(lat) < 1 and abs(lon) < 1:
|
||||
raw_lat = get_float(state, "GPS_RAW_INT_lat")
|
||||
raw_lon = get_float(state, "GPS_RAW_INT_lon")
|
||||
@@ -74,87 +100,200 @@ def build_points(rows_by_ts):
|
||||
else:
|
||||
continue
|
||||
|
||||
# Only emit if Lat or Lon just updated (reduce duplicate consecutive points)
|
||||
if "Lat" in updates or "Lon" in updates or "RAW_Lat" in updates or "RAW_Lon" in updates:
|
||||
t_ms = ts_to_ms(ts)
|
||||
points.append({
|
||||
"t": ts,
|
||||
"t_ms": t_ms,
|
||||
"lat": round(lat, 8),
|
||||
"lon": round(lon, 8),
|
||||
"heading": round(heading, 2) if heading is not None else None,
|
||||
"source": source_name,
|
||||
})
|
||||
|
||||
return points
|
||||
|
||||
|
||||
def sample_points(points, max_n):
|
||||
if len(points) <= max_n:
|
||||
def sample_points_session(points, max_total, n_sessions):
|
||||
"""Sample per session, always keeping first+last point of each session."""
|
||||
if not points:
|
||||
return points
|
||||
step = len(points) / max_n
|
||||
return [points[int(i * step)] for i in range(max_n)]
|
||||
quota = max(10, max_total // max(n_sessions, 1))
|
||||
if len(points) <= quota:
|
||||
return points
|
||||
|
||||
step = (len(points) - 2) / max(quota - 2, 1)
|
||||
sampled = [points[0]]
|
||||
for i in range(1, quota - 1):
|
||||
sampled.append(points[min(int(1 + i * step), len(points) - 2)])
|
||||
sampled.append(points[-1])
|
||||
return sampled
|
||||
|
||||
|
||||
def write_geojson(points, path):
|
||||
coords = [[p["lon"], p["lat"]] for p in points]
|
||||
geojson = {
|
||||
"type": "FeatureCollection",
|
||||
"features": [{
|
||||
def session_bbox(points):
|
||||
lats = [p["lat"] for p in points]
|
||||
lons = [p["lon"] for p in points]
|
||||
return [min(lons), min(lats), max(lons), max(lats)]
|
||||
|
||||
|
||||
def write_outputs(all_sessions, output_dir):
|
||||
"""Write track.geojson, points.json, manifest.json."""
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Colors for multi-track
|
||||
COLORS = ["#00b4d8", "#e94560", "#06d6a0", "#ffd166", "#a855f7", "#f97316"]
|
||||
|
||||
# ── track.geojson (MultiLineString per session) ──
|
||||
features = []
|
||||
for i, sess in enumerate(all_sessions):
|
||||
coords = [[p["lon"], p["lat"]] for p in sess["points"]]
|
||||
features.append({
|
||||
"type": "Feature",
|
||||
"geometry": {"type": "LineString", "coordinates": coords},
|
||||
"properties": {
|
||||
"start": points[0]["t"] if points else None,
|
||||
"end": points[-1]["t"] if points else None,
|
||||
"n_points": len(points),
|
||||
"source_file": sess["source_file"],
|
||||
"source_name": sess["source_name"],
|
||||
"start_iso": sess["t_start"],
|
||||
"end_iso": sess["t_end"],
|
||||
"n_points": len(coords),
|
||||
"color": COLORS[i % len(COLORS)],
|
||||
"session_index": i,
|
||||
}
|
||||
}]
|
||||
}
|
||||
with open(path, "w") as f:
|
||||
})
|
||||
|
||||
geojson = {"type": "FeatureCollection", "features": features}
|
||||
geo_path = os.path.join(output_dir, "track.geojson")
|
||||
with open(geo_path, "w") as f:
|
||||
json.dump(geojson, f)
|
||||
print(f" track.geojson: {len(coords)} coords → {path}")
|
||||
print(f" track.geojson: {len(features)} sessions → {geo_path}")
|
||||
|
||||
# ── points.json (all sampled, sorted by t_ms) ──
|
||||
all_points = []
|
||||
n_sessions = len(all_sessions)
|
||||
for sess in all_sessions:
|
||||
sampled = sample_points_session(sess["points"], MAX_SLIDER_POINTS, n_sessions)
|
||||
all_points.extend(sampled)
|
||||
|
||||
# Sort by t_ms (sessions may overlap in time)
|
||||
all_points.sort(key=lambda p: (p["t_ms"] or 0))
|
||||
|
||||
pts_path = os.path.join(output_dir, "points.json")
|
||||
with open(pts_path, "w") as f:
|
||||
json.dump(all_points, f)
|
||||
print(f" points.json: {len(all_points)} points (sampled) → {pts_path}")
|
||||
|
||||
# ── manifest.json ──
|
||||
all_lats = [p["lat"] for s in all_sessions for p in s["points"]]
|
||||
all_lons = [p["lon"] for s in all_sessions for p in s["points"]]
|
||||
global_bbox = [min(all_lons), min(all_lats), max(all_lons), max(all_lats)]
|
||||
|
||||
all_t_ms = [p["t_ms"] for s in all_sessions for p in s["points"] if p["t_ms"]]
|
||||
t_min_ms = min(all_t_ms) if all_t_ms else None
|
||||
t_max_ms = max(all_t_ms) if all_t_ms else None
|
||||
|
||||
sessions_meta = []
|
||||
for sess in all_sessions:
|
||||
sessions_meta.append({
|
||||
"file": sess["source_file"],
|
||||
"source_name": sess["source_name"],
|
||||
"n_points": sess["n_points_raw"],
|
||||
"t_start": sess["t_start"],
|
||||
"t_end": sess["t_end"],
|
||||
"t_start_ms": sess["t_start_ms"],
|
||||
"t_end_ms": sess["t_end_ms"],
|
||||
"bbox": sess["bbox"],
|
||||
})
|
||||
|
||||
manifest = {
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"n_sessions": len(all_sessions),
|
||||
"sessions": sessions_meta,
|
||||
"global_bbox": global_bbox,
|
||||
"t_min": all_sessions[0]["t_start"] if all_sessions else None,
|
||||
"t_max": all_sessions[-1]["t_end"] if all_sessions else None,
|
||||
"t_min_ms": t_min_ms,
|
||||
"t_max_ms": t_max_ms,
|
||||
"n_points_total_raw": sum(s["n_points_raw"] for s in all_sessions),
|
||||
"n_points_sampled": len(all_points),
|
||||
}
|
||||
|
||||
mf_path = os.path.join(output_dir, "manifest.json")
|
||||
with open(mf_path, "w") as f:
|
||||
json.dump(manifest, f, indent=2)
|
||||
print(f" manifest.json → {mf_path}")
|
||||
|
||||
return manifest
|
||||
|
||||
|
||||
def write_points_json(points, path):
|
||||
with open(path, "w") as f:
|
||||
json.dump(points, f)
|
||||
print(f" points.json: {len(points)} points → {path}")
|
||||
def print_global_stats(manifest, all_sessions):
|
||||
print(f"\n=== Stats globales ===")
|
||||
print(f" Sessions: {manifest['n_sessions']}")
|
||||
print(f" Points bruts: {manifest['n_points_total_raw']}")
|
||||
print(f" Points sampled: {manifest['n_points_sampled']}")
|
||||
print(f" t_min: {manifest['t_min']}")
|
||||
print(f" t_max: {manifest['t_max']}")
|
||||
bb = manifest["global_bbox"]
|
||||
print(f" Bbox: lon [{bb[0]:.5f}, {bb[2]:.5f}] lat [{bb[1]:.5f}, {bb[3]:.5f}]")
|
||||
if manifest["t_min_ms"] and manifest["t_max_ms"]:
|
||||
dur_s = (manifest["t_max_ms"] - manifest["t_min_ms"]) / 1000
|
||||
h, rem = divmod(int(dur_s), 3600)
|
||||
m, s = divmod(rem, 60)
|
||||
print(f" Durée totale: {h}h{m:02d}m{s:02d}s")
|
||||
for i, sess in enumerate(all_sessions):
|
||||
print(f" Session {i+1}: {sess['source_name']} {sess['n_points_raw']} pts {sess['t_start']} → {sess['t_end']}")
|
||||
|
||||
|
||||
def print_stats(points):
|
||||
def process_file(path):
|
||||
source_name = os.path.basename(path)
|
||||
print(f"\nChargement {source_name} ...")
|
||||
rows = load_csv(path)
|
||||
print(f" {len(rows)} timestamps uniques")
|
||||
points = build_points(rows, source_name)
|
||||
if not points:
|
||||
print("No valid points found!")
|
||||
return
|
||||
print(f" WARNING: aucun point GPS valide dans {source_name}")
|
||||
return None
|
||||
|
||||
# Filter points without t_ms
|
||||
points = [p for p in points if p["t_ms"] is not None]
|
||||
lats = [p["lat"] for p in points]
|
||||
lons = [p["lon"] for p in points]
|
||||
print(f"\n=== Stats ===")
|
||||
print(f" N points (full): {len(points)}")
|
||||
print(f" First ts: {points[0]['t']}")
|
||||
print(f" Last ts: {points[-1]['t']}")
|
||||
print(f" Bbox lat: {min(lats):.6f} → {max(lats):.6f}")
|
||||
print(f" Bbox lon: {min(lons):.6f} → {max(lons):.6f}")
|
||||
headings = [p["heading"] for p in points if p["heading"] is not None]
|
||||
print(f" Heading data: {'yes' if headings else 'no'} ({len(headings)} values)")
|
||||
return {
|
||||
"source_file": path,
|
||||
"source_name": source_name,
|
||||
"points": points,
|
||||
"n_points_raw": len(points),
|
||||
"t_start": points[0]["t"],
|
||||
"t_end": points[-1]["t"],
|
||||
"t_start_ms": points[0]["t_ms"],
|
||||
"t_end_ms": points[-1]["t_ms"],
|
||||
"bbox": [min(lons), min(lats), max(lons), max(lats)],
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
os.makedirs(args.output, exist_ok=True)
|
||||
|
||||
print(f"Loading {args.input} ...")
|
||||
rows = load_csv(args.input)
|
||||
print(f" {len(rows)} unique timestamps")
|
||||
if args.input:
|
||||
csv_files = [args.input]
|
||||
else:
|
||||
csv_files = find_csvs(args.input_dir)
|
||||
|
||||
points = build_points(rows)
|
||||
print_stats(points)
|
||||
print(f"Fichiers trouvés: {len(csv_files)}")
|
||||
for f in csv_files:
|
||||
print(f" {os.path.basename(f)}")
|
||||
|
||||
if not points:
|
||||
all_sessions = []
|
||||
for path in csv_files:
|
||||
sess = process_file(path)
|
||||
if sess:
|
||||
all_sessions.append(sess)
|
||||
|
||||
if not all_sessions:
|
||||
print("Aucune session valide.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
write_geojson(points, os.path.join(args.output, "track.geojson"))
|
||||
|
||||
sampled = sample_points(points, MAX_SLIDER_POINTS)
|
||||
if len(sampled) < len(points):
|
||||
print(f" Sampled {len(sampled)} points for slider (from {len(points)})")
|
||||
write_points_json(sampled, os.path.join(args.output, "points.json"))
|
||||
|
||||
manifest = write_outputs(all_sessions, args.output)
|
||||
print_global_stats(manifest, all_sessions)
|
||||
print("\nDone.")
|
||||
|
||||
|
||||
|
||||
125
tools/usbl_to_json.py
Normal file
125
tools/usbl_to_json.py
Normal file
@@ -0,0 +1,125 @@
|
||||
#!/usr/bin/env python3
|
||||
"""usbl_to_json.py - Convert combined_nav_usbl.csv to usbl.json + auv_track.geojson"""
|
||||
import csv, json, math, argparse, statistics
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
DEFAULT_INPUT = ROOT / "output" / "combined_nav_usbl.csv"
|
||||
OUTPUT_DIR = ROOT / "output"
|
||||
|
||||
def parse_ts(s):
|
||||
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
|
||||
try:
|
||||
dt = datetime.strptime(s.strip(), fmt).replace(tzinfo=timezone.utc)
|
||||
return int(dt.timestamp() * 1000)
|
||||
except ValueError:
|
||||
pass
|
||||
return None
|
||||
|
||||
def haversine_m(lat1, lon1, lat2, lon2):
|
||||
R = 6371000.0
|
||||
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||||
dphi = math.radians(lat2 - lat1)
|
||||
dlam = math.radians(lon2 - lon1)
|
||||
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
|
||||
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--input", default=str(DEFAULT_INPUT))
|
||||
ap.add_argument("--max-points", type=int, default=10000)
|
||||
args = ap.parse_args()
|
||||
|
||||
rows = []
|
||||
with open(args.input, newline="") as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
try:
|
||||
dist = float(row["Dist"])
|
||||
auv_lat = float(row["auv_lat"])
|
||||
auv_lon = float(row["auv_lon"])
|
||||
except (ValueError, KeyError):
|
||||
continue
|
||||
if dist <= 0 or auv_lat == 0 or auv_lon == 0:
|
||||
continue
|
||||
t_ms = parse_ts(row["Timestamp"])
|
||||
if t_ms is None:
|
||||
continue
|
||||
rows.append({
|
||||
"t": row["Timestamp"].strip(),
|
||||
"t_ms": t_ms,
|
||||
"usv_lat": float(row["lat"]),
|
||||
"usv_lon": float(row["lon"]),
|
||||
"heading": float(row["Heading"]),
|
||||
"dist": dist,
|
||||
"az": float(row["Azimuth"]),
|
||||
"elev": float(row["Elev"]),
|
||||
"snr": float(row["SNR"]),
|
||||
"auv_lat": auv_lat,
|
||||
"auv_lon": auv_lon,
|
||||
})
|
||||
|
||||
rows.sort(key=lambda r: r["t_ms"])
|
||||
n_raw = len(rows)
|
||||
|
||||
# Sample if > max-points (preserve begin/end)
|
||||
if n_raw > args.max_points:
|
||||
step = n_raw / args.max_points
|
||||
indices = set()
|
||||
indices.add(0)
|
||||
indices.add(n_raw - 1)
|
||||
for i in range(1, args.max_points - 1):
|
||||
indices.add(int(i * step))
|
||||
rows = [rows[i] for i in sorted(indices)]
|
||||
|
||||
n = len(rows)
|
||||
dists = [r["dist"] for r in rows]
|
||||
snrs = [r["snr"] for r in rows]
|
||||
auv_lats = [r["auv_lat"] for r in rows]
|
||||
auv_lons = [r["auv_lon"] for r in rows]
|
||||
|
||||
auv_bbox = [min(auv_lons), min(auv_lats), max(auv_lons), max(auv_lats)]
|
||||
|
||||
out = {
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"n_points": n,
|
||||
"n_raw": n_raw,
|
||||
"auv_bbox": auv_bbox,
|
||||
"stats": {
|
||||
"dist_min": round(min(dists), 3),
|
||||
"dist_max": round(max(dists), 3),
|
||||
"dist_median": round(statistics.median(dists), 3),
|
||||
"snr_min": round(min(snrs), 4),
|
||||
"snr_max": round(max(snrs), 4),
|
||||
},
|
||||
"points": rows,
|
||||
}
|
||||
|
||||
usbl_out = OUTPUT_DIR / "usbl.json"
|
||||
with open(usbl_out, "w") as f:
|
||||
json.dump(out, f, separators=(",", ":"))
|
||||
print(f"usbl.json: {n} points (raw={n_raw}) -> {usbl_out}")
|
||||
|
||||
# AUV track GeoJSON
|
||||
coords = [[r["auv_lon"], r["auv_lat"]] for r in rows]
|
||||
geojson = {
|
||||
"type": "FeatureCollection",
|
||||
"features": [{
|
||||
"type": "Feature",
|
||||
"geometry": {"type": "LineString", "coordinates": coords},
|
||||
"properties": {"name": "AUV track (USBL projection)", "color": "#ff8800"},
|
||||
}]
|
||||
}
|
||||
track_out = OUTPUT_DIR / "auv_track.geojson"
|
||||
with open(track_out, "w") as f:
|
||||
json.dump(geojson, f, separators=(",", ":"))
|
||||
print(f"auv_track.geojson: {len(coords)} coords -> {track_out}")
|
||||
|
||||
# Sanity check: first point haversine vs USBL dist
|
||||
r0 = rows[0]
|
||||
hav = haversine_m(r0["usv_lat"], r0["usv_lon"], r0["auv_lat"], r0["auv_lon"])
|
||||
print(f"Sanity [0]: USV=({r0['usv_lat']:.6f},{r0['usv_lon']:.6f}) AUV=({r0['auv_lat']:.6f},{r0['auv_lon']:.6f}) hav={hav:.2f}m USBL_dist={r0['dist']:.2f}m diff={abs(hav-r0['dist']):.3f}m")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
1
vendor/Kogger-Protocol
vendored
Submodule
1
vendor/Kogger-Protocol
vendored
Submodule
Submodule vendor/Kogger-Protocol added at d62576fee5
1205
viewer/index.html
1205
viewer/index.html
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user