Files
cosma-nav-tools/docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md

39 KiB
Raw Permalink Blame History

GDrive Pipeline Replay — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Bouton dans viewer 8765 → rclone sync GDrive → pipeline Python existant → affichage panels USV/AUV synchronisés avec slider 24h.

Architecture: Micro-service FastAPI pipeline-runner (port 8767) dans cosma-nav-tools orchestre rclone + scripts tools/ existants + écriture JSON.gz downsamplé LTTB. Le viewer 8765 ajoute sélecteur sortie, bouton Sync, progress SSE, et deux panels Plotly (USV 9 graphs + AUV 10 graphs + tabs multi-AUV), tous synchronisés sur le slider 24h existant.

Tech Stack: FastAPI, uvicorn, aiofiles, numpy (LTTB), rclone (pré-installé sur .83), Plotly.js (déjà dans viewer), noUiSlider (déjà dans viewer).

Prérequis sur .83: rclone configuré avec remote gdrive pointant sur Google Drive COSMA. Vérifier: rclone lsd gdrive:"Cosma - Internal/06-Operations/06 - Sorties".


Fichiers créés / modifiés

Fichier Action Rôle
requirements.txt Créer Dépendances Python
docker-compose.yml Créer Service pipeline-runner port 8767
tools/extract_mcap_signals.py Modifier Ajouter topics pitch/roll/yaw, altitude, obstacle, battery
pipeline_runner/__init__.py Créer Package marker
pipeline_runner/config.py Créer Chemins GDrive, output dir
pipeline_runner/processor.py Créer LTTB + extraction signaux → JSON.gz
pipeline_runner/runner.py Créer rclone sync + orchestration scripts tools/
pipeline_runner/main.py Créer FastAPI endpoints
viewer/index.html Modifier Sortie selector + bouton + SSE + panels USV/AUV

Task 0: Étendre extract_mcap_signals.py — topics AUV manquants

Files:

  • Modify: tools/extract_mcap_signals.py

Le script actuel extrait uniquement depth/PWM/state. Il faut ajouter pitch/roll/yaw (IMU), altitude, obstacle, batterie.

  • Step 1: Ouvrir tools/extract_mcap_signals.py et localiser TOPICS

Trouver la ligne:

TOPICS = ['/mavros/imu/static_pressure', '/mavros/rc/out', '/mavros/state']

Remplacer par:

TOPICS = [
    '/mavros/imu/static_pressure',  # depth (pression)
    '/mavros/rc/out',               # PWM moteurs
    '/mavros/state',                # arm/mode
    '/mavros/imu/data',             # orientation quaternion → pitch/roll/yaw
    '/mavros/altitude',             # altitude relative
    '/mavros/battery',              # tension batterie
    '/mavros/distance_sensor/hrlv_ez4_pub',  # obstacle avoidance
]
  • Step 2: Ajouter les handlers dans la boucle iter_decoded_messages

Trouver le bloc elif topic == '/mavros/state': et ajouter après:

                    elif topic == '/mavros/imu/data':
                        import math
                        q = ros_msg.orientation
                        sinr = 2*(q.w*q.x + q.y*q.z)
                        cosr = 1 - 2*(q.x*q.x + q.y*q.y)
                        roll = math.degrees(math.atan2(sinr, cosr))
                        sinp = 2*(q.w*q.y - q.z*q.x)
                        pitch = math.degrees(math.asin(max(-1, min(1, sinp))))
                        siny = 2*(q.w*q.z + q.x*q.y)
                        cosy = 1 - 2*(q.y*q.y + q.z*q.z)
                        yaw = math.degrees(math.atan2(siny, cosy))
                        signals.setdefault('pitch', []).append({'t_ms': t_ms, 'v': pitch})
                        signals.setdefault('roll', []).append({'t_ms': t_ms, 'v': roll})
                        signals.setdefault('yaw', []).append({'t_ms': t_ms, 'v': yaw})
                    elif topic == '/mavros/altitude':
                        signals.setdefault('altitude', []).append(
                            {'t_ms': t_ms, 'v': ros_msg.relative})
                    elif topic == '/mavros/battery':
                        signals.setdefault('battery_v', []).append(
                            {'t_ms': t_ms, 'v': ros_msg.voltage})
                    elif topic == '/mavros/distance_sensor/hrlv_ez4_pub':
                        signals.setdefault('obstacle_dist', []).append(
                            {'t_ms': t_ms, 'v': ros_msg.range})

Note: Si /mavros/distance_sensor/hrlv_ez4_pub n'existe pas dans les MCAP, remplacer par le topic réel. Vérifier avec mcap info <file>.mcap | grep distance.

  • Step 3: Vérifier que les nouvelles clés sont dans le JSON output

La variable signals est déjà écrite en JSON à la fin du script. Les nouvelles clés seront automatiquement incluses.

  • Step 4: Commit
git add tools/extract_mcap_signals.py
git commit -m "feat: extract_mcap_signals — pitch/roll/yaw, altitude, obstacle, battery"

Task 1: requirements.txt + docker-compose.yml

Files:

  • Create: requirements.txt

  • Create: docker-compose.yml

  • Step 1: Créer requirements.txt

