Files

150 lines
5.3 KiB
Python

import asyncio
import re
import shutil
import subprocess
from pathlib import Path
from .config import GDRIVE_REMOTE, OUTPUT_DIR, TOOLS_DIR
from .processor import write_auv_json, write_usv_json
async def _emit(queue: asyncio.Queue, step: str, pct: int, msg: str = ""):
await queue.put({"step": step, "pct": pct, "msg": msg})
def _find_nav_log(ship_dir: Path) -> Path | None:
for p in ship_dir.glob("*_navigation_log.csv"):
return p
return None
def _find_usbl_csv(ship_dir: Path) -> Path | None:
for p in ship_dir.glob("*_usbl.csv"):
return p
return None
def _detect_auvs(sub_dir: Path) -> list[str]:
"""Return AUV IDs from SUB/ subfolders (e.g. '20260416_125418_AUV010''AUV010')."""
auvs = []
for d in sub_dir.iterdir():
if d.is_dir():
m = re.search(r"(AUV\d+)", d.name, re.IGNORECASE)
if m:
auvs.append(m.group(1).upper())
return sorted(set(auvs))
def _detect_session_dir(sub_dir: Path, auv_id: str) -> Path | None:
for d in sub_dir.iterdir():
if d.is_dir() and auv_id.upper() in d.name.upper():
return d
return None
def _run_script(script_name: str, args: list[str]) -> subprocess.CompletedProcess:
cmd = ["python3", str(TOOLS_DIR / script_name)] + args
return subprocess.run(cmd, capture_output=True, text=True)
async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None:
"""Full pipeline: rclone sync → parse → process → write JSON.gz"""
raw_dir = OUTPUT_DIR / sortie_id / "raw"
proc_dir = OUTPUT_DIR / sortie_id / "processed"
proc_dir.mkdir(parents=True, exist_ok=True)
# Step 1: rclone sync
await _emit(queue, "sync", 0, "rclone sync en cours…")
gdrive_path = f'{GDRIVE_REMOTE}/{sortie_id}'
result = subprocess.run(
["rclone", "sync", gdrive_path, str(raw_dir), "--progress"],
capture_output=True, text=True
)
if result.returncode != 0:
await _emit(queue, "error", 0, f"rclone: {result.stderr[:200]}")
return
await _emit(queue, "sync", 50, "rclone terminé")
# Detect SHIP/SUB dirs inside raw/
ship_dir = next(raw_dir.rglob("logs/SHIP"), None)
sub_dir = next(raw_dir.rglob("logs/SUB"), None)
if not ship_dir or not ship_dir.exists():
await _emit(queue, "error", 50, "Dossier SHIP introuvable après sync")
return
# Step 2: USV parsing
await _emit(queue, "usv_parse", 55, "Parsing logs USV…")
nav_log = _find_nav_log(ship_dir)
usbl_csv_raw = _find_usbl_csv(ship_dir)
usbl_parsed_path = proc_dir / "combined_usbl.csv"
if usbl_csv_raw:
_run_script("parse_kogger_usbl.py", [str(usbl_csv_raw), "-o", str(usbl_parsed_path)])
date_str = sortie_id.split("/")[-1][:10] if "/" in sortie_id else sortie_id[:10]
if nav_log:
write_usv_json(
nav_log_path=nav_log,
usbl_csv_path=usbl_parsed_path if usbl_parsed_path.exists() else None,
output_path=proc_dir / "usv.json.gz",
sortie_id=sortie_id,
date=date_str,
)
await _emit(queue, "usv_parse", 70, "USV OK")
# Step 3: AUV parsing
if sub_dir and sub_dir.exists():
auv_ids = _detect_auvs(sub_dir)
total = len(auv_ids) or 1
for i, auv_id in enumerate(auv_ids):
await _emit(queue, "auv_parse", 70 + int(20 * i / total), f"AUV {auv_id}")
session_dir = _detect_session_dir(sub_dir, auv_id)
if not session_dir:
continue
mcap_out = proc_dir / f"mcap_{auv_id}.json"
_run_script("extract_mcap_signals.py", ["--session-dir", str(session_dir), "--max-pts", "0"])
# extract_mcap_signals writes to tools/../output/mcap_signals.json (hardcoded)
default_out = TOOLS_DIR.parent / "output" / "mcap_signals.json"
if not default_out.exists():
default_out = session_dir / "mcap_signals.json"
if default_out.exists():
shutil.copy(default_out, mcap_out)
if mcap_out.exists():
write_auv_json(
mcap_json_path=mcap_out,
output_path=proc_dir / f"auv_{auv_id}.json.gz",
auv_id=auv_id,
sortie_id=sortie_id,
date=date_str,
)
await _emit(queue, "write", 100, "Pipeline terminé")
def scan_sorties() -> list[dict]:
"""List available sorties on GDrive via rclone lsd (lent ~30s)."""
result = subprocess.run(
["rclone", "lsd", GDRIVE_REMOTE],
capture_output=True, text=True
)
sorties = []
for line in result.stdout.splitlines():
parts = line.strip().split(None, 4)
if len(parts) == 5:
name = parts[4].strip()
processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists()
sorties.append({"id": name, "processed": processed})
return sorties
def scan_sorties_local() -> list[dict]:
"""List sorties already synced locally in OUTPUT_DIR (instantané, pas rclone)."""
if not OUTPUT_DIR.exists():
return []
sorties = []
for d in sorted(OUTPUT_DIR.iterdir()):
if d.is_dir():
processed = (d / "processed" / "usv.json.gz").exists()
sorties.append({"id": d.name, "processed": processed})
return sorties