diff --git a/pipeline/stages/02_mission_run_detect.py b/pipeline/stages/02_mission_run_detect.py new file mode 100755 index 0000000..b9e3ca4 --- /dev/null +++ b/pipeline/stages/02_mission_run_detect.py @@ -0,0 +1,834 @@ +#!/usr/bin/env python3 +"""Stage 02 — Mission run detection (v4 state-based). + +Détecte runs via transitions /mavros/state (armed + mode) + validation rel_alt. +Fallback depth-only si topic state absent. + +Usage: + python3 02_mission_run_detect.py --mission 20260508-sttropez [--ssd-base /mnt/ssd] [--out data/] [--dry-run] + python3 02_mission_run_detect.py --mission 20260505-Lepradet --out data/ \ + --state-modes ALT_HOLD --require-armed --require-descent \ + --min-mission-depth -2.0 --min-sustained-duration 30 --min-near-bottom-pct 50 --force +""" + +import argparse +import glob +import json +import re +import statistics +import sys +from datetime import datetime, timezone +from pathlib import Path + +from mcap.reader import make_reader +from mcap_ros2.decoder import DecoderFactory + +# Local helper for stage time/memory tracking +sys.path.insert(0, str(Path(__file__).parent)) +from _meta import track_stage # noqa: E402 + +# Mapping ID physique → ID logistique +AUV_PHYSICAL_MAP = { + "AUV010": "AUV210", + "AUV012": "AUV212", + "AUV013": "AUV213", +} + +TOPIC_REL_ALT = "/mavros/global_position/rel_alt" +TOPIC_STATE = "/mavros/state" +THRESHOLD = -0.3 # rel_alt < THRESHOLD → immersé +NEED_STREAK = 30 # secondes consécutives pour confirmer début/fin +MIN_DURATION = 60 # secondes minimum par run +SMOOTH_WINDOW_S = 3 # fenêtre lissage médian (secondes à 1Hz = samples) + +# Default modes considérés comme "mission active" +DEFAULT_STATE_MODES = {"AUTO", "GUIDED"} +# Modes qui TERMINENT un run (si require-armed=False) +STATE_STOP_MODES = {"SURFACE", "MANUAL"} + +# Filtrage "vraie mission" (modifiables via argparse) +DEFAULT_MIN_MISSION_DEPTH = -2.0 # rel_alt doit atteindre ce seuil (m) +DEFAULT_MIN_SUSTAINED_DURATION = 30.0 # pendant au moins N secondes consécutives +DEFAULT_MIN_NEAR_BOTTOM_PCT = 0.0 # % min du run où rel_alt < min_mission_depth (0 = désactivé) + +# Filtrage "avant première plongée" +DEFAULT_FIRST_SUBMERSION_DEPTH = -2.0 # seuil rel_alt pour considérer submergé +DEFAULT_FIRST_SUBMERSION_DURATION = 5.0 # durée min continue (s) pour confirmer submersion + + +# --------------------------------------------------------------------------- +# Lecture MCAP rel_alt +# --------------------------------------------------------------------------- + +def parse_mcap_relalt(mcap_path: Path): + """Lit rel_alt depuis un fichier mcap. Retourne liste de (epoch_s, rel_alt_m).""" + data = [] + try: + with open(mcap_path, "rb") as fp: + reader = make_reader(fp, decoder_factories=[DecoderFactory()]) + for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(topics=[TOPIC_REL_ALT]): + ts = message.log_time / 1e9 + data.append((ts, float(ros_msg.data))) + except Exception as exc: + print(f" [WARN] skip {mcap_path.name}: {exc}", file=sys.stderr) + return data + + +# --------------------------------------------------------------------------- +# Lecture MCAP /mavros/state +# --------------------------------------------------------------------------- + +def read_mavros_state(mcap_path: Path, t_start: float = None, t_end: float = None): + """ + Lit /mavros/state depuis un fichier mcap. + Retourne liste de (epoch_s, armed:bool, mode:str). + Filtrage optionnel sur [t_start, t_end]. + """ + data = [] + try: + with open(mcap_path, "rb") as fp: + reader = make_reader(fp, decoder_factories=[DecoderFactory()]) + for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(topics=[TOPIC_STATE]): + ts = message.log_time / 1e9 + if t_start is not None and ts < t_start: + continue + if t_end is not None and ts > t_end: + continue + data.append((ts, bool(ros_msg.armed), str(ros_msg.mode))) + except Exception as exc: + # Silent — bag could be corrupt + pass + return data + + +def parse_mcap_both(mcap_path: Path): + """Lit rel_alt ET state depuis un seul fichier mcap en un seul pass.""" + alt_data = [] + state_data = [] + try: + with open(mcap_path, "rb") as fp: + reader = make_reader(fp, decoder_factories=[DecoderFactory()]) + for _schema, _channel, message, ros_msg in reader.iter_decoded_messages( + topics=[TOPIC_REL_ALT, TOPIC_STATE]): + ts = message.log_time / 1e9 + if _channel.topic == TOPIC_REL_ALT: + alt_data.append((ts, float(ros_msg.data))) + elif _channel.topic == TOPIC_STATE: + state_data.append((ts, bool(ros_msg.armed), str(ros_msg.mode))) + except Exception as exc: + print(f" [WARN] skip {mcap_path.name}: {exc}", file=sys.stderr) + return alt_data, state_data + + +def extract_auv_id(folder_name: str): + """'20260508_054551_AUV010' → 'AUV010'.""" + m = re.search(r"(AUV\d+)$", folder_name) + return m.group(1) if m else None + + +# --------------------------------------------------------------------------- +# Signal processing +# --------------------------------------------------------------------------- + +def resample_1hz(data): + """Resample (epoch, val) → 1Hz par interpolation linéaire. Retourne (times[], vals[]).""" + if not data: + return [], [] + data = sorted(data) + t0 = int(data[0][0]) + t1 = int(data[-1][0]) + times = list(range(t0, t1 + 1)) + vals = [] + idx = 0 + for t in times: + while idx < len(data) - 1 and data[idx + 1][0] < t: + idx += 1 + if t <= data[0][0]: + v = data[0][1] + elif t >= data[-1][0]: + v = data[-1][1] + else: + t0_, v0 = data[idx] + t1_, v1 = data[idx + 1] + dt = t1_ - t0_ + v = v0 + ((t - t0_) / dt) * (v1 - v0) if dt > 0 else v0 + vals.append(v) + return times, vals + + +def median_filter(vals, window=3): + half = window // 2 + out = [] + n = len(vals) + for i in range(n): + lo = max(0, i - half) + hi = min(n, i + half + 1) + w = sorted(vals[lo:hi]) + out.append(w[len(w) // 2]) + return out + + +def find_first_submersion_epoch(rel_alt_times, rel_alt_vals, threshold=-2.0, min_duration=5.0): + """ + Trouve la PREMIÈRE fois où rel_alt < threshold pendant >= min_duration secondes en continu. + Retourne epoch du début de cette submersion, ou None si jamais submergé. + + Travaille sur données 1Hz lissées (times/vals déjà resampleés). + """ + n = len(rel_alt_times) + if n == 0: + return None + + streak_start = None + streak_count = 0 + + for i in range(n): + if rel_alt_vals[i] < threshold: + if streak_count == 0: + streak_start = rel_alt_times[i] + streak_count += 1 + if streak_count >= min_duration: + return float(streak_start) + else: + streak_count = 0 + streak_start = None + + return None + + +def detect_runs_depth_only(times, vals, threshold=THRESHOLD, need_streak=NEED_STREAK, min_duration=MIN_DURATION): + """ + Détecte les runs d'immersion via rel_alt seul (fallback). + + Algorithme: + - need_streak samples consécutifs < threshold pour confirmer entrée/sortie + - min_duration secondes minimum par run + Returns liste de (start_epoch, end_epoch). + """ + n = len(times) + if n == 0: + return [] + + under = [v < threshold for v in vals] + + runs = [] + in_run = False + run_start = None + streak_under = 0 + streak_surface = 0 + + i = 0 + while i < n: + if not in_run: + if under[i]: + streak_under += 1 + if streak_under >= need_streak: + run_start = times[i - need_streak + 1] + in_run = True + streak_surface = 0 + else: + streak_under = 0 + else: + if under[i]: + streak_surface = 0 + else: + streak_surface += 1 + if streak_surface >= need_streak: + run_end = times[i - need_streak + 1] + dur = run_end - run_start + if dur >= min_duration: + runs.append((run_start, run_end)) + in_run = False + run_start = None + streak_under = 0 + streak_surface = 0 + i += 1 + + if in_run and run_start is not None: + run_end = times[-1] + dur = run_end - run_start + if dur >= min_duration: + runs.append((run_start, run_end)) + + return runs + + +# --------------------------------------------------------------------------- +# State-based run detection +# --------------------------------------------------------------------------- + +def detect_runs_state_based(state_data, times_alt, vals_smooth, mission_modes, require_armed, + require_descent, min_duration, min_depth_m=-0.3): + """ + Détecte runs via /mavros/state: + - Intervalles continus armed=True (si require_armed) AND mode in mission_modes + - Valide via transition rel_alt surface → fond (si require_descent) + - min_duration filtre courtes fenêtres + + Returns: + list of (start_epoch, end_epoch, dominant_mode) + """ + if not state_data: + return [] + + state_data_sorted = sorted(state_data) + + # Construire séquence d'états + # Identifier intervalles "mission active" + intervals = [] + cur_start = None + cur_mode_counts = {} + + for ts, armed, mode in state_data_sorted: + active = (not require_armed or armed) and (mode in mission_modes) + + if active and cur_start is None: + cur_start = ts + cur_mode_counts = {mode: 1} + elif active and cur_start is not None: + cur_mode_counts[mode] = cur_mode_counts.get(mode, 0) + 1 + elif not active and cur_start is not None: + # Fin de l'intervalle + intervals.append((cur_start, ts, cur_mode_counts)) + cur_start = None + cur_mode_counts = {} + + # Flush dernier intervalle + if cur_start is not None: + intervals.append((cur_start, state_data_sorted[-1][0], cur_mode_counts)) + + # Filtrer par durée min + runs = [] + for start_t, end_t, mode_counts in intervals: + dur = end_t - start_t + if dur < min_duration: + continue + + dominant_mode = max(mode_counts, key=mode_counts.get) if mode_counts else "UNKNOWN" + + if require_descent: + # Vérifier transition surface → fond dans la fenêtre + window_alts = [(t, v) for t, v in zip(times_alt, vals_smooth) + if start_t <= t <= end_t] + if not window_alts: + # Pas de données alt dans cette fenêtre — skip + continue + alt_vals = [v for _, v in window_alts] + # Début = surface (rel_alt > -1m dans les 60 premiers samples) + first_segment = alt_vals[:60] + last_segment = alt_vals[-60:] + starts_near_surface = any(v > -1.0 for v in first_segment) + reaches_depth = any(v <= min_depth_m for v in alt_vals) + if not (starts_near_surface and reaches_depth): + continue + + runs.append((start_t, end_t, dominant_mode)) + + return runs + + +# --------------------------------------------------------------------------- +# Validation "vraie mission" — filtre oscillations surface +# --------------------------------------------------------------------------- + +def compute_sustained_depth(times, vals_smooth, start_e, end_e, min_depth_m): + """ + Pour un run [start_e, end_e], calcule le plus long segment continu + où rel_alt <= min_depth_m (ex: -2.0m). + """ + run_pairs = [(t, v) for t, v in zip(times, vals_smooth) if start_e <= t <= end_e] + if not run_pairs: + return 0.0, 0.0 + + best_dur = 0 + best_min = 0.0 + cur_dur = 0 + cur_min = 0.0 + + for _t, v in run_pairs: + if v <= min_depth_m: + cur_dur += 1 + cur_min = min(cur_min, v) if cur_dur > 1 else v + else: + if cur_dur > best_dur: + best_dur = cur_dur + best_min = cur_min + cur_dur = 0 + cur_min = 0.0 + + if cur_dur > best_dur: + best_dur = cur_dur + best_min = cur_min + + return round(best_min, 2), float(best_dur) + + +def compute_near_bottom_pct(times, vals_smooth, start_e, end_e, min_depth_m): + """Calcule le % de samples dans [start_e, end_e] où rel_alt < min_depth_m.""" + run_vals = [v for t, v in zip(times, vals_smooth) if start_e <= t <= end_e] + if not run_vals: + return 0.0 + n_below = sum(1 for v in run_vals if v < min_depth_m) + return round(100.0 * n_below / len(run_vals), 1) + + +# --------------------------------------------------------------------------- +# Processing AUV +# --------------------------------------------------------------------------- + +def process_auv(auv_mcap_id: str, bag_dirs: list, ssd_base: Path, + min_mission_depth: float, min_sustained_duration: float, + min_near_bottom_pct: float = 0.0, + state_modes: set = None, require_armed: bool = True, + require_descent: bool = True, + first_submersion_depth: float = DEFAULT_FIRST_SUBMERSION_DEPTH, + first_submersion_duration: float = DEFAULT_FIRST_SUBMERSION_DURATION): + """ + Agréger tous les bags d'un AUV, détecter runs via state (avec fallback depth-only). + Filtre les runs entiers avant la première vraie submersion. + """ + physical_id = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id) + if state_modes is None: + state_modes = DEFAULT_STATE_MODES + + all_alt_data = [] + all_state_data = [] + bags_list = [] + n_parsed = 0 + n_skipped = 0 + + # Parcourir bags dans l'ordre chronologique + for bag_dir in sorted(bag_dirs): + bags_list.append(str(bag_dir.name)) + mcap_files = sorted(bag_dir.glob("*.mcap")) + for mcap_path in mcap_files: + alt_pts, state_pts = parse_mcap_both(mcap_path) + if alt_pts or state_pts: + all_alt_data.extend(alt_pts) + all_state_data.extend(state_pts) + n_parsed += 1 + else: + n_skipped += 1 + if (n_parsed + n_skipped) % 50 == 0: + print(f" [{auv_mcap_id}] ... {n_parsed + n_skipped} bags processed", file=sys.stderr) + + has_state = len(all_state_data) > 0 + print(f" [{auv_mcap_id}] {n_parsed} mcap OK, {n_skipped} skip, " + f"{len(all_alt_data)} rel_alt pts, {len(all_state_data)} state pts", file=sys.stderr) + + if not all_alt_data: + print(f" [{auv_mcap_id}] [WARN] aucune donnée rel_alt", file=sys.stderr) + return { + "mcap_id": auv_mcap_id, + "physical_id": physical_id, + "bags": bags_list, + "total_immersion_s": 0, + "runs": [], + "runs_rejected": [], + "detection_method": "no_data", + "first_submersion_epoch": None, + "pre_water_rejected_count": 0, + } + + # Dédup + tri rel_alt + seen = set() + deduped_alt = [] + for t, v in sorted(all_alt_data): + key = round(t, 3) + if key not in seen: + seen.add(key) + deduped_alt.append((t, v)) + + # Resample 1Hz + times, vals = resample_1hz(deduped_alt) + print(f" [{auv_mcap_id}] {len(times)} samples 1Hz [{times[0]:.0f}..{times[-1]:.0f}]", file=sys.stderr) + + # Lissage médian 3s + vals_smooth = median_filter(vals, window=SMOOTH_WINDOW_S) + + # Trouver première submersion réelle + first_sub_epoch = find_first_submersion_epoch( + times, vals_smooth, + threshold=first_submersion_depth, + min_duration=first_submersion_duration, + ) + if first_sub_epoch is not None: + print(f" [{auv_mcap_id}] Première submersion réelle: {first_sub_epoch:.0f} " + f"({datetime.fromtimestamp(first_sub_epoch, tz=timezone.utc).isoformat()})", + file=sys.stderr) + else: + print(f" [{auv_mcap_id}] [WARN] Aucune submersion réelle détectée " + f"(threshold={first_submersion_depth}m, min_dur={first_submersion_duration}s)", + file=sys.stderr) + + # Détection runs + detection_method = "depth_only" + if has_state: + # Trier state data + all_state_data.sort(key=lambda x: x[0]) + + # Extraire modes présents + modes_in_data = set(m for _, _, m in all_state_data) + modes_active = modes_in_data & state_modes + print(f" [{auv_mcap_id}] state modes found: {modes_in_data}", file=sys.stderr) + print(f" [{auv_mcap_id}] state modes matching --state-modes: {modes_active}", file=sys.stderr) + + if modes_active: + raw_runs_with_mode = detect_runs_state_based( + all_state_data, times, vals_smooth, + mission_modes=state_modes, + require_armed=require_armed, + require_descent=require_descent, + min_duration=MIN_DURATION, + min_depth_m=min_mission_depth, + ) + raw_runs = [(s, e) for s, e, _m in raw_runs_with_mode] + run_modes = {(s, e): m for s, e, m in raw_runs_with_mode} + detection_method = "state_based" + print(f" [{auv_mcap_id}] {len(raw_runs)} candidats state-based", file=sys.stderr) + else: + print(f" [{auv_mcap_id}] [WARN] aucun mode {state_modes} dans state data " + f"→ fallback depth-only", file=sys.stderr) + raw_runs = detect_runs_depth_only(times, vals_smooth) + run_modes = {} + detection_method = "depth_only_fallback_no_modes" + else: + print(f" [{auv_mcap_id}] [WARN] topic /mavros/state vide → fallback depth-only", file=sys.stderr) + raw_runs = detect_runs_depth_only(times, vals_smooth) + run_modes = {} + + print(f" [{auv_mcap_id}] {len(raw_runs)} runs candidats ({detection_method})", file=sys.stderr) + + # Filtrage pre-water: exclure/tronquer runs avant première submersion + pre_water_rejected_count = 0 + if first_sub_epoch is not None: + filtered_runs = [] + for start_e, end_e in raw_runs: + if end_e <= first_sub_epoch: + # Run entier avant première plongée → rejeter + pre_water_rejected_count += 1 + phys_id_log = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id) + print(f" [{auv_mcap_id}] PRE-WATER REJECT run [{start_e:.0f}..{end_e:.0f}] " + f"(end before first_sub={first_sub_epoch:.0f})", file=sys.stderr) + elif start_e < first_sub_epoch < end_e: + # Run chevauche → tronquer start + new_start = first_sub_epoch + print(f" [{auv_mcap_id}] PRE-WATER TRUNCATE run [{start_e:.0f}..{end_e:.0f}] " + f"→ [{new_start:.0f}..{end_e:.0f}]", file=sys.stderr) + # Mettre à jour run_modes si besoin + if (start_e, end_e) in run_modes: + run_modes[(new_start, end_e)] = run_modes.pop((start_e, end_e)) + filtered_runs.append((new_start, end_e)) + else: + filtered_runs.append((start_e, end_e)) + raw_runs = filtered_runs + print(f" [{auv_mcap_id}] {pre_water_rejected_count} runs rejetés pre-water, " + f"{len(raw_runs)} restants", file=sys.stderr) + + # Construire runs enrichis + filtre "vraie mission" + runs_out = [] + runs_rejected = [] + for idx, (start_e, end_e) in enumerate(raw_runs): + dur = end_e - start_e + run_id = f"{physical_id}_run_{idx:02d}" + dominant_mode = run_modes.get((start_e, end_e), "UNKNOWN") + + run_vals = [v for t, v in zip(times, vals_smooth) if start_e <= t <= end_e] + max_depth = round(min(run_vals), 2) if run_vals else 0.0 + mean_depth = round(statistics.mean(run_vals), 2) if run_vals else 0.0 + + sustained_below_m, sustained_duration_s = compute_sustained_depth( + times, vals_smooth, start_e, end_e, min_mission_depth + ) + pct_near_bottom = compute_near_bottom_pct( + times, vals_smooth, start_e, end_e, min_mission_depth + ) + + run_entry = { + "run_id": run_id, + "start_epoch": float(start_e), + "end_epoch": float(end_e), + "duration_s": round(dur, 1), + "max_depth_m": max_depth, + "mean_depth_m": mean_depth, + "sustained_below_m": sustained_below_m, + "sustained_duration_s": sustained_duration_s, + "pct_near_bottom": pct_near_bottom, + "dominant_mode": dominant_mode, + "detection_method": detection_method, + } + + reject_reason = None + if sustained_duration_s < min_sustained_duration: + reject_reason = ( + f"sustained_depth only {sustained_duration_s:.0f}s " + f"(need {min_sustained_duration:.0f}s below {min_mission_depth}m)" + ) + elif min_near_bottom_pct > 0.0 and pct_near_bottom < min_near_bottom_pct: + reject_reason = ( + f"not_enough_immersion: only {pct_near_bottom:.1f}% " + f"time below {min_mission_depth}m (need {min_near_bottom_pct:.1f}%)" + ) + + if reject_reason is None: + runs_out.append(run_entry) + else: + run_entry["rejected_reason"] = reject_reason + runs_rejected.append(run_entry) + print( + f" [{auv_mcap_id}] REJECT {run_id}: max_depth={max_depth}m " + f"sustained={sustained_duration_s:.0f}s pct={pct_near_bottom:.1f}% " + f"reason={reject_reason[:60]}", + file=sys.stderr, + ) + + print( + f" [{auv_mcap_id}] {len(runs_out)} runs OK, {len(runs_rejected)} rejetés ({detection_method})", + file=sys.stderr, + ) + + total_immersion_s = round(sum(r["duration_s"] for r in runs_out), 1) + + return { + "mcap_id": auv_mcap_id, + "physical_id": physical_id, + "bags": bags_list, + "total_immersion_s": total_immersion_s, + "runs": runs_out, + "runs_rejected": runs_rejected, + "detection_method": detection_method, + "first_submersion_epoch": first_sub_epoch, + "pre_water_rejected_count": pre_water_rejected_count, + } + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def _run(args): + bags_root = Path(args.ssd_base) / args.mission / "raw_data" / "logs" / "SUB" / "bag" + if not bags_root.exists(): + print(f"[ERROR] bags dir not found: {bags_root}", file=sys.stderr) + sys.exit(1) + + state_modes = set(m.strip() for m in args.state_modes.split(",") if m.strip()) + + print(f"[stage02] Mission: {args.mission}") + print(f"[stage02] Bags root: {bags_root}", file=sys.stderr) + print(f"[stage02] State modes: {state_modes}, require_armed={args.require_armed}, " + f"require_descent={args.require_descent}", file=sys.stderr) + near_bottom_str = (f" + near_bottom >= {args.min_near_bottom_pct:.0f}%" + if args.min_near_bottom_pct > 0 else "") + print( + f"[stage02] Filtre mission: sustained >= {args.min_sustained_duration}s " + f"below {args.min_mission_depth}m{near_bottom_str}", + file=sys.stderr, + ) + print( + f"[stage02] Filtre pre-water: first_submersion threshold={args.first_submersion_depth}m " + f"min_duration={args.first_submersion_duration}s", + file=sys.stderr, + ) + + # Grouper dossiers bag par AUV ID + auv_dirs = {} + for d in sorted(bags_root.iterdir()): + if not d.is_dir(): + continue + auv_id = extract_auv_id(d.name) + if auv_id: + auv_dirs.setdefault(auv_id, []).append(d) + + print(f"[stage02] AUVs: {sorted(auv_dirs.keys())}") + + # Traiter chaque AUV + auvs_out = {} + all_runs_flat = [] + all_rejected_flat = [] + + for auv_mcap_id in sorted(auv_dirs.keys()): + physical_id = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id) + print(f"\n[stage02] Processing {auv_mcap_id} -> {physical_id} ({len(auv_dirs[auv_mcap_id])} bag dirs)...") + result = process_auv( + auv_mcap_id, + auv_dirs[auv_mcap_id], + Path(args.ssd_base), + min_mission_depth=args.min_mission_depth, + min_sustained_duration=args.min_sustained_duration, + min_near_bottom_pct=args.min_near_bottom_pct, + state_modes=state_modes, + require_armed=args.require_armed, + require_descent=args.require_descent, + first_submersion_depth=args.first_submersion_depth, + first_submersion_duration=args.first_submersion_duration, + ) + auvs_out[physical_id] = result + all_runs_flat.extend(result["runs"]) + all_rejected_flat.extend(result.get("runs_rejected", [])) + + # Trier tous les runs par start_epoch + all_runs_sorted = sorted(all_runs_flat, key=lambda r: r["start_epoch"]) + all_rejected_sorted = sorted(all_rejected_flat, key=lambda r: r["start_epoch"]) + + # Construire output JSON + output = { + "mission": args.mission, + "generated_at": datetime.now(timezone.utc).isoformat(), + "filter_params": { + "min_mission_depth_m": args.min_mission_depth, + "min_sustained_duration_s": args.min_sustained_duration, + "min_near_bottom_pct": args.min_near_bottom_pct, + "state_modes": sorted(state_modes), + "require_armed": args.require_armed, + "require_descent": args.require_descent, + "first_submersion_depth_m": args.first_submersion_depth, + "first_submersion_duration_s": args.first_submersion_duration, + }, + "auvs": auvs_out, + "all_runs_sorted": all_runs_sorted, + "all_runs_rejected": all_rejected_sorted, + } + + # Summary + total_runs = len(all_runs_sorted) + total_rejected = len(all_rejected_sorted) + all_depths = [r["max_depth_m"] for r in all_runs_sorted] + global_max_depth = min(all_depths) if all_depths else 0.0 + total_immersion = sum(r["duration_s"] for r in all_runs_sorted) + total_pre_water = sum(v.get("pre_water_rejected_count", 0) for v in auvs_out.values()) + + print(f"\n{'='*60}") + print(f"[stage02] SUMMARY — {args.mission}") + print(f"{'='*60}") + for phys_id, auv in auvs_out.items(): + deep = [r for r in auv["runs"] if r["max_depth_m"] < -10.0] + rej = len(auv.get("runs_rejected", [])) + method = auv.get("detection_method", "?") + fsub = auv.get("first_submersion_epoch") + fsub_str = (datetime.fromtimestamp(fsub, tz=timezone.utc).isoformat() + if fsub else "None") + pre_w = auv.get("pre_water_rejected_count", 0) + print(f" {phys_id}: {auv['total_immersion_s']}s immersion, " + f"{len(auv['runs'])} runs OK, {rej} rejetés, runs>10m: {len(deep)} [{method}]") + print(f" first_submersion: {fsub_str} | pre_water_rejected: {pre_w}") + print(f" Total runs OK: {total_runs}") + print(f" Total runs rejetés: {total_rejected}") + print(f" Total pre-water rejetés: {total_pre_water}") + print(f" Global max depth: {global_max_depth:.1f}m") + print(f" Total immersion: {total_immersion:.0f}s") + + if total_runs == 0: + print("[WARN] Aucun run validé!") + else: + print(f"[QC OK] {total_runs} runs validés") + for r in all_runs_sorted: + print(f" {r['run_id']}: {r['duration_s']:.0f}s depth={r['max_depth_m']}m " + f"pct={r['pct_near_bottom']:.1f}% mode={r.get('dominant_mode','?')} " + f"method={r.get('detection_method','?')}") + + if args.dry_run: + print(f"\n[dry-run] JSON non écrit.") + return + + # Écrire JSON + out_dir = Path(args.out) / args.mission + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "02_runs.json" + + with open(out_path, "w") as f: + json.dump(output, f, indent=2) + + print(f"[stage02] Written: {out_path}") + + +def main(): + parser = argparse.ArgumentParser(description="Stage 02 — Detect mission underwater runs (v4 state-based)") + parser.add_argument("--mission", required=True, help="Mission folder, e.g. 20260508-sttropez") + parser.add_argument("--ssd-base", default="/mnt/ssd", help="SSD root path (READ-ONLY)") + parser.add_argument("--out", default="data/", help="Output base dir") + parser.add_argument("--dry-run", action="store_true", help="Print stats sans écrire fichier") + parser.add_argument( + "--min-mission-depth", + type=float, + default=DEFAULT_MIN_MISSION_DEPTH, + help=f"Profondeur seuil (m, négatif) pour valider un run (default: {DEFAULT_MIN_MISSION_DEPTH})", + ) + parser.add_argument( + "--min-sustained-duration", + type=float, + default=DEFAULT_MIN_SUSTAINED_DURATION, + help=f"Durée min (s) consécutive sous min-mission-depth (default: {DEFAULT_MIN_SUSTAINED_DURATION})", + ) + parser.add_argument( + "--min-near-bottom-pct", + type=float, + default=DEFAULT_MIN_NEAR_BOTTOM_PCT, + help=f"% min du run où rel_alt < min-mission-depth (0=désactivé, default: {DEFAULT_MIN_NEAR_BOTTOM_PCT})", + ) + parser.add_argument( + "--state-modes", + type=str, + default=",".join(sorted(DEFAULT_STATE_MODES)), + help="CSV modes ArduSub considérés mission (ex: AUTO,GUIDED ou ALT_HOLD). " + f"Default: {','.join(sorted(DEFAULT_STATE_MODES))}", + ) + parser.add_argument( + "--require-armed", + action="store_true", + default=True, + help="Exiger armed=True (défaut: True)", + ) + parser.add_argument( + "--no-require-armed", + action="store_false", + dest="require_armed", + help="Ne pas exiger armed=True", + ) + parser.add_argument( + "--require-descent", + action="store_true", + default=True, + help="Exiger transition surface→fond dans fenêtre (défaut: True)", + ) + parser.add_argument( + "--no-require-descent", + action="store_false", + dest="require_descent", + help="Ne pas exiger transition surface→fond", + ) + parser.add_argument( + "--first-submersion-depth", + type=float, + default=DEFAULT_FIRST_SUBMERSION_DEPTH, + help=f"Seuil rel_alt (m) pour détecter première submersion réelle " + f"(default: {DEFAULT_FIRST_SUBMERSION_DEPTH})", + ) + parser.add_argument( + "--first-submersion-duration", + type=float, + default=DEFAULT_FIRST_SUBMERSION_DURATION, + help=f"Durée min continue (s) sous first-submersion-depth pour confirmer mise à l'eau " + f"(default: {DEFAULT_FIRST_SUBMERSION_DURATION})", + ) + parser.add_argument( + "--force", + action="store_true", + help="Forcer recalcul même si fichier existe", + ) + args = parser.parse_args() + + if args.dry_run: + _run(args) + return + + out_dir = Path(args.out) / args.mission + out_dir.mkdir(parents=True, exist_ok=True) + out_path = out_dir / "02_runs.json" + with track_stage("02_mission_run_detect", args.mission, out_dir, + output_files=[out_path]): + _run(args) + + +if __name__ == "__main__": + main()