fastapi==0.115.0
uvicorn[standard]==0.30.6
aiofiles==24.1.0
numpy==2.1.1
  • Step 2: Créer docker-compose.yml
version: "3.9"
services:
  pipeline-runner:
    build:
      context: .
      dockerfile: pipeline_runner/Dockerfile
    ports:
      - "8767:8767"
    volumes:
      - /data/sorties:/data/sorties
    environment:
      - GDRIVE_REMOTE=gdrive:Cosma - Internal/06-Operations/06 - Sorties
      - OUTPUT_DIR=/data/sorties
      - TOOLS_DIR=/app/tools
    restart: unless-stopped
  • Step 3: Créer pipeline_runner/Dockerfile
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y rclone && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY tools/ ./tools/
COPY vendor/ ./vendor/
COPY pipeline_runner/ ./pipeline_runner/
CMD ["uvicorn", "pipeline_runner.main:app", "--host", "0.0.0.0", "--port", "8767"]
  • Step 4: Créer pipeline_runner/init.py (vide)
  • Step 5: Commit
git add requirements.txt docker-compose.yml pipeline_runner/
git commit -m "feat: pipeline-runner — scaffold docker + deps"

Task 2: config.py

Files:

  • Create: pipeline_runner/config.py

  • Step 1: Créer config.py

import os
from pathlib import Path

GDRIVE_REMOTE = os.getenv("GDRIVE_REMOTE", "gdrive:Cosma - Internal/06-Operations/06 - Sorties")
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/data/sorties"))
TOOLS_DIR = Path(os.getenv("TOOLS_DIR", Path(__file__).parent.parent / "tools"))
LTTB_MAX_PTS = 4000
  • Step 2: Commit
git add pipeline_runner/config.py
git commit -m "feat: pipeline-runner — config"

Task 3: processor.py — LTTB + extraction signaux

Files:

  • Create: pipeline_runner/processor.py

Le processor lit les CSV/JSON produits par les scripts tools/ et écrit usv.json.gz et auv_AUVxxx.json.gz.

Champs USV extraits depuis _navigation_log.csv (format: timestamp,data,value long):

  • Yaw, Heading, Roll, Pitch
  • BattVoltage
  • gps_fix (valeur string: ex "3D_FIX")
  • Armed (0/1)
  • Mode (string: ex "MANUAL", "AUTO")
  • M1, M2 (commandes moteur USV)

USBL depuis combined_usbl.csv produit par parse_kogger_usbl.py:

  • colonnes: Timestamp, Dist, Azimuth, Elev, SNR, FrameID, ...

Champs AUV depuis mcap_signals.json produit par extract_mcap_signals.py:

  • depth, pwm_auv (dict M1-M6), state (arm/mode)

  • Step 1: Créer processor.py

import csv
import gzip
import json
import math
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

import numpy as np

from .config import LTTB_MAX_PTS


def _lttb(points: list[dict], max_pts: int) -> list[dict]:
    """Largest Triangle Three Buckets downsampling."""
    n = len(points)
    if n <= max_pts:
        return points
    ts = np.array([p["t"] for p in points], dtype=float)
    vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float)
    bucket_size = (n - 2) / (max_pts - 2)
    out = [points[0]]
    a = 0
    for i in range(max_pts - 2):
        avg_start = int((i + 1) * bucket_size) + 1
        avg_end = min(int((i + 2) * bucket_size) + 1, n)
        avg_t = np.mean(ts[avg_start:avg_end])
        avg_v = np.mean(vs[avg_start:avg_end])
        rng_start = int(i * bucket_size) + 1
        rng_end = min(int((i + 1) * bucket_size) + 1, n)
        max_area = -1.0
        best = rng_start
        at, av = ts[a], vs[a]
        for j in range(rng_start, rng_end):
            area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5
            if area > max_area:
                max_area = area
                best = j
        out.append(points[best])
        a = best
    out.append(points[-1])
    return out


def _ts_to_epoch(ts_str: str) -> float:
    """Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds."""
    for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
        try:
            dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc)
            return dt.timestamp()
        except ValueError:
            continue
    return 0.0


def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]:
    """Read long-format navigation_log.csv → dict of signal arrays."""
    signals: dict[str, list] = {}
    wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix",
              "Armed", "Mode", "M1", "M2"}
    with open(nav_log_path, newline="", encoding="utf-8") as f:
        reader = csv.reader(f)
        next(reader, None)  # skip header
        for row in reader:
            if len(row) < 3:
                continue
            ts_str, field, val = row[0], row[1], row[2]
            if field not in wanted:
                continue
            t = _ts_to_epoch(ts_str)
            try:
                v: Any = float(val)
            except ValueError:
                v = val.strip()
            signals.setdefault(field, []).append({"t": t, "v": v})
    return signals


