import asyncio import re import shutil import subprocess from pathlib import Path from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR from .processor import write_auv_json, write_usv_json async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""): await queue.put({"step": step, "pct": pct, "msg": msg}) def _find_nav_log(ship_dir: Path) -> Path | None: for p in ship_dir.glob("*_navigation_log.csv"): return p return None def _find_usbl_csv(ship_dir: Path) -> Path | None: for p in ship_dir.glob("*_usbl.csv"): return p return None def _detect_auvs(sub_dir: Path) -> list[str]: """Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010' → 'AUV010').""" auvs = [] for d in sub_dir.iterdir(): if d.is_dir(): m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE) if m: auvs.append(m.group(1).upper()) return sorted(set(auvs)) def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None: for d in sub_dir.iterdir(): if d.is_dir() and auv_id.upper() in d.name.upper(): return d return None def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess: cmd = ["python3", str(TOOLS_DIR / script_name)] + args return subprocess.run(cmd, capture_output=True, text=True) async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None: """Full pipeline: rclone sync → parse → process → write JSON.gz""" raw_dir = OUTPUT_DIR / sortie_id / "raw" proc_dir = OUTPUT_DIR / sortie_id / "processed" proc_dir.mkdir(parents=True, exist_ok=True) # Step 1: rclone sync await _emit(queue, "sync", 0, "rclone sync en cours…") gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}' result = subprocess.run( ["rclone", "sync", gdrive_path, str(raw_dir), "--progress"], capture_output=True, text=True ) if result.returncode != 0: await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}") return await _emit(queue, "sync", 50, "rclone terminé") # Detect SHIP/SUB dirs inside raw/ ship_dir = next(raw_dir.rglob("logs/SHIP"), None) sub_dir = next(raw_dir.rglob("logs/SUB"), None) if not ship_dir or not ship_dir.exists(): await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync") return # Step 2: USV parsing await _emit(queue, "usv_parse", 55, "Parsing logs USV…") nav_log = _find_nav_log(ship_dir) usbl_csv_raw = _find_usbl_csv(ship_dir) usbl_parsed_path = proc_dir / "combined_usbl.csv" if usbl_csv_raw: _run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)]) date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10] if nav_log: write_usv_json( nav_log_path=nav_log, usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None, output_path=proc_dir / "usv.json.gz", sortie_id=sortie_id, date=date_str, ) await _emit(queue, "usv_parse", 70, "USV OK") # Step 3: AUV parsing if sub_dir and sub_dir.exists(): auv_ids = _detect_auvs(sub_dir) total = len(auv_ids) or 1 for i, auv_id in enumerate(auv_ids): await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}…") session_dir = _detect_session_dir(sub_dir, auv_id) if not session_dir: continue mcap_out = proc_dir / f"mcap_{auv_id}.json" _run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"]) # extract_mcap_signals writes to tools/../output/mcap_signals.json (hardcoded) default_out = TOOLS_DIR.parent / "output" / "mcap_signals.json" if not default_out.exists(): default_out = session_dir / "mcap_signals.json" if default_out.exists(): shutil.copy(default_out, mcap_out) if mcap_out.exists(): write_auv_json( mcap_json_path=mcap_out, output_path=proc_dir / f"auv_{auv_id}.json.gz", auv_id=auv_id, sortie_id=sortie_id, date=date_str, ) await _emit(queue, "write", 100, "Pipeline terminé") async def scan_sorties() -> list[dict]: """List available sorties on GDrive via rclone lsd (lent ~30s).""" result = subprocess.run( ["rclone", "lsd", GDRIVE_REMOTE], capture_output=True, text=True ) sorties = [] for line in result.stdout.splitlines(): parts = line.strip().split(None, 4) if len(parts) == 5: name = parts[4].strip() processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists() sorties.append({"id": name, "processed": processed}) return sorties def scan_sorties_local() -> list[dict]: """List sorties already synced locally in OUTPUT_DIR (instantané, pas rclone).""" if not OUTPUT_DIR.exists(): return [] sorties = [] for d in sorted(OUTPUT_DIR.iterdir()): if d.is_dir(): processed = (d / "processed" / "usv.json.gz").exists() sorties.append({"id": d.name, "processed": processed}) return sorties