Compare commits

..

18 Commits

Author SHA1 Message Date
70608de221 fix(pipeline): docker-compose env GDRIVE_REMOTE=cosma + rclone.conf mount + NAS sorties bind 2026-04-27 21:33:56 +00:00
Flagabat
0a0dac7fda feat: viewer — sortie selector + USV/AUV panels HTML + CSS
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 23:28:05 +02:00
Flagabat
62091a09b7 feat: pipeline-runner — FastAPI endpoints + SSE 2026-04-27 23:24:44 +02:00
Flagabat
3e1da53cc7 feat: pipeline-runner — runner rclone + orchestration 2026-04-27 23:23:13 +02:00
Flagabat
24f9394c75 feat: pipeline-runner — processor LTTB + signal extraction 2026-04-27 23:22:41 +02:00
Flagabat
c9dca1d071 feat: pipeline-runner — config 2026-04-27 23:21:44 +02:00
Flagabat
682050ef14 feat: pipeline-runner — scaffold docker + deps 2026-04-27 23:18:21 +02:00
Flagabat
4aec9d6295 fix: extract_mcap_signals — flatten signals output, unify time keys
- Merge new signals (pitch/roll/yaw/altitude/battery_v/obstacle_dist) at
  top level of output dict via **unpacking, not nested under 'signals' key
- Replace t_ms -> t in all new signal appends to match depth/state format
- Fix all_t computation to use unified 't' key across old and new signals

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 23:17:02 +02:00
Flagabat
af2bb6581f feat: extract_mcap_signals — pitch/roll/yaw, altitude, obstacle, battery
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 23:15:09 +02:00
Flagabat
bd3a2359d9 docs: plan implémentation pipeline GDrive → replay USV/AUV (12 tâches) 2026-04-27 22:29:41 +02:00
Flagabat
02e357b874 docs: spec pipeline GDrive → replay USV/AUV dans viewer 2026-04-27 22:23:18 +02:00
Flag
34e709b7c8 feat(viewer): bloc pipeline Mermaid + toggle overlay #btn-pipeline 2026-04-27 14:18:08 +00:00
Flag
07df61cbc4 feat(viewer): v6 - date-picker fonctionnel + Plotly charts depuis API 8766
- Date picker onChange charge toutes sessions du jour
- Mini-graphs Plotly: depth AUV, motors AUV, motors USV, USBL distance
- Slider 24h + cursor line Plotly synchronisé
- Map v5 intacte (Leaflet USV arrow + AUV USBL + panel)
- API: /api/missions -> /dives -> /sessions -> track/series/usbl_track
2026-04-27 14:14:57 +00:00
Poulpe
8a5ed6174c feat(viewer): v5 grid 2x2 + trail length + headless screenshot 2026-04-25 22:29:39 +00:00
Poulpe
103bf1cedd feat(viewer): v4 time-series graphs (depth, PWM USV/AUV, status) 2026-04-25 22:15:43 +00:00
Poulpe
3198164aff feat(viewer): v3 AUV track + USBL vector overlay 2026-04-25 21:40:05 +00:00
Poulpe
be2cd1d156 feat: Kogger USBL decoder + nav merge
- tools/parse_kogger_usbl.py: decode SBP protocol (ID=0x65 USBL_SOLUTION)
  from raw *_usbl.csv files, output combined_usbl.csv with Dist/Az/Elev/SNR
- tools/merge_nav_usbl.py: merge USBL data with navigation_log.csv,
  interpolate USV lat/lon/heading, compute AUV absolute position
  (azimuth relative to USV heading convention)
- vendor/Kogger-Protocol: SBP spec reference (submodule)
- 69-sttropez: 13986 USBL records decoded, avg USV-AUV dist 39m
2026-04-25 21:24:00 +00:00
Poulpe
b46f136b76 feat: v2 multi-session parser + timeline range viewer 2026-04-25 20:31:17 +00:00
19 changed files with 2628 additions and 239 deletions

5
.gitignore vendored
View File

@@ -2,8 +2,3 @@ data/
output/
__pycache__/
*.pyc
.venv/
data/
viewer/data/
viewer/screenshots/
screenshots/

17
docker-compose.yml Normal file
View File