def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]:
    """Read combined_usbl.csv → Dist and Azimuth signal arrays."""
    dist_pts, az_pts = [], []
    with open(usbl_csv_path, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            try:
                t = _ts_to_epoch(row["Timestamp"])
                d = float(row["Dist"]) if row["Dist"] else None
                az = float(row["Azimuth"]) if row["Azimuth"] else None
            except (KeyError, ValueError):
                continue
            if d is not None and not math.isnan(d):
                dist_pts.append({"t": t, "v": d})
            if az is not None and not math.isnan(az):
                az_pts.append({"t": t, "v": az})
    return {"usbl_dist": dist_pts, "usbl_angle": az_pts}


def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]:
    """Read mcap_signals.json → depth, motors M1-M6, state signals."""
    with open(mcap_json_path) as f:
        data = json.load(f)
    signals: dict[str, list[dict]] = {}

    def _unpack(key: str, series: list):
        pts = []
        for item in series:
            t = item.get("t_ms", item.get("t", 0))
            if isinstance(t, (int, float)) and t > 1e9:
                t = t / 1000.0  # ms → s
            pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))})
        signals[key] = pts

    if "depth" in data:
        _unpack("depth", data["depth"])
    if "pwm_auv" in data:
        for m_key, series in data["pwm_auv"].items():
            _unpack(m_key.lower(), series)
    if "state" in data:
        _unpack("arm_status", data["state"])
    return signals


def write_usv_json(
    nav_log_path: Path,
    usbl_csv_path: Path | None,
    output_path: Path,
    sortie_id: str,
    date: str,
) -> None:
    signals = _read_nav_log(nav_log_path)
    if usbl_csv_path and usbl_csv_path.exists():
        signals.update(_read_usbl_csv(usbl_csv_path))

    t_all = [p["t"] for pts in signals.values() for p in pts if pts]
    meta = {
        "sortie": sortie_id,
        "date": date,
        "vehicle": "USV",
        "t_start": min(t_all) if t_all else 0,
        "t_end": max(t_all) if t_all else 0,
    }

    downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
    payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
    output_path.parent.mkdir(parents=True, exist_ok=True)
    with gzip.open(output_path, "wb") as f:
        f.write(payload)


def write_auv_json(
    mcap_json_path: Path,
    output_path: Path,
    auv_id: str,
    sortie_id: str,
    date: str,
) -> None:
    signals = _read_mcap_signals(mcap_json_path)
    t_all = [p["t"] for pts in signals.values() for p in pts if pts]
    meta = {
        "sortie": sortie_id,
        "date": date,
        "vehicle": auv_id,
        "t_start": min(t_all) if t_all else 0,
        "t_end": max(t_all) if t_all else 0,
    }
    downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
    payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
    output_path.parent.mkdir(parents=True, exist_ok=True)
    with gzip.open(output_path, "wb") as f:
        f.write(payload)
  • Step 2: Commit
git add pipeline_runner/processor.py
git commit -m "feat: pipeline-runner — processor LTTB + signal extraction"

Task 4: runner.py — rclone + orchestration

Files:

  • Create: pipeline_runner/runner.py

  • Step 1: Créer runner.py

import asyncio
import re
import subprocess
from pathlib import Path
from typing import AsyncGenerator

from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR
from .processor import write_auv_json, write_usv_json


async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""):
    await queue.put({"step": step, "pct": pct, "msg": msg})


def _find_nav_log(ship_dir: Path) -> Path | None:
    for p in ship_dir.glob("*_navigation_log.csv"):
        return p
    return None


def _find_usbl_csv(ship_dir: Path) -> Path | None:
    for p in ship_dir.glob("*_usbl.csv"):
        return p
    return None


def _detect_auvs(sub_dir: Path) -> list[str]:
    """Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010' → 'AUV010')."""
    auvs = []
    for d in sub_dir.iterdir():
        if d.is_dir():
            m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE)
            if m:
                auvs.append(m.group(1).upper())
    return sorted(set(auvs))


def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None:
    for d in sub_dir.iterdir():
        if d.is_dir() and auv_id.upper() in d.name.upper():
            return d
    return None


def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess:
    cmd = ["python3", str(TOOLS_DIR / script_name)] + args
    return subprocess.run(cmd, capture_output=True, text=True)


async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None:
    """Full pipeline: rclone sync → parse → process → write JSON.gz"""
    raw_dir = OUTPUT_DIR / sortie_id / "raw"
    proc_dir = OUTPUT_DIR / sortie_id / "processed"
    proc_dir.mkdir(parents=True, exist_ok=True)

    # Step 1: rclone sync
    await _emit(queue, "sync", 0, "rclone sync en cours…")
    gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}'
    result = subprocess.run(
        ["rclone", "sync", gdrive_path, str(raw_dir), "--progress"],
        capture_output=True, text=True
    )
    if result.returncode != 0:
        await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}")
        return
    await _emit(queue, "sync", 50, "rclone terminé")

    # Detect session dir (YYYYMMDD_HHMMSS_AUVxxx) inside raw/
    # Sorties have structure: raw/raw_data/logs/SHIP + SUB
    ship_dir = next(raw_dir.rglob("logs/SHIP"), None)
    sub_dir = next(raw_dir.rglob("logs/SUB"), None)

    if not ship_dir or not ship_dir.exists():
        await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync")
        return

    # Step 2: USV parsing
    await _emit(queue, "usv_parse", 55, "Parsing logs USV…")
    nav_log = _find_nav_log(ship_dir)
    usbl_csv_raw = _find_usbl_csv(ship_dir)
    usbl_parsed_path = proc_dir / "combined_usbl.csv"

    if usbl_csv_raw:
        _run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)])

    date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10]
    if nav_log:
        write_usv_json(
            nav_log_path=nav_log,
            usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None,
            output_path=proc_dir / "usv.json.gz",
            sortie_id=sortie_id,
            date=date_str,
        )
    await _emit(queue, "usv_parse", 70, "USV OK")

    # Step 3: AUV parsing
    if sub_dir and sub_dir.exists():
        auv_ids = _detect_auvs(sub_dir)
        total = len(auv_ids) or 1
        for i, auv_id in enumerate(auv_ids):
            await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}…")
            session_dir = _detect_session_dir(sub_dir, auv_id)
            if not session_dir:
                continue
            mcap_out = proc_dir / f"mcap_{auv_id}.json"
            _run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"])
            # extract_mcap_signals writes to output/ by default; look for it
            default_out = Path("output") / "mcap_signals.json"
            if not default_out.exists():
                default_out = session_dir / "mcap_signals.json"
            if default_out.exists():
                import shutil
                shutil.copy(default_out, mcap_out)
            if mcap_out.exists():
                write_auv_json(
                    mcap_json_path=mcap_out,
                    output_path=proc_dir / f"auv_{auv_id}.json.gz",
                    auv_id=auv_id,
                    sortie_id=sortie_id,
                    date=date_str,
                )
    await _emit(queue, "write", 100, "Pipeline terminé")


