Compare commits
31 Commits
6f2f6d2d72
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f66bb52fec | ||
|
|
dd6f0cf435 | ||
|
|
f5788a01f4 | ||
| f5debc8afc | |||
|
|
63270beeff | ||
|
|
4164f32694 | ||
|
|
b45a5368ee | ||
|
|
7a5d442fbd | ||
| b21e306a86 | |||
|
|
a79f63e59e | ||
|
|
31b5a221b8 | ||
|
|
b962997008 | ||
|
|
6978b36650 | ||
| 70608de221 | |||
|
|
0a0dac7fda | ||
|
|
62091a09b7 | ||
|
|
3e1da53cc7 | ||
|
|
24f9394c75 | ||
|
|
c9dca1d071 | ||
|
|
682050ef14 | ||
|
|
4aec9d6295 | ||
|
|
af2bb6581f | ||
|
|
bd3a2359d9 | ||
|
|
02e357b874 | ||
|
|
34e709b7c8 | ||
|
|
07df61cbc4 | ||
|
|
8a5ed6174c | ||
|
|
103bf1cedd | ||
|
|
3198164aff | ||
|
|
be2cd1d156 | ||
|
|
b46f136b76 |
5
.gitignore
vendored
5
.gitignore
vendored
@@ -2,8 +2,3 @@ data/
|
||||
output/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
.venv/
|
||||
data/
|
||||
viewer/data/
|
||||
viewer/screenshots/
|
||||
screenshots/
|
||||
|
||||
17
docker-compose.yml
Normal file
17
docker-compose.yml
Normal file
@@ -0,0 +1,17 @@
|
||||
version: "3.9"
|
||||
services:
|
||||
pipeline-runner:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: pipeline_runner/Dockerfile
|
||||
container_name: cosma-pipeline-runner
|
||||
ports:
|
||||
- "8767:8767"
|
||||
volumes:
|
||||
- /mnt/nas-cosma/cosma/sorties:/data/sorties
|
||||
- /home/floppyrj45/.config/rclone/rclone.conf:/root/.config/rclone/rclone.conf:ro
|
||||
environment:
|
||||
- GDRIVE_REMOTE=cosma:06-Operations/06 - Sorties
|
||||
- OUTPUT_DIR=/data/sorties
|
||||
- TOOLS_DIR=/app/tools
|
||||
restart: unless-stopped
|
||||
1219
docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md
Normal file
1219
docs/superpowers/plans/2026-04-27-gdrive-pipeline-replay.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,184 @@
|
||||
# Design : GDrive Pipeline Replay
|
||||
|
||||
**Date :** 2026-04-27
|
||||
**Repo :** cosma-nav-tools
|
||||
**Branche :** feat/flag-local
|
||||
|
||||
---
|
||||
|
||||
## Objectif
|
||||
|
||||
Bouton dans le viewer 8765 qui déclenche un sync GDrive → pipeline Python → affichage replay des signaux USV et AUV sur la même page, synchronisés avec le slider 24h existant.
|
||||
|
||||
---
|
||||
|
||||
## Architecture globale
|
||||
|
||||
```
|
||||
GDrive (G:)
|
||||
└─ 06-Operations/06-Sorties/#XX-lieu/YYYYMMDD-lieu/raw_data/
|
||||
├─ SHIP/*.csv (USV nav + USBL + full)
|
||||
└─ SUB/*.mcap (AUV ROS2) + bin/
|
||||
|
||||
↓ rclone sync (déclenché par bouton viewer)
|
||||
|
||||
.83 /data/sorties/#XX/raw/
|
||||
├─ SHIP/
|
||||
└─ SUB/
|
||||
|
||||
↓ pipeline Python (scripts existants cosma-nav-tools/tools/)
|
||||
|
||||
.83 /data/sorties/#XX/processed/
|
||||
├─ usv.json.gz ← signaux USV downsamplés LTTB
|
||||
├─ auv_AUV010.json.gz ← signaux AUV010 downsamplés LTTB
|
||||
├─ auv_AUVxxx.json.gz ← un fichier par AUV détecté
|
||||
└─ tracks.geojson ← USV + AUV tracks (Leaflet)
|
||||
|
||||
↓ servi par
|
||||
|
||||
port 8767 pipeline-runner FastAPI (cosma-nav-tools/pipeline_runner/)
|
||||
port 8765 viewer/index.html ← page unique, fetch 8766 + 8767
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Section 1 : Pipeline-runner (port 8767)
|
||||
|
||||
### Structure
|
||||
|
||||
```
|
||||
cosma-nav-tools/
|
||||
pipeline_runner/
|
||||
main.py ← FastAPI app + endpoints
|
||||
runner.py ← logique rclone + orchestration scripts
|
||||
config.py ← chemins GDrive, output dir, rclone remote
|
||||
docker-compose.yml ← service pipeline-runner port 8767
|
||||
data/sorties/ ← gitignored
|
||||
```
|
||||
|
||||
### Endpoints
|
||||
|
||||
| Méthode | Route | Description |
|
||||
|---------|-------|-------------|
|
||||
| `GET` | `/sorties` | Liste sorties détectées sur GDrive (scan dossier) |
|
||||
| `POST` | `/run/{sortie_id}` | Lance rclone pull + pipeline complet |
|
||||
| `GET` | `/events/{sortie_id}` | SSE stream progress (étape + %) |
|
||||
| `GET` | `/sorties/{id}/usv` | Données USV processed (JSON gzip) |
|
||||
| `GET` | `/sorties/{id}/auvs` | Liste AUVs disponibles pour cette sortie |
|
||||
| `GET` | `/sorties/{id}/auv/{auv_id}` | Données AUV processed (JSON gzip) |
|
||||
| `GET` | `/sorties/{id}/tracks` | GeoJSON tracks USV + AUV |
|
||||
|
||||
### Flow interne POST /run/{sortie_id}
|
||||
|
||||
```
|
||||
1. rclone sync GDrive/#id → /data/sorties/#id/raw/ SSE: "sync" 0→50%
|
||||
2. parse_usv_nav.py + extract_usv_pwm.py SSE: "usv_parse" 50→65%
|
||||
3. parse_kogger_usbl.py + merge_nav_usbl.py SSE: "usbl_merge" 65→80%
|
||||
4. extract_mcap_signals.py (par AUV détecté) SSE: "auv_parse" 80→90%
|
||||
5. usbl_to_json.py SSE: "usbl_json" 90→95%
|
||||
6. downsample LTTB + écriture .json.gz SSE: "write" 95→100%
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Section 2 : Format données
|
||||
|
||||
### Signaux (usv.json.gz, auv_*.json.gz)
|
||||
|
||||
```json
|
||||
{
|
||||
"meta": {
|
||||
"sortie": "#71-golrest",
|
||||
"date": "2026-04-16",
|
||||
"vehicle": "USV001",
|
||||
"t_start": 1713268477,
|
||||
"t_end": 1713282877
|
||||
},
|
||||
"signals": {
|
||||
"yaw": [{ "t": 1713268477, "v": 142.3 }, ...],
|
||||
"heading": [...],
|
||||
"gps_status": [{ "t": ..., "v": "3D_FIX" }, ...],
|
||||
"battery_v": [...],
|
||||
"usbl_dist": [...],
|
||||
"usbl_angle": [...],
|
||||
"motor_1": [...],
|
||||
"motor_2": [...],
|
||||
"auv_status": [{ "t": ..., "v": "MISSION" }, ...]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
- `t` = epoch UNIX secondes
|
||||
- Max **4000 points par signal** via LTTB (Plotly optimal)
|
||||
- Valeurs discrètes (status, mode) = strings → step-plot Plotly
|
||||
- Endpoint optionnel `/range?from=&to=` pour zoom fin sur brutes
|
||||
|
||||
### Tracks
|
||||
|
||||
- **GeoJSON** FeatureCollection : une Feature LineString par véhicule
|
||||
- Poids estimé : ~300KB gzip pour 4h à 1Hz
|
||||
|
||||
---
|
||||
|
||||
## Section 3 : Viewer 8765 — extensions
|
||||
|
||||
### Layout (page unique)
|
||||
|
||||
```
|
||||
datebar ← inchangé
|
||||
header ← + dropdown sortie + [Sync & Process] + progress bar SSE
|
||||
map Leaflet ← inchangé
|
||||
slider 24h ← shared X axis, curseur vertical sur tous les graphs
|
||||
─────────────────────────────────────────────
|
||||
USV PANEL
|
||||
[Yaw] [Heading]
|
||||
[GPS status] [Battery voltage]
|
||||
[USBL dist] [USBL angle]
|
||||
[Motor 1] [Motor 2]
|
||||
[AUV status: disarm/mission/goto — step-plot]
|
||||
─────────────────────────────────────────────
|
||||
AUV PANEL ← tabs [AUV010] [AUV011] … si multi-AUV
|
||||
[Pitch/Roll/Yaw — 3 traces] [Depth]
|
||||
[Altitude] [Obstacle dist]
|
||||
[USBL dist] [USBL angle]
|
||||
[Battery voltage] [Arm/Disarm/Mode — step-plot]
|
||||
[Motors ×6 PWM — 1 graph multi-trace M1…M6]
|
||||
```
|
||||
|
||||
### Synchronisation slider
|
||||
|
||||
- Slider 24h existant → événement → `updateCursor(t)` sur tous les graphs Plotly via `Plotly.relayout` shapes
|
||||
- Données chargées une fois en mémoire au click de sortie, pas de re-fetch au scrub
|
||||
|
||||
### Bouton Sync & Process
|
||||
|
||||
- `POST /run/{sortie_id}` → ouvre SSE `/events/{sortie_id}`
|
||||
- Progress bar inline dans le header (ex: `rclone 34% → usv_parse…`)
|
||||
- À 100% → fetch automatique USV + AUV + tracks → render graphs
|
||||
|
||||
---
|
||||
|
||||
## Section 4 : Déploiement .83
|
||||
|
||||
```yaml
|
||||
# docker-compose.yml (ajout)
|
||||
pipeline-runner:
|
||||
build: ./pipeline_runner
|
||||
ports:
|
||||
- "8767:8767"
|
||||
volumes:
|
||||
- /data/sorties:/data/sorties
|
||||
- /mnt/gdrive:/mnt/gdrive # rclone mount ou rclone exec
|
||||
environment:
|
||||
- GDRIVE_REMOTE=gdrive:Cosma - Internal/06-Operations/06 - Sorties
|
||||
- OUTPUT_DIR=/data/sorties
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Hors scope (v1)
|
||||
|
||||
- Authentification / accès multi-user
|
||||
- HDF5 archivage (peut venir en v2)
|
||||
- Replay animé temps-réel (curseur qui avance automatiquement)
|
||||
- Tests unitaires pipeline
|
||||
9
pipeline_runner/Dockerfile
Normal file
9
pipeline_runner/Dockerfile
Normal file
@@ -0,0 +1,9 @@
|
||||
FROM python:3.12-slim
|
||||
WORKDIR /app
|
||||
RUN apt-get update && apt-get install -y rclone && rm -rf /var/lib/apt/lists/*
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
COPY tools/ ./tools/
|
||||
COPY vendor/ ./vendor/
|
||||
COPY pipeline_runner/ ./pipeline_runner/
|
||||
CMD ["uvicorn", "pipeline_runner.main:app", "--host", "0.0.0.0", "--port", "8767"]
|
||||
0
pipeline_runner/__init__.py
Normal file
0
pipeline_runner/__init__.py
Normal file
7
pipeline_runner/config.py
Normal file
7
pipeline_runner/config.py
Normal file
@@ -0,0 +1,7 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
GDRIVE_REMOTE = os.getenv("GDRIVE_REMOTE", "gdrive:Cosma - Internal/06-Operations/06 - Sorties")
|
||||
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "/data/sorties"))
|
||||
TOOLS_DIR = Path(os.getenv("TOOLS_DIR", Path(__file__).parent.parent / "tools"))
|
||||
LTTB_MAX_PTS = 4000
|
||||
221
pipeline_runner/main.py
Normal file
221
pipeline_runner/main.py
Normal file
@@ -0,0 +1,221 @@
|
||||
import asyncio
|
||||
import csv
|
||||
import gzip
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import AsyncGenerator
|
||||
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
|
||||
from .config import OUTPUT_DIR
|
||||
from .runner import run_pipeline, scan_sorties, scan_sorties_local
|
||||
|
||||
app = FastAPI(title="COSMA Pipeline Runner")
|
||||
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
||||
|
||||
# Active pipeline jobs: sortie_id → asyncio.Queue
|
||||
_jobs: dict[str, asyncio.Queue] = {}
|
||||
|
||||
# Cache sorties avec TTL 10min
|
||||
_sorties_cache: list | None = None
|
||||
_sorties_cache_ts: float = 0.0
|
||||
_SORTIES_TTL = 600.0 # 10 minutes
|
||||
_sorties_refresh_lock: asyncio.Lock | None = None
|
||||
|
||||
|
||||
def _get_lock() -> asyncio.Lock:
|
||||
global _sorties_refresh_lock
|
||||
if _sorties_refresh_lock is None:
|
||||
_sorties_refresh_lock = asyncio.Lock()
|
||||
return _sorties_refresh_lock
|
||||
|
||||
|
||||
async def _refresh_sorties_cache() -> None:
|
||||
"""Refresh cache in background (holds lock to avoid parallel rclone calls)."""
|
||||
global _sorties_cache, _sorties_cache_ts
|
||||
lock = _get_lock()
|
||||
async with lock:
|
||||
# Double-check after acquiring lock
|
||||
if time.monotonic() - _sorties_cache_ts < _SORTIES_TTL:
|
||||
return
|
||||
result = await asyncio.to_thread(scan_sorties)
|
||||
_sorties_cache = result
|
||||
_sorties_cache_ts = time.monotonic()
|
||||
|
||||
|
||||
@app.get("/sorties")
|
||||
async def list_sorties():
|
||||
global _sorties_cache, _sorties_cache_ts
|
||||
now = time.monotonic()
|
||||
if _sorties_cache is None:
|
||||
# Premier appel: bloquant (cache vide)
|
||||
await _refresh_sorties_cache()
|
||||
elif now - _sorties_cache_ts >= _SORTIES_TTL:
|
||||
# Cache périmé: retourne le cache, refresh en arrière-plan
|
||||
asyncio.create_task(_refresh_sorties_cache())
|
||||
return _sorties_cache or []
|
||||
|
||||
|
||||
@app.get("/sorties/local")
|
||||
async def list_sorties_local():
|
||||
"""Scan /data/sorties local (NAS, instantané) sans rclone."""
|
||||
sorties = await asyncio.to_thread(scan_sorties_local)
|
||||
return sorties
|
||||
|
||||
|
||||
@app.post("/run/{sortie_id:path}")
|
||||
async def run_sortie(sortie_id: str):
|
||||
if sortie_id in _jobs:
|
||||
return {"status": "already_running"}
|
||||
queue: asyncio.Queue = asyncio.Queue()
|
||||
_jobs[sortie_id] = queue
|
||||
asyncio.create_task(_run_and_cleanup(sortie_id, queue))
|
||||
return {"status": "started"}
|
||||
|
||||
|
||||
async def _run_and_cleanup(sortie_id: str, queue: asyncio.Queue):
|
||||
try:
|
||||
await run_pipeline(sortie_id, queue)
|
||||
finally:
|
||||
await asyncio.sleep(30)
|
||||
_jobs.pop(sortie_id, None)
|
||||
|
||||
|
||||
@app.get("/events/{sortie_id:path}")
|
||||
async def sse_events(sortie_id: str):
|
||||
if sortie_id not in _jobs:
|
||||
raise HTTPException(404, "No active job for this sortie")
|
||||
|
||||
async def generate() -> AsyncGenerator[str, None]:
|
||||
queue = _jobs[sortie_id]
|
||||
while True:
|
||||
try:
|
||||
event = await asyncio.wait_for(queue.get(), timeout=60)
|
||||
yield f"data: {json.dumps(event)}\n\n"
|
||||
if event.get("step") in ("error", "write") and event.get("pct") in (0, 100):
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
yield "data: {\"step\":\"ping\"}\n\n"
|
||||
|
||||
return StreamingResponse(generate(), media_type="text/event-stream")
|
||||
|
||||
|
||||
def _read_gz(path: Path) -> dict:
|
||||
with gzip.open(path) as f:
|
||||
return json.loads(f.read())
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/usv")
|
||||
async def get_usv(sortie_id: str):
|
||||
p = OUTPUT_DIR / sortie_id / "processed" / "usv.json.gz"
|
||||
if not p.exists():
|
||||
raise HTTPException(404, "USV data not found — run pipeline first")
|
||||
return JSONResponse(_read_gz(p))
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/auvs")
|
||||
async def list_auvs(sortie_id: str):
|
||||
proc = OUTPUT_DIR / sortie_id / "processed"
|
||||
auvs = [p.name.removesuffix(".json.gz").removeprefix("auv_")
|
||||
for p in proc.glob("auv_*.json.gz")]
|
||||
return sorted(auvs)
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/auv/{auv_id}")
|
||||
async def get_auv(sortie_id: str, auv_id: str):
|
||||
p = OUTPUT_DIR / sortie_id / "processed" / f"auv_{auv_id}.json.gz"
|
||||
if not p.exists():
|
||||
raise HTTPException(404, f"AUV {auv_id} data not found")
|
||||
return JSONResponse(_read_gz(p))
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/tracks")
|
||||
async def get_tracks(sortie_id: str):
|
||||
p = OUTPUT_DIR / sortie_id / "processed" / "tracks.geojson"
|
||||
if not p.exists():
|
||||
raise HTTPException(404, "tracks.geojson not found")
|
||||
with open(p) as f:
|
||||
return JSONResponse(json.load(f))
|
||||
|
||||
|
||||
def _ts_nav(ts_str: str) -> float:
|
||||
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
|
||||
try:
|
||||
return datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc).timestamp()
|
||||
except ValueError:
|
||||
continue
|
||||
return 0.0
|
||||
|
||||
|
||||
def _read_usv_track(nav_log_path: Path, max_pts: int = 2000) -> list[dict]:
|
||||
"""Read navigation_log.csv → [{t_ms, lat, lon, heading, source}] downsampled."""
|
||||
pts: list[dict] = []
|
||||
lat_map: dict[float, float] = {}
|
||||
lon_map: dict[float, float] = {}
|
||||
heading_map: dict[float, float] = {}
|
||||
with open(nav_log_path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.reader(f)
|
||||
next(reader, None) # skip header
|
||||
for row in reader:
|
||||
if len(row) < 3:
|
||||
continue
|
||||
ts_str, field, val = row[0], row[1], row[2]
|
||||
if field not in ("Lat", "Lon", "Heading"):
|
||||
continue
|
||||
t = _ts_nav(ts_str)
|
||||
try:
|
||||
v = float(val)
|
||||
except ValueError:
|
||||
continue
|
||||
if field == "Lat":
|
||||
lat_map[t] = v
|
||||
elif field == "Lon":
|
||||
lon_map[t] = v
|
||||
else:
|
||||
heading_map[t] = v
|
||||
# Join on lat timestamps (master)
|
||||
source = nav_log_path.parent.name
|
||||
for t, lat in sorted(lat_map.items()):
|
||||
lon = lon_map.get(t)
|
||||
if lon is None:
|
||||
# nearest lon within 1s
|
||||
near = min(lon_map.keys(), key=lambda x: abs(x - t), default=None)
|
||||
if near is None or abs(near - t) > 1.0:
|
||||
continue
|
||||
lon = lon_map[near]
|
||||
pts.append({
|
||||
"t_ms": int(t * 1000),
|
||||
"lat": lat,
|
||||
"lon": lon,
|
||||
"heading": heading_map.get(t),
|
||||
"source": source,
|
||||
})
|
||||
# Simple stride downsampling
|
||||
if len(pts) > max_pts:
|
||||
step = len(pts) // max_pts
|
||||
pts = pts[::step]
|
||||
return pts
|
||||
|
||||
|
||||
_track_cache: dict[str, list[dict]] = {}
|
||||
|
||||
|
||||
@app.get("/sorties/{sortie_id:path}/usv_track")
|
||||
async def get_usv_track(sortie_id: str):
|
||||
"""Return USV GPS track [{t_ms, lat, lon, heading, source}] for map polylines."""
|
||||
if sortie_id in _track_cache:
|
||||
return JSONResponse(_track_cache[sortie_id])
|
||||
raw_dir = OUTPUT_DIR / sortie_id / "raw"
|
||||
nav_logs = list(raw_dir.rglob("*_navigation_log.csv")) if raw_dir.exists() else []
|
||||
if not nav_logs:
|
||||
raise HTTPException(404, "No navigation_log.csv found — run pipeline first")
|
||||
pts: list[dict] = []
|
||||
for nav_log in nav_logs:
|
||||
pts.extend(await asyncio.to_thread(_read_usv_track, nav_log))
|
||||
pts.sort(key=lambda p: p["t_ms"])
|
||||
_track_cache[sortie_id] = pts
|
||||
return JSONResponse(pts)
|
||||
174
pipeline_runner/processor.py
Normal file
174
pipeline_runner/processor.py
Normal file
@@ -0,0 +1,174 @@
|
||||
import csv
|
||||
import gzip
|
||||
import json
|
||||
import math
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
from .config import LTTB_MAX_PTS
|
||||
|
||||
|
||||
def _lttb(points: list[dict], max_pts: int) -> list[dict]:
|
||||
"""Largest Triangle Three Buckets downsampling."""
|
||||
n = len(points)
|
||||
if n <= max_pts:
|
||||
return points
|
||||
ts = np.array([p["t"] for p in points], dtype=float)
|
||||
vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float)
|
||||
bucket_size = (n - 2) / (max_pts - 2)
|
||||
out = [points[0]]
|
||||
a = 0
|
||||
for i in range(max_pts - 2):
|
||||
avg_start = int((i + 1) * bucket_size) + 1
|
||||
avg_end = min(int((i + 2) * bucket_size) + 1, n)
|
||||
avg_t = np.mean(ts[avg_start:avg_end])
|
||||
avg_v = np.mean(vs[avg_start:avg_end])
|
||||
rng_start = int(i * bucket_size) + 1
|
||||
rng_end = min(int((i + 1) * bucket_size) + 1, n)
|
||||
max_area = -1.0
|
||||
best = rng_start
|
||||
at, av = ts[a], vs[a]
|
||||
for j in range(rng_start, rng_end):
|
||||
area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5
|
||||
if area > max_area:
|
||||
max_area = area
|
||||
best = j
|
||||
out.append(points[best])
|
||||
a = best
|
||||
out.append(points[-1])
|
||||
return out
|
||||
|
||||
|
||||
def _ts_to_epoch(ts_str: str) -> float:
|
||||
"""Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds."""
|
||||
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
|
||||
try:
|
||||
dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc)
|
||||
return dt.timestamp()
|
||||
except ValueError:
|
||||
continue
|
||||
return 0.0
|
||||
|
||||
|
||||
def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]:
|
||||
"""Read long-format navigation_log.csv → dict of signal arrays."""
|
||||
signals: dict[str, list] = {}
|
||||
wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix",
|
||||
"Armed", "Mode", "M1", "M2"}
|
||||
with open(nav_log_path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.reader(f)
|
||||
next(reader, None) # skip header
|
||||
for row in reader:
|
||||
if len(row) < 3:
|
||||
continue
|
||||
ts_str, field, val = row[0], row[1], row[2]
|
||||
if field not in wanted:
|
||||
continue
|
||||
t = _ts_to_epoch(ts_str)
|
||||
try:
|
||||
v: Any = float(val)
|
||||
except ValueError:
|
||||
v = val.strip()
|
||||
signals.setdefault(field, []).append({"t": t, "v": v})
|
||||
return signals
|
||||
|
||||
|
||||
def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]:
|
||||
"""Read combined_usbl.csv → Dist and Azimuth signal arrays."""
|
||||
dist_pts, az_pts = [], []
|
||||
with open(usbl_csv_path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
try:
|
||||
t = _ts_to_epoch(row["Timestamp"])
|
||||
d = float(row["Dist"]) if row["Dist"] else None
|
||||
az = float(row["Azimuth"]) if row["Azimuth"] else None
|
||||
except (KeyError, ValueError):
|
||||
continue
|
||||
if d is not None and not math.isnan(d):
|
||||
dist_pts.append({"t": t, "v": d})
|
||||
if az is not None and not math.isnan(az):
|
||||
az_pts.append({"t": t, "v": az})
|
||||
return {"usbl_dist": dist_pts, "usbl_angle": az_pts}
|
||||
|
||||
|
||||
def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]:
|
||||
"""Read mcap_signals.json → depth, motors M1-M6, state signals."""
|
||||
with open(mcap_json_path) as f:
|
||||
data = json.load(f)
|
||||
signals: dict[str, list[dict]] = {}
|
||||
|
||||
def _unpack(key: str, series: list):
|
||||
pts = []
|
||||
for item in series:
|
||||
t = item.get("t_ms", item.get("t", 0))
|
||||
if isinstance(t, (int, float)) and t > 1e9:
|
||||
t = t / 1000.0 # ms → s
|
||||
pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))})
|
||||
signals[key] = pts
|
||||
|
||||
if "depth" in data:
|
||||
_unpack("depth", data["depth"])
|
||||
if "pwm_auv" in data:
|
||||
for m_key, series in data["pwm_auv"].items():
|
||||
_unpack(m_key.lower(), series)
|
||||
if "state" in data:
|
||||
_unpack("arm_status", data["state"])
|
||||
# New signals added by extended extract_mcap_signals.py
|
||||
for key in ("pitch", "roll", "yaw", "altitude", "battery_v", "obstacle_dist"):
|
||||
if key in data:
|
||||
_unpack(key, data[key])
|
||||
return signals
|
||||
|
||||
|
||||
def write_usv_json(
|
||||
nav_log_path: Path,
|
||||
usbl_csv_path: Path | None,
|
||||
output_path: Path,
|
||||
sortie_id: str,
|
||||
date: str,
|
||||
) -> None:
|
||||
signals = _read_nav_log(nav_log_path)
|
||||
if usbl_csv_path and usbl_csv_path.exists():
|
||||
signals.update(_read_usbl_csv(usbl_csv_path))
|
||||
|
||||
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
|
||||
meta = {
|
||||
"sortie": sortie_id,
|
||||
"date": date,
|
||||
"vehicle": "USV",
|
||||
"t_start": min(t_all) if t_all else 0,
|
||||
"t_end": max(t_all) if t_all else 0,
|
||||
}
|
||||
|
||||
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
|
||||
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with gzip.open(output_path, "wb") as f:
|
||||
f.write(payload)
|
||||
|
||||
|
||||
def write_auv_json(
|
||||
mcap_json_path: Path,
|
||||
output_path: Path,
|
||||
auv_id: str,
|
||||
sortie_id: str,
|
||||
date: str,
|
||||
) -> None:
|
||||
signals = _read_mcap_signals(mcap_json_path)
|
||||
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
|
||||
meta = {
|
||||
"sortie": sortie_id,
|
||||
"date": date,
|
||||
"vehicle": auv_id,
|
||||
"t_start": min(t_all) if t_all else 0,
|
||||
"t_end": max(t_all) if t_all else 0,
|
||||
}
|
||||
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
|
||||
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with gzip.open(output_path, "wb") as f:
|
||||
f.write(payload)
|
||||
149
pipeline_runner/runner.py
Normal file
149
pipeline_runner/runner.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import asyncio
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR
|
||||
from .processor import write_auv_json, write_usv_json
|
||||
|
||||
|
||||
async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""):
|
||||
await queue.put({"step": step, "pct": pct, "msg": msg})
|
||||
|
||||
|
||||
def _find_nav_log(ship_dir: Path) -> Path | None:
|
||||
for p in ship_dir.glob("*_navigation_log.csv"):
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def _find_usbl_csv(ship_dir: Path) -> Path | None:
|
||||
for p in ship_dir.glob("*_usbl.csv"):
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def _detect_auvs(sub_dir: Path) -> list[str]:
|
||||
"""Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010' → 'AUV010')."""
|
||||
auvs = []
|
||||
for d in sub_dir.iterdir():
|
||||
if d.is_dir():
|
||||
m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE)
|
||||
if m:
|
||||
auvs.append(m.group(1).upper())
|
||||
return sorted(set(auvs))
|
||||
|
||||
|
||||
def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None:
|
||||
for d in sub_dir.iterdir():
|
||||
if d.is_dir() and auv_id.upper() in d.name.upper():
|
||||
return d
|
||||
return None
|
||||
|
||||
|
||||
def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess:
|
||||
cmd = ["python3", str(TOOLS_DIR / script_name)] + args
|
||||
return subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
|
||||
async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None:
|
||||
"""Full pipeline: rclone sync → parse → process → write JSON.gz"""
|
||||
raw_dir = OUTPUT_DIR / sortie_id / "raw"
|
||||
proc_dir = OUTPUT_DIR / sortie_id / "processed"
|
||||
proc_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Step 1: rclone sync
|
||||
await _emit(queue, "sync", 0, "rclone sync en cours…")
|
||||
gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}'
|
||||
result = subprocess.run(
|
||||
["rclone", "sync", gdrive_path, str(raw_dir), "--progress"],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
if result.returncode != 0:
|
||||
await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}")
|
||||
return
|
||||
await _emit(queue, "sync", 50, "rclone terminé")
|
||||
|
||||
# Detect SHIP/SUB dirs inside raw/
|
||||
ship_dir = next(raw_dir.rglob("logs/SHIP"), None)
|
||||
sub_dir = next(raw_dir.rglob("logs/SUB"), None)
|
||||
|
||||
if not ship_dir or not ship_dir.exists():
|
||||
await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync")
|
||||
return
|
||||
|
||||
# Step 2: USV parsing
|
||||
await _emit(queue, "usv_parse", 55, "Parsing logs USV…")
|
||||
nav_log = _find_nav_log(ship_dir)
|
||||
usbl_csv_raw = _find_usbl_csv(ship_dir)
|
||||
usbl_parsed_path = proc_dir / "combined_usbl.csv"
|
||||
|
||||
if usbl_csv_raw:
|
||||
_run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)])
|
||||
|
||||
date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10]
|
||||
if nav_log:
|
||||
write_usv_json(
|
||||
nav_log_path=nav_log,
|
||||
usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None,
|
||||
output_path=proc_dir / "usv.json.gz",
|
||||
sortie_id=sortie_id,
|
||||
date=date_str,
|
||||
)
|
||||
await _emit(queue, "usv_parse", 70, "USV OK")
|
||||
|
||||
# Step 3: AUV parsing
|
||||
if sub_dir and sub_dir.exists():
|
||||
auv_ids = _detect_auvs(sub_dir)
|
||||
total = len(auv_ids) or 1
|
||||
for i, auv_id in enumerate(auv_ids):
|
||||
await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}…")
|
||||
session_dir = _detect_session_dir(sub_dir, auv_id)
|
||||
if not session_dir:
|
||||
continue
|
||||
mcap_out = proc_dir / f"mcap_{auv_id}.json"
|
||||
_run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"])
|
||||
# extract_mcap_signals writes to tools/../output/mcap_signals.json (hardcoded)
|
||||
default_out = TOOLS_DIR.parent / "output" / "mcap_signals.json"
|
||||
if not default_out.exists():
|
||||
default_out = session_dir / "mcap_signals.json"
|
||||
if default_out.exists():
|
||||
shutil.copy(default_out, mcap_out)
|
||||
if mcap_out.exists():
|
||||
write_auv_json(
|
||||
mcap_json_path=mcap_out,
|
||||
output_path=proc_dir / f"auv_{auv_id}.json.gz",
|
||||
auv_id=auv_id,
|
||||
sortie_id=sortie_id,
|
||||
date=date_str,
|
||||
)
|
||||
await _emit(queue, "write", 100, "Pipeline terminé")
|
||||
|
||||
|
||||
def scan_sorties() -> list[dict]:
|
||||
"""List available sorties on GDrive via rclone lsd (lent ~30s)."""
|
||||
result = subprocess.run(
|
||||
["rclone", "lsd", GDRIVE_REMOTE],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
sorties = []
|
||||
for line in result.stdout.splitlines():
|
||||
parts = line.strip().split(None, 4)
|
||||
if len(parts) == 5:
|
||||
name = parts[4].strip()
|
||||
processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists()
|
||||
sorties.append({"id": name, "processed": processed})
|
||||
return sorties
|
||||
|
||||
|
||||
def scan_sorties_local() -> list[dict]:
|
||||
"""List sorties already synced locally in OUTPUT_DIR (instantané, pas rclone)."""
|
||||
if not OUTPUT_DIR.exists():
|
||||
return []
|
||||
sorties = []
|
||||
for d in sorted(OUTPUT_DIR.iterdir()):
|
||||
if d.is_dir():
|
||||
processed = (d / "processed" / "usv.json.gz").exists()
|
||||
sorties.append({"id": d.name, "processed": processed})
|
||||
return sorties
|
||||
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
fastapi==0.115.0
|
||||
uvicorn[standard]==0.30.6
|
||||
aiofiles==24.1.0
|
||||
numpy==2.1.1
|
||||
BIN
screenshots/viewer-v5-wait.png
Normal file
BIN
screenshots/viewer-v5-wait.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 116 KiB |
BIN
screenshots/viewer-v5.png
Normal file
BIN
screenshots/viewer-v5.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 37 KiB |
@@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Extract AUV signals from MCAP files: depth, PWM, state."""
|
||||
import argparse, glob, json, os, sys
|
||||
import argparse, glob, json, math, os, sys
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
@@ -26,7 +26,16 @@ def main():
|
||||
depth_raw = []
|
||||
pwm_raw = []
|
||||
state_raw = []
|
||||
TOPICS = ['/mavros/imu/static_pressure', '/mavros/rc/out', '/mavros/state']
|
||||
signals = {}
|
||||
TOPICS = [
|
||||
'/mavros/imu/static_pressure',
|
||||
'/mavros/rc/out',
|
||||
'/mavros/state',
|
||||
'/mavros/imu/data',
|
||||
'/mavros/altitude',
|
||||
'/mavros/battery',
|
||||
'/mavros/distance_sensor/hrlv_ez4_pub',
|
||||
]
|
||||
|
||||
for mcap_file in mcap_files:
|
||||
try:
|
||||
@@ -53,6 +62,40 @@ def main():
|
||||
state_raw.append({'t': t_ms, 'mode': str(ros_msg.mode), 'armed': bool(ros_msg.armed)})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/imu/data':
|
||||
try:
|
||||
q = ros_msg.orientation
|
||||
sinr = 2*(q.w*q.x + q.y*q.z)
|
||||
cosr = 1 - 2*(q.x*q.x + q.y*q.y)
|
||||
roll = math.degrees(math.atan2(sinr, cosr))
|
||||
sinp = 2*(q.w*q.y - q.z*q.x)
|
||||
pitch = math.degrees(math.asin(max(-1, min(1, sinp))))
|
||||
siny = 2*(q.w*q.z + q.x*q.y)
|
||||
cosy = 1 - 2*(q.y*q.y + q.z*q.z)
|
||||
yaw = math.degrees(math.atan2(siny, cosy))
|
||||
signals.setdefault('pitch', []).append({'t': t_ms, 'v': pitch})
|
||||
signals.setdefault('roll', []).append({'t': t_ms, 'v': roll})
|
||||
signals.setdefault('yaw', []).append({'t': t_ms, 'v': yaw})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/altitude':
|
||||
try:
|
||||
signals.setdefault('altitude', []).append(
|
||||
{'t': t_ms, 'v': ros_msg.relative})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/battery':
|
||||
try:
|
||||
signals.setdefault('battery_v', []).append(
|
||||
{'t': t_ms, 'v': ros_msg.voltage})
|
||||
except Exception:
|
||||
pass
|
||||
elif topic == '/mavros/distance_sensor/hrlv_ez4_pub':
|
||||
try:
|
||||
signals.setdefault('obstacle_dist', []).append(
|
||||
{'t': t_ms, 'v': ros_msg.range})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f" Skip {os.path.basename(mcap_file)}: {e}")
|
||||
|
||||
@@ -69,7 +112,8 @@ def main():
|
||||
pwm_samples = sample(pwm_raw, args.max_pts)
|
||||
state = state_raw # events, keep all
|
||||
|
||||
all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw]
|
||||
signals_flat = [pt for pts in signals.values() for pt in pts]
|
||||
all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw + signals_flat]
|
||||
t_min = min(all_t) if all_t else 0
|
||||
t_max = max(all_t) if all_t else 0
|
||||
|
||||
@@ -94,6 +138,7 @@ def main():
|
||||
'depth': depth,
|
||||
'pwm_auv': {'channels': channels, 'samples': pwm_samples},
|
||||
'state': state,
|
||||
**{k: sample(v, args.max_pts) for k, v in signals.items()},
|
||||
}
|
||||
|
||||
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
|
||||
|
||||
1
vendor/Kogger-Protocol
vendored
Submodule
1
vendor/Kogger-Protocol
vendored
Submodule
Submodule vendor/Kogger-Protocol added at d62576fee5
BIN
vendor/Kogger-Protocol/Kogger SB protocol.odt
vendored
BIN
vendor/Kogger-Protocol/Kogger SB protocol.odt
vendored
Binary file not shown.
BIN
vendor/Kogger-Protocol/Kogger SB protocol.pdf
vendored
BIN
vendor/Kogger-Protocol/Kogger SB protocol.pdf
vendored
Binary file not shown.
51
vendor/Kogger-Protocol/README.md
vendored
51
vendor/Kogger-Protocol/README.md
vendored
@@ -1,51 +0,0 @@
|
||||
# Open Serial Binary Protocol (SBP) specification
|
||||
|
||||
## Protocol frame structure
|
||||
|
||||
SYNC1 | SYNC2 | ROUTE | MODE | ID | LENGTH | PAYLOAD | CHECK1 | CHECK2|
|
||||
|----------|----------|----------|----------|----------|----------|----------|----------|----------|
|
||||
U1 | U1 | U1 | U1 | U1 | U1 | BYTE[LENGTH] | U1 | U1 |
|
||||
0xBB | 0x55 | BITFIELD | BITFIELD | 1 … 255 | 0 … 128 | BYTEARRAY | 0 … 0xFF | 0 … 0xFF |
|
||||
|
||||
### ROUTE
|
||||
Name | Bits | Description
|
||||
|----------|----------|----------|
|
||||
DEV_ADDRESS | 0:3 bit | Device address. Default and broadcast address is 0x0.
|
||||
RESERVED | 2 bit | Reserved
|
||||
|
||||
### MODE
|
||||
Name | Bits | Description
|
||||
|----------|----------|----------|
|
||||
TYPE | 0:1 bit | 0 — Reserved, 1 — CONTENT: DEVICE → HOST, 2 — SETTING: HOST → DEVICE, 3 —GETTING: HOST → DEVICE
|
||||
RESERVED | 2 bit | Reserved
|
||||
VERSION | 3:5 bit | Field defines the payload data version
|
||||
MARK | 6 bit | Once device is switched on, this flag is always in reset state (ZERO). It can be set to active state (ONE) by the host (see the CMD_MARK command) and the slave device keeps the flag in active state in every frame until hardware reset occurs or is reset by the host. Therefore the host monitors the device's actual settings.
|
||||
RESPONSE | 7 bit | HOST → DEVICE: Set the flag to active state (ONE) in order to get the result of processing the command. The flag doesn't affect the response if one is provided by the TYPE field. DEVICE → HOST: The flag is in reset state (ZERO) by default. Payload goes according to the command specification. If flag is set, the payload contains the result of command processing (see CMD_RESP command).
|
||||
|
||||
## Checksum
|
||||
The checksum algorithm used is the Fletcher-16.
|
||||
Example source code for calculating the checksum:
|
||||
```
|
||||
uint8_t CHECK1 = 0;
|
||||
uint8_t CHECK2 = 0;
|
||||
void CheckSumUpdate(uint8_t byte) {
|
||||
CHECK1 += byte;
|
||||
CHECK2 += CHECK1;
|
||||
}
|
||||
```
|
||||
|
||||
## Number Formats
|
||||
- Multi-byte values are ordered in Little Endian format
|
||||
- Floating point values are transmitted in IEEE754 single or double precision
|
||||
- bit-field in LSB format
|
||||
|
||||
Name | Type | Size (Bytes) | Range
|
||||
|----------|----------|----------|----------|
|
||||
S1 | int8_t | 1 | -128 ... 127
|
||||
U1 | uint8_t | 1 | 0 … 255
|
||||
S2 | int16_t | 2 | -32768 … 32767
|
||||
U2 | uint16_t | 2 | 0 … 65535
|
||||
S4 | int32_t | 4 | -2'147'483'648 ... 2'147'483'647
|
||||
U4 | uint32_t | 4 | 0 … 4'294'967'295
|
||||
F4 | float | 4 | -1*2^+127 ... 2^+127
|
||||
D8 | double | 8 | -1*2^+1023 ... 2^+1023
|
||||
1218
viewer/index.html
1218
viewer/index.html
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user