@@ -0,0 +1,17 @@
version: "3.9"
services:
pipeline-runner:
build:
context: .
dockerfile: pipeline_runner/Dockerfile
container_name: cosma-pipeline-runner
ports:
- "8767:8767"
volumes:
- /mnt/nas-cosma/cosma/sorties:/data/sorties
- /home/floppyrj45/.config/rclone/rclone.conf:/root/.config/rclone/rclone.conf:ro
environment:
- GDRIVE_REMOTE=cosma:06-Operations/06 - Sorties
- OUTPUT_DIR=/data/sorties
- TOOLS_DIR=/app/tools
restart: unless-stopped

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,184 @@
# Design : GDrive Pipeline Replay
**Date :** 2026-04-27
**Repo :** cosma-nav-tools
**Branche :** feat/flag-local
---
## Objectif
Bouton dans le viewer 8765 qui déclenche un sync GDrive → pipeline Python → affichage replay des signaux USV et AUV sur la même page, synchronisés avec le slider 24h existant.
---
## Architecture globale
```
GDrive (G:)
└─ 06-Operations/06-Sorties/#XX-lieu/YYYYMMDD-lieu/raw_data/
├─ SHIP/*.csv (USV nav + USBL + full)
└─ SUB/*.mcap (AUV ROS2) + bin/
↓ rclone sync (déclenché par bouton viewer)
.83 /data/sorties/#XX/raw/
├─ SHIP/
└─ SUB/
↓ pipeline Python (scripts existants cosma-nav-tools/tools/)
.83 /data/sorties/#XX/processed/
├─ usv.json.gz ← signaux USV downsamplés LTTB
├─ auv_AUV010.json.gz ← signaux AUV010 downsamplés LTTB
├─ auv_AUVxxx.json.gz ← un fichier par AUV détecté
└─ tracks.geojson ← USV + AUV tracks (Leaflet)
↓ servi par
port 8767 pipeline-runner FastAPI (cosma-nav-tools/pipeline_runner/)
port 8765 viewer/index.html ← page unique, fetch 8766 + 8767
```
---
## Section 1 : Pipeline-runner (port 8767)
### Structure
```
cosma-nav-tools/
pipeline_runner/
main.py ← FastAPI app + endpoints
runner.py ← logique rclone + orchestration scripts
config.py ← chemins GDrive, output dir, rclone remote
docker-compose.yml ← service pipeline-runner port 8767
data/sorties/ ← gitignored
```
### Endpoints
| Méthode | Route | Description |
|---------|-------|-------------|
| `GET` | `/sorties` | Liste sorties détectées sur GDrive (scan dossier) |
| `POST` | `/run/{sortie_id}` | Lance rclone pull + pipeline complet |
| `GET` | `/events/{sortie_id}` | SSE stream progress (étape + %) |
| `GET` | `/sorties/{id}/usv` | Données USV processed (JSON gzip) |
| `GET` | `/sorties/{id}/auvs` | Liste AUVs disponibles pour cette sortie |
| `GET` | `/sorties/{id}/auv/{auv_id}` | Données AUV processed (JSON gzip) |
| `GET` | `/sorties/{id}/tracks` | GeoJSON tracks USV + AUV |
### Flow interne POST /run/{sortie_id}
```
1. rclone sync GDrive/#id → /data/sorties/#id/raw/ SSE: "sync" 0→50%
2. parse_usv_nav.py + extract_usv_pwm.py SSE: "usv_parse" 50→65%
3. parse_kogger_usbl.py + merge_nav_usbl.py SSE: "usbl_merge" 65→80%
4. extract_mcap_signals.py (par AUV détecté) SSE: "auv_parse" 80→90%
5. usbl_to_json.py SSE: "usbl_json" 90→95%
6. downsample LTTB + écriture .json.gz SSE: "write" 95→100%
```
---
## Section 2 : Format données
### Signaux (usv.json.gz, auv_*.json.gz)
```json
{
"meta": {
"sortie": "#71-golrest",
"date": "2026-04-16",
"vehicle": "USV001",
"t_start": 1713268477,
"t_end": 1713282877
},
"signals": {
"yaw": [{ "t": 1713268477, "v": 142.3 }, ...],
"heading": [...],
"gps_status": [{ "t": ..., "v": "3D_FIX" }, ...],
"battery_v": [...],
"usbl_dist": [...],
"usbl_angle": [...],
"motor_1": [...],
"motor_2": [...],
"auv_status": [{ "t": ..., "v": "MISSION" }, ...]
}
}
```
- `t` = epoch UNIX secondes
- Max **4000 points par signal** via LTTB (Plotly optimal)
- Valeurs discrètes (status, mode) = strings → step-plot Plotly
- Endpoint optionnel `/range?from=&to=` pour zoom fin sur brutes
### Tracks
- **GeoJSON** FeatureCollection : une Feature LineString par véhicule
- Poids estimé : ~300KB gzip pour 4h à 1Hz
---
## Section 3 : Viewer 8765 — extensions
### Layout (page unique)
```
datebar ← inchangé
header ← + dropdown sortie + [Sync & Process] + progress bar SSE
map Leaflet ← inchangé
slider 24h ← shared X axis, curseur vertical sur tous les graphs
─────────────────────────────────────────────
USV PANEL
[Yaw] [Heading]
[GPS status] [Battery voltage]
[USBL dist] [USBL angle]
[Motor 1] [Motor 2]
[AUV status: disarm/mission/goto — step-plot]
─────────────────────────────────────────────
AUV PANEL ← tabs [AUV010] [AUV011] … si multi-AUV
[Pitch/Roll/Yaw — 3 traces] [Depth]
[Altitude] [Obstacle dist]
[USBL dist] [USBL angle]
[Battery voltage] [Arm/Disarm/Mode — step-plot]
[Motors ×6 PWM — 1 graph multi-trace M1…M6]
```
### Synchronisation slider
- Slider 24h existant → événement → `updateCursor(t)` sur tous les graphs Plotly via `Plotly.relayout` shapes
- Données chargées une fois en mémoire au click de sortie, pas de re-fetch au scrub
### Bouton Sync & Process
- `POST /run/{sortie_id}` → ouvre SSE `/events/{sortie_id}`
- Progress bar inline dans le header (ex: `rclone 34% → usv_parse…`)
- À 100% → fetch automatique USV + AUV + tracks → render graphs
---
## Section 4 : Déploiement .83
```yaml
# docker-compose.yml (ajout)
pipeline-runner:
build: ./pipeline_runner
ports:
- "8767:8767"
volumes:
- /data/sorties:/data/sorties
- /mnt/gdrive:/mnt/gdrive # rclone mount ou rclone exec
environment:
- GDRIVE_REMOTE=gdrive:Cosma - Internal/06-Operations/06 - Sorties
- OUTPUT_DIR=/data/sorties
```
---
## Hors scope (v1)
- Authentification / accès multi-user
- HDF5 archivage (peut venir en v2)
- Replay animé temps-réel (curseur qui avance automatiquement)
- Tests unitaires pipeline