async def scan_sorties() -> list[dict]:
    """List available sorties on GDrive via rclone lsd."""
    result = subprocess.run(
        ["rclone", "lsd", GDRIVE_REMOTE],
        capture_output=True, text=True
    )
    sorties = []
    for line in result.stdout.splitlines():
        parts = line.strip().split(None, 4)
        if len(parts) == 5:
            name = parts[4].strip()
            processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists()
            sorties.append({"id": name, "processed": processed})
    return sorties
  • Step 2: Commit
git add pipeline_runner/runner.py
git commit -m "feat: pipeline-runner — runner rclone + orchestration"

Task 5: main.py — FastAPI endpoints

Files:

  • Create: pipeline_runner/main.py

  • Step 1: Créer main.py

import asyncio
import gzip
import json
from pathlib import Path
from typing import AsyncGenerator

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse

from .config import OUTPUT_DIR
from .runner import run_pipeline, scan_sorties

app = FastAPI(title="COSMA Pipeline Runner")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])

# Active pipeline jobs: sortie_id → asyncio.Queue
_jobs: dict[str, asyncio.Queue] = {}


@app.get("/sorties")
async def list_sorties():
    return await scan_sorties()


@app.post("/run/{sortie_id:path}")
async def run_sortie(sortie_id: str):
    if sortie_id in _jobs:
        return {"status": "already_running"}
    queue: asyncio.Queue = asyncio.Queue()
    _jobs[sortie_id] = queue
    asyncio.create_task(_run_and_cleanup(sortie_id, queue))
    return {"status": "started"}


async def _run_and_cleanup(sortie_id: str, queue: asyncio.Queue):
    try:
        await run_pipeline(sortie_id, queue)
    finally:
        await asyncio.sleep(30)
        _jobs.pop(sortie_id, None)


@app.get("/events/{sortie_id:path}")
async def sse_events(sortie_id: str):
    if sortie_id not in _jobs:
        raise HTTPException(404, "No active job for this sortie")

    async def generate() -> AsyncGenerator[str, None]:
        queue = _jobs[sortie_id]
        while True:
            try:
                event = await asyncio.wait_for(queue.get(), timeout=60)
                yield f"data: {json.dumps(event)}\n\n"
                if event.get("step") in ("error", "write") and event.get("pct") in (0, 100):
                    break
            except asyncio.TimeoutError:
                yield "data: {\"step\":\"ping\"}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")


def _read_gz(path: Path) -> dict:
    with gzip.open(path) as f:
        return json.loads(f.read())


@app.get("/sorties/{sortie_id:path}/usv")
async def get_usv(sortie_id: str):
    p = OUTPUT_DIR / sortie_id / "processed" / "usv.json.gz"
    if not p.exists():
        raise HTTPException(404, "USV data not found — run pipeline first")
    return JSONResponse(_read_gz(p))


@app.get("/sorties/{sortie_id:path}/auvs")
async def list_auvs(sortie_id: str):
    proc = OUTPUT_DIR / sortie_id / "processed"
    auvs = [p.stem.removeprefix("auv_").removesuffix(".json")
            for p in proc.glob("auv_*.json.gz")]
    return sorted(auvs)


@app.get("/sorties/{sortie_id:path}/auv/{auv_id}")
async def get_auv(sortie_id: str, auv_id: str):
    p = OUTPUT_DIR / sortie_id / "processed" / f"auv_{auv_id}.json.gz"
    if not p.exists():
        raise HTTPException(404, f"AUV {auv_id} data not found")
    return JSONResponse(_read_gz(p))


@app.get("/sorties/{sortie_id:path}/tracks")
async def get_tracks(sortie_id: str):
    p = OUTPUT_DIR / sortie_id / "processed" / "tracks.geojson"
    if not p.exists():
        raise HTTPException(404, "tracks.geojson not found")
    with open(p) as f:
        return JSONResponse(json.load(f))
  • Step 2: Commit
