diff --git a/pipeline/stages/01_select_mission.py b/pipeline/stages/01_select_mission.py new file mode 100755 index 0000000..2c81099 --- /dev/null +++ b/pipeline/stages/01_select_mission.py @@ -0,0 +1,304 @@ +#!/usr/bin/env python3 +"""Stage 01 — Select mission and produce a raw manifest. + +Scans `//raw_data/` and writes +`//01_manifest.json` listing all AUVs, GoPro folders, MCAP bags, +BIN files, USBL logs. + +Usage: + python3 01_select_mission.py \ + --mission 20260505-Lepradet \ + --ssd-base /mnt/ssd \ + --out /home/cosma/cosma-qc/data +""" +from __future__ import annotations + +import argparse +import json +import re +import sys +from collections import defaultdict +from datetime import datetime, timezone +from pathlib import Path + +# Local helper for stage time/memory tracking +sys.path.insert(0, str(Path(__file__).parent)) +from _meta import track_stage # noqa: E402 + +# AUV physical (GoPro) and AUV MCAP IDs — pattern matching only, never write to SSD. +AUV_PHYS_RE = re.compile(r"AUV(\d{3})", re.I) +GP_FOLDER_RE = re.compile(r"^GP(?P\d+)[_-]AUV(?P\d{3})$", re.I) +BAG_AUV_RE = re.compile(r"_AUV(?P\d{3})(?:[_/]|$)", re.I) +USBL_FILE_RE = re.compile(r"usbl", re.I) + + +def iso_utc_now() -> str: + return datetime.now(timezone.utc).isoformat(timespec="seconds") + + +def safe_listdir(p: Path) -> list[Path]: + if not p.exists() or not p.is_dir(): + return [] + try: + return sorted(p.iterdir()) + except OSError: + return [] + + +def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int, float]: + """videos[AUV][GP1|GP2] -> sorted list of MP4 filenames. Two layouts tried.""" + videos: dict[str, dict[str, list[str]]] = defaultdict(lambda: defaultdict(list)) + total_n = 0 + total_bytes = 0 + + # Layout A: medias/videos/GP{n}{_|-}AUV{xxx}/*.MP4 + vroot_a = ssd_path / "medias" / "videos" + for sub in safe_listdir(vroot_a): + if not sub.is_dir(): + continue + m = GP_FOLDER_RE.match(sub.name) + if not m: + continue + gp_key = f"GP{int(m.group('gp'))}" + auv_id = f"AUV{int(m.group('auv')):03d}" + for f in safe_listdir(sub): + if f.is_file() and f.suffix.upper() == ".MP4": + videos[auv_id][gp_key].append(f.name) + total_n += 1 + try: + total_bytes += f.stat().st_size + except OSError: + pass + + # Layout B: medias/videos/AUV{xxx}/GP{n}/*.MP4 (fallback if anything found) + for sub in safe_listdir(vroot_a): + if not sub.is_dir(): + continue + am = AUV_PHYS_RE.fullmatch(sub.name) + if not am: + continue + auv_id = f"AUV{int(am.group(1)):03d}" + for gp_sub in safe_listdir(sub): + if not gp_sub.is_dir(): + continue + gm = re.fullmatch(r"GP(\d+)", gp_sub.name, re.I) + if not gm: + continue + gp_key = f"GP{int(gm.group(1))}" + for f in safe_listdir(gp_sub): + if f.is_file() and f.suffix.upper() == ".MP4": + videos[auv_id][gp_key].append(f.name) + total_n += 1 + try: + total_bytes += f.stat().st_size + except OSError: + pass + + # Sort + dedup + out: dict[str, dict[str, list[str]]] = {} + for auv in sorted(videos.keys()): + out[auv] = {} + for gp in sorted(videos[auv].keys()): + out[auv][gp] = sorted(set(videos[auv][gp])) + return out, total_n, total_bytes / 1e9 + + +def collect_mcap_bags(ssd_path: Path) -> tuple[dict[str, list[str]], int]: + """mcap_bags[AUV{nnn}] -> sorted list of bag-dir names.""" + bags: dict[str, list[str]] = defaultdict(list) + n = 0 + broot = ssd_path / "logs" / "SUB" / "bag" + for sub in safe_listdir(broot): + if not sub.is_dir(): + continue + m = BAG_AUV_RE.search(sub.name) + if not m: + continue + auv_id = f"AUV{int(m.group('auv')):03d}" + bags[auv_id].append(sub.name) + n += 1 + return {k: sorted(set(v)) for k, v in sorted(bags.items())}, n + + +def collect_bin_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]: + """bin_files[AUV{nnn}] -> sorted .BIN filenames.""" + bins: dict[str, list[str]] = defaultdict(list) + n = 0 + # Layout: logs/SUB/AUV{xxx}/*.BIN + sub_root = ssd_path / "logs" / "SUB" + for sub in safe_listdir(sub_root): + if not sub.is_dir(): + continue + am = AUV_PHYS_RE.fullmatch(sub.name) + if not am: + continue + auv_id = f"AUV{int(am.group(1)):03d}" + for f in safe_listdir(sub): + if f.is_file() and f.suffix.upper() == ".BIN": + bins[auv_id].append(f.name) + n += 1 + # Also try logs/SUB/bin/*.BIN (flat layout) + flat = ssd_path / "logs" / "SUB" / "bin" + for f in safe_listdir(flat): + if f.is_file() and f.suffix.upper() == ".BIN": + bins.setdefault("AUV000", []).append(f.name) + n += 1 + return {k: sorted(set(v)) for k, v in sorted(bins.items())}, n + + +def collect_usbl_logs(ssd_path: Path) -> list[str]: + """Return list of USBL csv paths relative to ssd_path.""" + out = [] + logs_root = ssd_path / "logs" + if not logs_root.exists(): + return out + for p in logs_root.rglob("*"): + try: + if not p.is_file(): + continue + except OSError: + continue + name_l = p.name.lower() + if "usbl" in name_l and name_l.endswith(".csv"): + try: + out.append(str(p.relative_to(ssd_path))) + except ValueError: + out.append(str(p)) + return sorted(set(out)) + + +def collect_audio_logs(ssd_path: Path) -> list[str]: + out = [] + aud = ssd_path / "audios" + for p in aud.rglob("*") if aud.exists() else []: + try: + if p.is_file(): + out.append(str(p.relative_to(ssd_path))) + except (OSError, ValueError): + continue + return sorted(out) + + + +def collect_sss_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]: + """klf + bin under ssd_path/sss/, recursive.""" + sss_root = ssd_path / "sss" + klf: list[str] = [] + bin_: list[str] = [] + if sss_root.exists(): + try: + for p in sss_root.rglob("*.klf"): + try: + if p.is_file(): + klf.append(str(p.relative_to(ssd_path))) + except (OSError, ValueError): + continue + for p in sss_root.rglob("*.bin"): + try: + if p.is_file(): + bin_.append(str(p.relative_to(ssd_path))) + except (OSError, ValueError): + continue + except OSError: + pass + result = {"klf": sorted(set(klf)), "bin": sorted(set(bin_))} + return result, len(result["klf"]) + len(result["bin"]) + + +def collect_mag_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]: + """csv under ssd_path/mag/, categorised ar/av/side/other.""" + mag_root = ssd_path / "mag" + out: dict[str, list[str]] = {"ar": [], "av": [], "side": [], "other": []} + if mag_root.exists(): + try: + for p in mag_root.rglob("*.csv"): + try: + if not p.is_file(): + continue + rel = str(p.relative_to(ssd_path)) + parts = p.parts + if "ar" in parts: + out["ar"].append(rel) + elif "av" in parts: + out["av"].append(rel) + elif "side" in parts: + out["side"].append(rel) + else: + out["other"].append(rel) + except (OSError, ValueError): + continue + except OSError: + pass + result = {k: sorted(set(v)) for k, v in out.items()} + total = sum(len(v) for v in result.values()) + return result, total + +def build_manifest(mission: str, ssd_base: Path) -> dict: + ssd_path = ssd_base / mission / "raw_data" + if not ssd_path.exists(): + raise FileNotFoundError(f"raw_data not found: {ssd_path}") + + videos, n_vids, total_gb = collect_videos(ssd_path) + mcap_bags, n_bags = collect_mcap_bags(ssd_path) + bin_files, n_bins = collect_bin_files(ssd_path) + usbl_logs = collect_usbl_logs(ssd_path) + audio_logs = collect_audio_logs(ssd_path) + sss_files, n_sss = collect_sss_files(ssd_path) + mag_files, n_mag = collect_mag_files(ssd_path) + + return { + "mission": mission, + "generated_at": iso_utc_now(), + "ssd_path": str(ssd_path), + "videos": videos, + "mcap_bags": mcap_bags, + "bin_files": bin_files, + "usbl_logs": usbl_logs, + "audio_logs": audio_logs, + "sss_files": sss_files, + "mag_files": mag_files, + "totals": { + "n_videos": n_vids, + "n_mcap_bags": n_bags, + "n_bin_files": n_bins, + "n_usbl_logs": len(usbl_logs), + "n_audio_logs": len(audio_logs), + "n_sss_files": n_sss, + "n_mag_files": n_mag, + "total_video_size_gb": round(total_gb, 2), + }, + } + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description="Stage 01 — select mission, produce raw manifest.") + ap.add_argument("--mission", required=True, help="Mission folder name (e.g. 20260505-Lepradet)") + ap.add_argument("--ssd-base", default="/mnt/ssd", help="SSD base path (default /mnt/ssd)") + ap.add_argument("--out", default="/home/cosma/cosma-qc/data", + help="Output base dir (manifest written to //01_manifest.json)") + args = ap.parse_args(argv) + + ssd_base = Path(args.ssd_base) + out_base = Path(args.out) + out_dir = out_base / args.mission + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "01_manifest.json" + + with track_stage("01_select_mission", args.mission, out_dir, + output_files=[out_path]): + manifest = build_manifest(args.mission, ssd_base) + tmp = out_path.with_suffix(".json.tmp") + tmp.write_text(json.dumps(manifest, indent=2, ensure_ascii=False)) + tmp.replace(out_path) + + t = manifest["totals"] + print(f"[ok] {out_path}") + print(f" videos={t['n_videos']} ({t['total_video_size_gb']} GB) " + f"bags={t['n_mcap_bags']} bins={t['n_bin_files']} " + f"usbl={t['n_usbl_logs']} audio={t['n_audio_logs']} " + f"sss={t['n_sss_files']} mag={t['n_mag_files']}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())