View File

@@ -0,0 +1,9 @@
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y rclone && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY tools/ ./tools/
COPY vendor/ ./vendor/
COPY pipeline_runner/ ./pipeline_runner/
CMD ["uvicorn", "pipeline_runner.main:app", "--host", "0.0.0.0", "--port", "8767"]

View File

View File

@@ -0,0 +1,7 @@
import os
from pathlib import Path
GDRIVE_REMOTE = os.getenv("GDRIVE_REMOTE", "gdrive:Cosma - Internal/06-Operations/06 - Sorties")
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/data/sorties"))
TOOLS_DIR = Path(os.getenv("TOOLS_DIR", Path(__file__).parent.parent / "tools"))
LTTB_MAX_PTS = 4000

98
pipeline_runner/main.py Normal file
View File

@@ -0,0 +1,98 @@
import asyncio
import gzip
import json
from pathlib import Path
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from .config import OUTPUT_DIR
from .runner import run_pipeline, scan_sorties
app = FastAPI(title="COSMA Pipeline Runner")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
# Active pipeline jobs: sortie_id → asyncio.Queue
_jobs: dict[str, asyncio.Queue] = {}
@app.get("/sorties")
async def list_sorties():
return await scan_sorties()
@app.post("/run/{sortie_id:path}")
async def run_sortie(sortie_id: str):
if sortie_id in _jobs:
return {"status": "already_running"}
queue: asyncio.Queue = asyncio.Queue()
_jobs[sortie_id] = queue
asyncio.create_task(_run_and_cleanup(sortie_id, queue))
return {"status": "started"}
async def _run_and_cleanup(sortie_id: str, queue: asyncio.Queue):
try:
await run_pipeline(sortie_id, queue)
finally:
await asyncio.sleep(30)
_jobs.pop(sortie_id, None)
@app.get("/events/{sortie_id:path}")
async def sse_events(sortie_id: str):
if sortie_id not in _jobs:
raise HTTPException(404, "No active job for this sortie")
async def generate() -> AsyncGenerator[str, None]:
queue = _jobs[sortie_id]
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=60)
yield f"data: {json.dumps(event)}\n\n"
if event.get("step") in ("error", "write") and event.get("pct") in (0, 100):
break
except asyncio.TimeoutError:
yield "data: {\"step\":\"ping\"}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
def _read_gz(path: Path) -> dict:
with gzip.open(path) as f:
return json.loads(f.read())
@app.get("/sorties/{sortie_id:path}/usv")
async def get_usv(sortie_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / "usv.json.gz"
if not p.exists():
raise HTTPException(404, "USV data not found — run pipeline first")
return JSONResponse(_read_gz(p))
@app.get("/sorties/{sortie_id:path}/auvs")
async def list_auvs(sortie_id: str):
proc = OUTPUT_DIR / sortie_id / "processed"
auvs = [p.name.removesuffix(".json.gz").removeprefix("auv_")
for p in proc.glob("auv_*.json.gz")]
return sorted(auvs)
@app.get("/sorties/{sortie_id:path}/auv/{auv_id}")
async def get_auv(sortie_id: str, auv_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / f"auv_{auv_id}.json.gz"
if not p.exists():
raise HTTPException(404, f"AUV {auv_id} data not found")
return JSONResponse(_read_gz(p))
@app.get("/sorties/{sortie_id:path}/tracks")
async def get_tracks(sortie_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / "tracks.geojson"
if not p.exists():
raise HTTPException(404, "tracks.geojson not found")
with open(p) as f:
return JSONResponse(json.load(f))

View File

@@ -0,0 +1,174 @@
import csv
import gzip
import json
import math
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import numpy as np
from .config import LTTB_MAX_PTS
def _lttb(points: list[dict], max_pts: int) -> list[dict]:
"""Largest Triangle Three Buckets downsampling."""
n = len(points)
if n <= max_pts:
return points
ts = np.array([p["t"] for p in points], dtype=float)
vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float)
bucket_size = (n - 2) / (max_pts - 2)
out = [points[0]]
a = 0
for i in range(max_pts - 2):
avg_start = int((i + 1) * bucket_size) + 1
avg_end = min(int((i + 2) * bucket_size) + 1, n)
avg_t = np.mean(ts[avg_start:avg_end])
avg_v = np.mean(vs[avg_start:avg_end])
rng_start = int(i * bucket_size) + 1
rng_end = min(int((i + 1) * bucket_size) + 1, n)
max_area = -1.0
best = rng_start
at, av = ts[a], vs[a]
for j in range(rng_start, rng_end):
area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5
if area > max_area:
max_area = area
best = j
out.append(points[best])
a = best
out.append(points[-1])
return out
def _ts_to_epoch(ts_str: str) -> float:
"""Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds."""
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc)
return dt.timestamp()
except ValueError:
continue
return 0.0
def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]:
"""Read long-format navigation_log.csv → dict of signal arrays."""
signals: dict[str, list] = {}
wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix",
"Armed", "Mode", "M1", "M2"}
with open(nav_log_path, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
next(reader, None) # skip header
for row in reader:
if len(row) < 3:
continue
ts_str, field, val = row[0], row[1], row[2]
if field not in wanted:
continue
t = _ts_to_epoch(ts_str)
try:
v: Any = float(val)
except ValueError:
v = val.strip()
signals.setdefault(field, []).append({"t": t, "v": v})
return signals
def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]:
"""Read combined_usbl.csv → Dist and Azimuth signal arrays."""
dist_pts, az_pts = [], []
with open(usbl_csv_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
try:
t = _ts_to_epoch(row["Timestamp"])
d = float(row["Dist"]) if row["Dist"] else None
az = float(row["Azimuth"]) if row["Azimuth"] else None
except (KeyError, ValueError):
continue
if d is not None and not math.isnan(d):
dist_pts.append({"t": t, "v": d})
if az is not None and not math.isnan(az):
az_pts.append({"t": t, "v": az})
return {"usbl_dist": dist_pts, "usbl_angle": az_pts}
def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]:
"""Read mcap_signals.json → depth, motors M1-M6, state signals."""
with open(mcap_json_path) as f:
data = json.load(f)
signals: dict[str, list[dict]] = {}
def _unpack(key: str, series: list):
pts = []
for item in series:
t = item.get("t_ms", item.get("t", 0))
if isinstance(t, (int, float)) and t > 1e9:
t = t / 1000.0 # ms → s
pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))})
signals[key] = pts
if "depth" in data:
_unpack("depth", data["depth"])
if "pwm_auv" in data:
for m_key, series in data["pwm_auv"].items():
_unpack(m_key.lower(), series)
if "state" in data:
_unpack("arm_status", data["state"])
# New signals added by extended extract_mcap_signals.py
for key in ("pitch", "roll", "yaw", "altitude", "battery_v", "obstacle_dist"):
if key in data:
_unpack(key, data[key])
return signals
def write_usv_json(
nav_log_path: Path,
usbl_csv_path: Path | None,
output_path: Path,
sortie_id: str,
date: str,
) -> None:
signals = _read_nav_log(nav_log_path)
if usbl_csv_path and usbl_csv_path.exists():
signals.update(_read_usbl_csv(usbl_csv_path))
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
meta = {
"sortie": sortie_id,
"date": date,
"vehicle": "USV",
"t_start": min(t_all) if t_all else 0,
"t_end": max(t_all) if t_all else 0,
}
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
output_path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(output_path, "wb") as f:
f.write(payload)
def write_auv_json(
mcap_json_path: Path,
output_path: Path,
auv_id: str,
sortie_id: str,
date: str,
) -> None:
signals = _read_mcap_signals(mcap_json_path)
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
meta = {
"sortie": sortie_id,
"date": date,
"vehicle": auv_id,
"t_start": min(t_all) if t_all else 0,
"t_end": max(t_all) if t_all else 0,
}
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
output_path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(output_path, "wb") as f:
f.write(payload)

