From 02e357b8748eb91be9be69566813606bd36ab12e Mon Sep 17 00:00:00 2001 From: Flagabat Date: Mon, 27 Apr 2026 22:23:18 +0200 Subject: [PATCH 01/11] =?UTF-8?q?docs:=20spec=20pipeline=20GDrive=20?= =?UTF-8?q?=E2=86=92=20replay=20USV/AUV=20dans=20viewer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...026-04-27-gdrive-pipeline-replay-design.md | 184 ++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-27-gdrive-pipeline-replay-design.md diff --git a/docs/superpowers/specs/2026-04-27-gdrive-pipeline-replay-design.md b/docs/superpowers/specs/2026-04-27-gdrive-pipeline-replay-design.md new file mode 100644 index 0000000..c5a2bfe --- /dev/null +++ b/docs/superpowers/specs/2026-04-27-gdrive-pipeline-replay-design.md @@ -0,0 +1,184 @@ +# Design : GDrive Pipeline Replay + +**Date :** 2026-04-27 +**Repo :** cosma-nav-tools +**Branche :** feat/flag-local + +--- + +## Objectif + +Bouton dans le viewer 8765 qui déclenche un sync GDrive → pipeline Python → affichage replay des signaux USV et AUV sur la même page, synchronisés avec le slider 24h existant. + +--- + +## Architecture globale + +``` +GDrive (G:) + └─ 06-Operations/06-Sorties/#XX-lieu/YYYYMMDD-lieu/raw_data/ + ├─ SHIP/*.csv (USV nav + USBL + full) + └─ SUB/*.mcap (AUV ROS2) + bin/ + + ↓ rclone sync (déclenché par bouton viewer) + +.83 /data/sorties/#XX/raw/ + ├─ SHIP/ + └─ SUB/ + + ↓ pipeline Python (scripts existants cosma-nav-tools/tools/) + +.83 /data/sorties/#XX/processed/ + ├─ usv.json.gz ← signaux USV downsamplés LTTB + ├─ auv_AUV010.json.gz ← signaux AUV010 downsamplés LTTB + ├─ auv_AUVxxx.json.gz ← un fichier par AUV détecté + └─ tracks.geojson ← USV + AUV tracks (Leaflet) + + ↓ servi par + +port 8767 pipeline-runner FastAPI (cosma-nav-tools/pipeline_runner/) +port 8765 viewer/index.html ← page unique, fetch 8766 + 8767 +``` + +--- + +## Section 1 : Pipeline-runner (port 8767) + +### Structure + +``` +cosma-nav-tools/ + pipeline_runner/ + main.py ← FastAPI app + endpoints + runner.py ← logique rclone + orchestration scripts + config.py ← chemins GDrive, output dir, rclone remote + docker-compose.yml ← service pipeline-runner port 8767 + data/sorties/ ← gitignored +``` + +### Endpoints + +| Méthode | Route | Description | +|---------|-------|-------------| +| `GET` | `/sorties` | Liste sorties détectées sur GDrive (scan dossier) | +| `POST` | `/run/{sortie_id}` | Lance rclone pull + pipeline complet | +| `GET` | `/events/{sortie_id}` | SSE stream progress (étape + %) | +| `GET` | `/sorties/{id}/usv` | Données USV processed (JSON gzip) | +| `GET` | `/sorties/{id}/auvs` | Liste AUVs disponibles pour cette sortie | +| `GET` | `/sorties/{id}/auv/{auv_id}` | Données AUV processed (JSON gzip) | +| `GET` | `/sorties/{id}/tracks` | GeoJSON tracks USV + AUV | + +### Flow interne POST /run/{sortie_id} + +``` +1. rclone sync GDrive/#id → /data/sorties/#id/raw/ SSE: "sync" 0→50% +2. parse_usv_nav.py + extract_usv_pwm.py SSE: "usv_parse" 50→65% +3. parse_kogger_usbl.py + merge_nav_usbl.py SSE: "usbl_merge" 65→80% +4. extract_mcap_signals.py (par AUV détecté) SSE: "auv_parse" 80→90% +5. usbl_to_json.py SSE: "usbl_json" 90→95% +6. downsample LTTB + écriture .json.gz SSE: "write" 95→100% +``` + +--- + +## Section 2 : Format données + +### Signaux (usv.json.gz, auv_*.json.gz) + +```json +{ + "meta": { + "sortie": "#71-golrest", + "date": "2026-04-16", + "vehicle": "USV001", + "t_start": 1713268477, + "t_end": 1713282877 + }, + "signals": { + "yaw": [{ "t": 1713268477, "v": 142.3 }, ...], + "heading": [...], + "gps_status": [{ "t": ..., "v": "3D_FIX" }, ...], + "battery_v": [...], + "usbl_dist": [...], + "usbl_angle": [...], + "motor_1": [...], + "motor_2": [...], + "auv_status": [{ "t": ..., "v": "MISSION" }, ...] + } +} +``` + +- `t` = epoch UNIX secondes +- Max **4000 points par signal** via LTTB (Plotly optimal) +- Valeurs discrètes (status, mode) = strings → step-plot Plotly +- Endpoint optionnel `/range?from=&to=` pour zoom fin sur brutes + +### Tracks + +- **GeoJSON** FeatureCollection : une Feature LineString par véhicule +- Poids estimé : ~300KB gzip pour 4h à 1Hz + +--- + +## Section 3 : Viewer 8765 — extensions + +### Layout (page unique) + +``` +datebar ← inchangé +header ← + dropdown sortie + [Sync & Process] + progress bar SSE +map Leaflet ← inchangé +slider 24h ← shared X axis, curseur vertical sur tous les graphs +───────────────────────────────────────────── +USV PANEL + [Yaw] [Heading] + [GPS status] [Battery voltage] + [USBL dist] [USBL angle] + [Motor 1] [Motor 2] + [AUV status: disarm/mission/goto — step-plot] +───────────────────────────────────────────── +AUV PANEL ← tabs [AUV010] [AUV011] … si multi-AUV + [Pitch/Roll/Yaw — 3 traces] [Depth] + [Altitude] [Obstacle dist] + [USBL dist] [USBL angle] + [Battery voltage] [Arm/Disarm/Mode — step-plot] + [Motors ×6 PWM — 1 graph multi-trace M1…M6] +``` + +### Synchronisation slider + +- Slider 24h existant → événement → `updateCursor(t)` sur tous les graphs Plotly via `Plotly.relayout` shapes +- Données chargées une fois en mémoire au click de sortie, pas de re-fetch au scrub + +### Bouton Sync & Process + +- `POST /run/{sortie_id}` → ouvre SSE `/events/{sortie_id}` +- Progress bar inline dans le header (ex: `rclone 34% → usv_parse…`) +- À 100% → fetch automatique USV + AUV + tracks → render graphs + +--- + +## Section 4 : Déploiement .83 + +```yaml +# docker-compose.yml (ajout) +pipeline-runner: + build: ./pipeline_runner + ports: + - "8767:8767" + volumes: + - /data/sorties:/data/sorties + - /mnt/gdrive:/mnt/gdrive # rclone mount ou rclone exec + environment: + - GDRIVE_REMOTE=gdrive:Cosma - Internal/06-Operations/06 - Sorties + - OUTPUT_DIR=/data/sorties +``` + +--- + +## Hors scope (v1) + +- Authentification / accès multi-user +- HDF5 archivage (peut venir en v2) +- Replay animé temps-réel (curseur qui avance automatiquement) +- Tests unitaires pipeline From bd3a2359d912724645d1161a6c447c713e98d4f8 Mon Sep 17 00:00:00 2001 From: Flagabat Date: Mon, 27 Apr 2026 22:29:41 +0200 Subject: [PATCH 02/11] =?UTF-8?q?docs:=20plan=20impl=C3=A9mentation=20pipe?= =?UTF-8?q?line=20GDrive=20=E2=86=92=20replay=20USV/AUV=20(12=20t=C3=A2che?= =?UTF-8?q?s)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../2026-04-27-gdrive-pipeline-replay.md | 1219 +++++++++++++++++ 1 file changed, 1219 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md diff --git a/docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md b/docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md new file mode 100644 index 0000000..56db67c --- /dev/null +++ b/docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md @@ -0,0 +1,1219 @@ +# GDrive Pipeline Replay — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Bouton dans viewer 8765 → rclone sync GDrive → pipeline Python existant → affichage panels USV/AUV synchronisés avec slider 24h. + +**Architecture:** Micro-service FastAPI `pipeline-runner` (port 8767) dans cosma-nav-tools orchestre rclone + scripts tools/ existants + écriture JSON.gz downsamplé LTTB. Le viewer 8765 ajoute sélecteur sortie, bouton Sync, progress SSE, et deux panels Plotly (USV 9 graphs + AUV 10 graphs + tabs multi-AUV), tous synchronisés sur le slider 24h existant. + +**Tech Stack:** FastAPI, uvicorn, aiofiles, numpy (LTTB), rclone (pré-installé sur .83), Plotly.js (déjà dans viewer), noUiSlider (déjà dans viewer). + +**Prérequis sur .83:** `rclone` configuré avec remote `gdrive` pointant sur Google Drive COSMA. Vérifier: `rclone lsd gdrive:"Cosma - Internal/06-Operations/06 - Sorties"`. + +--- + +## Fichiers créés / modifiés + +| Fichier | Action | Rôle | +|---------|--------|------| +| `requirements.txt` | Créer | Dépendances Python | +| `docker-compose.yml` | Créer | Service pipeline-runner port 8767 | +| `tools/extract_mcap_signals.py` | Modifier | Ajouter topics pitch/roll/yaw, altitude, obstacle, battery | +| `pipeline_runner/__init__.py` | Créer | Package marker | +| `pipeline_runner/config.py` | Créer | Chemins GDrive, output dir | +| `pipeline_runner/processor.py` | Créer | LTTB + extraction signaux → JSON.gz | +| `pipeline_runner/runner.py` | Créer | rclone sync + orchestration scripts tools/ | +| `pipeline_runner/main.py` | Créer | FastAPI endpoints | +| `viewer/index.html` | Modifier | Sortie selector + bouton + SSE + panels USV/AUV | + +--- + +## Task 0: Étendre extract_mcap_signals.py — topics AUV manquants + +**Files:** +- Modify: `tools/extract_mcap_signals.py` + +Le script actuel extrait uniquement depth/PWM/state. Il faut ajouter pitch/roll/yaw (IMU), altitude, obstacle, batterie. + +- [ ] **Step 1: Ouvrir tools/extract_mcap_signals.py et localiser TOPICS** + +Trouver la ligne: +```python +TOPICS = ['/mavros/imu/static_pressure', '/mavros/rc/out', '/mavros/state'] +``` +Remplacer par: +```python +TOPICS = [ + '/mavros/imu/static_pressure', # depth (pression) + '/mavros/rc/out', # PWM moteurs + '/mavros/state', # arm/mode + '/mavros/imu/data', # orientation quaternion → pitch/roll/yaw + '/mavros/altitude', # altitude relative + '/mavros/battery', # tension batterie + '/mavros/distance_sensor/hrlv_ez4_pub', # obstacle avoidance +] +``` + +- [ ] **Step 2: Ajouter les handlers dans la boucle iter_decoded_messages** + +Trouver le bloc `elif topic == '/mavros/state':` et ajouter après: + +```python + elif topic == '/mavros/imu/data': + import math + q = ros_msg.orientation + sinr = 2*(q.w*q.x + q.y*q.z) + cosr = 1 - 2*(q.x*q.x + q.y*q.y) + roll = math.degrees(math.atan2(sinr, cosr)) + sinp = 2*(q.w*q.y - q.z*q.x) + pitch = math.degrees(math.asin(max(-1, min(1, sinp)))) + siny = 2*(q.w*q.z + q.x*q.y) + cosy = 1 - 2*(q.y*q.y + q.z*q.z) + yaw = math.degrees(math.atan2(siny, cosy)) + signals.setdefault('pitch', []).append({'t_ms': t_ms, 'v': pitch}) + signals.setdefault('roll', []).append({'t_ms': t_ms, 'v': roll}) + signals.setdefault('yaw', []).append({'t_ms': t_ms, 'v': yaw}) + elif topic == '/mavros/altitude': + signals.setdefault('altitude', []).append( + {'t_ms': t_ms, 'v': ros_msg.relative}) + elif topic == '/mavros/battery': + signals.setdefault('battery_v', []).append( + {'t_ms': t_ms, 'v': ros_msg.voltage}) + elif topic == '/mavros/distance_sensor/hrlv_ez4_pub': + signals.setdefault('obstacle_dist', []).append( + {'t_ms': t_ms, 'v': ros_msg.range}) +``` + +> **Note:** Si `/mavros/distance_sensor/hrlv_ez4_pub` n'existe pas dans les MCAP, remplacer par le topic réel. Vérifier avec `mcap info .mcap | grep distance`. + +- [ ] **Step 3: Vérifier que les nouvelles clés sont dans le JSON output** + +La variable `signals` est déjà écrite en JSON à la fin du script. Les nouvelles clés seront automatiquement incluses. + +- [ ] **Step 4: Commit** + +```bash +git add tools/extract_mcap_signals.py +git commit -m "feat: extract_mcap_signals — pitch/roll/yaw, altitude, obstacle, battery" +``` + +--- + +## Task 1: requirements.txt + docker-compose.yml + +**Files:** +- Create: `requirements.txt` +- Create: `docker-compose.yml` + +- [ ] **Step 1: Créer requirements.txt** + +``` +fastapi==0.115.0 +uvicorn[standard]==0.30.6 +aiofiles==24.1.0 +numpy==2.1.1 +``` + +- [ ] **Step 2: Créer docker-compose.yml** + +```yaml +version: "3.9" +services: + pipeline-runner: + build: + context: . + dockerfile: pipeline_runner/Dockerfile + ports: + - "8767:8767" + volumes: + - /data/sorties:/data/sorties + environment: + - GDRIVE_REMOTE=gdrive:Cosma - Internal/06-Operations/06 - Sorties + - OUTPUT_DIR=/data/sorties + - TOOLS_DIR=/app/tools + restart: unless-stopped +``` + +- [ ] **Step 3: Créer pipeline_runner/Dockerfile** + +```dockerfile +FROM python:3.12-slim +WORKDIR /app +RUN apt-get update && apt-get install -y rclone && rm -rf /var/lib/apt/lists/* +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY tools/ ./tools/ +COPY vendor/ ./vendor/ +COPY pipeline_runner/ ./pipeline_runner/ +CMD ["uvicorn", "pipeline_runner.main:app", "--host", "0.0.0.0", "--port", "8767"] +``` + +- [ ] **Step 4: Créer pipeline_runner/__init__.py** (vide) + +```python +``` + +- [ ] **Step 5: Commit** + +```bash +git add requirements.txt docker-compose.yml pipeline_runner/ +git commit -m "feat: pipeline-runner — scaffold docker + deps" +``` + +--- + +## Task 2: config.py + +**Files:** +- Create: `pipeline_runner/config.py` + +- [ ] **Step 1: Créer config.py** + +```python +import os +from pathlib import Path + +GDRIVE_REMOTE = os.getenv("GDRIVE_REMOTE", "gdrive:Cosma - Internal/06-Operations/06 - Sorties") +OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/data/sorties")) +TOOLS_DIR = Path(os.getenv("TOOLS_DIR", Path(__file__).parent.parent / "tools")) +LTTB_MAX_PTS = 4000 +``` + +- [ ] **Step 2: Commit** + +```bash +git add pipeline_runner/config.py +git commit -m "feat: pipeline-runner — config" +``` + +--- + +## Task 3: processor.py — LTTB + extraction signaux + +**Files:** +- Create: `pipeline_runner/processor.py` + +Le processor lit les CSV/JSON produits par les scripts tools/ et écrit `usv.json.gz` et `auv_AUVxxx.json.gz`. + +**Champs USV extraits depuis `_navigation_log.csv`** (format: `timestamp,data,value` long): +- `Yaw`, `Heading`, `Roll`, `Pitch` +- `BattVoltage` +- `gps_fix` (valeur string: ex "3D_FIX") +- `Armed` (0/1) +- `Mode` (string: ex "MANUAL", "AUTO") +- `M1`, `M2` (commandes moteur USV) + +**USBL depuis `combined_usbl.csv`** produit par `parse_kogger_usbl.py`: +- colonnes: `Timestamp, Dist, Azimuth, Elev, SNR, FrameID, ...` + +**Champs AUV depuis `mcap_signals.json`** produit par `extract_mcap_signals.py`: +- `depth`, `pwm_auv` (dict M1-M6), `state` (arm/mode) + +- [ ] **Step 1: Créer processor.py** + +```python +import csv +import gzip +import json +import math +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import numpy as np + +from .config import LTTB_MAX_PTS + + +def _lttb(points: list[dict], max_pts: int) -> list[dict]: + """Largest Triangle Three Buckets downsampling.""" + n = len(points) + if n <= max_pts: + return points + ts = np.array([p["t"] for p in points], dtype=float) + vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float) + bucket_size = (n - 2) / (max_pts - 2) + out = [points[0]] + a = 0 + for i in range(max_pts - 2): + avg_start = int((i + 1) * bucket_size) + 1 + avg_end = min(int((i + 2) * bucket_size) + 1, n) + avg_t = np.mean(ts[avg_start:avg_end]) + avg_v = np.mean(vs[avg_start:avg_end]) + rng_start = int(i * bucket_size) + 1 + rng_end = min(int((i + 1) * bucket_size) + 1, n) + max_area = -1.0 + best = rng_start + at, av = ts[a], vs[a] + for j in range(rng_start, rng_end): + area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5 + if area > max_area: + max_area = area + best = j + out.append(points[best]) + a = best + out.append(points[-1]) + return out + + +def _ts_to_epoch(ts_str: str) -> float: + """Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds.""" + for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"): + try: + dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc) + return dt.timestamp() + except ValueError: + continue + return 0.0 + + +def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]: + """Read long-format navigation_log.csv → dict of signal arrays.""" + signals: dict[str, list] = {} + wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix", + "Armed", "Mode", "M1", "M2"} + with open(nav_log_path, newline="", encoding="utf-8") as f: + reader = csv.reader(f) + next(reader, None) # skip header + for row in reader: + if len(row) < 3: + continue + ts_str, field, val = row[0], row[1], row[2] + if field not in wanted: + continue + t = _ts_to_epoch(ts_str) + try: + v: Any = float(val) + except ValueError: + v = val.strip() + signals.setdefault(field, []).append({"t": t, "v": v}) + return signals + + +def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]: + """Read combined_usbl.csv → Dist and Azimuth signal arrays.""" + dist_pts, az_pts = [], [] + with open(usbl_csv_path, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + try: + t = _ts_to_epoch(row["Timestamp"]) + d = float(row["Dist"]) if row["Dist"] else None + az = float(row["Azimuth"]) if row["Azimuth"] else None + except (KeyError, ValueError): + continue + if d is not None and not math.isnan(d): + dist_pts.append({"t": t, "v": d}) + if az is not None and not math.isnan(az): + az_pts.append({"t": t, "v": az}) + return {"usbl_dist": dist_pts, "usbl_angle": az_pts} + + +def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]: + """Read mcap_signals.json → depth, motors M1-M6, state signals.""" + with open(mcap_json_path) as f: + data = json.load(f) + signals: dict[str, list[dict]] = {} + + def _unpack(key: str, series: list): + pts = [] + for item in series: + t = item.get("t_ms", item.get("t", 0)) + if isinstance(t, (int, float)) and t > 1e9: + t = t / 1000.0 # ms → s + pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))}) + signals[key] = pts + + if "depth" in data: + _unpack("depth", data["depth"]) + if "pwm_auv" in data: + for m_key, series in data["pwm_auv"].items(): + _unpack(m_key.lower(), series) + if "state" in data: + _unpack("arm_status", data["state"]) + return signals + + +def write_usv_json( + nav_log_path: Path, + usbl_csv_path: Path | None, + output_path: Path, + sortie_id: str, + date: str, +) -> None: + signals = _read_nav_log(nav_log_path) + if usbl_csv_path and usbl_csv_path.exists(): + signals.update(_read_usbl_csv(usbl_csv_path)) + + t_all = [p["t"] for pts in signals.values() for p in pts if pts] + meta = { + "sortie": sortie_id, + "date": date, + "vehicle": "USV", + "t_start": min(t_all) if t_all else 0, + "t_end": max(t_all) if t_all else 0, + } + + downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()} + payload = json.dumps({"meta": meta, "signals": downsampled}).encode() + output_path.parent.mkdir(parents=True, exist_ok=True) + with gzip.open(output_path, "wb") as f: + f.write(payload) + + +def write_auv_json( + mcap_json_path: Path, + output_path: Path, + auv_id: str, + sortie_id: str, + date: str, +) -> None: + signals = _read_mcap_signals(mcap_json_path) + t_all = [p["t"] for pts in signals.values() for p in pts if pts] + meta = { + "sortie": sortie_id, + "date": date, + "vehicle": auv_id, + "t_start": min(t_all) if t_all else 0, + "t_end": max(t_all) if t_all else 0, + } + downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()} + payload = json.dumps({"meta": meta, "signals": downsampled}).encode() + output_path.parent.mkdir(parents=True, exist_ok=True) + with gzip.open(output_path, "wb") as f: + f.write(payload) +``` + +- [ ] **Step 2: Commit** + +```bash +git add pipeline_runner/processor.py +git commit -m "feat: pipeline-runner — processor LTTB + signal extraction" +``` + +--- + +## Task 4: runner.py — rclone + orchestration + +**Files:** +- Create: `pipeline_runner/runner.py` + +- [ ] **Step 1: Créer runner.py** + +```python +import asyncio +import re +import subprocess +from pathlib import Path +from typing import AsyncGenerator + +from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR +from .processor import write_auv_json, write_usv_json + + +async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""): + await queue.put({"step": step, "pct": pct, "msg": msg}) + + +def _find_nav_log(ship_dir: Path) -> Path | None: + for p in ship_dir.glob("*_navigation_log.csv"): + return p + return None + + +def _find_usbl_csv(ship_dir: Path) -> Path | None: + for p in ship_dir.glob("*_usbl.csv"): + return p + return None + + +def _detect_auvs(sub_dir: Path) -> list[str]: + """Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010' → 'AUV010').""" + auvs = [] + for d in sub_dir.iterdir(): + if d.is_dir(): + m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE) + if m: + auvs.append(m.group(1).upper()) + return sorted(set(auvs)) + + +def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None: + for d in sub_dir.iterdir(): + if d.is_dir() and auv_id.upper() in d.name.upper(): + return d + return None + + +def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess: + cmd = ["python3", str(TOOLS_DIR / script_name)] + args + return subprocess.run(cmd, capture_output=True, text=True) + + +async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None: + """Full pipeline: rclone sync → parse → process → write JSON.gz""" + raw_dir = OUTPUT_DIR / sortie_id / "raw" + proc_dir = OUTPUT_DIR / sortie_id / "processed" + proc_dir.mkdir(parents=True, exist_ok=True) + + # Step 1: rclone sync + await _emit(queue, "sync", 0, "rclone sync en cours…") + gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}' + result = subprocess.run( + ["rclone", "sync", gdrive_path, str(raw_dir), "--progress"], + capture_output=True, text=True + ) + if result.returncode != 0: + await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}") + return + await _emit(queue, "sync", 50, "rclone terminé") + + # Detect session dir (YYYYMMDD_HHMMSS_AUVxxx) inside raw/ + # Sorties have structure: raw/raw_data/logs/SHIP + SUB + ship_dir = next(raw_dir.rglob("logs/SHIP"), None) + sub_dir = next(raw_dir.rglob("logs/SUB"), None) + + if not ship_dir or not ship_dir.exists(): + await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync") + return + + # Step 2: USV parsing + await _emit(queue, "usv_parse", 55, "Parsing logs USV…") + nav_log = _find_nav_log(ship_dir) + usbl_csv_raw = _find_usbl_csv(ship_dir) + usbl_parsed_path = proc_dir / "combined_usbl.csv" + + if usbl_csv_raw: + _run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)]) + + date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10] + if nav_log: + write_usv_json( + nav_log_path=nav_log, + usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None, + output_path=proc_dir / "usv.json.gz", + sortie_id=sortie_id, + date=date_str, + ) + await _emit(queue, "usv_parse", 70, "USV OK") + + # Step 3: AUV parsing + if sub_dir and sub_dir.exists(): + auv_ids = _detect_auvs(sub_dir) + total = len(auv_ids) or 1 + for i, auv_id in enumerate(auv_ids): + await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}…") + session_dir = _detect_session_dir(sub_dir, auv_id) + if not session_dir: + continue + mcap_out = proc_dir / f"mcap_{auv_id}.json" + _run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"]) + # extract_mcap_signals writes to output/ by default; look for it + default_out = Path("output") / "mcap_signals.json" + if not default_out.exists(): + default_out = session_dir / "mcap_signals.json" + if default_out.exists(): + import shutil + shutil.copy(default_out, mcap_out) + if mcap_out.exists(): + write_auv_json( + mcap_json_path=mcap_out, + output_path=proc_dir / f"auv_{auv_id}.json.gz", + auv_id=auv_id, + sortie_id=sortie_id, + date=date_str, + ) + await _emit(queue, "write", 100, "Pipeline terminé") + + +async def scan_sorties() -> list[dict]: + """List available sorties on GDrive via rclone lsd.""" + result = subprocess.run( + ["rclone", "lsd", GDRIVE_REMOTE], + capture_output=True, text=True + ) + sorties = [] + for line in result.stdout.splitlines(): + parts = line.strip().split(None, 4) + if len(parts) == 5: + name = parts[4].strip() + processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists() + sorties.append({"id": name, "processed": processed}) + return sorties +``` + +- [ ] **Step 2: Commit** + +```bash +git add pipeline_runner/runner.py +git commit -m "feat: pipeline-runner — runner rclone + orchestration" +``` + +--- + +## Task 5: main.py — FastAPI endpoints + +**Files:** +- Create: `pipeline_runner/main.py` + +- [ ] **Step 1: Créer main.py** + +```python +import asyncio +import gzip +import json +from pathlib import Path +from typing import AsyncGenerator + +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse, StreamingResponse + +from .config import OUTPUT_DIR +from .runner import run_pipeline, scan_sorties + +app = FastAPI(title="COSMA Pipeline Runner") +app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) + +# Active pipeline jobs: sortie_id → asyncio.Queue +_jobs: dict[str, asyncio.Queue] = {} + + +@app.get("/sorties") +async def list_sorties(): + return await scan_sorties() + + +@app.post("/run/{sortie_id:path}") +async def run_sortie(sortie_id: str): + if sortie_id in _jobs: + return {"status": "already_running"} + queue: asyncio.Queue = asyncio.Queue() + _jobs[sortie_id] = queue + asyncio.create_task(_run_and_cleanup(sortie_id, queue)) + return {"status": "started"} + + +async def _run_and_cleanup(sortie_id: str, queue: asyncio.Queue): + try: + await run_pipeline(sortie_id, queue) + finally: + await asyncio.sleep(30) + _jobs.pop(sortie_id, None) + + +@app.get("/events/{sortie_id:path}") +async def sse_events(sortie_id: str): + if sortie_id not in _jobs: + raise HTTPException(404, "No active job for this sortie") + + async def generate() -> AsyncGenerator[str, None]: + queue = _jobs[sortie_id] + while True: + try: + event = await asyncio.wait_for(queue.get(), timeout=60) + yield f"data: {json.dumps(event)}\n\n" + if event.get("step") in ("error", "write") and event.get("pct") in (0, 100): + break + except asyncio.TimeoutError: + yield "data: {\"step\":\"ping\"}\n\n" + + return StreamingResponse(generate(), media_type="text/event-stream") + + +def _read_gz(path: Path) -> dict: + with gzip.open(path) as f: + return json.loads(f.read()) + + +@app.get("/sorties/{sortie_id:path}/usv") +async def get_usv(sortie_id: str): + p = OUTPUT_DIR / sortie_id / "processed" / "usv.json.gz" + if not p.exists(): + raise HTTPException(404, "USV data not found — run pipeline first") + return JSONResponse(_read_gz(p)) + + +@app.get("/sorties/{sortie_id:path}/auvs") +async def list_auvs(sortie_id: str): + proc = OUTPUT_DIR / sortie_id / "processed" + auvs = [p.stem.removeprefix("auv_").removesuffix(".json") + for p in proc.glob("auv_*.json.gz")] + return sorted(auvs) + + +@app.get("/sorties/{sortie_id:path}/auv/{auv_id}") +async def get_auv(sortie_id: str, auv_id: str): + p = OUTPUT_DIR / sortie_id / "processed" / f"auv_{auv_id}.json.gz" + if not p.exists(): + raise HTTPException(404, f"AUV {auv_id} data not found") + return JSONResponse(_read_gz(p)) + + +@app.get("/sorties/{sortie_id:path}/tracks") +async def get_tracks(sortie_id: str): + p = OUTPUT_DIR / sortie_id / "processed" / "tracks.geojson" + if not p.exists(): + raise HTTPException(404, "tracks.geojson not found") + with open(p) as f: + return JSONResponse(json.load(f)) +``` + +- [ ] **Step 2: Commit** + +```bash +git add pipeline_runner/main.py +git commit -m "feat: pipeline-runner — FastAPI endpoints + SSE" +``` + +--- + +## Task 6: Test pipeline local sur sortie #71 + +Avant de modifier le viewer, valider que le service tourne et répond. + +- [ ] **Step 1: Installer deps localement** + +```bash +pip install fastapi uvicorn[standard] aiofiles numpy +``` + +- [ ] **Step 2: Lancer le service en local** + +```bash +cd /c/Users/flopp/cosma-nav-tools +OUTPUT_DIR=/tmp/cosma-test \ +GDRIVE_REMOTE="gdrive:Cosma - Internal/06-Operations/06 - Sorties" \ +TOOLS_DIR=tools \ +uvicorn pipeline_runner.main:app --port 8767 --reload +``` + +- [ ] **Step 3: Tester /sorties** + +```bash +curl http://localhost:8767/sorties | python3 -m json.tool | head -20 +``` +Attendu: liste de dossiers dont `#71-golrest`. + +- [ ] **Step 4: Lancer pipeline sur #71** + +```bash +curl -X POST "http://localhost:8767/run/%2371-golrest" +# puis stream les events +curl "http://localhost:8767/events/%2371-golrest" +``` +Attendu: stream SSE avec étapes `sync → usv_parse → auv_parse → write 100%`. + +- [ ] **Step 5: Vérifier output USV** + +```bash +curl "http://localhost:8767/sorties/%2371-golrest/usv" | python3 -m json.tool | head -30 +``` +Attendu: `{"meta": {...}, "signals": {"Yaw": [...], "Heading": [...], ...}}`. + +- [ ] **Step 6: Commit si OK** + +```bash +git commit --allow-empty -m "test: pipeline #71 validé en local" +``` + +--- + +## Task 7: viewer — sélecteur sortie + bouton + SSE progress + +**Files:** +- Modify: `viewer/index.html` + +La constante `API2 = 'http://192.168.0.83:8767'` est ajoutée. Le header reçoit un dropdown sortie et le bouton Sync. + +- [ ] **Step 1: Ajouter `API2` et les styles dans `` (après la ligne `const API = ...`)** + +Trouver dans le JS de index.html: +```js +const API = 'http://192.168.0.83:8766'; +``` +Ajouter juste après: +```js +const API2 = 'http://192.168.0.83:8767'; +``` + +- [ ] **Step 2: Ajouter styles CSS pour les nouveaux éléments (dans le bloc ``: +```css + #sortie-select { + background: #0f3460; border: 1px solid #e94560; color: #e0e0e0; + font-family: monospace; font-size: 11px; padding: 2px 6px; border-radius: 2px; + cursor: pointer; max-width: 160px; + } + #btn-sync { + background: #0f3460; border: 1px solid #e94560; color: #e94560; + padding: 2px 9px; cursor: pointer; font-family: monospace; font-size: 11px; + border-radius: 2px; + } + #btn-sync:hover { background: #e94560; color: #1a1a2e; } + #btn-sync:disabled { opacity: 0.4; cursor: not-allowed; } + #sync-progress { + font-size: 10px; color: #06d6a0; flex: 1; + white-space: nowrap; overflow: hidden; text-overflow: ellipsis; + } + .panel-header { + background: #0d0d20; border-top: 1px solid #0f3460; border-bottom: 1px solid #0f3460; + padding: 4px 14px; font-size: 11px; font-weight: bold; color: #e94560; + display: flex; align-items: center; gap: 8px; + } + .auv-tab { + font-family: monospace; font-size: 10px; padding: 2px 8px; cursor: pointer; + border: 1px solid #0f3460; background: transparent; color: #a0c4ff; border-radius: 2px; + } + .auv-tab.active { background: #0f3460; color: #e0e0e0; } + .graphs-grid { + display: grid; grid-template-columns: repeat(2, 1fr); gap: 4px; + padding: 4px 8px; background: #12122a; + } + .graph-cell { height: 130px; background: #1a1a2e; } + .graph-cell.wide { grid-column: span 2; height: 130px; } +``` + +- [ ] **Step 3: Ajouter HTML dans le `#header` (sortie dropdown + bouton + progress)** + +Dans le `