Compare commits
25 Commits
07df61cbc4
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f66bb52fec | ||
|
|
dd6f0cf435 | ||
|
|
f5788a01f4 | ||
| f5debc8afc | |||
|
|
63270beeff | ||
|
|
4164f32694 | ||
|
|
b45a5368ee | ||
|
|
7a5d442fbd | ||
| b21e306a86 | |||
|
|
a79f63e59e | ||
|
|
31b5a221b8 | ||
|
|
b962997008 | ||
|
|
6978b36650 | ||
| 70608de221 | |||
|
|
0a0dac7fda | ||
|
|
62091a09b7 | ||
|
|
3e1da53cc7 | ||
|
|
24f9394c75 | ||
|
|
c9dca1d071 | ||
|
|
682050ef14 | ||
|
|
4aec9d6295 | ||
|
|
af2bb6581f | ||
|
|
bd3a2359d9 | ||
|
|
02e357b874 | ||
|
|
34e709b7c8 |
17
docker-compose.yml
Normal file
17
docker-compose.yml
Normal file
@@ -0,0 +1,17 @@
|
||||
version: "3.9"
|
||||
services:
|
||||
pipeline-runner:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: pipeline_runner/Dockerfile
|
||||
container_name: cosma-pipeline-runner
|
||||
ports:
|
||||
- "8767:8767"
|
||||
volumes:
|
||||
- /mnt/nas-cosma/cosma/sorties:/data/sorties
|
||||
- /home/floppyrj45/.config/rclone/rclone.conf:/root/.config/rclone/rclone.conf:ro
|
||||
environment:
|
||||
- GDRIVE_REMOTE=cosma:06-Operations/06 - Sorties
|
||||
- OUTPUT_DIR=/data/sorties
|
||||
- TOOLS_DIR=/app/tools
|
||||
restart: unless-stopped
|
||||
1219
docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md
Normal file
1219
docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,184 @@
|
||||
# Design : GDrive Pipeline Replay
|
||||
|
||||
**Date :** 2026-04-27
|
||||
**Repo :** cosma-nav-tools
|
||||
**Branche :** feat/flag-local
|
||||
|
||||
---
|
||||
|
||||
## Objectif
|
||||
|
||||
Bouton dans le viewer 8765 qui déclenche un sync GDrive → pipeline Python → affichage replay des signaux USV et AUV sur la même page, synchronisés avec le slider 24h existant.
|
||||
|
||||
---
|
||||
|
||||
## Architecture globale
|
||||
|
||||
```
|
||||
GDrive (G:)
|
||||
└─ 06-Operations/06-Sorties/#XX-lieu/YYYYMMDD-lieu/raw_data/
|
||||
├─ SHIP/*.csv (USV nav + USBL + full)
|
||||
└─ SUB/*.mcap (AUV ROS2) + bin/
|
||||
|
||||
↓ rclone sync (déclenché par bouton viewer)
|
||||
|
||||
.83 /data/sorties/#XX/raw/
|
||||
├─ SHIP/
|
||||
└─ SUB/
|
||||
|
||||
↓ pipeline Python (scripts existants cosma-nav-tools/tools/)
|
||||
|
||||
.83 /data/sorties/#XX/processed/
|
||||
├─ usv.json.gz ← signaux USV downsamplés LTTB
|
||||
├─ auv_AUV010.json.gz ← signaux AUV010 downsamplés LTTB
|
||||
├─ auv_AUVxxx.json.gz ← un fichier par AUV détecté
|
||||
└─ tracks.geojson ← USV + AUV tracks (Leaflet)
|
||||
|
||||
↓ servi par
|
||||
|
||||
port 8767 pipeline-runner FastAPI (cosma-nav-tools/pipeline_runner/)
|
||||
port 8765 viewer/index.html ← page unique, fetch 8766 + 8767
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Section 1 : Pipeline-runner (port 8767)
|
||||
|
||||
### Structure
|
||||
|
||||
```
|
||||
cosma-nav-tools/
|
||||
pipeline_runner/
|
||||
main.py ← FastAPI app + endpoints
|
||||
runner.py ← logique rclone + orchestration scripts
|
||||
config.py ← chemins GDrive, output dir, rclone remote
|
||||
docker-compose.yml ← service pipeline-runner port 8767
|
||||
data/sorties/ ← gitignored
|
||||
```
|
||||
|
||||
### Endpoints
|
||||
|
||||
| Méthode | Route | Description |
|
||||
|---------|-------|-------------|
|
||||
| `GET` | `/sorties` | Liste sorties détectées sur GDrive (scan dossier) |
|
||||
| `POST` | `/run/{sortie_id}` | Lance rclone pull + pipeline complet |
|
||||
| `GET` | `/events/{sortie_id}` | SSE stream progress (étape + %) |
|
||||
| `GET` | `/sorties/{id}/usv` | Données USV processed (JSON gzip) |
|
||||
| `GET` | `/sorties/{id}/auvs` | Liste AUVs disponibles pour cette sortie |
|
||||
| `GET` | `/sorties/{id}/auv/{auv_id}` | Données AUV processed (JSON gzip) |
|
||||
| `GET` | `/sorties/{id}/tracks` | GeoJSON tracks USV + AUV |
|
||||
|
||||
### Flow interne POST /run/{sortie_id}
|
||||
|
||||
```
|
||||
1. rclone sync GDrive/#id → /data/sorties/#id/raw/ SSE: "sync" 0→50%
|
||||
2. parse_usv_nav.py + extract_usv_pwm.py SSE: "usv_parse" 50→65%
|
||||
3. parse_kogger_usbl.py + merge_nav_usbl.py SSE: "usbl_merge" 65→80%
|
||||
4. extract_mcap_signals.py (par AUV détecté) SSE: "auv_parse" 80→90%
|
||||
5. usbl_to_json.py SSE: "usbl_json" 90→95%
|
||||
6. downsample LTTB + écriture .json.gz SSE: "write" 95→100%
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Section 2 : Format données
|
||||
|
||||
### Signaux (usv.json.gz, auv_*.json.gz)
|
||||
|
||||
```json
|
||||
{
|
||||
"meta": {
|
||||
"sortie": "#71-golrest",
|
||||
"date": "2026-04-16",
|
||||
"vehicle": "USV001",
|
||||
"t_start": 1713268477,
|
||||
"t_end": 1713282877
|
||||
},
|
||||
"signals": {
|
||||
"yaw": [{ "t": 1713268477, "v": 142.3 }, ...],
|
||||
"heading": [...],
|
||||
"gps_status": [{ "t": ..., "v": "3D_FIX" }, ...],
|
||||
"battery_v": [...],
|
||||
"usbl_dist": [...],
|
||||
"usbl_angle": [...],
|
||||
"motor_1": [...],
|
||||
"motor_2": [...],
|
||||
"auv_status": [{ "t": ..., "v": "MISSION" }, ...]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- `t` = epoch UNIX secondes
|
||||
- Max **4000 points par signal** via LTTB (Plotly optimal)
|
||||
- Valeurs discrètes (status, mode) = strings → step-plot Plotly
|
||||
- Endpoint optionnel `/range?from=&to=` pour zoom fin sur brutes
|
||||
|
||||
### Tracks
|
||||
|
||||
- **GeoJSON** FeatureCollection : une Feature LineString par véhicule
|
||||
- Poids estimé : ~300KB gzip pour 4h à 1Hz
|
||||
|
||||
---
|
||||
|
||||
## Section 3 : Viewer 8765 — extensions
|
||||
|
||||
### Layout (page unique)
|
||||
|
||||
```
|
||||
datebar ← inchangé
|
||||
header ← + dropdown sortie + [Sync & Process] + progress bar SSE
|
||||
map Leaflet ← inchangé
|
||||
slider 24h ← shared X axis, curseur vertical sur tous les graphs
|
||||
─────────────────────────────────────────────
|
||||
USV PANEL
|
||||
[Yaw] [Heading]
|
||||
[GPS status] [Battery voltage]
|
||||
[USBL dist] [USBL angle]
|
||||
[Motor 1] [Motor 2]
|
||||
[AUV status: disarm/mission/goto — step-plot]
|
||||
─────────────────────────────────────────────
|
||||
AUV PANEL ← tabs [AUV010] [AUV011] … si multi-AUV
|
||||
[Pitch/Roll/Yaw — 3 traces] [Depth]
|
||||
[Altitude] [Obstacle dist]
|
||||
[USBL dist] [USBL angle]
|
||||
[Battery voltage] [Arm/Disarm/Mode — step-plot]
|
||||
[Motors ×6 PWM — 1 graph multi-trace M1…M6]
|
||||
```
|
||||
|
||||
### Synchronisation slider
|
||||
|
||||
- Slider 24h existant → événement → `updateCursor(t)` sur tous les graphs Plotly via `Plotly.relayout` shapes
|
||||
- Données chargées une fois en mémoire au click de sortie, pas de re-fetch au scrub
|
||||
|
||||
### Bouton Sync & Process
|
||||
|
||||
- `POST /run/{sortie_id}` → ouvre SSE `/events/{sortie_id}`
|
||||
- Progress bar inline dans le header (ex: `rclone 34% → usv_parse…`)
|
||||
- À 100% → fetch automatique USV + AUV + tracks → render graphs
|
||||
|
||||
---
|
||||
|
||||
## Section 4 : Déploiement .83
|
||||
|
||||
```yaml
|
||||
# docker-compose.yml (ajout)
|
||||
pipeline-runner:
|
||||
build: ./pipeline_runner
|
||||
ports:
|
||||
- "8767:8767"
|
||||
volumes:
|
||||
- /data/sorties:/data/sorties
|
||||
- /mnt/gdrive:/mnt/gdrive # rclone mount ou rclone exec
|
||||
environment:
|
||||
- GDRIVE_REMOTE=gdrive:Cosma - Internal/06-Operations/06 - Sorties
|
||||
- OUTPUT_DIR=/data/sorties
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Hors scope (v1)
|
||||
|
||||
- Authentification / accès multi-user
|
||||
- HDF5 archivage (peut venir en v2)
|
||||
- Replay animé temps-réel (curseur qui avance automatiquement)
|
||||
- Tests unitaires pipeline
|
||||
9
pipeline_runner/Dockerfile
Normal file
9
pipeline_runner/Dockerfile
Normal file
@@ -0,0 +1,9 @@
|
||||
FROM python:3.12-slim
|
||||
WORKDIR /app
|
||||
RUN apt-get update && apt-get install -y rclone && rm -rf /var/lib/apt/lists/*
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
COPY tools/ ./tools/
|
||||
COPY vendor/ ./vendor/
|
||||
COPY pipeline_runner/ ./pipeline_runner/
|
||||
CMD ["uvicorn", "pipeline_runner.main:app", "--host", "0.0.0.0", "--port", "8767"]
|
||||
0
pipeline_runner/__init__.py
Normal file
0
pipeline_runner/__init__.py
Normal file
7
pipeline_runner/config.py
Normal file
7
pipeline_runner/config.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
GDRIVE_REMOTE = os.getenv("GDRIVE_REMOTE", "gdrive:Cosma - Internal/06-Operations/06 - Sorties")
|
||||
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/data/sorties"))
|
||||
TOOLS_DIR = Path(os.getenv("TOOLS_DIR", Path(__file__).parent.parent / "tools"))
|
||||
LTTB_MAX_PTS = 4000
|
||||
221
pipeline_runner/main.py
Normal file
221
pipeline_runner/main.py
Normal file
@@ -0,0 +1,221 @@
|
||||
import asyncio
|
||||
import csv
|
||||
import gzip
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
|
||||
from .config import OUTPUT_DIR
|
||||
from .runner import run_pipeline, scan_sorties, scan_sorties_local
|
||||
|
||||
app = FastAPI(title="COSMA Pipeline Runner")
|
||||
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
||||
|
||||
# Active pipeline jobs: sortie_id → asyncio.Queue
|
||||
_jobs: dict[str, asyncio.Queue] = {}
|
||||
|
||||
# Cache sorties avec TTL 10min
|
||||
_sorties_cache: list | None = None
|
||||
_sorties_cache_ts: float = 0.0
|
||||
_SORTIES_TTL = 600.0 # 10 minutes
|
||||
_sorties_refresh_lock: asyncio.Lock | None = None
|
||||
|
||||
|
||||
def _get_lock() -> asyncio.Lock:
|
||||
global _sorties_refresh_lock
|
||||
if _sorties_refresh_lock is None:
|
||||
_sorties_refresh_lock = asyncio.Lock()
|
||||
return _sorties_refresh_lock
|
||||
|
||||
|
||||
async def _refresh_sorties_cache() -> None:
|
||||
"""Refresh cache in background (holds lock to avoid parallel rclone calls)."""
|
||||
global _sorties_cache, _sorties_cache_ts
|
||||
lock = _get_lock()
|
||||
async with lock:
|
||||
# Double-check after acquiring lock
|
||||
if time.monotonic() - _sorties_cache_ts < _SORTIES_TTL:
|
||||
return
|
||||
result = await asyncio.to_thread(scan_sorties)
|
||||
_sorties_cache = result
|
||||
_sorties_cache_ts = time.monotonic()
|
||||
|
||||
|
||||
@app.get("/sorties")
|
||||
async def list_sorties():
|
||||
global _sorties_cache, _sorties_cache_ts
|
||||
now = time.monotonic()
|
||||
if _sorties_cache is None:
|
||||
# Premier appel: bloquant (cache vide)
|
||||
await _refresh_sorties_cache()
|
||||
elif now - _sorties_cache_ts >= _SORTIES_TTL:
|
||||
# Cache périmé: retourne le cache, refresh en arrière-plan
|
||||
asyncio.create_task(_refresh_sorties_cache())
|
||||
return _sorties_cache or []
|
||||
|
||||
|
||||
@app.get("/sorties/local")
|
||||
async def list_sorties_local():
|
||||
"""Scan /data/sorties local (NAS, instantané) sans rclone."""
|
||||
sorties = await asyncio.to_thread(scan_sorties_local)
|
||||
return sorties
|
||||
|
||||
|
||||
@app.post("/run/{sortie_id:path}")
|
||||
async def run_sortie(sortie_id: str):
|
||||
if sortie_id in _jobs:
|
||||
return {"status": "already_running"}
|
||||
queue: asyncio.Queue = asyncio.Queue()
|
||||
_jobs[sortie_id] = queue
|
||||
asyncio.create_task(_run_and_cleanup(sortie_id, queue))
|
||||
return {"status": "started"}
|
||||
|
||||
|
||||
async def _run_and_cleanup(sortie_id: str, queue: asyncio.Queue):
|
||||
try:
|
||||
await run_pipeline(sortie_id, queue)
|
||||
finally:
|
||||
await asyncio.sleep(30)
|
||||
_jobs.pop(sortie_id, None)
|
||||
|
||||
|
||||
@app.get("/events/{sortie_id:path}")
|
||||
async def sse_events(sortie_id: str):
|
||||
if sortie_id not in _jobs:
|
||||
raise HTTPException(404, "No active job for this sortie")
|
||||
|
||||
async def generate() -> AsyncGenerator[str, None]:
|
||||
queue = _jobs[sortie_id]
|
||||
while True:
|
||||
try:
|
||||
event = await asyncio.wait_for(queue.get(), timeout=60)
|
||||
yield f"data: {json.dumps(event)}\n\n"
|
||||
if event.get("step") in ("error", "write") and event.get("pct") in (0, 100):
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
yield "data: {\"step\":\"ping\"}\n\n"
|
||||
|
||||
return StreamingResponse(generate(), media_type="text/event-stream")
|
||||
|
||||
|
||||
def _read_gz(path: Path) -> dict:
|
||||
with gzip.open(path) as f:
|
||||
return json.loads(f.read())
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/usv")
|
||||
async def get_usv(sortie_id: str):
|
||||
p = OUTPUT_DIR / sortie_id / "processed" / "usv.json.gz"
|
||||
if not p.exists():
|
||||
raise HTTPException(404, "USV data not found — run pipeline first")
|
||||
return JSONResponse(_read_gz(p))
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/auvs")
|
||||
async def list_auvs(sortie_id: str):
|
||||
proc = OUTPUT_DIR / sortie_id / "processed"
|
||||
auvs = [p.name.removesuffix(".json.gz").removeprefix("auv_")
|
||||
for p in proc.glob("auv_*.json.gz")]
|
||||
return sorted(auvs)
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/auv/{auv_id}")
|
||||
async def get_auv(sortie_id: str, auv_id: str):
|
||||
p = OUTPUT_DIR / sortie_id / "processed" / f"auv_{auv_id}.json.gz"
|
||||
if not p.exists():
|
||||
raise HTTPException(404, f"AUV {auv_id} data not found")
|
||||
return JSONResponse(_read_gz(p))
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/tracks")
|
||||
async def get_tracks(sortie_id: str):
|
||||
p = OUTPUT_DIR / sortie_id / "processed" / "tracks.geojson"
|
||||
if not p.exists():
|
||||
raise HTTPException(404, "tracks.geojson not found")
|
||||
with open(p) as f:
|
||||
return JSONResponse(json.load(f))
|
||||
|
||||
|
||||
def _ts_nav(ts_str: str) -> float:
|
||||
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
|
||||
try:
|
||||
return datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc).timestamp()
|
||||
except ValueError:
|
||||
continue
|
||||
return 0.0
|
||||
|
||||
|
||||
def _read_usv_track(nav_log_path: Path, max_pts: int = 2000) -> list[dict]:
|
||||
"""Read navigation_log.csv → [{t_ms, lat, lon, heading, source}] downsampled."""
|
||||
pts: list[dict] = []
|
||||
lat_map: dict[float, float] = {}
|
||||
lon_map: dict[float, float] = {}
|
||||
heading_map: dict[float, float] = {}
|
||||
with open(nav_log_path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.reader(f)
|
||||
next(reader, None) # skip header
|
||||
for row in reader:
|
||||
if len(row) < 3:
|
||||
continue
|
||||
ts_str, field, val = row[0], row[1], row[2]
|
||||
if field not in ("Lat", "Lon", "Heading"):
|
||||
continue
|
||||
t = _ts_nav(ts_str)
|
||||
try:
|
||||
v = float(val)
|
||||
except ValueError:
|
||||
continue
|
||||
if field == "Lat":
|
||||
lat_map[t] = v
|
||||
elif field == "Lon":
|
||||
lon_map[t] = v
|
||||
else:
|
||||
heading_map[t] = v
|
||||
# Join on lat timestamps (master)
|
||||
source = nav_log_path.parent.name
|
||||
for t, lat in sorted(lat_map.items()):
|
||||
lon = lon_map.get(t)
|
||||
if lon is None:
|
||||
# nearest lon within 1s
|
||||
near = min(lon_map.keys(), key=lambda x: abs(x - t), default=None)
|
||||
if near is None or abs(near - t) > 1.0:
|
||||
continue
|
||||
lon = lon_map[near]
|
||||
pts.append({
|
||||
"t_ms": int(t * 1000),
|
||||
"lat": lat,
|
||||
"lon": lon,
|
||||
"heading": heading_map.get(t),
|
||||
"source": source,
|
||||
})
|
||||
# Simple stride downsampling
|
||||
if len(pts) > max_pts:
|
||||
step = len(pts) // max_pts
|
||||
pts = pts[::step]
|
||||
return pts
|
||||
|
||||
|
||||
_track_cache: dict[str, list[dict]] = {}
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/usv_track")
|
||||
async def get_usv_track(sortie_id: str):
|
||||
"""Return USV GPS track [{t_ms, lat, lon, heading, source}] for map polylines."""
|
||||
if sortie_id in _track_cache:
|
||||
return JSONResponse(_track_cache[sortie_id])
|
||||
raw_dir = OUTPUT_DIR / sortie_id / "raw"
|
||||
nav_logs = list(raw_dir.rglob("*_navigation_log.csv")) if raw_dir.exists() else []
|
||||
if not nav_logs:
|
||||
raise HTTPException(404, "No navigation_log.csv found — run pipeline first")
|
||||
pts: list[dict] = []
|
||||
for nav_log in nav_logs:
|
||||
pts.extend(await asyncio.to_thread(_read_usv_track, nav_log))
|
||||
pts.sort(key=lambda p: p["t_ms"])
|
||||
_track_cache[sortie_id] = pts
|
||||
return JSONResponse(pts)
|
||||
174
pipeline_runner/processor.py
Normal file
174
pipeline_runner/processor.py
Normal file
@@ -0,0 +1,174 @@
|
||||
import csv
|
||||
import gzip
|
||||
import json
|
||||
import math
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
from .config import LTTB_MAX_PTS
|
||||
|
||||
|
||||
def _lttb(points: list[dict], max_pts: int) -> list[dict]:
|
||||
"""Largest Triangle Three Buckets downsampling."""
|
||||
n = len(points)
|
||||
if n <= max_pts:
|
||||
return points
|
||||
ts = np.array([p["t"] for p in points], dtype=float)
|
||||
vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float)
|
||||
bucket_size = (n - 2) / (max_pts - 2)
|
||||
out = [points[0]]
|
||||
a = 0
|
||||
for i in range(max_pts - 2):
|
||||
avg_start = int((i + 1) * bucket_size) + 1
|
||||
avg_end = min(int((i + 2) * bucket_size) + 1, n)
|
||||
avg_t = np.mean(ts[avg_start:avg_end])
|
||||
avg_v = np.mean(vs[avg_start:avg_end])
|
||||
rng_start = int(i * bucket_size) + 1
|
||||
rng_end = min(int((i + 1) * bucket_size) + 1, n)
|
||||
max_area = -1.0
|
||||
best = rng_start
|
||||
at, av = ts[a], vs[a]
|
||||
for j in range(rng_start, rng_end):
|
||||
area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5
|
||||
if area > max_area:
|
||||
max_area = area
|
||||
best = j
|
||||
out.append(points[best])
|
||||
a = best
|
||||
out.append(points[-1])
|
||||
return out
|
||||
|
||||
|
||||
def _ts_to_epoch(ts_str: str) -> float:
|
||||
"""Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds."""
|
||||
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
|
||||
try:
|
||||
dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc)
|
||||
return dt.timestamp()
|
||||
except ValueError:
|
||||
continue
|
||||
return 0.0
|
||||
|
||||
|
||||
def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]:
|
||||
"""Read long-format navigation_log.csv → dict of signal arrays."""
|
||||
signals: dict[str, list] = {}
|
||||
wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix",
|
||||
"Armed", "Mode", "M1", "M2"}
|
||||
with open(nav_log_path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.reader(f)
|
||||
next(reader, None) # skip header
|
||||
for row in reader:
|
||||
if len(row) < 3:
|
||||
continue
|
||||
ts_str, field, val = row[0], row[1], row[2]
|
||||
if field not in wanted:
|
||||
continue
|
||||
t = _ts_to_epoch(ts_str)
|
||||
try:
|
||||
v: Any = float(val)
|
||||
except ValueError:
|
||||
v = val.strip()
|
||||
signals.setdefault(field, []).append({"t": t, "v": v})
|
||||
return signals
|
||||
|
||||
|
||||
def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]:
|
||||
"""Read combined_usbl.csv → Dist and Azimuth signal arrays."""
|
||||
dist_pts, az_pts = [], []
|
||||
with open(usbl_csv_path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
try:
|
||||
t = _ts_to_epoch(row["Timestamp"])
|
||||
d = float(row["Dist"]) if row["Dist"] else None
|
||||
az = float(row["Azimuth"]) if row["Azimuth"] else None
|
||||
except (KeyError, ValueError):
|
||||
continue
|
||||
if d is not None and not math.isnan(d):
|
||||
dist_pts.append({"t": t, "v": d})
|
||||
if az is not None and not math.isnan(az):
|
||||
az_pts.append({"t": t, "v": az})
|
||||
return {"usbl_dist": dist_pts, "usbl_angle": az_pts}
|
||||
|
||||
|
||||
def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]:
|
||||
"""Read mcap_signals.json → depth, motors M1-M6, state signals."""
|
||||
with open(mcap_json_path) as f:
|
||||
data = json.load(f)
|
||||
signals: dict[str, list[dict]] = {}
|
||||
|
||||
def _unpack(key: str, series: list):
|
||||
pts = []
|
||||
for item in series:
|
||||
t = item.get("t_ms", item.get("t", 0))
|
||||
if isinstance(t, (int, float)) and t > 1e9:
|
||||
t = t / 1000.0 # ms → s
|
||||
pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))})
|
||||
signals[key] = pts
|
||||
|
||||
if "depth" in data:
|
||||
_unpack("depth", data["depth"])
|
||||
if "pwm_auv" in data:
|
||||
for m_key, series in data["pwm_auv"].items():
|
||||
_unpack(m_key.lower(), series)
|
||||
if "state" in data:
|
||||
_unpack("arm_status", data["state"])
|
||||
# New signals added by extended extract_mcap_signals.py
|
||||
for key in ("pitch", "roll", "yaw", "altitude", "battery_v", "obstacle_dist"):
|
||||
if key in data:
|
||||
_unpack(key, data[key])
|
||||
return signals
|
||||
|
||||
|
||||
def write_usv_json(
|
||||
nav_log_path: Path,
|
||||
usbl_csv_path: Path | None,
|
||||
output_path: Path,
|
||||
sortie_id: str,
|
||||
date: str,
|
||||
) -> None:
|
||||
signals = _read_nav_log(nav_log_path)
|
||||
if usbl_csv_path and usbl_csv_path.exists():
|
||||
signals.update(_read_usbl_csv(usbl_csv_path))
|
||||
|
||||
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
|
||||
meta = {
|
||||
"sortie": sortie_id,
|
||||
"date": date,
|
||||
"vehicle": "USV",
|
||||
"t_start": min(t_all) if t_all else 0,
|
||||
"t_end": max(t_all) if t_all else 0,
|
||||
}
|
||||
|
||||
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
|
||||
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with gzip.open(output_path, "wb") as f:
|
||||
f.write(payload)
|
||||
|
||||
|
||||
def write_auv_json(
|
||||
mcap_json_path: Path,
|
||||
output_path: Path,
|
||||
auv_id: str,
|
||||
sortie_id: str,
|
||||
date: str,
|
||||
) -> None:
|
||||
signals = _read_mcap_signals(mcap_json_path)
|
||||
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
|
||||
meta = {
|
||||
"sortie": sortie_id,
|
||||
"date": date,
|
||||
"vehicle": auv_id,
|
||||
"t_start": min(t_all) if t_all else 0,
|
||||
"t_end": max(t_all) if t_all else 0,
|
||||
}
|
||||
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
|
||||
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with gzip.open(output_path, "wb") as f:
|
||||
f.write(payload)
|
||||
149
pipeline_runner/runner.py
Normal file
149
pipeline_runner/runner.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import asyncio
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR
|
||||
from .processor import write_auv_json, write_usv_json
|
||||
|
||||
|
||||
async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""):
|
||||
await queue.put({"step": step, "pct": pct, "msg": msg})
|
||||
|
||||
|
||||
def _find_nav_log(ship_dir: Path) -> Path | None:
|
||||
for p in ship_dir.glob("*_navigation_log.csv"):
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def _find_usbl_csv(ship_dir: Path) -> Path | None:
|
||||
for p in ship_dir.glob("*_usbl.csv"):
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def _detect_auvs(sub_dir: Path) -> list[str]:
|
||||
"""Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010' → 'AUV010')."""
|
||||
auvs = []
|
||||
for d in sub_dir.iterdir():
|
||||
if d.is_dir():
|
||||
m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE)
|
||||
if m:
|
||||
auvs.append(m.group(1).upper())
|
||||
return sorted(set(auvs))
|
||||
|
||||
|
||||
def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None:
|
||||
for d in sub_dir.iterdir():
|
||||
if d.is_dir() and auv_id.upper() in d.name.upper():
|
||||
return d
|
||||
return None
|
||||
|
||||
|
||||
def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess:
|
||||
cmd = ["python3", str(TOOLS_DIR / script_name)] + args
|
||||
return subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
|
||||
async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None:
|
||||
"""Full pipeline: rclone sync → parse → process → write JSON.gz"""
|
||||
raw_dir = OUTPUT_DIR / sortie_id / "raw"
|
||||
proc_dir = OUTPUT_DIR / sortie_id / "processed"
|
||||
proc_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Step 1: rclone sync
|
||||
await _emit(queue, "sync", 0, "rclone sync en cours…")
|
||||
gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}'
|
||||
result = subprocess.run(
|
||||
["rclone", "sync", gdrive_path, str(raw_dir), "--progress"],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
if result.returncode != 0:
|
||||
await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}")
|
||||
return
|
||||
await _emit(queue, "sync", 50, "rclone terminé")
|
||||
|
||||
# Detect SHIP/SUB dirs inside raw/
|
||||
ship_dir = next(raw_dir.rglob("logs/SHIP"), None)
|
||||
sub_dir = next(raw_dir.rglob("logs/SUB"), None)
|
||||
|
||||
if not ship_dir or not ship_dir.exists():
|
||||
await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync")
|
||||
return
|
||||
|
||||
# Step 2: USV parsing
|
||||
await _emit(queue, "usv_parse", 55, "Parsing logs USV…")
|
||||
nav_log = _find_nav_log(ship_dir)
|
||||
usbl_csv_raw = _find_usbl_csv(ship_dir)
|
||||
usbl_parsed_path = proc_dir / "combined_usbl.csv"
|
||||
|
||||
if usbl_csv_raw:
|
||||
_run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)])
|
||||
|
||||
date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10]
|
||||
if nav_log:
|
||||
write_usv_json(
|
||||
nav_log_path=nav_log,
|
||||
usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None,
|
||||
output_path=proc_dir / "usv.json.gz",
|
||||
sortie_id=sortie_id,
|
||||
date=date_str,
|
||||
)
|
||||
await _emit(queue, "usv_parse", 70, "USV OK")
|
||||
|
||||
# Step 3: AUV parsing
|
||||
if sub_dir and sub_dir.exists():
|
||||
auv_ids = _detect_auvs(sub_dir)
|
||||
total = len(auv_ids) or 1
|
||||
for i, auv_id in enumerate(auv_ids):
|
||||
await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}…")
|
||||
session_dir = _detect_session_dir(sub_dir, auv_id)
|
||||
if not session_dir:
|
||||
continue
|
||||
mcap_out = proc_dir / f"mcap_{auv_id}.json"
|
||||
_run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"])
|
||||
# extract_mcap_signals writes to tools/../output/mcap_signals.json (hardcoded)
|
||||
default_out = TOOLS_DIR.parent / "output" / "mcap_signals.json"
|
||||
if not default_out.exists():
|
||||
default_out = session_dir / "mcap_signals.json"
|
||||
if default_out.exists():
|
||||
shutil.copy(default_out, mcap_out)
|
||||
if mcap_out.exists():
|
||||
write_auv_json(
|
||||
mcap_json_path=mcap_out,
|
||||
output_path=proc_dir / f"auv_{auv_id}.json.gz",
|
||||
auv_id=auv_id,
|
||||
sortie_id=sortie_id,
|
||||
date=date_str,
|
||||
)
|
||||
await _emit(queue, "write", 100, "Pipeline terminé")
|
||||
|
||||
|
||||
def scan_sorties() -> list[dict]:
|
||||
"""List available sorties on GDrive via rclone lsd (lent ~30s)."""
|
||||
result = subprocess.run(
|
||||
["rclone", "lsd", GDRIVE_REMOTE],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
sorties = []
|
||||
for line in result.stdout.splitlines():
|
||||
parts = line.strip().split(None, 4)
|
||||
if len(parts) == 5:
|
||||
name = parts[4].strip()
|
||||
processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists()
|
||||
sorties.append({"id": name, "processed": processed})
|
||||
return sorties
|
||||
|
||||
|
||||
def scan_sorties_local() -> list[dict]:
|
||||
"""List sorties already synced locally in OUTPUT_DIR (instantané, pas rclone)."""
|
||||
if not OUTPUT_DIR.exists():
|
||||
return []
|
||||
sorties = []
|
||||
for d in sorted(OUTPUT_DIR.iterdir()):
|
||||
if d.is_dir():
|
||||
processed = (d / "processed" / "usv.json.gz").exists()
|
||||
sorties.append({"id": d.name, "processed": processed})
|
||||
return sorties
|
||||
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
fastapi==0.115.0
|
||||
uvicorn[standard]==0.30.6
|
||||
aiofiles==24.1.0
|
||||
numpy==2.1.1
|
||||
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Extract AUV signals from MCAP files: depth, PWM, state."""
|
||||
import argparse, glob, json, os, sys
|
||||
import argparse, glob, json, math, os, sys
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
@@ -26,7 +26,16 @@ def main():
|
||||
depth_raw = []
|
||||
pwm_raw = []
|
||||
state_raw = []
|
||||
TOPICS = ['/mavros/imu/static_pressure', '/mavros/rc/out', '/mavros/state']
|
||||
signals = {}
|
||||
TOPICS = [
|
||||
'/mavros/imu/static_pressure',
|
||||
'/mavros/rc/out',
|
||||
'/mavros/state',
|
||||
'/mavros/imu/data',
|
||||
'/mavros/altitude',
|
||||
'/mavros/battery',
|
||||
'/mavros/distance_sensor/hrlv_ez4_pub',
|
||||
]
|
||||
|
||||
for mcap_file in mcap_files:
|
||||
try:
|
||||
@@ -53,6 +62,40 @@ def main():
|
||||
state_raw.append({'t': t_ms, 'mode': str(ros_msg.mode), 'armed': bool(ros_msg.armed)})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/imu/data':
|
||||
try:
|
||||
q = ros_msg.orientation
|
||||
sinr = 2*(q.w*q.x + q.y*q.z)
|
||||
cosr = 1 - 2*(q.x*q.x + q.y*q.y)
|
||||
roll = math.degrees(math.atan2(sinr, cosr))
|
||||
sinp = 2*(q.w*q.y - q.z*q.x)
|
||||
pitch = math.degrees(math.asin(max(-1, min(1, sinp))))
|
||||
siny = 2*(q.w*q.z + q.x*q.y)
|
||||
cosy = 1 - 2*(q.y*q.y + q.z*q.z)
|
||||
yaw = math.degrees(math.atan2(siny, cosy))
|
||||
signals.setdefault('pitch', []).append({'t': t_ms, 'v': pitch})
|
||||
signals.setdefault('roll', []).append({'t': t_ms, 'v': roll})
|
||||
signals.setdefault('yaw', []).append({'t': t_ms, 'v': yaw})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/altitude':
|
||||
try:
|
||||
signals.setdefault('altitude', []).append(
|
||||
{'t': t_ms, 'v': ros_msg.relative})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/battery':
|
||||
try:
|
||||
signals.setdefault('battery_v', []).append(
|
||||
{'t': t_ms, 'v': ros_msg.voltage})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/distance_sensor/hrlv_ez4_pub':
|
||||
try:
|
||||
signals.setdefault('obstacle_dist', []).append(
|
||||
{'t': t_ms, 'v': ros_msg.range})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f" Skip {os.path.basename(mcap_file)}: {e}")
|
||||
|
||||
@@ -69,7 +112,8 @@ def main():
|
||||
pwm_samples = sample(pwm_raw, args.max_pts)
|
||||
state = state_raw # events, keep all
|
||||
|
||||
all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw]
|
||||
signals_flat = [pt for pts in signals.values() for pt in pts]
|
||||
all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw + signals_flat]
|
||||
t_min = min(all_t) if all_t else 0
|
||||
t_max = max(all_t) if all_t else 0
|
||||
|
||||
@@ -94,6 +138,7 @@ def main():
|
||||
'depth': depth,
|
||||
'pwm_auv': {'channels': channels, 'samples': pwm_samples},
|
||||
'state': state,
|
||||
**{k: sample(v, args.max_pts) for k, v in signals.items()},
|
||||
}
|
||||
|
||||
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user