137
pipeline_runner/runner.py Normal file
View File

@@ -0,0 +1,137 @@
import asyncio
import re
import shutil
import subprocess
from pathlib import Path
from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR
from .processor import write_auv_json, write_usv_json
async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""):
await queue.put({"step": step, "pct": pct, "msg": msg})
def _find_nav_log(ship_dir: Path) -> Path | None:
for p in ship_dir.glob("*_navigation_log.csv"):
return p
return None
def _find_usbl_csv(ship_dir: Path) -> Path | None:
for p in ship_dir.glob("*_usbl.csv"):
return p
return None
def _detect_auvs(sub_dir: Path) -> list[str]:
"""Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010''AUV010')."""
auvs = []
for d in sub_dir.iterdir():
if d.is_dir():
m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE)
if m:
auvs.append(m.group(1).upper())
return sorted(set(auvs))
def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None:
for d in sub_dir.iterdir():
if d.is_dir() and auv_id.upper() in d.name.upper():
return d
return None
def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess:
cmd = ["python3", str(TOOLS_DIR / script_name)] + args
return subprocess.run(cmd, capture_output=True, text=True)
async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None:
"""Full pipeline: rclone sync → parse → process → write JSON.gz"""
raw_dir = OUTPUT_DIR / sortie_id / "raw"
proc_dir = OUTPUT_DIR / sortie_id / "processed"
proc_dir.mkdir(parents=True, exist_ok=True)
# Step 1: rclone sync
await _emit(queue, "sync", 0, "rclone sync en cours…")
gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}'
result = subprocess.run(
["rclone", "sync", gdrive_path, str(raw_dir), "--progress"],
capture_output=True, text=True
)
if result.returncode != 0:
await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}")
return
await _emit(queue, "sync", 50, "rclone terminé")
# Detect SHIP/SUB dirs inside raw/
ship_dir = next(raw_dir.rglob("logs/SHIP"), None)
sub_dir = next(raw_dir.rglob("logs/SUB"), None)
if not ship_dir or not ship_dir.exists():
await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync")
return
# Step 2: USV parsing
await _emit(queue, "usv_parse", 55, "Parsing logs USV…")
nav_log = _find_nav_log(ship_dir)
usbl_csv_raw = _find_usbl_csv(ship_dir)
usbl_parsed_path = proc_dir / "combined_usbl.csv"
if usbl_csv_raw:
_run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)])
date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10]
if nav_log:
write_usv_json(
nav_log_path=nav_log,
usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None,
output_path=proc_dir / "usv.json.gz",
sortie_id=sortie_id,
date=date_str,
)
await _emit(queue, "usv_parse", 70, "USV OK")
# Step 3: AUV parsing
if sub_dir and sub_dir.exists():
auv_ids = _detect_auvs(sub_dir)
total = len(auv_ids) or 1
for i, auv_id in enumerate(auv_ids):
await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}")
session_dir = _detect_session_dir(sub_dir, auv_id)
if not session_dir:
continue
mcap_out = proc_dir / f"mcap_{auv_id}.json"
_run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"])
# extract_mcap_signals writes to tools/../output/mcap_signals.json (hardcoded)
default_out = TOOLS_DIR.parent / "output" / "mcap_signals.json"
if not default_out.exists():
default_out = session_dir / "mcap_signals.json"
if default_out.exists():
shutil.copy(default_out, mcap_out)
if mcap_out.exists():
write_auv_json(
mcap_json_path=mcap_out,
output_path=proc_dir / f"auv_{auv_id}.json.gz",
auv_id=auv_id,
sortie_id=sortie_id,
date=date_str,
)
await _emit(queue, "write", 100, "Pipeline terminé")
async def scan_sorties() -> list[dict]:
"""List available sorties on GDrive via rclone lsd."""
result = subprocess.run(
["rclone", "lsd", GDRIVE_REMOTE],
capture_output=True, text=True
)
sorties = []
for line in result.stdout.splitlines():
parts = line.strip().split(None, 4)
if len(parts) == 5:
name = parts[4].strip()
processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists()
sorties.append({"id": name, "processed": processed})
return sorties

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
fastapi==0.115.0
uvicorn[standard]==0.30.6
aiofiles==24.1.0
numpy==2.1.1