git add pipeline_runner/main.py
git commit -m "feat: pipeline-runner — FastAPI endpoints + SSE"

Task 6: Test pipeline local sur sortie #71

Avant de modifier le viewer, valider que le service tourne et répond.

  • Step 1: Installer deps localement
pip install fastapi uvicorn[standard] aiofiles numpy
  • Step 2: Lancer le service en local
cd /c/Users/flopp/cosma-nav-tools
OUTPUT_DIR=/tmp/cosma-test \
GDRIVE_REMOTE="gdrive:Cosma - Internal/06-Operations/06 - Sorties" \
TOOLS_DIR=tools \
uvicorn pipeline_runner.main:app --port 8767 --reload
  • Step 3: Tester /sorties
curl http://localhost:8767/sorties | python3 -m json.tool | head -20

Attendu: liste de dossiers dont #71-golrest.

  • Step 4: Lancer pipeline sur #71
curl -X POST "http://localhost:8767/run/%2371-golrest"
# puis stream les events
curl "http://localhost:8767/events/%2371-golrest"

Attendu: stream SSE avec étapes sync → usv_parse → auv_parse → write 100%.

  • Step 5: Vérifier output USV
curl "http://localhost:8767/sorties/%2371-golrest/usv" | python3 -m json.tool | head -30

Attendu: {"meta": {...}, "signals": {"Yaw": [...], "Heading": [...], ...}}.

  • Step 6: Commit si OK
git commit --allow-empty -m "test: pipeline #71 validé en local"

Task 7: viewer — sélecteur sortie + bouton + SSE progress

Files:

  • Modify: viewer/index.html

La constante API2 = 'http://192.168.0.83:8767' est ajoutée. Le header reçoit un dropdown sortie et le bouton Sync.

  • Step 1: Ajouter API2 et les styles dans <head> (après la ligne const API = ...)

Trouver dans le JS de index.html:

const API = 'http://192.168.0.83:8766';

Ajouter juste après:

const API2 = 'http://192.168.0.83:8767';
  • Step 2: Ajouter styles CSS pour les nouveaux éléments (dans le bloc <style>)

