feat: pipeline-runner — runner rclone + orchestration
This commit is contained in:
137
pipeline_runner/runner.py
Normal file
137
pipeline_runner/runner.py
Normal file
@@ -0,0 +1,137 @@
|
||||
import asyncio
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR
|
||||
from .processor import write_auv_json, write_usv_json
|
||||
|
||||
|
||||
async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""):
|
||||
await queue.put({"step": step, "pct": pct, "msg": msg})
|
||||
|
||||
|
||||
def _find_nav_log(ship_dir: Path) -> Path | None:
|
||||
for p in ship_dir.glob("*_navigation_log.csv"):
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def _find_usbl_csv(ship_dir: Path) -> Path | None:
|
||||
for p in ship_dir.glob("*_usbl.csv"):
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def _detect_auvs(sub_dir: Path) -> list[str]:
|
||||
"""Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010' → 'AUV010')."""
|
||||
auvs = []
|
||||
for d in sub_dir.iterdir():
|
||||
if d.is_dir():
|
||||
m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE)
|
||||
if m:
|
||||
auvs.append(m.group(1).upper())
|
||||
return sorted(set(auvs))
|
||||
|
||||
|
||||
def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None:
|
||||
for d in sub_dir.iterdir():
|
||||
if d.is_dir() and auv_id.upper() in d.name.upper():
|
||||
return d
|
||||
return None
|
||||
|
||||
|
||||
def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess:
|
||||
cmd = ["python3", str(TOOLS_DIR / script_name)] + args
|
||||
return subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
|
||||
async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None:
|
||||
"""Full pipeline: rclone sync → parse → process → write JSON.gz"""
|
||||
raw_dir = OUTPUT_DIR / sortie_id / "raw"
|
||||
proc_dir = OUTPUT_DIR / sortie_id / "processed"
|
||||
proc_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Step 1: rclone sync
|
||||
await _emit(queue, "sync", 0, "rclone sync en cours…")
|
||||
gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}'
|
||||
result = subprocess.run(
|
||||
["rclone", "sync", gdrive_path, str(raw_dir), "--progress"],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
if result.returncode != 0:
|
||||
await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}")
|
||||
return
|
||||
await _emit(queue, "sync", 50, "rclone terminé")
|
||||
|
||||
# Detect SHIP/SUB dirs inside raw/
|
||||
ship_dir = next(raw_dir.rglob("logs/SHIP"), None)
|
||||
sub_dir = next(raw_dir.rglob("logs/SUB"), None)
|
||||
|
||||
if not ship_dir or not ship_dir.exists():
|
||||
await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync")
|
||||
return
|
||||
|
||||
# Step 2: USV parsing
|
||||
await _emit(queue, "usv_parse", 55, "Parsing logs USV…")
|
||||
nav_log = _find_nav_log(ship_dir)
|
||||
usbl_csv_raw = _find_usbl_csv(ship_dir)
|
||||
usbl_parsed_path = proc_dir / "combined_usbl.csv"
|
||||
|
||||
if usbl_csv_raw:
|
||||
_run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)])
|
||||
|
||||
date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10]
|
||||
if nav_log:
|
||||
write_usv_json(
|
||||
nav_log_path=nav_log,
|
||||
usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None,
|
||||
output_path=proc_dir / "usv.json.gz",
|
||||
sortie_id=sortie_id,
|
||||
date=date_str,
|
||||
)
|
||||
await _emit(queue, "usv_parse", 70, "USV OK")
|
||||
|
||||
# Step 3: AUV parsing
|
||||
if sub_dir and sub_dir.exists():
|
||||
auv_ids = _detect_auvs(sub_dir)
|
||||
total = len(auv_ids) or 1
|
||||
for i, auv_id in enumerate(auv_ids):
|
||||
await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}…")
|
||||
session_dir = _detect_session_dir(sub_dir, auv_id)
|
||||
if not session_dir:
|
||||
continue
|
||||
mcap_out = proc_dir / f"mcap_{auv_id}.json"
|
||||
_run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"])
|
||||
# extract_mcap_signals writes to tools/../output/mcap_signals.json (hardcoded)
|
||||
default_out = TOOLS_DIR.parent / "output" / "mcap_signals.json"
|
||||
if not default_out.exists():
|
||||
default_out = session_dir / "mcap_signals.json"
|
||||
if default_out.exists():
|
||||
shutil.copy(default_out, mcap_out)
|
||||
if mcap_out.exists():
|
||||
write_auv_json(
|
||||
mcap_json_path=mcap_out,
|
||||
output_path=proc_dir / f"auv_{auv_id}.json.gz",
|
||||
auv_id=auv_id,
|
||||
sortie_id=sortie_id,
|
||||
date=date_str,
|
||||
)
|
||||
await _emit(queue, "write", 100, "Pipeline terminé")
|
||||
|
||||
|
||||
async def scan_sorties() -> list[dict]:
|
||||
"""List available sorties on GDrive via rclone lsd."""
|
||||
result = subprocess.run(
|
||||
["rclone", "lsd", GDRIVE_REMOTE],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
sorties = []
|
||||
for line in result.stdout.splitlines():
|
||||
parts = line.strip().split(None, 4)
|
||||
if len(parts) == 5:
|
||||
name = parts[4].strip()
|
||||
processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists()
|
||||
sorties.append({"id": name, "processed": processed})
|
||||
return sorties
|
||||
Reference in New Issue
Block a user