Files
cosma-nav-tools/docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md

1220 lines
39 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# GDrive Pipeline Replay — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Bouton dans viewer 8765 → rclone sync GDrive → pipeline Python existant → affichage panels USV/AUV synchronisés avec slider 24h.
**Architecture:** Micro-service FastAPI `pipeline-runner` (port 8767) dans cosma-nav-tools orchestre rclone + scripts tools/ existants + écriture JSON.gz downsamplé LTTB. Le viewer 8765 ajoute sélecteur sortie, bouton Sync, progress SSE, et deux panels Plotly (USV 9 graphs + AUV 10 graphs + tabs multi-AUV), tous synchronisés sur le slider 24h existant.
**Tech Stack:** FastAPI, uvicorn, aiofiles, numpy (LTTB), rclone (pré-installé sur .83), Plotly.js (déjà dans viewer), noUiSlider (déjà dans viewer).
**Prérequis sur .83:** `rclone` configuré avec remote `gdrive` pointant sur Google Drive COSMA. Vérifier: `rclone lsd gdrive:"Cosma - Internal/06-Operations/06 - Sorties"`.
---
## Fichiers créés / modifiés
| Fichier | Action | Rôle |
|---------|--------|------|
| `requirements.txt` | Créer | Dépendances Python |
| `docker-compose.yml` | Créer | Service pipeline-runner port 8767 |
| `tools/extract_mcap_signals.py` | Modifier | Ajouter topics pitch/roll/yaw, altitude, obstacle, battery |
| `pipeline_runner/__init__.py` | Créer | Package marker |
| `pipeline_runner/config.py` | Créer | Chemins GDrive, output dir |
| `pipeline_runner/processor.py` | Créer | LTTB + extraction signaux → JSON.gz |
| `pipeline_runner/runner.py` | Créer | rclone sync + orchestration scripts tools/ |
| `pipeline_runner/main.py` | Créer | FastAPI endpoints |
| `viewer/index.html` | Modifier | Sortie selector + bouton + SSE + panels USV/AUV |
---
## Task 0: Étendre extract_mcap_signals.py — topics AUV manquants
**Files:**
- Modify: `tools/extract_mcap_signals.py`
Le script actuel extrait uniquement depth/PWM/state. Il faut ajouter pitch/roll/yaw (IMU), altitude, obstacle, batterie.
- [ ] **Step 1: Ouvrir tools/extract_mcap_signals.py et localiser TOPICS**
Trouver la ligne:
```python
TOPICS = ['/mavros/imu/static_pressure', '/mavros/rc/out', '/mavros/state']
```
Remplacer par:
```python
TOPICS = [
'/mavros/imu/static_pressure', # depth (pression)
'/mavros/rc/out', # PWM moteurs
'/mavros/state', # arm/mode
'/mavros/imu/data', # orientation quaternion → pitch/roll/yaw
'/mavros/altitude', # altitude relative
'/mavros/battery', # tension batterie
'/mavros/distance_sensor/hrlv_ez4_pub', # obstacle avoidance
]
```
- [ ] **Step 2: Ajouter les handlers dans la boucle iter_decoded_messages**
Trouver le bloc `elif topic == '/mavros/state':` et ajouter après:
```python
elif topic == '/mavros/imu/data':
import math
q = ros_msg.orientation
sinr = 2*(q.w*q.x + q.y*q.z)
cosr = 1 - 2*(q.x*q.x + q.y*q.y)
roll = math.degrees(math.atan2(sinr, cosr))
sinp = 2*(q.w*q.y - q.z*q.x)
pitch = math.degrees(math.asin(max(-1, min(1, sinp))))
siny = 2*(q.w*q.z + q.x*q.y)
cosy = 1 - 2*(q.y*q.y + q.z*q.z)
yaw = math.degrees(math.atan2(siny, cosy))
signals.setdefault('pitch', []).append({'t_ms': t_ms, 'v': pitch})
signals.setdefault('roll', []).append({'t_ms': t_ms, 'v': roll})
signals.setdefault('yaw', []).append({'t_ms': t_ms, 'v': yaw})
elif topic == '/mavros/altitude':
signals.setdefault('altitude', []).append(
{'t_ms': t_ms, 'v': ros_msg.relative})
elif topic == '/mavros/battery':
signals.setdefault('battery_v', []).append(
{'t_ms': t_ms, 'v': ros_msg.voltage})
elif topic == '/mavros/distance_sensor/hrlv_ez4_pub':
signals.setdefault('obstacle_dist', []).append(
{'t_ms': t_ms, 'v': ros_msg.range})
```
> **Note:** Si `/mavros/distance_sensor/hrlv_ez4_pub` n'existe pas dans les MCAP, remplacer par le topic réel. Vérifier avec `mcap info <file>.mcap | grep distance`.
- [ ] **Step 3: Vérifier que les nouvelles clés sont dans le JSON output**
La variable `signals` est déjà écrite en JSON à la fin du script. Les nouvelles clés seront automatiquement incluses.
- [ ] **Step 4: Commit**
```bash
git add tools/extract_mcap_signals.py
git commit -m "feat: extract_mcap_signals — pitch/roll/yaw, altitude, obstacle, battery"
```
---
## Task 1: requirements.txt + docker-compose.yml
**Files:**
- Create: `requirements.txt`
- Create: `docker-compose.yml`
- [ ] **Step 1: Créer requirements.txt**
```
fastapi==0.115.0
uvicorn[standard]==0.30.6
aiofiles==24.1.0
numpy==2.1.1
```
- [ ] **Step 2: Créer docker-compose.yml**
```yaml
version: "3.9"
services:
pipeline-runner:
build:
context: .
dockerfile: pipeline_runner/Dockerfile
ports:
- "8767:8767"
volumes:
- /data/sorties:/data/sorties
environment:
- GDRIVE_REMOTE=gdrive:Cosma - Internal/06-Operations/06 - Sorties
- OUTPUT_DIR=/data/sorties
- TOOLS_DIR=/app/tools
restart: unless-stopped
```
- [ ] **Step 3: Créer pipeline_runner/Dockerfile**
```dockerfile
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y rclone && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY tools/ ./tools/
COPY vendor/ ./vendor/
COPY pipeline_runner/ ./pipeline_runner/
CMD ["uvicorn", "pipeline_runner.main:app", "--host", "0.0.0.0", "--port", "8767"]
```
- [ ] **Step 4: Créer pipeline_runner/__init__.py** (vide)
```python
```
- [ ] **Step 5: Commit**
```bash
git add requirements.txt docker-compose.yml pipeline_runner/
git commit -m "feat: pipeline-runner — scaffold docker + deps"
```
---
## Task 2: config.py
**Files:**
- Create: `pipeline_runner/config.py`
- [ ] **Step 1: Créer config.py**
```python
import os
from pathlib import Path
GDRIVE_REMOTE = os.getenv("GDRIVE_REMOTE", "gdrive:Cosma - Internal/06-Operations/06 - Sorties")
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/data/sorties"))
TOOLS_DIR = Path(os.getenv("TOOLS_DIR", Path(__file__).parent.parent / "tools"))
LTTB_MAX_PTS = 4000
```
- [ ] **Step 2: Commit**
```bash
git add pipeline_runner/config.py
git commit -m "feat: pipeline-runner — config"
```
---
## Task 3: processor.py — LTTB + extraction signaux
**Files:**
- Create: `pipeline_runner/processor.py`
Le processor lit les CSV/JSON produits par les scripts tools/ et écrit `usv.json.gz` et `auv_AUVxxx.json.gz`.
**Champs USV extraits depuis `_navigation_log.csv`** (format: `timestamp,data,value` long):
- `Yaw`, `Heading`, `Roll`, `Pitch`
- `BattVoltage`
- `gps_fix` (valeur string: ex "3D_FIX")
- `Armed` (0/1)
- `Mode` (string: ex "MANUAL", "AUTO")
- `M1`, `M2` (commandes moteur USV)
**USBL depuis `combined_usbl.csv`** produit par `parse_kogger_usbl.py`:
- colonnes: `Timestamp, Dist, Azimuth, Elev, SNR, FrameID, ...`
**Champs AUV depuis `mcap_signals.json`** produit par `extract_mcap_signals.py`:
- `depth`, `pwm_auv` (dict M1-M6), `state` (arm/mode)
- [ ] **Step 1: Créer processor.py**
```python
import csv
import gzip
import json
import math
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import numpy as np
from .config import LTTB_MAX_PTS
def _lttb(points: list[dict], max_pts: int) -> list[dict]:
"""Largest Triangle Three Buckets downsampling."""
n = len(points)
if n <= max_pts:
return points
ts = np.array([p["t"] for p in points], dtype=float)
vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float)
bucket_size = (n - 2) / (max_pts - 2)
out = [points[0]]
a = 0
for i in range(max_pts - 2):
avg_start = int((i + 1) * bucket_size) + 1
avg_end = min(int((i + 2) * bucket_size) + 1, n)
avg_t = np.mean(ts[avg_start:avg_end])
avg_v = np.mean(vs[avg_start:avg_end])
rng_start = int(i * bucket_size) + 1
rng_end = min(int((i + 1) * bucket_size) + 1, n)
max_area = -1.0
best = rng_start
at, av = ts[a], vs[a]
for j in range(rng_start, rng_end):
area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5
if area > max_area:
max_area = area
best = j
out.append(points[best])
a = best
out.append(points[-1])
return out
def _ts_to_epoch(ts_str: str) -> float:
"""Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds."""
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc)
return dt.timestamp()
except ValueError:
continue
return 0.0
def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]:
"""Read long-format navigation_log.csv → dict of signal arrays."""
signals: dict[str, list] = {}
wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix",
"Armed", "Mode", "M1", "M2"}
with open(nav_log_path, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
next(reader, None) # skip header
for row in reader:
if len(row) < 3:
continue
ts_str, field, val = row[0], row[1], row[2]
if field not in wanted:
continue
t = _ts_to_epoch(ts_str)
try:
v: Any = float(val)
except ValueError:
v = val.strip()
signals.setdefault(field, []).append({"t": t, "v": v})
return signals
def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]:
"""Read combined_usbl.csv → Dist and Azimuth signal arrays."""
dist_pts, az_pts = [], []
with open(usbl_csv_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
try:
t = _ts_to_epoch(row["Timestamp"])
d = float(row["Dist"]) if row["Dist"] else None
az = float(row["Azimuth"]) if row["Azimuth"] else None
except (KeyError, ValueError):
continue
if d is not None and not math.isnan(d):
dist_pts.append({"t": t, "v": d})
if az is not None and not math.isnan(az):
az_pts.append({"t": t, "v": az})
return {"usbl_dist": dist_pts, "usbl_angle": az_pts}
def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]:
"""Read mcap_signals.json → depth, motors M1-M6, state signals."""
with open(mcap_json_path) as f:
data = json.load(f)
signals: dict[str, list[dict]] = {}
def _unpack(key: str, series: list):
pts = []
for item in series:
t = item.get("t_ms", item.get("t", 0))
if isinstance(t, (int, float)) and t > 1e9:
t = t / 1000.0 # ms → s
pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))})
signals[key] = pts
if "depth" in data:
_unpack("depth", data["depth"])
if "pwm_auv" in data:
for m_key, series in data["pwm_auv"].items():
_unpack(m_key.lower(), series)
if "state" in data:
_unpack("arm_status", data["state"])
return signals
def write_usv_json(
nav_log_path: Path,
usbl_csv_path: Path | None,
output_path: Path,
sortie_id: str,
date: str,
) -> None:
signals = _read_nav_log(nav_log_path)
if usbl_csv_path and usbl_csv_path.exists():
signals.update(_read_usbl_csv(usbl_csv_path))
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
meta = {
"sortie": sortie_id,
"date": date,
"vehicle": "USV",
"t_start": min(t_all) if t_all else 0,
"t_end": max(t_all) if t_all else 0,
}
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
output_path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(output_path, "wb") as f:
f.write(payload)
def write_auv_json(
mcap_json_path: Path,
output_path: Path,
auv_id: str,
sortie_id: str,
date: str,
) -> None:
signals = _read_mcap_signals(mcap_json_path)
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
meta = {
"sortie": sortie_id,
"date": date,
"vehicle": auv_id,
"t_start": min(t_all) if t_all else 0,
"t_end": max(t_all) if t_all else 0,
}
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
output_path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(output_path, "wb") as f:
f.write(payload)
```
- [ ] **Step 2: Commit**
```bash
git add pipeline_runner/processor.py
git commit -m "feat: pipeline-runner — processor LTTB + signal extraction"
```
---
## Task 4: runner.py — rclone + orchestration
**Files:**
- Create: `pipeline_runner/runner.py`
- [ ] **Step 1: Créer runner.py**
```python
import asyncio
import re
import subprocess
from pathlib import Path
from typing import AsyncGenerator
from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR
from .processor import write_auv_json, write_usv_json
async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""):
await queue.put({"step": step, "pct": pct, "msg": msg})
def _find_nav_log(ship_dir: Path) -> Path | None:
for p in ship_dir.glob("*_navigation_log.csv"):
return p
return None
def _find_usbl_csv(ship_dir: Path) -> Path | None:
for p in ship_dir.glob("*_usbl.csv"):
return p
return None
def _detect_auvs(sub_dir: Path) -> list[str]:
"""Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010''AUV010')."""
auvs = []
for d in sub_dir.iterdir():
if d.is_dir():
m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE)
if m:
auvs.append(m.group(1).upper())
return sorted(set(auvs))
def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None:
for d in sub_dir.iterdir():
if d.is_dir() and auv_id.upper() in d.name.upper():
return d
return None
def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess:
cmd = ["python3", str(TOOLS_DIR / script_name)] + args
return subprocess.run(cmd, capture_output=True, text=True)
async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None:
"""Full pipeline: rclone sync → parse → process → write JSON.gz"""
raw_dir = OUTPUT_DIR / sortie_id / "raw"
proc_dir = OUTPUT_DIR / sortie_id / "processed"
proc_dir.mkdir(parents=True, exist_ok=True)
# Step 1: rclone sync
await _emit(queue, "sync", 0, "rclone sync en cours…")
gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}'
result = subprocess.run(
["rclone", "sync", gdrive_path, str(raw_dir), "--progress"],
capture_output=True, text=True
)
if result.returncode != 0:
await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}")
return
await _emit(queue, "sync", 50, "rclone terminé")
# Detect session dir (YYYYMMDD_HHMMSS_AUVxxx) inside raw/
# Sorties have structure: raw/raw_data/logs/SHIP + SUB
ship_dir = next(raw_dir.rglob("logs/SHIP"), None)
sub_dir = next(raw_dir.rglob("logs/SUB"), None)
if not ship_dir or not ship_dir.exists():
await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync")
return
# Step 2: USV parsing
await _emit(queue, "usv_parse", 55, "Parsing logs USV…")
nav_log = _find_nav_log(ship_dir)
usbl_csv_raw = _find_usbl_csv(ship_dir)
usbl_parsed_path = proc_dir / "combined_usbl.csv"
if usbl_csv_raw:
_run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)])
date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10]
if nav_log:
write_usv_json(
nav_log_path=nav_log,
usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None,
output_path=proc_dir / "usv.json.gz",
sortie_id=sortie_id,
date=date_str,
)
await _emit(queue, "usv_parse", 70, "USV OK")
# Step 3: AUV parsing
if sub_dir and sub_dir.exists():
auv_ids = _detect_auvs(sub_dir)
total = len(auv_ids) or 1
for i, auv_id in enumerate(auv_ids):
await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}")
session_dir = _detect_session_dir(sub_dir, auv_id)
if not session_dir:
continue
mcap_out = proc_dir / f"mcap_{auv_id}.json"
_run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"])
# extract_mcap_signals writes to output/ by default; look for it
default_out = Path("output") / "mcap_signals.json"
if not default_out.exists():
default_out = session_dir / "mcap_signals.json"
if default_out.exists():
import shutil
shutil.copy(default_out, mcap_out)
if mcap_out.exists():
write_auv_json(
mcap_json_path=mcap_out,
output_path=proc_dir / f"auv_{auv_id}.json.gz",
auv_id=auv_id,
sortie_id=sortie_id,
date=date_str,
)
await _emit(queue, "write", 100, "Pipeline terminé")
async def scan_sorties() -> list[dict]:
"""List available sorties on GDrive via rclone lsd."""
result = subprocess.run(
["rclone", "lsd", GDRIVE_REMOTE],
capture_output=True, text=True
)
sorties = []
for line in result.stdout.splitlines():
parts = line.strip().split(None, 4)
if len(parts) == 5:
name = parts[4].strip()
processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists()
sorties.append({"id": name, "processed": processed})
return sorties
```
- [ ] **Step 2: Commit**
```bash
git add pipeline_runner/runner.py
git commit -m "feat: pipeline-runner — runner rclone + orchestration"
```
---
## Task 5: main.py — FastAPI endpoints
**Files:**
- Create: `pipeline_runner/main.py`
- [ ] **Step 1: Créer main.py**
```python
import asyncio
import gzip
import json
from pathlib import Path
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from .config import OUTPUT_DIR
from .runner import run_pipeline, scan_sorties
app = FastAPI(title="COSMA Pipeline Runner")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
# Active pipeline jobs: sortie_id → asyncio.Queue
_jobs: dict[str, asyncio.Queue] = {}
@app.get("/sorties")
async def list_sorties():
return await scan_sorties()
@app.post("/run/{sortie_id:path}")
async def run_sortie(sortie_id: str):
if sortie_id in _jobs:
return {"status": "already_running"}
queue: asyncio.Queue = asyncio.Queue()
_jobs[sortie_id] = queue
asyncio.create_task(_run_and_cleanup(sortie_id, queue))
return {"status": "started"}
async def _run_and_cleanup(sortie_id: str, queue: asyncio.Queue):
try:
await run_pipeline(sortie_id, queue)
finally:
await asyncio.sleep(30)
_jobs.pop(sortie_id, None)
@app.get("/events/{sortie_id:path}")
async def sse_events(sortie_id: str):
if sortie_id not in _jobs:
raise HTTPException(404, "No active job for this sortie")
async def generate() -> AsyncGenerator[str, None]:
queue = _jobs[sortie_id]
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=60)
yield f"data: {json.dumps(event)}\n\n"
if event.get("step") in ("error", "write") and event.get("pct") in (0, 100):
break
except asyncio.TimeoutError:
yield "data: {\"step\":\"ping\"}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
def _read_gz(path: Path) -> dict:
with gzip.open(path) as f:
return json.loads(f.read())
@app.get("/sorties/{sortie_id:path}/usv")
async def get_usv(sortie_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / "usv.json.gz"
if not p.exists():
raise HTTPException(404, "USV data not found — run pipeline first")
return JSONResponse(_read_gz(p))
@app.get("/sorties/{sortie_id:path}/auvs")
async def list_auvs(sortie_id: str):
proc = OUTPUT_DIR / sortie_id / "processed"
auvs = [p.stem.removeprefix("auv_").removesuffix(".json")
for p in proc.glob("auv_*.json.gz")]
return sorted(auvs)
@app.get("/sorties/{sortie_id:path}/auv/{auv_id}")
async def get_auv(sortie_id: str, auv_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / f"auv_{auv_id}.json.gz"
if not p.exists():
raise HTTPException(404, f"AUV {auv_id} data not found")
return JSONResponse(_read_gz(p))
@app.get("/sorties/{sortie_id:path}/tracks")
async def get_tracks(sortie_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / "tracks.geojson"
if not p.exists():
raise HTTPException(404, "tracks.geojson not found")
with open(p) as f:
return JSONResponse(json.load(f))
```
- [ ] **Step 2: Commit**
```bash
git add pipeline_runner/main.py
git commit -m "feat: pipeline-runner — FastAPI endpoints + SSE"
```
---
## Task 6: Test pipeline local sur sortie #71
Avant de modifier le viewer, valider que le service tourne et répond.
- [ ] **Step 1: Installer deps localement**
```bash
pip install fastapi uvicorn[standard] aiofiles numpy
```
- [ ] **Step 2: Lancer le service en local**
```bash
cd /c/Users/flopp/cosma-nav-tools
OUTPUT_DIR=/tmp/cosma-test \
GDRIVE_REMOTE="gdrive:Cosma - Internal/06-Operations/06 - Sorties" \
TOOLS_DIR=tools \
uvicorn pipeline_runner.main:app --port 8767 --reload
```
- [ ] **Step 3: Tester /sorties**
```bash
curl http://localhost:8767/sorties | python3 -m json.tool | head -20
```
Attendu: liste de dossiers dont `#71-golrest`.
- [ ] **Step 4: Lancer pipeline sur #71**
```bash
curl -X POST "http://localhost:8767/run/%2371-golrest"
# puis stream les events
curl "http://localhost:8767/events/%2371-golrest"
```
Attendu: stream SSE avec étapes `sync → usv_parse → auv_parse → write 100%`.
- [ ] **Step 5: Vérifier output USV**
```bash
curl "http://localhost:8767/sorties/%2371-golrest/usv" | python3 -m json.tool | head -30
```
Attendu: `{"meta": {...}, "signals": {"Yaw": [...], "Heading": [...], ...}}`.
- [ ] **Step 6: Commit si OK**
```bash
git commit --allow-empty -m "test: pipeline #71 validé en local"
```
---
## Task 7: viewer — sélecteur sortie + bouton + SSE progress
**Files:**
- Modify: `viewer/index.html`
La constante `API2 = 'http://192.168.0.83:8767'` est ajoutée. Le header reçoit un dropdown sortie et le bouton Sync.
- [ ] **Step 1: Ajouter `API2` et les styles dans `<head>` (après la ligne `const API = ...`)**
Trouver dans le JS de index.html:
```js
const API = 'http://192.168.0.83:8766';
```
Ajouter juste après:
```js
const API2 = 'http://192.168.0.83:8767';
```
- [ ] **Step 2: Ajouter styles CSS pour les nouveaux éléments (dans le bloc `<style>`)**
Ajouter avant `</style>`:
```css
#sortie-select {
background: #0f3460; border: 1px solid #e94560; color: #e0e0e0;
font-family: monospace; font-size: 11px; padding: 2px 6px; border-radius: 2px;
cursor: pointer; max-width: 160px;
}
#btn-sync {
background: #0f3460; border: 1px solid #e94560; color: #e94560;
padding: 2px 9px; cursor: pointer; font-family: monospace; font-size: 11px;
border-radius: 2px;
}
#btn-sync:hover { background: #e94560; color: #1a1a2e; }
#btn-sync:disabled { opacity: 0.4; cursor: not-allowed; }
#sync-progress {
font-size: 10px; color: #06d6a0; flex: 1;
white-space: nowrap; overflow: hidden; text-overflow: ellipsis;
}
.panel-header {
background: #0d0d20; border-top: 1px solid #0f3460; border-bottom: 1px solid #0f3460;
padding: 4px 14px; font-size: 11px; font-weight: bold; color: #e94560;
display: flex; align-items: center; gap: 8px;
}
.auv-tab {
font-family: monospace; font-size: 10px; padding: 2px 8px; cursor: pointer;
border: 1px solid #0f3460; background: transparent; color: #a0c4ff; border-radius: 2px;
}
.auv-tab.active { background: #0f3460; color: #e0e0e0; }
.graphs-grid {
display: grid; grid-template-columns: repeat(2, 1fr); gap: 4px;
padding: 4px 8px; background: #12122a;
}
.graph-cell { height: 130px; background: #1a1a2e; }
.graph-cell.wide { grid-column: span 2; height: 130px; }
```
- [ ] **Step 3: Ajouter HTML dans le `#header` (sortie dropdown + bouton + progress)**
Dans le `<div id="header">`, ajouter après `#layer-toggles`:
```html
<select id="sortie-select"><option value="">— Sortie —</option></select>
<button id="btn-sync" disabled>Sync &amp; Process</button>
<span id="sync-progress"></span>
```
- [ ] **Step 4: Ajouter HTML panels USV + AUV (avant `</body>`)**
```html
<div class="panel-header">USV</div>
<div class="graphs-grid" id="usv-graphs">
<div class="graph-cell" id="usv-yaw"></div>
<div class="graph-cell" id="usv-heading"></div>
<div class="graph-cell" id="usv-batt"></div>
<div class="graph-cell" id="usv-gps"></div>
<div class="graph-cell" id="usv-usbl-dist"></div>
<div class="graph-cell" id="usv-usbl-angle"></div>
<div class="graph-cell" id="usv-m1"></div>
<div class="graph-cell" id="usv-m2"></div>
<div class="graph-cell wide" id="usv-status"></div>
</div>
<div class="panel-header" id="auv-panel-header">
AUV
<span id="auv-tabs"></span>
</div>
<div class="graphs-grid" id="auv-graphs">
<div class="graph-cell" id="auv-pry"></div>
<div class="graph-cell" id="auv-depth"></div>
<div class="graph-cell" id="auv-alt"></div>
<div class="graph-cell" id="auv-obs"></div>
<div class="graph-cell" id="auv-usbl-dist"></div>
<div class="graph-cell" id="auv-usbl-angle"></div>
<div class="graph-cell" id="auv-batt"></div>
<div class="graph-cell" id="auv-status"></div>
<div class="graph-cell wide" id="auv-motors"></div>
</div>
```
- [ ] **Step 5: Ajouter JS — load sorties au démarrage**
Dans le bloc `<script>`, ajouter la fonction:
```js
async function loadSorties() {
try {
const resp = await fetch(`${API2}/sorties`);
const sorties = await resp.json();
const sel = document.getElementById('sortie-select');
sorties.forEach(s => {
const opt = document.createElement('option');
opt.value = s.id;
opt.textContent = s.id + (s.processed ? ' ✓' : '');
sel.appendChild(opt);
});
sel.addEventListener('change', () => {
document.getElementById('btn-sync').disabled = !sel.value;
});
} catch(e) { console.warn('pipeline-runner unavailable', e); }
}
loadSorties();
```
- [ ] **Step 6: Ajouter JS — bouton Sync & SSE progress**
```js
document.getElementById('btn-sync').addEventListener('click', async () => {
const sortieId = document.getElementById('sortie-select').value;
if (!sortieId) return;
const btn = document.getElementById('btn-sync');
const prog = document.getElementById('sync-progress');
btn.disabled = true;
prog.textContent = 'Démarrage…';
const encoded = encodeURIComponent(sortieId);
await fetch(`${API2}/run/${encoded}`, { method: 'POST' });
const es = new EventSource(`${API2}/events/${encoded}`);
es.onmessage = async (e) => {
const evt = JSON.parse(e.data);
if (evt.step === 'ping') return;
prog.textContent = `[${evt.step}] ${evt.pct}% ${evt.msg}`;
if (evt.step === 'write' && evt.pct === 100) {
es.close();
btn.disabled = false;
prog.textContent = 'Terminé — chargement des données…';
await loadSortieData(sortieId);
}
if (evt.step === 'error') {
es.close();
btn.disabled = false;
prog.textContent = `Erreur: ${evt.msg}`;
}
};
});
```
- [ ] **Step 7: Commit**
```bash
git add viewer/index.html
git commit -m "feat: viewer — sortie selector + sync button + SSE progress"
```
---
## Task 8: viewer — panels USV (9 graphs Plotly)
**Files:**
- Modify: `viewer/index.html`
Ajouter la fonction `renderUSV(signals)` et l'appel dans `loadSortieData`.
- [ ] **Step 1: Ajouter fonction renderUSV dans le JS**
```js
function _pts(sig) {
if (!sig) return [[], []];
return [sig.map(p => new Date(p.t * 1000)), sig.map(p => p.v)];
}
const PLOTLY_LAYOUT_BASE = {
margin: {l:40, r:8, t:20, b:30},
paper_bgcolor: '#1a1a2e', plot_bgcolor: '#1a1a2e',
font: {color: '#e0e0e0', size: 9, family: 'monospace'},
xaxis: {color: '#555', gridcolor: '#1e1e3a', type: 'date'},
yaxis: {color: '#555', gridcolor: '#1e1e3a'},
showlegend: false,
};
function _layout(title, yLabel='') {
return Object.assign({}, PLOTLY_LAYOUT_BASE, {
title: {text: title, font: {size: 9, color: '#888'}},
yaxis: Object.assign({}, PLOTLY_LAYOUT_BASE.yaxis, {title: {text: yLabel, font: {size: 8}}})
});
}
function renderUSV(signals) {
const cfg = {responsive: true, displayModeBar: false};
const [xt, xv] = _pts(signals.Yaw);
Plotly.react('usv-yaw',
[{x: xt, y: xv, type:'scatter', mode:'lines', line:{color:'#00b4d8', width:1}, name:'Yaw'}],
_layout('Yaw', '°'), cfg);
const [ht, hv] = _pts(signals.Heading);
Plotly.react('usv-heading',
[{x: ht, y: hv, type:'scatter', mode:'lines', line:{color:'#06d6a0', width:1}, name:'Heading'}],
_layout('Heading', '°'), cfg);
const [bt, bv] = _pts(signals.BattVoltage);
Plotly.react('usv-batt',
[{x: bt, y: bv, type:'scatter', mode:'lines', line:{color:'#ffd166', width:1}, name:'Batt'}],
_layout('Battery', 'V'), cfg);
const [gt, gv] = _pts(signals.gps_fix);
Plotly.react('usv-gps',
[{x: gt, y: gv, type:'scatter', mode:'lines+markers', line:{color:'#06d6a0', width:1, shape:'hv'}, name:'GPS'}],
_layout('GPS fix'), cfg);
const [dt, dv] = _pts(signals.usbl_dist);
Plotly.react('usv-usbl-dist',
[{x: dt, y: dv, type:'scatter', mode:'lines', line:{color:'#a0c4ff', width:1}}],
_layout('USBL dist', 'm'), cfg);
const [at, av] = _pts(signals.usbl_angle);
Plotly.react('usv-usbl-angle',
[{x: at, y: av, type:'scatter', mode:'lines', line:{color:'#c77dff', width:1}}],
_layout('USBL angle', '°'), cfg);
const [m1t, m1v] = _pts(signals.M1);
Plotly.react('usv-m1',
[{x: m1t, y: m1v, type:'scatter', mode:'lines', line:{color:'#ef476f', width:1}}],
_layout('Motor 1', 'cmd'), cfg);
const [m2t, m2v] = _pts(signals.M2);
Plotly.react('usv-m2',
[{x: m2t, y: m2v, type:'scatter', mode:'lines', line:{color:'#ff6b6b', width:1}}],
_layout('Motor 2', 'cmd'), cfg);
const [st, sv] = _pts(signals.Armed || signals.Mode);
Plotly.react('usv-status',
[{x: st, y: sv, type:'scatter', mode:'lines', line:{color:'#ffd166', width:1, shape:'hv'}, name:'Status'}],
Object.assign(_layout('USV status'), {showlegend: false}), cfg);
}
```
- [ ] **Step 2: Commit**
```bash
git add viewer/index.html
git commit -m "feat: viewer — USV panel 9 graphs Plotly"
```
---
## Task 9: viewer — panel AUV avec tabs multi-AUV
**Files:**
- Modify: `viewer/index.html`
- [ ] **Step 1: Ajouter renderAUV + tabs dans le JS**
```js
let _currentAuvId = null;
async function loadAuvTabs(sortieId) {
const resp = await fetch(`${API2}/sorties/${encodeURIComponent(sortieId)}/auvs`);
const auvs = await resp.json();
const tabsEl = document.getElementById('auv-tabs');
tabsEl.innerHTML = '';
auvs.forEach((auv, i) => {
const btn = document.createElement('button');
btn.className = 'auv-tab' + (i === 0 ? ' active' : '');
btn.textContent = auv;
btn.onclick = async () => {
document.querySelectorAll('.auv-tab').forEach(b => b.classList.remove('active'));
btn.classList.add('active');
_currentAuvId = auv;
const r = await fetch(`${API2}/sorties/${encodeURIComponent(sortieId)}/auv/${auv}`);
const data = await r.json();
renderAUV(data.signals);
};
tabsEl.appendChild(btn);
if (i === 0) { _currentAuvId = auv; }
});
if (auvs.length > 0) {
const r = await fetch(`${API2}/sorties/${encodeURIComponent(sortieId)}/auv/${auvs[0]}`);
const data = await r.json();
renderAUV(data.signals);
}
}
function renderAUV(signals) {
const cfg = {responsive: true, displayModeBar: false};
Plotly.react('auv-pry', [
{x: _pts(signals.pitch)[0], y: _pts(signals.pitch)[1], name:'Pitch', line:{color:'#ef476f', width:1}},
{x: _pts(signals.roll)[0], y: _pts(signals.roll)[1], name:'Roll', line:{color:'#06d6a0', width:1}},
{x: _pts(signals.yaw)[0], y: _pts(signals.yaw)[1], name:'Yaw', line:{color:'#00b4d8', width:1}},
].map(t => Object.assign({type:'scatter', mode:'lines'}, t)),
Object.assign(_layout('Pitch/Roll/Yaw', '°'), {showlegend: true,
legend:{font:{size:8}, bgcolor:'transparent', x:0, y:1}}), cfg);
const [dpt, dpv] = _pts(signals.depth);
Plotly.react('auv-depth',
[{x: dpt, y: dpv, type:'scatter', mode:'lines', line:{color:'#a0c4ff', width:1}}],
_layout('Depth', 'm'), cfg);
const [alt, alv] = _pts(signals.altitude);
Plotly.react('auv-alt',
[{x: alt, y: alv, type:'scatter', mode:'lines', line:{color:'#ffd166', width:1}}],
_layout('Altitude', 'm'), cfg);
const [obt, obv] = _pts(signals.obstacle_dist);
Plotly.react('auv-obs',
[{x: obt, y: obv, type:'scatter', mode:'lines', line:{color:'#c77dff', width:1}}],
_layout('Obstacle', 'm'), cfg);
const [udt, udv] = _pts(signals.usbl_dist);
Plotly.react('auv-usbl-dist',
[{x: udt, y: udv, type:'scatter', mode:'lines', line:{color:'#a0c4ff', width:1}}],
_layout('USBL dist', 'm'), cfg);
const [uat, uav] = _pts(signals.usbl_angle);
Plotly.react('auv-usbl-angle',
[{x: uat, y: uav, type:'scatter', mode:'lines', line:{color:'#c77dff', width:1}}],
_layout('USBL angle', '°'), cfg);
const [bbt, bbv] = _pts(signals.battery_v);
Plotly.react('auv-batt',
[{x: bbt, y: bbv, type:'scatter', mode:'lines', line:{color:'#ffd166', width:1}}],
_layout('Battery', 'V'), cfg);
const [stt, stv] = _pts(signals.arm_status);
Plotly.react('auv-status',
[{x: stt, y: stv, type:'scatter', mode:'lines', line:{color:'#06d6a0', width:1, shape:'hv'}}],
_layout('Arm/Mode'), cfg);
const motorColors = ['#ef476f','#ffd166','#06d6a0','#00b4d8','#a0c4ff','#c77dff'];
const motorTraces = ['m1','m2','m3','m4','m5','m6'].map((mk, i) => {
const [t, v] = _pts(signals[mk]);
return {x: t, y: v, type:'scatter', mode:'lines', name:`M${i+1}`,
line:{color: motorColors[i], width:1}};
});
Plotly.react('auv-motors', motorTraces,
Object.assign(_layout('Motors ×6 PWM', 'µs'), {showlegend: true,
legend:{font:{size:8}, bgcolor:'transparent', orientation:'h', x:0, y:1}}), cfg);
}
```
- [ ] **Step 2: Commit**
```bash
git add viewer/index.html
git commit -m "feat: viewer — AUV panel 10 graphs + tabs multi-AUV"
```
---
## Task 10: viewer — curseur X synchronisé slider ↔ tous les graphs
**Files:**
- Modify: `viewer/index.html`
- [ ] **Step 1: Créer la liste de tous les graph IDs et la fonction updateCursor**
```js
const ALL_GRAPH_IDS = [
'chart-depth', 'chart-pwm-auv', 'chart-pwm-usv', 'chart-usbl',
'usv-yaw', 'usv-heading', 'usv-batt', 'usv-gps',
'usv-usbl-dist', 'usv-usbl-angle', 'usv-m1', 'usv-m2', 'usv-status',
'auv-pry', 'auv-depth', 'auv-alt', 'auv-obs',
'auv-usbl-dist', 'auv-usbl-angle', 'auv-batt', 'auv-status', 'auv-motors',
];
function updateCursor(epochSec) {
const ts = new Date(epochSec * 1000).toISOString();
const shape = {
type: 'line', x0: ts, x1: ts, y0: 0, y1: 1,
yref: 'paper', line: {color: '#e94560', width: 1, dash: 'dot'},
};
ALL_GRAPH_IDS.forEach(id => {
const el = document.getElementById(id);
if (el && el._fullLayout) {
Plotly.relayout(id, {'shapes': [shape]});
}
});
}
```
- [ ] **Step 2: Brancher sur l'événement slider existant**
Trouver dans le JS existant l'event handler du slider noUiSlider (chercher `slider.noUiSlider.on` ou `noUiSlider`). Ajouter dans le callback `update` ou `set`:
```js
// Dans le callback du slider, ajouter:
const [lo] = slider.noUiSlider.get(true); // valeur basse en epoch ms
updateCursor(lo / 1000);
```
- [ ] **Step 3: Commit**
```bash
git add viewer/index.html
git commit -m "feat: viewer — curseur X synchronisé slider + tous les graphs"
```
---
## Task 11: loadSortieData — câblage final
**Files:**
- Modify: `viewer/index.html`
- [ ] **Step 1: Ajouter la fonction loadSortieData**
```js
async function loadSortieData(sortieId) {
const prog = document.getElementById('sync-progress');
try {
prog.textContent = 'Chargement USV…';
const usvResp = await fetch(`${API2}/sorties/${encodeURIComponent(sortieId)}/usv`);
if (usvResp.ok) {
const usvData = await usvResp.json();
renderUSV(usvData.signals);
}
prog.textContent = 'Chargement AUV…';
await loadAuvTabs(sortieId);
prog.textContent = `${sortieId} chargé`;
} catch(e) {
prog.textContent = `Erreur chargement: ${e.message}`;
}
}
```
- [ ] **Step 2: Brancher sur le changement de sortie (si déjà processé)**
Dans l'event listener du `sortie-select` (Task 7 Step 5), ajouter après `btn.disabled = !sel.value`:
```js
if (sel.value) {
// Charger si déjà processé
const opt = sel.options[sel.selectedIndex];
if (opt.textContent.includes('✓')) {
loadSortieData(sel.value);
}
}
```
- [ ] **Step 3: Push sur feat/flag-local**
```bash
git add viewer/index.html
git commit -m "feat: viewer — loadSortieData câblage final"
git push origin feat/flag-local
```
---
## Task 12: Déploiement sur .83
- [ ] **Step 1: Sur .83, cloner la branche feat/flag-local**
```bash
ssh user@192.168.0.83
cd /opt
git clone https://floppyrj45:TOKEN@gitea.nowyouknow.fr/floppyrj45/cosma-nav-tools.git
cd cosma-nav-tools
git checkout feat/flag-local
```
- [ ] **Step 2: Vérifier rclone configuré**
```bash
rclone lsd "gdrive:Cosma - Internal/06-Operations/06 - Sorties" | head -5
```
- [ ] **Step 3: Créer le dossier data**
```bash
mkdir -p /data/sorties
```
- [ ] **Step 4: Build + démarrer Docker**
```bash
docker compose up -d --build pipeline-runner
docker compose logs -f pipeline-runner
```
Attendu: `Uvicorn running on http://0.0.0.0:8767`.
- [ ] **Step 5: Test smoke**
```bash
curl http://192.168.0.83:8767/sorties | python3 -m json.tool | head -10
```
- [ ] **Step 6: Ouvrir le viewer 8765 et tester le bouton Sync sur #71**
Naviguer sur `http://192.168.0.83:8765`, sélectionner `#71-golrest`, cliquer `Sync & Process`, observer la progress bar, vérifier les graphs USV/AUV.