Ajouter avant </style>:

  #sortie-select {
    background: #0f3460; border: 1px solid #e94560; color: #e0e0e0;
    font-family: monospace; font-size: 11px; padding: 2px 6px; border-radius: 2px;
    cursor: pointer; max-width: 160px;
  }
  #btn-sync {
    background: #0f3460; border: 1px solid #e94560; color: #e94560;
    padding: 2px 9px; cursor: pointer; font-family: monospace; font-size: 11px;
    border-radius: 2px;
  }
  #btn-sync:hover { background: #e94560; color: #1a1a2e; }
  #btn-sync:disabled { opacity: 0.4; cursor: not-allowed; }
  #sync-progress {
    font-size: 10px; color: #06d6a0; flex: 1;
    white-space: nowrap; overflow: hidden; text-overflow: ellipsis;
  }
  .panel-header {
    background: #0d0d20; border-top: 1px solid #0f3460; border-bottom: 1px solid #0f3460;
    padding: 4px 14px; font-size: 11px; font-weight: bold; color: #e94560;
    display: flex; align-items: center; gap: 8px;
  }
  .auv-tab {
    font-family: monospace; font-size: 10px; padding: 2px 8px; cursor: pointer;
    border: 1px solid #0f3460; background: transparent; color: #a0c4ff; border-radius: 2px;
  }
  .auv-tab.active { background: #0f3460; color: #e0e0e0; }
  .graphs-grid {
    display: grid; grid-template-columns: repeat(2, 1fr); gap: 4px;
    padding: 4px 8px; background: #12122a;
  }
  .graph-cell { height: 130px; background: #1a1a2e; }
  .graph-cell.wide { grid-column: span 2; height: 130px; }
  • Step 3: Ajouter HTML dans le #header (sortie dropdown + bouton + progress)

Dans le <div id="header">, ajouter après #layer-toggles:

<select id="sortie-select"><option value="">— Sortie —</option></select>
<button id="btn-sync" disabled>Sync &amp; Process</button>
<span id="sync-progress"></span>
  • Step 4: Ajouter HTML panels USV + AUV (avant </body>)
<div class="panel-header">USV</div>
<div class="graphs-grid" id="usv-graphs">
  <div class="graph-cell" id="usv-yaw"></div>
  <div class="graph-cell" id="usv-heading"></div>
  <div class="graph-cell" id="usv-batt"></div>
  <div class="graph-cell" id="usv-gps"></div>
  <div class="graph-cell" id="usv-usbl-dist"></div>
  <div class="graph-cell" id="usv-usbl-angle"></div>
  <div class="graph-cell" id="usv-m1"></div>
  <div class="graph-cell" id="usv-m2"></div>
  <div class="graph-cell wide" id="usv-status"></div>
</div>

<div class="panel-header" id="auv-panel-header">
  AUV
  <span id="auv-tabs"></span>
</div>
<div class="graphs-grid" id="auv-graphs">
  <div class="graph-cell" id="auv-pry"></div>
  <div class="graph-cell" id="auv-depth"></div>
  <div class="graph-cell" id="auv-alt"></div>
  <div class="graph-cell" id="auv-obs"></div>
  <div class="graph-cell" id="auv-usbl-dist"></div>
  <div class="graph-cell" id="auv-usbl-angle"></div>
  <div class="graph-cell" id="auv-batt"></div>
  <div class="graph-cell" id="auv-status"></div>
  <div class="graph-cell wide" id="auv-motors"></div>
</div>
  • Step 5: Ajouter JS — load sorties au démarrage

Dans le bloc <script>, ajouter la fonction:

async function loadSorties() {
  try {
    const resp = await fetch(`${API2}/sorties`);
    const sorties = await resp.json();
    const sel = document.getElementById('sortie-select');
    sorties.forEach(s => {
      const opt = document.createElement('option');
      opt.value = s.id;
      opt.textContent = s.id + (s.processed ? ' ✓' : '');
      sel.appendChild(opt);
    });
    sel.addEventListener('change', () => {
      document.getElementById('btn-sync').disabled = !sel.value;
    });
  } catch(e) { console.warn('pipeline-runner unavailable', e); }
}
loadSorties();
  • Step 6: Ajouter JS — bouton Sync & SSE progress
document.getElementById('btn-sync').addEventListener('click', async () => {
  const sortieId = document.getElementById('sortie-select').value;
  if (!sortieId) return;
  const btn = document.getElementById('btn-sync');
  const prog = document.getElementById('sync-progress');
  btn.disabled = true;
  prog.textContent = 'Démarrage…';

  const encoded = encodeURIComponent(sortieId);
  await fetch(`${API2}/run/${encoded}`, { method: 'POST' });

  const es = new EventSource(`${API2}/events/${encoded}`);
  es.onmessage = async (e) => {
    const evt = JSON.parse(e.data);
    if (evt.step === 'ping') return;
    prog.textContent = `[${evt.step}] ${evt.pct}% ${evt.msg}`;
    if (evt.step === 'write' && evt.pct === 100) {
      es.close();
      btn.disabled = false;
      prog.textContent = 'Terminé — chargement des données…';
      await loadSortieData(sortieId);
    }
    if (evt.step === 'error') {
      es.close();
      btn.disabled = false;
      prog.textContent = `Erreur: ${evt.msg}`;
    }
  };
});
  • Step 7: Commit
git add viewer/index.html
git commit -m "feat: viewer — sortie selector + sync button + SSE progress"

Task 8: viewer — panels USV (9 graphs Plotly)

Files:

  • Modify: viewer/index.html

Ajouter la fonction renderUSV(signals) et l'appel dans loadSortieData.

  • Step 1: Ajouter fonction renderUSV dans le JS
function _pts(sig) {
  if (!sig) return [[], []];
  return [sig.map(p => new Date(p.t * 1000)), sig.map(p => p.v)];
}

const PLOTLY_LAYOUT_BASE = {
  margin: {l:40, r:8, t:20, b:30},
  paper_bgcolor: '#1a1a2e', plot_bgcolor: '#1a1a2e',
  font: {color: '#e0e0e0', size: 9, family: 'monospace'},
  xaxis: {color: '#555', gridcolor: '#1e1e3a', type: 'date'},
  yaxis: {color: '#555', gridcolor: '#1e1e3a'},
  showlegend: false,
};

function _layout(title, yLabel='') {
  return Object.assign({}, PLOTLY_LAYOUT_BASE, {
    title: {text: title, font: {size: 9, color: '#888'}},
    yaxis: Object.assign({}, PLOTLY_LAYOUT_BASE.yaxis, {title: {text: yLabel, font: {size: 8}}})
  });
}

function renderUSV(signals) {
  const cfg = {responsive: true, displayModeBar: false};
  const [xt, xv] = _pts(signals.Yaw);
  Plotly.react('usv-yaw',
    [{x: xt, y: xv, type:'scatter', mode:'lines', line:{color:'#00b4d8', width:1}, name:'Yaw'}],
    _layout('Yaw', '°'), cfg);

  const [ht, hv] = _pts(signals.Heading);
  Plotly.react('usv-heading',
    [{x: ht, y: hv, type:'scatter', mode:'lines', line:{color:'#06d6a0', width:1}, name:'Heading'}],
    _layout('Heading', '°'), cfg);

  const [bt, bv] = _pts(signals.BattVoltage);
  Plotly.react('usv-batt',
    [{x: bt, y: bv, type:'scatter', mode:'lines', line:{color:'#ffd166', width:1}, name:'Batt'}],
    _layout('Battery', 'V'), cfg);

  const [gt, gv] = _pts(signals.gps_fix);
  Plotly.react('usv-gps',
    [{x: gt, y: gv, type:'scatter', mode:'lines+markers', line:{color:'#06d6a0', width:1, shape:'hv'}, name:'GPS'}],
    _layout('GPS fix'), cfg);

  const [dt, dv] = _pts(signals.usbl_dist);
  Plotly.react('usv-usbl-dist',
    [{x: dt, y: dv, type:'scatter', mode:'lines', line:{color:'#a0c4ff', width:1}}],
    _layout('USBL dist', 'm'), cfg);

  const [at, av] = _pts(signals.usbl_angle);
  Plotly.react('usv-usbl-angle',
    [{x: at, y: av, type:'scatter', mode:'lines', line:{color:'#c77dff', width:1}}],
    _layout('USBL angle', '°'), cfg);

  const [m1t, m1v] = _pts(signals.M1);
  Plotly.react('usv-m1',
    [{x: m1t, y: m1v, type:'scatter', mode:'lines', line:{color:'#ef476f', width:1}}],
    _layout('Motor 1', 'cmd'), cfg);

  const [m2t, m2v] = _pts(signals.M2);
  Plotly.react('usv-m2',
    [{x: m2t, y: m2v, type:'scatter', mode:'lines', line:{color:'#ff6b6b', width:1}}],
    _layout('Motor 2', 'cmd'), cfg);

  const [st, sv] = _pts(signals.Armed || signals.Mode);
  Plotly.react('usv-status',
    [{x: st, y: sv, type:'scatter', mode:'lines', line:{color:'#ffd166', width:1, shape:'hv'}, name:'Status'}],
    Object.assign(_layout('USV status'), {showlegend: false}), cfg);
}
  • Step 2: Commit
git add viewer/index.html
git commit -m "feat: viewer — USV panel 9 graphs Plotly"

Task 9: viewer — panel AUV avec tabs multi-AUV

Files:

  • Modify: viewer/index.html

  • Step 1: Ajouter renderAUV + tabs dans le JS

let _currentAuvId = null;

async function loadAuvTabs(sortieId) {
  const resp = await fetch(`${API2}/sorties/${encodeURIComponent(sortieId)}/auvs`);
  const auvs = await resp.json();
  const tabsEl = document.getElementById('auv-tabs');
  tabsEl.innerHTML = '';
  auvs.forEach((auv, i) => {
    const btn = document.createElement('button');
    btn.className = 'auv-tab' + (i === 0 ? ' active' : '');
    btn.textContent = auv;
    btn.onclick = async () => {
      document.querySelectorAll('.auv-tab').forEach(b => b.classList.remove('active'));
      btn.classList.add('active');
      _currentAuvId = auv;
      const r = await fetch(`${API2}/sorties/${encodeURIComponent(sortieId)}/auv/${auv}`);
      const data = await r.json();
      renderAUV(data.signals);
    };
    tabsEl.appendChild(btn);
    if (i === 0) { _currentAuvId = auv; }
  });
  if (auvs.length > 0) {
    const r = await fetch(`${API2}/sorties/${encodeURIComponent(sortieId)}/auv/${auvs[0]}`);
    const data = await r.json();
    renderAUV(data.signals);
  }
}

function renderAUV(signals) {
  const cfg = {responsive: true, displayModeBar: false};

  Plotly.react('auv-pry', [
    {x: _pts(signals.pitch)[0], y: _pts(signals.pitch)[1], name:'Pitch', line:{color:'#ef476f', width:1}},
    {x: _pts(signals.roll)[0], y: _pts(signals.roll)[1], name:'Roll', line:{color:'#06d6a0', width:1}},
    {x: _pts(signals.yaw)[0], y: _pts(signals.yaw)[1], name:'Yaw', line:{color:'#00b4d8', width:1}},
  ].map(t => Object.assign({type:'scatter', mode:'lines'}, t)),
  Object.assign(_layout('Pitch/Roll/Yaw', '°'), {showlegend: true,
    legend:{font:{size:8}, bgcolor:'transparent', x:0, y:1}}), cfg);

  const [dpt, dpv] = _pts(signals.depth);
  Plotly.react('auv-depth',
    [{x: dpt, y: dpv, type:'scatter', mode:'lines', line:{color:'#a0c4ff', width:1}}],
    _layout('Depth', 'm'), cfg);

  const [alt, alv] = _pts(signals.altitude);
  Plotly.react('auv-alt',
    [{x: alt, y: alv, type:'scatter', mode:'lines', line:{color:'#ffd166', width:1}}],
    _layout('Altitude', 'm'), cfg);

  const [obt, obv] = _pts(signals.obstacle_dist);
  Plotly.react('auv-obs',
    [{x: obt, y: obv, type:'scatter', mode:'lines', line:{color:'#c77dff', width:1}}],
    _layout('Obstacle', 'm'), cfg);

  const [udt, udv] = _pts(signals.usbl_dist);
  Plotly.react('auv-usbl-dist',
    [{x: udt, y: udv, type:'scatter', mode:'lines', line:{color:'#a0c4ff', width:1}}],
    _layout('USBL dist', 'm'), cfg);

  const [uat, uav] = _pts(signals.usbl_angle);
  Plotly.react('auv-usbl-angle',
    [{x: uat, y: uav, type:'scatter', mode:'lines', line:{color:'#c77dff', width:1}}],
    _layout('USBL angle', '°'), cfg);

  const [bbt, bbv] = _pts(signals.battery_v);
  Plotly.react('auv-batt',
    [{x: bbt, y: bbv, type:'scatter', mode:'lines', line:{color:'#ffd166', width:1}}],
    _layout('Battery', 'V'), cfg);

  const [stt, stv] = _pts(signals.arm_status);
  Plotly.react('auv-status',
    [{x: stt, y: stv, type:'scatter', mode:'lines', line:{color:'#06d6a0', width:1, shape:'hv'}}],
    _layout('Arm/Mode'), cfg);

  const motorColors = ['#ef476f','#ffd166','#06d6a0','#00b4d8','#a0c4ff','#c77dff'];
  const motorTraces = ['m1','m2','m3','m4','m5','m6'].map((mk, i) => {
    const [t, v] = _pts(signals[mk]);
    return {x: t, y: v, type:'scatter', mode:'lines', name:`M${i+1}`,
            line:{color: motorColors[i], width:1}};
  });
  Plotly.react('auv-motors', motorTraces,
    Object.assign(_layout('Motors ×6 PWM', 'µs'), {showlegend: true,
      legend:{font:{size:8}, bgcolor:'transparent', orientation:'h', x:0, y:1}}), cfg);
}
  • Step 2: Commit
git add viewer/index.html
git commit -m "feat: viewer — AUV panel 10 graphs + tabs multi-AUV"

Task 10: viewer — curseur X synchronisé slider ↔ tous les graphs

Files:

  • Modify: viewer/index.html

  • Step 1: Créer la liste de tous les graph IDs et la fonction updateCursor

const ALL_GRAPH_IDS = [
  'chart-depth', 'chart-pwm-auv', 'chart-pwm-usv', 'chart-usbl',
  'usv-yaw', 'usv-heading', 'usv-batt', 'usv-gps',
  'usv-usbl-dist', 'usv-usbl-angle', 'usv-m1', 'usv-m2', 'usv-status',
  'auv-pry', 'auv-depth', 'auv-alt', 'auv-obs',
  'auv-usbl-dist', 'auv-usbl-angle', 'auv-batt', 'auv-status', 'auv-motors',
];

function updateCursor(epochSec) {
  const ts = new Date(epochSec * 1000).toISOString();
  const shape = {
    type: 'line', x0: ts, x1: ts, y0: 0, y1: 1,
    yref: 'paper', line: {color: '#e94560', width: 1, dash: 'dot'},
  };
  ALL_GRAPH_IDS.forEach(id => {
    const el = document.getElementById(id);
    if (el && el._fullLayout) {
      Plotly.relayout(id, {'shapes': [shape]});
    }
  });
}
  • Step 2: Brancher sur l'événement slider existant

Trouver dans le JS existant l'event handler du slider noUiSlider (chercher slider.noUiSlider.on ou noUiSlider). Ajouter dans le callback update ou set:

// Dans le callback du slider, ajouter:
const [lo] = slider.noUiSlider.get(true);  // valeur basse en epoch ms
updateCursor(lo / 1000);
  • Step 3: Commit
git add viewer/index.html
git commit -m "feat: viewer — curseur X synchronisé slider + tous les graphs"

Task 11: loadSortieData — câblage final

Files:

  • Modify: viewer/index.html

  • Step 1: Ajouter la fonction loadSortieData

async function loadSortieData(sortieId) {
  const prog = document.getElementById('sync-progress');
  try {
    prog.textContent = 'Chargement USV…';
    const usvResp = await fetch(`${API2}/sorties/${encodeURIComponent(sortieId)}/usv`);
    if (usvResp.ok) {
      const usvData = await usvResp.json();
      renderUSV(usvData.signals);
    }
    prog.textContent = 'Chargement AUV…';
    await loadAuvTabs(sortieId);
    prog.textContent = `${sortieId} chargé`;
  } catch(e) {
    prog.textContent = `Erreur chargement: ${e.message}`;
  }
}
  • Step 2: Brancher sur le changement de sortie (si déjà processé)

Dans l'event listener du sortie-select (Task 7 Step 5), ajouter après btn.disabled = !sel.value:

    if (sel.value) {
      // Charger si déjà processé
      const opt = sel.options[sel.selectedIndex];
      if (opt.textContent.includes('✓')) {
        loadSortieData(sel.value);
      }
    }
  • Step 3: Push sur feat/flag-local
git add viewer/index.html
git commit -m "feat: viewer — loadSortieData câblage final"
git push origin feat/flag-local

Task 12: Déploiement sur .83

  • Step 1: Sur .83, cloner la branche feat/flag-local
ssh user@192.168.0.83
cd /opt
git clone https://floppyrj45:TOKEN@gitea.nowyouknow.fr/floppyrj45/cosma-nav-tools.git
cd cosma-nav-tools
git checkout feat/flag-local
  • Step 2: Vérifier rclone configuré
rclone lsd "gdrive:Cosma - Internal/06-Operations/06 - Sorties" | head -5
  • Step 3: Créer le dossier data
mkdir -p /data/sorties
  • Step 4: Build + démarrer Docker
docker compose up -d --build pipeline-runner
docker compose logs -f pipeline-runner

Attendu: Uvicorn running on http://0.0.0.0:8767.

  • Step 5: Test smoke
curl http://192.168.0.83:8767/sorties | python3 -m json.tool | head -10
  • Step 6: Ouvrir le viewer 8765 et tester le bouton Sync sur #71

Naviguer sur http://192.168.0.83:8765, sélectionner #71-golrest, cliquer Sync & Process, observer la progress bar, vérifier les graphs USV/AUV.