diff --git a/pipeline_runner/runner.py b/pipeline_runner/runner.py new file mode 100644 index 0000000..db70dc3 --- /dev/null +++ b/pipeline_runner/runner.py @@ -0,0 +1,137 @@ +import asyncio +import re +import shutil +import subprocess +from pathlib import Path + +from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR +from .processor import write_auv_json, write_usv_json + + +async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""): + await queue.put({"step": step, "pct": pct, "msg": msg}) + + +def _find_nav_log(ship_dir: Path) -> Path | None: + for p in ship_dir.glob("*_navigation_log.csv"): + return p + return None + + +def _find_usbl_csv(ship_dir: Path) -> Path | None: + for p in ship_dir.glob("*_usbl.csv"): + return p + return None + + +def _detect_auvs(sub_dir: Path) -> list[str]: + """Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010' → 'AUV010').""" + auvs = [] + for d in sub_dir.iterdir(): + if d.is_dir(): + m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE) + if m: + auvs.append(m.group(1).upper()) + return sorted(set(auvs)) + + +def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None: + for d in sub_dir.iterdir(): + if d.is_dir() and auv_id.upper() in d.name.upper(): + return d + return None + + +def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess: + cmd = ["python3", str(TOOLS_DIR / script_name)] + args + return subprocess.run(cmd, capture_output=True, text=True) + + +async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None: + """Full pipeline: rclone sync → parse → process → write JSON.gz""" + raw_dir = OUTPUT_DIR / sortie_id / "raw" + proc_dir = OUTPUT_DIR / sortie_id / "processed" + proc_dir.mkdir(parents=True, exist_ok=True) + + # Step 1: rclone sync + await _emit(queue, "sync", 0, "rclone sync en cours…") + gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}' + result = subprocess.run( + ["rclone", "sync", gdrive_path, str(raw_dir), "--progress"], + capture_output=True, text=True + ) + if result.returncode != 0: + await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}") + return + await _emit(queue, "sync", 50, "rclone terminé") + + # Detect SHIP/SUB dirs inside raw/ + ship_dir = next(raw_dir.rglob("logs/SHIP"), None) + sub_dir = next(raw_dir.rglob("logs/SUB"), None) + + if not ship_dir or not ship_dir.exists(): + await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync") + return + + # Step 2: USV parsing + await _emit(queue, "usv_parse", 55, "Parsing logs USV…") + nav_log = _find_nav_log(ship_dir) + usbl_csv_raw = _find_usbl_csv(ship_dir) + usbl_parsed_path = proc_dir / "combined_usbl.csv" + + if usbl_csv_raw: + _run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)]) + + date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10] + if nav_log: + write_usv_json( + nav_log_path=nav_log, + usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None, + output_path=proc_dir / "usv.json.gz", + sortie_id=sortie_id, + date=date_str, + ) + await _emit(queue, "usv_parse", 70, "USV OK") + + # Step 3: AUV parsing + if sub_dir and sub_dir.exists(): + auv_ids = _detect_auvs(sub_dir) + total = len(auv_ids) or 1 + for i, auv_id in enumerate(auv_ids): + await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}…") + session_dir = _detect_session_dir(sub_dir, auv_id) + if not session_dir: + continue + mcap_out = proc_dir / f"mcap_{auv_id}.json" + _run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"]) + # extract_mcap_signals writes to tools/../output/mcap_signals.json (hardcoded) + default_out = TOOLS_DIR.parent / "output" / "mcap_signals.json" + if not default_out.exists(): + default_out = session_dir / "mcap_signals.json" + if default_out.exists(): + shutil.copy(default_out, mcap_out) + if mcap_out.exists(): + write_auv_json( + mcap_json_path=mcap_out, + output_path=proc_dir / f"auv_{auv_id}.json.gz", + auv_id=auv_id, + sortie_id=sortie_id, + date=date_str, + ) + await _emit(queue, "write", 100, "Pipeline terminé") + + +async def scan_sorties() -> list[dict]: + """List available sorties on GDrive via rclone lsd.""" + result = subprocess.run( + ["rclone", "lsd", GDRIVE_REMOTE], + capture_output=True, text=True + ) + sorties = [] + for line in result.stdout.splitlines(): + parts = line.strip().split(None, 4) + if len(parts) == 5: + name = parts[4].strip() + processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists() + sorties.append({"id": name, "processed": processed}) + return sorties