stage02: filtre pre-water runs avant première submersion réelle

- find_first_submersion_epoch(): premier rel_alt < -2m pendant >= 5s continu
- detect_runs_state_based(): rejette runs avant first_sub_epoch, tronque chevauchants
- CLI: --first-submersion-depth (default -2.0) + --first-submersion-duration (default 5.0)
- JSON output: first_submersion_epoch + pre_water_rejected_count par AUV
- Lepradet: AUV212 run_00 tronqué 25s, AUV213 run_00 tronqué 11s
This commit is contained in:
Ubuntu
2026-05-14 20:15:27 +00:00
parent 568ff9469b
commit 2858217897

View File

@@ -0,0 +1,834 @@
#!/usr/bin/env python3
"""Stage 02 — Mission run detection (v4 state-based).
Détecte runs via transitions /mavros/state (armed + mode) + validation rel_alt.
Fallback depth-only si topic state absent.
Usage:
python3 02_mission_run_detect.py --mission 20260508-sttropez [--ssd-base /mnt/ssd] [--out data/] [--dry-run]
python3 02_mission_run_detect.py --mission 20260505-Lepradet --out data/ \
--state-modes ALT_HOLD --require-armed --require-descent \
--min-mission-depth -2.0 --min-sustained-duration 30 --min-near-bottom-pct 50 --force
"""
import argparse
import glob
import json
import re
import statistics
import sys
from datetime import datetime, timezone
from pathlib import Path
from mcap.reader import make_reader
from mcap_ros2.decoder import DecoderFactory
# Local helper for stage time/memory tracking
sys.path.insert(0, str(Path(__file__).parent))
from _meta import track_stage # noqa: E402
# Mapping ID physique → ID logistique
AUV_PHYSICAL_MAP = {
"AUV010": "AUV210",
"AUV012": "AUV212",
"AUV013": "AUV213",
}
TOPIC_REL_ALT = "/mavros/global_position/rel_alt"
TOPIC_STATE = "/mavros/state"
THRESHOLD = -0.3 # rel_alt < THRESHOLD → immersé
NEED_STREAK = 30 # secondes consécutives pour confirmer début/fin
MIN_DURATION = 60 # secondes minimum par run
SMOOTH_WINDOW_S = 3 # fenêtre lissage médian (secondes à 1Hz = samples)
# Default modes considérés comme "mission active"
DEFAULT_STATE_MODES = {"AUTO", "GUIDED"}
# Modes qui TERMINENT un run (si require-armed=False)
STATE_STOP_MODES = {"SURFACE", "MANUAL"}
# Filtrage "vraie mission" (modifiables via argparse)
DEFAULT_MIN_MISSION_DEPTH = -2.0 # rel_alt doit atteindre ce seuil (m)
DEFAULT_MIN_SUSTAINED_DURATION = 30.0 # pendant au moins N secondes consécutives
DEFAULT_MIN_NEAR_BOTTOM_PCT = 0.0 # % min du run où rel_alt < min_mission_depth (0 = désactivé)
# Filtrage "avant première plongée"
DEFAULT_FIRST_SUBMERSION_DEPTH = -2.0 # seuil rel_alt pour considérer submergé
DEFAULT_FIRST_SUBMERSION_DURATION = 5.0 # durée min continue (s) pour confirmer submersion
# ---------------------------------------------------------------------------
# Lecture MCAP rel_alt
# ---------------------------------------------------------------------------
def parse_mcap_relalt(mcap_path: Path):
"""Lit rel_alt depuis un fichier mcap. Retourne liste de (epoch_s, rel_alt_m)."""
data = []
try:
with open(mcap_path, "rb") as fp:
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(topics=[TOPIC_REL_ALT]):
ts = message.log_time / 1e9
data.append((ts, float(ros_msg.data)))
except Exception as exc:
print(f" [WARN] skip {mcap_path.name}: {exc}", file=sys.stderr)
return data
# ---------------------------------------------------------------------------
# Lecture MCAP /mavros/state
# ---------------------------------------------------------------------------
def read_mavros_state(mcap_path: Path, t_start: float = None, t_end: float = None):
"""
Lit /mavros/state depuis un fichier mcap.
Retourne liste de (epoch_s, armed:bool, mode:str).
Filtrage optionnel sur [t_start, t_end].
"""
data = []
try:
with open(mcap_path, "rb") as fp:
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(topics=[TOPIC_STATE]):
ts = message.log_time / 1e9
if t_start is not None and ts < t_start:
continue
if t_end is not None and ts > t_end:
continue
data.append((ts, bool(ros_msg.armed), str(ros_msg.mode)))
except Exception as exc:
# Silent — bag could be corrupt
pass
return data
def parse_mcap_both(mcap_path: Path):
"""Lit rel_alt ET state depuis un seul fichier mcap en un seul pass."""
alt_data = []
state_data = []
try:
with open(mcap_path, "rb") as fp:
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(
topics=[TOPIC_REL_ALT, TOPIC_STATE]):
ts = message.log_time / 1e9
if _channel.topic == TOPIC_REL_ALT:
alt_data.append((ts, float(ros_msg.data)))
elif _channel.topic == TOPIC_STATE:
state_data.append((ts, bool(ros_msg.armed), str(ros_msg.mode)))
except Exception as exc:
print(f" [WARN] skip {mcap_path.name}: {exc}", file=sys.stderr)
return alt_data, state_data
def extract_auv_id(folder_name: str):
"""'20260508_054551_AUV010''AUV010'."""
m = re.search(r"(AUV\d+)$", folder_name)
return m.group(1) if m else None
# ---------------------------------------------------------------------------
# Signal processing
# ---------------------------------------------------------------------------
def resample_1hz(data):
"""Resample (epoch, val) → 1Hz par interpolation linéaire. Retourne (times[], vals[])."""
if not data:
return [], []
data = sorted(data)
t0 = int(data[0][0])
t1 = int(data[-1][0])
times = list(range(t0, t1 + 1))
vals = []
idx = 0
for t in times:
while idx < len(data) - 1 and data[idx + 1][0] < t:
idx += 1
if t <= data[0][0]:
v = data[0][1]
elif t >= data[-1][0]:
v = data[-1][1]
else:
t0_, v0 = data[idx]
t1_, v1 = data[idx + 1]
dt = t1_ - t0_
v = v0 + ((t - t0_) / dt) * (v1 - v0) if dt > 0 else v0
vals.append(v)
return times, vals
def median_filter(vals, window=3):
half = window // 2
out = []
n = len(vals)
for i in range(n):
lo = max(0, i - half)
hi = min(n, i + half + 1)
w = sorted(vals[lo:hi])
out.append(w[len(w) // 2])
return out
def find_first_submersion_epoch(rel_alt_times, rel_alt_vals, threshold=-2.0, min_duration=5.0):
"""
Trouve la PREMIÈRE fois où rel_alt < threshold pendant >= min_duration secondes en continu.
Retourne epoch du début de cette submersion, ou None si jamais submergé.
Travaille sur données 1Hz lissées (times/vals déjà resampleés).
"""
n = len(rel_alt_times)
if n == 0:
return None
streak_start = None
streak_count = 0
for i in range(n):
if rel_alt_vals[i] < threshold:
if streak_count == 0:
streak_start = rel_alt_times[i]
streak_count += 1
if streak_count >= min_duration:
return float(streak_start)
else:
streak_count = 0
streak_start = None
return None
def detect_runs_depth_only(times, vals, threshold=THRESHOLD, need_streak=NEED_STREAK, min_duration=MIN_DURATION):
"""
Détecte les runs d'immersion via rel_alt seul (fallback).
Algorithme:
- need_streak samples consécutifs < threshold pour confirmer entrée/sortie
- min_duration secondes minimum par run
Returns liste de (start_epoch, end_epoch).
"""
n = len(times)
if n == 0:
return []
under = [v < threshold for v in vals]
runs = []
in_run = False
run_start = None
streak_under = 0
streak_surface = 0
i = 0
while i < n:
if not in_run:
if under[i]:
streak_under += 1
if streak_under >= need_streak:
run_start = times[i - need_streak + 1]
in_run = True
streak_surface = 0
else:
streak_under = 0
else:
if under[i]:
streak_surface = 0
else:
streak_surface += 1
if streak_surface >= need_streak:
run_end = times[i - need_streak + 1]
dur = run_end - run_start
if dur >= min_duration:
runs.append((run_start, run_end))
in_run = False
run_start = None
streak_under = 0
streak_surface = 0
i += 1
if in_run and run_start is not None:
run_end = times[-1]
dur = run_end - run_start
if dur >= min_duration:
runs.append((run_start, run_end))
return runs
# ---------------------------------------------------------------------------
# State-based run detection
# ---------------------------------------------------------------------------
def detect_runs_state_based(state_data, times_alt, vals_smooth, mission_modes, require_armed,
require_descent, min_duration, min_depth_m=-0.3):
"""
Détecte runs via /mavros/state:
- Intervalles continus armed=True (si require_armed) AND mode in mission_modes
- Valide via transition rel_alt surface → fond (si require_descent)
- min_duration filtre courtes fenêtres
Returns:
list of (start_epoch, end_epoch, dominant_mode)
"""
if not state_data:
return []
state_data_sorted = sorted(state_data)
# Construire séquence d'états
# Identifier intervalles "mission active"
intervals = []
cur_start = None
cur_mode_counts = {}
for ts, armed, mode in state_data_sorted:
active = (not require_armed or armed) and (mode in mission_modes)
if active and cur_start is None:
cur_start = ts
cur_mode_counts = {mode: 1}
elif active and cur_start is not None:
cur_mode_counts[mode] = cur_mode_counts.get(mode, 0) + 1
elif not active and cur_start is not None:
# Fin de l'intervalle
intervals.append((cur_start, ts, cur_mode_counts))
cur_start = None
cur_mode_counts = {}
# Flush dernier intervalle
if cur_start is not None:
intervals.append((cur_start, state_data_sorted[-1][0], cur_mode_counts))
# Filtrer par durée min
runs = []
for start_t, end_t, mode_counts in intervals:
dur = end_t - start_t
if dur < min_duration:
continue
dominant_mode = max(mode_counts, key=mode_counts.get) if mode_counts else "UNKNOWN"
if require_descent:
# Vérifier transition surface → fond dans la fenêtre
window_alts = [(t, v) for t, v in zip(times_alt, vals_smooth)
if start_t <= t <= end_t]
if not window_alts:
# Pas de données alt dans cette fenêtre — skip
continue
alt_vals = [v for _, v in window_alts]
# Début = surface (rel_alt > -1m dans les 60 premiers samples)
first_segment = alt_vals[:60]
last_segment = alt_vals[-60:]
starts_near_surface = any(v > -1.0 for v in first_segment)
reaches_depth = any(v <= min_depth_m for v in alt_vals)
if not (starts_near_surface and reaches_depth):
continue
runs.append((start_t, end_t, dominant_mode))
return runs
# ---------------------------------------------------------------------------
# Validation "vraie mission" — filtre oscillations surface
# ---------------------------------------------------------------------------
def compute_sustained_depth(times, vals_smooth, start_e, end_e, min_depth_m):
"""
Pour un run [start_e, end_e], calcule le plus long segment continu
où rel_alt <= min_depth_m (ex: -2.0m).
"""
run_pairs = [(t, v) for t, v in zip(times, vals_smooth) if start_e <= t <= end_e]
if not run_pairs:
return 0.0, 0.0
best_dur = 0
best_min = 0.0
cur_dur = 0
cur_min = 0.0
for _t, v in run_pairs:
if v <= min_depth_m:
cur_dur += 1
cur_min = min(cur_min, v) if cur_dur > 1 else v
else:
if cur_dur > best_dur:
best_dur = cur_dur
best_min = cur_min
cur_dur = 0
cur_min = 0.0
if cur_dur > best_dur:
best_dur = cur_dur
best_min = cur_min
return round(best_min, 2), float(best_dur)
def compute_near_bottom_pct(times, vals_smooth, start_e, end_e, min_depth_m):
"""Calcule le % de samples dans [start_e, end_e] où rel_alt < min_depth_m."""
run_vals = [v for t, v in zip(times, vals_smooth) if start_e <= t <= end_e]
if not run_vals:
return 0.0
n_below = sum(1 for v in run_vals if v < min_depth_m)
return round(100.0 * n_below / len(run_vals), 1)
# ---------------------------------------------------------------------------
# Processing AUV
# ---------------------------------------------------------------------------
def process_auv(auv_mcap_id: str, bag_dirs: list, ssd_base: Path,
min_mission_depth: float, min_sustained_duration: float,
min_near_bottom_pct: float = 0.0,
state_modes: set = None, require_armed: bool = True,
require_descent: bool = True,
first_submersion_depth: float = DEFAULT_FIRST_SUBMERSION_DEPTH,
first_submersion_duration: float = DEFAULT_FIRST_SUBMERSION_DURATION):
"""
Agréger tous les bags d'un AUV, détecter runs via state (avec fallback depth-only).
Filtre les runs entiers avant la première vraie submersion.
"""
physical_id = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id)
if state_modes is None:
state_modes = DEFAULT_STATE_MODES
all_alt_data = []
all_state_data = []
bags_list = []
n_parsed = 0
n_skipped = 0
# Parcourir bags dans l'ordre chronologique
for bag_dir in sorted(bag_dirs):
bags_list.append(str(bag_dir.name))
mcap_files = sorted(bag_dir.glob("*.mcap"))
for mcap_path in mcap_files:
alt_pts, state_pts = parse_mcap_both(mcap_path)
if alt_pts or state_pts:
all_alt_data.extend(alt_pts)
all_state_data.extend(state_pts)
n_parsed += 1
else:
n_skipped += 1
if (n_parsed + n_skipped) % 50 == 0:
print(f" [{auv_mcap_id}] ... {n_parsed + n_skipped} bags processed", file=sys.stderr)
has_state = len(all_state_data) > 0
print(f" [{auv_mcap_id}] {n_parsed} mcap OK, {n_skipped} skip, "
f"{len(all_alt_data)} rel_alt pts, {len(all_state_data)} state pts", file=sys.stderr)
if not all_alt_data:
print(f" [{auv_mcap_id}] [WARN] aucune donnée rel_alt", file=sys.stderr)
return {
"mcap_id": auv_mcap_id,
"physical_id": physical_id,
"bags": bags_list,
"total_immersion_s": 0,
"runs": [],
"runs_rejected": [],
"detection_method": "no_data",
"first_submersion_epoch": None,
"pre_water_rejected_count": 0,
}
# Dédup + tri rel_alt
seen = set()
deduped_alt = []
for t, v in sorted(all_alt_data):
key = round(t, 3)
if key not in seen:
seen.add(key)
deduped_alt.append((t, v))
# Resample 1Hz
times, vals = resample_1hz(deduped_alt)
print(f" [{auv_mcap_id}] {len(times)} samples 1Hz [{times[0]:.0f}..{times[-1]:.0f}]", file=sys.stderr)
# Lissage médian 3s
vals_smooth = median_filter(vals, window=SMOOTH_WINDOW_S)
# Trouver première submersion réelle
first_sub_epoch = find_first_submersion_epoch(
times, vals_smooth,
threshold=first_submersion_depth,
min_duration=first_submersion_duration,
)
if first_sub_epoch is not None:
print(f" [{auv_mcap_id}] Première submersion réelle: {first_sub_epoch:.0f} "
f"({datetime.fromtimestamp(first_sub_epoch, tz=timezone.utc).isoformat()})",
file=sys.stderr)
else:
print(f" [{auv_mcap_id}] [WARN] Aucune submersion réelle détectée "
f"(threshold={first_submersion_depth}m, min_dur={first_submersion_duration}s)",
file=sys.stderr)
# Détection runs
detection_method = "depth_only"
if has_state:
# Trier state data
all_state_data.sort(key=lambda x: x[0])
# Extraire modes présents
modes_in_data = set(m for _, _, m in all_state_data)
modes_active = modes_in_data & state_modes
print(f" [{auv_mcap_id}] state modes found: {modes_in_data}", file=sys.stderr)
print(f" [{auv_mcap_id}] state modes matching --state-modes: {modes_active}", file=sys.stderr)
if modes_active:
raw_runs_with_mode = detect_runs_state_based(
all_state_data, times, vals_smooth,
mission_modes=state_modes,
require_armed=require_armed,
require_descent=require_descent,
min_duration=MIN_DURATION,
min_depth_m=min_mission_depth,
)
raw_runs = [(s, e) for s, e, _m in raw_runs_with_mode]
run_modes = {(s, e): m for s, e, m in raw_runs_with_mode}
detection_method = "state_based"
print(f" [{auv_mcap_id}] {len(raw_runs)} candidats state-based", file=sys.stderr)
else:
print(f" [{auv_mcap_id}] [WARN] aucun mode {state_modes} dans state data "
f"→ fallback depth-only", file=sys.stderr)
raw_runs = detect_runs_depth_only(times, vals_smooth)
run_modes = {}
detection_method = "depth_only_fallback_no_modes"
else:
print(f" [{auv_mcap_id}] [WARN] topic /mavros/state vide → fallback depth-only", file=sys.stderr)
raw_runs = detect_runs_depth_only(times, vals_smooth)
run_modes = {}
print(f" [{auv_mcap_id}] {len(raw_runs)} runs candidats ({detection_method})", file=sys.stderr)
# Filtrage pre-water: exclure/tronquer runs avant première submersion
pre_water_rejected_count = 0
if first_sub_epoch is not None:
filtered_runs = []
for start_e, end_e in raw_runs:
if end_e <= first_sub_epoch:
# Run entier avant première plongée → rejeter
pre_water_rejected_count += 1
phys_id_log = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id)
print(f" [{auv_mcap_id}] PRE-WATER REJECT run [{start_e:.0f}..{end_e:.0f}] "
f"(end before first_sub={first_sub_epoch:.0f})", file=sys.stderr)
elif start_e < first_sub_epoch < end_e:
# Run chevauche → tronquer start
new_start = first_sub_epoch
print(f" [{auv_mcap_id}] PRE-WATER TRUNCATE run [{start_e:.0f}..{end_e:.0f}] "
f"→ [{new_start:.0f}..{end_e:.0f}]", file=sys.stderr)
# Mettre à jour run_modes si besoin
if (start_e, end_e) in run_modes:
run_modes[(new_start, end_e)] = run_modes.pop((start_e, end_e))
filtered_runs.append((new_start, end_e))
else:
filtered_runs.append((start_e, end_e))
raw_runs = filtered_runs
print(f" [{auv_mcap_id}] {pre_water_rejected_count} runs rejetés pre-water, "
f"{len(raw_runs)} restants", file=sys.stderr)
# Construire runs enrichis + filtre "vraie mission"
runs_out = []
runs_rejected = []
for idx, (start_e, end_e) in enumerate(raw_runs):
dur = end_e - start_e
run_id = f"{physical_id}_run_{idx:02d}"
dominant_mode = run_modes.get((start_e, end_e), "UNKNOWN")
run_vals = [v for t, v in zip(times, vals_smooth) if start_e <= t <= end_e]
max_depth = round(min(run_vals), 2) if run_vals else 0.0
mean_depth = round(statistics.mean(run_vals), 2) if run_vals else 0.0
sustained_below_m, sustained_duration_s = compute_sustained_depth(
times, vals_smooth, start_e, end_e, min_mission_depth
)
pct_near_bottom = compute_near_bottom_pct(
times, vals_smooth, start_e, end_e, min_mission_depth
)
run_entry = {
"run_id": run_id,
"start_epoch": float(start_e),
"end_epoch": float(end_e),
"duration_s": round(dur, 1),
"max_depth_m": max_depth,
"mean_depth_m": mean_depth,
"sustained_below_m": sustained_below_m,
"sustained_duration_s": sustained_duration_s,
"pct_near_bottom": pct_near_bottom,
"dominant_mode": dominant_mode,
"detection_method": detection_method,
}
reject_reason = None
if sustained_duration_s < min_sustained_duration:
reject_reason = (
f"sustained_depth only {sustained_duration_s:.0f}s "
f"(need {min_sustained_duration:.0f}s below {min_mission_depth}m)"
)
elif min_near_bottom_pct > 0.0 and pct_near_bottom < min_near_bottom_pct:
reject_reason = (
f"not_enough_immersion: only {pct_near_bottom:.1f}% "
f"time below {min_mission_depth}m (need {min_near_bottom_pct:.1f}%)"
)
if reject_reason is None:
runs_out.append(run_entry)
else:
run_entry["rejected_reason"] = reject_reason
runs_rejected.append(run_entry)
print(
f" [{auv_mcap_id}] REJECT {run_id}: max_depth={max_depth}m "
f"sustained={sustained_duration_s:.0f}s pct={pct_near_bottom:.1f}% "
f"reason={reject_reason[:60]}",
file=sys.stderr,
)
print(
f" [{auv_mcap_id}] {len(runs_out)} runs OK, {len(runs_rejected)} rejetés ({detection_method})",
file=sys.stderr,
)
total_immersion_s = round(sum(r["duration_s"] for r in runs_out), 1)
return {
"mcap_id": auv_mcap_id,
"physical_id": physical_id,
"bags": bags_list,
"total_immersion_s": total_immersion_s,
"runs": runs_out,
"runs_rejected": runs_rejected,
"detection_method": detection_method,
"first_submersion_epoch": first_sub_epoch,
"pre_water_rejected_count": pre_water_rejected_count,
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def _run(args):
bags_root = Path(args.ssd_base) / args.mission / "raw_data" / "logs" / "SUB" / "bag"
if not bags_root.exists():
print(f"[ERROR] bags dir not found: {bags_root}", file=sys.stderr)
sys.exit(1)
state_modes = set(m.strip() for m in args.state_modes.split(",") if m.strip())
print(f"[stage02] Mission: {args.mission}")
print(f"[stage02] Bags root: {bags_root}", file=sys.stderr)
print(f"[stage02] State modes: {state_modes}, require_armed={args.require_armed}, "
f"require_descent={args.require_descent}", file=sys.stderr)
near_bottom_str = (f" + near_bottom >= {args.min_near_bottom_pct:.0f}%"
if args.min_near_bottom_pct > 0 else "")
print(
f"[stage02] Filtre mission: sustained >= {args.min_sustained_duration}s "
f"below {args.min_mission_depth}m{near_bottom_str}",
file=sys.stderr,
)
print(
f"[stage02] Filtre pre-water: first_submersion threshold={args.first_submersion_depth}m "
f"min_duration={args.first_submersion_duration}s",
file=sys.stderr,
)
# Grouper dossiers bag par AUV ID
auv_dirs = {}
for d in sorted(bags_root.iterdir()):
if not d.is_dir():
continue
auv_id = extract_auv_id(d.name)
if auv_id:
auv_dirs.setdefault(auv_id, []).append(d)
print(f"[stage02] AUVs: {sorted(auv_dirs.keys())}")
# Traiter chaque AUV
auvs_out = {}
all_runs_flat = []
all_rejected_flat = []
for auv_mcap_id in sorted(auv_dirs.keys()):
physical_id = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id)
print(f"\n[stage02] Processing {auv_mcap_id} -> {physical_id} ({len(auv_dirs[auv_mcap_id])} bag dirs)...")
result = process_auv(
auv_mcap_id,
auv_dirs[auv_mcap_id],
Path(args.ssd_base),
min_mission_depth=args.min_mission_depth,
min_sustained_duration=args.min_sustained_duration,
min_near_bottom_pct=args.min_near_bottom_pct,
state_modes=state_modes,
require_armed=args.require_armed,
require_descent=args.require_descent,
first_submersion_depth=args.first_submersion_depth,
first_submersion_duration=args.first_submersion_duration,
)
auvs_out[physical_id] = result
all_runs_flat.extend(result["runs"])
all_rejected_flat.extend(result.get("runs_rejected", []))
# Trier tous les runs par start_epoch
all_runs_sorted = sorted(all_runs_flat, key=lambda r: r["start_epoch"])
all_rejected_sorted = sorted(all_rejected_flat, key=lambda r: r["start_epoch"])
# Construire output JSON
output = {
"mission": args.mission,
"generated_at": datetime.now(timezone.utc).isoformat(),
"filter_params": {
"min_mission_depth_m": args.min_mission_depth,
"min_sustained_duration_s": args.min_sustained_duration,
"min_near_bottom_pct": args.min_near_bottom_pct,
"state_modes": sorted(state_modes),
"require_armed": args.require_armed,
"require_descent": args.require_descent,
"first_submersion_depth_m": args.first_submersion_depth,
"first_submersion_duration_s": args.first_submersion_duration,
},
"auvs": auvs_out,
"all_runs_sorted": all_runs_sorted,
"all_runs_rejected": all_rejected_sorted,
}
# Summary
total_runs = len(all_runs_sorted)
total_rejected = len(all_rejected_sorted)
all_depths = [r["max_depth_m"] for r in all_runs_sorted]
global_max_depth = min(all_depths) if all_depths else 0.0
total_immersion = sum(r["duration_s"] for r in all_runs_sorted)
total_pre_water = sum(v.get("pre_water_rejected_count", 0) for v in auvs_out.values())
print(f"\n{'='*60}")
print(f"[stage02] SUMMARY — {args.mission}")
print(f"{'='*60}")
for phys_id, auv in auvs_out.items():
deep = [r for r in auv["runs"] if r["max_depth_m"] < -10.0]
rej = len(auv.get("runs_rejected", []))
method = auv.get("detection_method", "?")
fsub = auv.get("first_submersion_epoch")
fsub_str = (datetime.fromtimestamp(fsub, tz=timezone.utc).isoformat()
if fsub else "None")
pre_w = auv.get("pre_water_rejected_count", 0)
print(f" {phys_id}: {auv['total_immersion_s']}s immersion, "
f"{len(auv['runs'])} runs OK, {rej} rejetés, runs>10m: {len(deep)} [{method}]")
print(f" first_submersion: {fsub_str} | pre_water_rejected: {pre_w}")
print(f" Total runs OK: {total_runs}")
print(f" Total runs rejetés: {total_rejected}")
print(f" Total pre-water rejetés: {total_pre_water}")
print(f" Global max depth: {global_max_depth:.1f}m")
print(f" Total immersion: {total_immersion:.0f}s")
if total_runs == 0:
print("[WARN] Aucun run validé!")
else:
print(f"[QC OK] {total_runs} runs validés")
for r in all_runs_sorted:
print(f" {r['run_id']}: {r['duration_s']:.0f}s depth={r['max_depth_m']}m "
f"pct={r['pct_near_bottom']:.1f}% mode={r.get('dominant_mode','?')} "
f"method={r.get('detection_method','?')}")
if args.dry_run:
print(f"\n[dry-run] JSON non écrit.")
return
# Écrire JSON
out_dir = Path(args.out) / args.mission
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "02_runs.json"
with open(out_path, "w") as f:
json.dump(output, f, indent=2)
print(f"[stage02] Written: {out_path}")
def main():
parser = argparse.ArgumentParser(description="Stage 02 — Detect mission underwater runs (v4 state-based)")
parser.add_argument("--mission", required=True, help="Mission folder, e.g. 20260508-sttropez")
parser.add_argument("--ssd-base", default="/mnt/ssd", help="SSD root path (READ-ONLY)")
parser.add_argument("--out", default="data/", help="Output base dir")
parser.add_argument("--dry-run", action="store_true", help="Print stats sans écrire fichier")
parser.add_argument(
"--min-mission-depth",
type=float,
default=DEFAULT_MIN_MISSION_DEPTH,
help=f"Profondeur seuil (m, négatif) pour valider un run (default: {DEFAULT_MIN_MISSION_DEPTH})",
)
parser.add_argument(
"--min-sustained-duration",
type=float,
default=DEFAULT_MIN_SUSTAINED_DURATION,
help=f"Durée min (s) consécutive sous min-mission-depth (default: {DEFAULT_MIN_SUSTAINED_DURATION})",
)
parser.add_argument(
"--min-near-bottom-pct",
type=float,
default=DEFAULT_MIN_NEAR_BOTTOM_PCT,
help=f"% min du run où rel_alt < min-mission-depth (0=désactivé, default: {DEFAULT_MIN_NEAR_BOTTOM_PCT})",
)
parser.add_argument(
"--state-modes",
type=str,
default=",".join(sorted(DEFAULT_STATE_MODES)),
help="CSV modes ArduSub considérés mission (ex: AUTO,GUIDED ou ALT_HOLD). "
f"Default: {','.join(sorted(DEFAULT_STATE_MODES))}",
)
parser.add_argument(
"--require-armed",
action="store_true",
default=True,
help="Exiger armed=True (défaut: True)",
)
parser.add_argument(
"--no-require-armed",
action="store_false",
dest="require_armed",
help="Ne pas exiger armed=True",
)
parser.add_argument(
"--require-descent",
action="store_true",
default=True,
help="Exiger transition surface→fond dans fenêtre (défaut: True)",
)
parser.add_argument(
"--no-require-descent",
action="store_false",
dest="require_descent",
help="Ne pas exiger transition surface→fond",
)
parser.add_argument(
"--first-submersion-depth",
type=float,
default=DEFAULT_FIRST_SUBMERSION_DEPTH,
help=f"Seuil rel_alt (m) pour détecter première submersion réelle "
f"(default: {DEFAULT_FIRST_SUBMERSION_DEPTH})",
)
parser.add_argument(
"--first-submersion-duration",
type=float,
default=DEFAULT_FIRST_SUBMERSION_DURATION,
help=f"Durée min continue (s) sous first-submersion-depth pour confirmer mise à l'eau "
f"(default: {DEFAULT_FIRST_SUBMERSION_DURATION})",
)
parser.add_argument(
"--force",
action="store_true",
help="Forcer recalcul même si fichier existe",
)
args = parser.parse_args()
if args.dry_run:
_run(args)
return
out_dir = Path(args.out) / args.mission
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "02_runs.json"
with track_stage("02_mission_run_detect", args.mission, out_dir,
output_files=[out_path]):
_run(args)
if __name__ == "__main__":
main()