Binary file not shown.

After

Width:  |  Height:  |  Size: 116 KiB

BIN
screenshots/viewer-v5.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""Extract AUV signals from MCAP files: depth, PWM, state."""
import argparse, glob, json, os, sys
import argparse, glob, json, math, os, sys
def main():
parser = argparse.ArgumentParser()
@@ -26,7 +26,16 @@ def main():
depth_raw = []
pwm_raw = []
state_raw = []
TOPICS = ['/mavros/imu/static_pressure', '/mavros/rc/out', '/mavros/state']
signals = {}
TOPICS = [
'/mavros/imu/static_pressure',
'/mavros/rc/out',
'/mavros/state',
'/mavros/imu/data',
'/mavros/altitude',
'/mavros/battery',
'/mavros/distance_sensor/hrlv_ez4_pub',
]
for mcap_file in mcap_files:
try:
@@ -53,6 +62,40 @@ def main():
state_raw.append({'t': t_ms, 'mode': str(ros_msg.mode), 'armed': bool(ros_msg.armed)})
except Exception:
pass
elif topic == '/mavros/imu/data':
try:
q = ros_msg.orientation
sinr = 2*(q.w*q.x + q.y*q.z)
cosr = 1 - 2*(q.x*q.x + q.y*q.y)
roll = math.degrees(math.atan2(sinr, cosr))
sinp = 2*(q.w*q.y - q.z*q.x)
pitch = math.degrees(math.asin(max(-1, min(1, sinp))))
siny = 2*(q.w*q.z + q.x*q.y)
cosy = 1 - 2*(q.y*q.y + q.z*q.z)
yaw = math.degrees(math.atan2(siny, cosy))
signals.setdefault('pitch', []).append({'t': t_ms, 'v': pitch})
signals.setdefault('roll', []).append({'t': t_ms, 'v': roll})
signals.setdefault('yaw', []).append({'t': t_ms, 'v': yaw})
except Exception:
pass
elif topic == '/mavros/altitude':
try:
signals.setdefault('altitude', []).append(
{'t': t_ms, 'v': ros_msg.relative})
except Exception:
pass
elif topic == '/mavros/battery':
try:
signals.setdefault('battery_v', []).append(
{'t': t_ms, 'v': ros_msg.voltage})
except Exception:
pass
elif topic == '/mavros/distance_sensor/hrlv_ez4_pub':
try:
signals.setdefault('obstacle_dist', []).append(
{'t': t_ms, 'v': ros_msg.range})
except Exception:
pass
except Exception as e:
print(f" Skip {os.path.basename(mcap_file)}: {e}")
@@ -69,7 +112,8 @@ def main():
pwm_samples = sample(pwm_raw, args.max_pts)
state = state_raw # events, keep all
all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw]
signals_flat = [pt for pts in signals.values() for pt in pts]
all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw + signals_flat]
t_min = min(all_t) if all_t else 0
t_max = max(all_t) if all_t else 0
@@ -94,6 +138,7 @@ def main():
'depth': depth,
'pwm_auv': {'channels': channels, 'samples': pwm_samples},
'state': state,
**{k: sample(v, args.max_pts) for k, v in signals.items()},
}
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')

1
vendor/Kogger-Protocol vendored Submodule

Submodule vendor/Kogger-Protocol added at d62576fee5

Binary file not shown.

Binary file not shown.

View File

@@ -1,51 +0,0 @@
# Open Serial Binary Protocol (SBP) specification
## Protocol frame structure
SYNC1 | SYNC2 | ROUTE | MODE | ID | LENGTH | PAYLOAD | CHECK1 | CHECK2|
|----------|----------|----------|----------|----------|----------|----------|----------|----------|
U1 | U1 | U1 | U1 | U1 | U1 | BYTE[LENGTH] | U1 | U1 |
0xBB | 0x55 | BITFIELD | BITFIELD | 1 … 255 | 0 … 128 | BYTEARRAY | 0 … 0xFF | 0 … 0xFF |
### ROUTE
Name | Bits | Description
|----------|----------|----------|
DEV_ADDRESS | 0:3 bit | Device address. Default and broadcast address is 0x0.
RESERVED | 2 bit | Reserved
### MODE
Name | Bits | Description
|----------|----------|----------|
TYPE | 0:1 bit | 0 — Reserved, 1 — CONTENT: DEVICE → HOST, 2 — SETTING: HOST → DEVICE, 3 —GETTING: HOST → DEVICE
RESERVED | 2 bit | Reserved
VERSION | 3:5 bit | Field defines the payload data version
MARK | 6 bit | Once device is switched on, this flag is always in reset state (ZERO). It can be set to active state (ONE) by the host (see the CMD_MARK command) and the slave device keeps the flag in active state in every frame until hardware reset occurs or is reset by the host. Therefore the host monitors the device's actual settings.
RESPONSE | 7 bit | HOST → DEVICE: Set the flag to active state (ONE) in order to get the result of processing the command. The flag doesn't affect the response if one is provided by the TYPE field. DEVICE → HOST: The flag is in reset state (ZERO) by default. Payload goes according to the command specification. If flag is set, the payload contains the result of command processing (see CMD_RESP command).
## Checksum
The checksum algorithm used is the Fletcher-16.
Example source code for calculating the checksum:
```
uint8_t CHECK1 = 0;
uint8_t CHECK2 = 0;
void CheckSumUpdate(uint8_t byte) {
CHECK1 += byte;
CHECK2 += CHECK1;
}
```
## Number Formats
- Multi-byte values are ordered in Little Endian format
- Floating point values are transmitted in IEEE754 single or double precision
- bit-field in LSB format
Name | Type | Size (Bytes) | Range
|----------|----------|----------|----------|
S1 | int8_t | 1 | -128 ... 127
U1 | uint8_t | 1 | 0 … 255
S2 | int16_t | 2 | -32768 … 32767
U2 | uint16_t | 2 | 0 … 65535
S4 | int32_t | 4 | -2'147'483'648 ... 2'147'483'647
U4 | uint32_t | 4 | 0 … 4'294'967'295
F4 | float | 4 | -1*2^+127 ... 2^+127
D8 | double | 8 | -1*2^+1023 ... 2^+1023

File diff suppressed because it is too large Load Diff