Compare commits

..

8 Commits

Author SHA1 Message Date
Poulpe
171f90ce9f stage03b: trim videos per run + ours rough cut
LRV proxy (GoPro low-res 768x432 H.264) + ffmpeg -c copy keyframe-aligned.
Inputs: 02_runs.json + 03_video_index.json.
Outputs: per-run mp4 + ours_<gp>.mp4 chrono concat.
Tested on 20260505-Lepradet: 5 files + 2 ours (~11 GB total).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 16:05:41 +00:00
Ubuntu
754f3c7272 stage01: filtre old/, fusion AUV0xx<->AUV2xx, focus date mission
- _is_old_path skip tout path contenant /old/
- _normalize_auv_id: AUV010->AUV210, AUV012->AUV212, AUV013->AUV213
- _parse_mission_date: regex YYYYMMDD du nom mission
- _mission_date_window: filtre coverage hors [date-2h, date+26h] UTC
- manifest: mission_date + mission_date_window_utc + n_filtered_old + n_filtered_out_of_date
2026-05-15 10:48:22 +00:00
Ubuntu
90621dea12 stage01: extraction timestamps internes + mission/auv windows
- ffprobe SMPTE pour MP4 GoPro (priorité)
- mcap.reader pour bag ROS2
- pymavlink pour BIN ArduSub (fallback mtime si fail)
- head/tail CSV USBL et MAG
- regex filename pour KLF Kogger
- mission_window global + auv_windows avec gaps détection (>60s)
2026-05-15 10:10:01 +00:00
Ubuntu
15b4ddfd70 stage01: ajoute collectors MAG (cosmag csv ar/av/side) + SSS (Kogger klf/bin)
Manifest 01 enrichi avec deux nouveaux champs:
- sss_files: {klf,bin} relatifs sous raw_data/sss/**
- mag_files: {ar,av,side,other} csv sous raw_data/mag/**

Tests:
- 20260505-Lepradet: sss=1 mag=0 (mag vide attendu)
- 20260508-sttropez: sss=53 mag=73 (match scout)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 09:53:01 +00:00
Ubuntu
65bda7ff71 stage02: filtre strict v5 (pct=80 dur=60 depth=-3m) + stage02b diag plots
Defaults plus stricts pour éliminer surface/yoyo:
- min_near_bottom_pct: 50 -> 80 %
- min_sustained_duration: 30 -> 60 s
- min_mission_depth: -2 -> -3 m
- min_displacement_m: 5.0 documenté (stage06+ futur)

Nouveau script 02b_runs_diag.py: 4-panel PNG par run OK+rejected
(rel_alt+threshold, MAVROS state, depth histo, verdict criteria)
+ index.html pour inspection visuelle Flag.

Test Lepradet: 5 -> 1 run OK (AUV210_run_00 79s -13m 81pct)
Page publiée: laboratoire.freeboxos.fr/02-runs-diag-lepradet/
2026-05-15 09:53:01 +00:00
Ubuntu
2858217897 stage02: filtre pre-water runs avant première submersion réelle
- find_first_submersion_epoch(): premier rel_alt < -2m pendant >= 5s continu
- detect_runs_state_based(): rejette runs avant first_sub_epoch, tronque chevauchants
- CLI: --first-submersion-depth (default -2.0) + --first-submersion-duration (default 5.0)
- JSON output: first_submersion_epoch + pre_water_rejected_count par AUV
- Lepradet: AUV212 run_00 tronqué 25s, AUV213 run_00 tronqué 11s
2026-05-15 09:53:01 +00:00
Poulpe
568ff9469b auto-iter 2026-05-14: iteration-log iter10 — RoPE fix + PR#13 merge + .83 blocker
Co-Authored-By: Poulpe <claude@nowyouknow.fr>
2026-05-14 04:56:09 +00:00
Poulpe
2611a72aa2 auto-iter 2026-05-14: max_frame_num 1024→2048 fix RoPE overflow GX019817
Root cause: 3D RoPE precomputed for max_frame_num+100=1124 positions.
GX019817 has 1357 frames after trim → index overflow → tensor mismatch.
2048 → supports up to 2148 frames (covers all current segments).

Co-Authored-By: Poulpe <claude@nowyouknow.fr>
2026-05-14 04:52:20 +00:00
6 changed files with 2282 additions and 1 deletions

View File

@@ -15,7 +15,7 @@ frame_extract:
bottom_visible_pct_min: 25 bottom_visible_pct_min: 25
inference: inference:
ply_conf_threshold: 1.5 ply_conf_threshold: 1.5
max_frame_num: 1024 max_frame_num: 2048
mode: streaming mode: streaming
keyframe_interval: 1 keyframe_interval: 1
min_frames_for_inference: 32 min_frames_for_inference: 32

View File

@@ -155,3 +155,19 @@
2. Retry GX019817 après update 2. Retry GX019817 après update
3. Start stage06_align preparation (pose extraction pipeline) 3. Start stage06_align preparation (pose extraction pipeline)
4. Test ReefMapGS on known-good segment (GX029839 85M pts) 4. Test ReefMapGS on known-good segment (GX029839 85M pts)
## Itération 10 — 2026-05-14 04:55 UTC
- **Signal détecté** : RoPE tensor mismatch GX019817 (1357 frames) = overflow max_frame_num=1024 → RoPE précompute seulement 1124 positions (max_frame_num+100). Source confirmée : `aggregator/stream.py` ligne 226 `max_total_frames=self.max_frame_num+100=1124 < 1357`.
- **Patch** :
- AUTO-COMMIT 2611a72 : `thresholds.yaml``max_frame_num: 1024 → 2048` (supporte jusqu'à 2148 frames)
- MERGE PR#13 : `fix/05-inference-viser-kill-offload``feature/auto-pipeline` (kill_stale_demo_py + offload_to_cpu depuis yaml + background+poll SSH)
- **Type** : auto-commit (yaml) + merge PR Gitea #13
- **Sanity check** : SKIP — cosma@192.168.0.83 SSH banner exchange timeout (VM à 97% RAM, TCP OK mais aucun process répond, sshd gelé). Retry GX019817 impossible jusqu'à rétablissement .83.
- **Infrastructure** : 4 orphelins viser_ply.py tués sur .84 (libéré ~29GB RAM). VM .83 inaccessible — bloquer retry pipeline.
- **Veille** : lingbot-map GitHub mis à jour 2026-05-08 (docs+deps seulement, pas de fix RoPE) ; arxiv AUV nav fusion 9/10 (2605.04672) ; VGGT CVPR 2025 7/10
- **Bloquants** : cosma@.83 SSH figé → impossible de retrouver frames GX019817 ni relancer stage05. Nécessite intervention humaine (.83 sshd restart ou VM reset).
- **Suggestion prochaine** :
1. ⚠️ Intervention : débloquer SSH .83 (restart sshd ou VM reset via Proxmox)
2. Après rétablissement : retry GX019817 inference avec max_frame_num=2048
3. Si .83 reste mort : cloner lingbot-map sur workspace → push Gitea → update .84/.87 depuis réseau local (les workers ne peuvent pas atteindre GitHub)
4. Évaluer ReefMapGS v0.8 (underwater-specific) sur GX029839 (85M pts référence)

View File

@@ -0,0 +1,850 @@
#!/usr/bin/env python3
"""Stage 01 — Select mission and produce a raw manifest.
Scans `<ssd_base>/<mission>/raw_data/` and writes
`<out>/<mission>/01_manifest.json` listing all AUVs, GoPro folders, MCAP bags,
BIN files, USBL logs.
Usage:
python3 01_select_mission.py \
--mission 20260505-Lepradet \
--ssd-base /mnt/ssd \
--out /home/cosma/cosma-qc/data
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, timedelta
from pathlib import Path
# Local helper for stage time/memory tracking
sys.path.insert(0, str(Path(__file__).parent))
from _meta import track_stage # noqa: E402
# AUV physical (GoPro) and AUV MCAP IDs — pattern matching only, never write to SSD.
AUV_PHYS_RE = re.compile(r"AUV(\d{3})", re.I)
GP_FOLDER_RE = re.compile(r"^GP(?P<gp>\d+)[_-]AUV(?P<auv>\d{3})$", re.I)
BAG_AUV_RE = re.compile(r"_AUV(?P<auv>\d{3})(?:[_/]|$)", re.I)
USBL_FILE_RE = re.compile(r"usbl", re.I)
KLF_TS_RE = re.compile(r"(\d{8})_(\d{6})")
MISSION_DATE_RE = re.compile(r"^(\d{8})-")
# KLF throughput estimate: ~5MB/min
KLF_BYTES_PER_SEC = 5 * 1024 * 1024 / 60
def iso_utc_now() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def _is_old_path(path: Path) -> bool:
"""Return True if path contains an 'old' component (skip these files)."""
return "old" in path.parts
def _normalize_auv_id(auv_str: str) -> str:
"""AUV0xx -> AUV2xx; AUV2xx unchanged; others unchanged."""
m = re.fullmatch(r"AUV(\d{3})", auv_str, re.I)
if not m:
return auv_str
n = int(m.group(1))
if 0 <= n < 100:
return f"AUV2{n:02d}" # AUV010 -> AUV210
return f"AUV{n:03d}"
def _parse_mission_date(mission: str) -> "datetime | None":
"""Parse YYYYMMDD from mission folder name. Returns UTC midnight or None."""
m = MISSION_DATE_RE.match(mission)
if not m:
print(f" [warn] mission_date: cannot parse date from '{mission}', no date filter")
return None
try:
return datetime.strptime(m.group(1), "%Y%m%d").replace(tzinfo=timezone.utc)
except ValueError:
print(f" [warn] mission_date: invalid date '{m.group(1)}' in '{mission}'")
return None
def _mission_date_window(mission_date: datetime) -> "tuple[datetime, datetime]":
"""Return [date-2h, date+26h] UTC window with +/-2h timezone tolerance."""
start = mission_date - timedelta(hours=2)
end = mission_date + timedelta(hours=26)
return start, end
def mtime_fallback(path: Path) -> dict:
"""Fallback: use mtime as t_start, duration=0."""
try:
mt = path.stat().st_mtime
t = datetime.fromtimestamp(mt, tz=timezone.utc)
return {
"t_start": t.isoformat(timespec="seconds"),
"t_end": t.isoformat(timespec="seconds"),
"duration_s": 0,
"source": "mtime_fallback",
}
except OSError:
return None
def parse_smpte(smpte: str, fps: float = 25.0) -> float:
"""SMPTE HH:MM:SS:FF -> seconds since midnight."""
parts = smpte.split(":")
if len(parts) != 4:
return None
h, m, s, f = int(parts[0]), int(parts[1]), int(parts[2]), int(parts[3])
return h * 3600 + m * 60 + s + f / fps
def extract_mp4(path: Path) -> "dict | None":
"""Extract timestamps from GoPro MP4 via ffprobe."""
try:
out = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_format", "-show_streams", str(path)],
capture_output=True, text=True, timeout=10,
).stdout
if not out:
return mtime_fallback(path)
data = json.loads(out)
fmt = data.get("format", {})
duration = float(fmt.get("duration", 0) or 0)
# Try SMPTE timecode first
smpte = None
for s in data.get("streams", []):
tc = s.get("tags", {}).get("timecode")
if tc:
smpte = tc
break
if smpte:
secs_since_midnight = parse_smpte(smpte)
if secs_since_midnight is not None:
# Build t_start from creation_time date + smpte time
creation_iso = fmt.get("tags", {}).get("creation_time", "")
try:
date_part = datetime.fromisoformat(
creation_iso.replace("Z", "+00:00")
).date()
midnight = datetime(
date_part.year, date_part.month, date_part.day,
tzinfo=timezone.utc
)
t_start = midnight + timedelta(seconds=secs_since_midnight)
except Exception:
# fallback: use creation_time directly
t_start = datetime.fromisoformat(
creation_iso.replace("Z", "+00:00")
)
t_end = t_start + timedelta(seconds=duration)
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": int(duration),
"source": "smpte",
}
# Fallback: creation_time
creation_iso = fmt.get("tags", {}).get("creation_time", "")
if creation_iso:
t_start = datetime.fromisoformat(creation_iso.replace("Z", "+00:00"))
t_end = t_start + timedelta(seconds=duration)
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": int(duration),
"source": "creation_time",
}
except Exception:
pass
return mtime_fallback(path)
def extract_mcap(path: Path) -> "dict | None":
"""Extract timestamps from MCAP bag via mcap.reader."""
try:
from mcap.reader import make_reader
with open(path, "rb") as f:
reader = make_reader(f)
summary = reader.get_summary()
if summary and summary.statistics:
start_ns = summary.statistics.message_start_time
end_ns = summary.statistics.message_end_time
if start_ns and end_ns:
t_start = datetime.fromtimestamp(start_ns / 1e9, tz=timezone.utc)
t_end = datetime.fromtimestamp(end_ns / 1e9, tz=timezone.utc)
dur = int((end_ns - start_ns) / 1e9)
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": dur,
"source": "mcap_summary",
}
except Exception:
pass
return mtime_fallback(path)
def extract_bin(path: Path) -> "dict | None":
"""BIN ArduSub: no absolute timestamp available (TimeUS = boot-relative).
Use mtime fallback."""
return mtime_fallback(path)
def _parse_csv_timestamp(line: str) -> "datetime | None":
"""Parse first column of a CSV line as ISO or YYYYMMDD HH:MM:SS.mmm."""
col = line.split(",")[0].strip().strip('"')
# Try ISO format (2026-05-08 05:46:29.384209)
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(col, fmt)
return dt.replace(tzinfo=timezone.utc)
except ValueError:
pass
# Try COSMA MAG format: '20260508 07:52:28.456'
try:
dt = datetime.strptime(col, "%Y%m%d %H:%M:%S.%f")
return dt.replace(tzinfo=timezone.utc)
except ValueError:
pass
# Try float epoch
try:
return datetime.fromtimestamp(float(col), tz=timezone.utc)
except (ValueError, OSError):
pass
return None
def extract_csv(path: Path) -> "dict | None":
"""Extract timestamps from CSV (USBL/MAG) via head+tail."""
try:
result = subprocess.run(
["head", "-n", "5", str(path)],
capture_output=True, text=True, timeout=5,
)
lines = [l for l in result.stdout.splitlines() if l.strip()]
tail_result = subprocess.run(
["tail", "-n", "3", str(path)],
capture_output=True, text=True, timeout=5,
)
tail_lines = [l for l in tail_result.stdout.splitlines() if l.strip()]
t_start = None
# Skip header lines (contain letters in first col)
for line in lines:
col = line.split(",")[0].strip()
if not col or col[0].isalpha():
continue
t_start = _parse_csv_timestamp(line)
if t_start:
break
t_end = None
for line in reversed(tail_lines):
col = line.split(",")[0].strip()
if not col or col[0].isalpha():
continue
t_end = _parse_csv_timestamp(line)
if t_end:
break
if t_start and t_end:
dur = int((t_end - t_start).total_seconds())
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": max(0, dur),
"source": "csv_inline",
}
except Exception:
pass
return mtime_fallback(path)
def extract_klf(path: Path) -> "dict | None":
"""Extract timestamp from KLF Kogger filename: kogger_sss_YYYYMMDD_HHMMSS.klf"""
try:
m = KLF_TS_RE.search(path.name)
if m:
dt = datetime.strptime(m.group(1) + m.group(2), "%Y%m%d%H%M%S")
t_start = dt.replace(tzinfo=timezone.utc)
# Estimate duration from file size
try:
size = path.stat().st_size
dur = int(size / KLF_BYTES_PER_SEC)
except OSError:
dur = 0
t_end = t_start + timedelta(seconds=dur)
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": dur,
"source": "filename_parse",
}
except Exception:
pass
return mtime_fallback(path)
def extract_timestamps(path: Path, kind: str) -> "dict | None":
"""Dispatch timestamp extraction by file kind."""
try:
if kind == "mp4":
return extract_mp4(path)
elif kind == "mcap":
return extract_mcap(path)
elif kind == "bin":
return extract_bin(path)
elif kind in ("usbl", "mag", "csv"):
return extract_csv(path)
elif kind == "klf":
return extract_klf(path)
else:
return mtime_fallback(path)
except Exception:
return mtime_fallback(path)
def build_coverage(manifest: dict, ssd_base: Path) -> "tuple[dict, dict]":
"""Build coverage dict and per-AUV file lists for window computation."""
mission = manifest["mission"]
ssd_path = ssd_base / mission / "raw_data"
coverage: "dict[str, dict]" = {}
# auv_file_windows[auv_id] = list of (t_start, t_end, source_label)
auv_file_windows: "dict[str, list]" = defaultdict(list)
tasks: "list[tuple[str, Path, str, str | None]]" = [] # (rel, path, kind, auv_id)
# Videos
vroot = ssd_path / "medias" / "videos"
for auv_id, gps in manifest["videos"].items():
for gp_key, files in gps.items():
for fname in files:
# find the file
for sub in (vroot,):
for folder in sub.iterdir() if sub.exists() else []:
if not folder.is_dir():
continue
p = folder / fname
if p.exists():
rel = str(p.relative_to(ssd_path))
tasks.append((rel, p, "mp4", auv_id))
break
# MCAP bags
bag_root = ssd_path / "logs" / "SUB" / "bag"
for auv_id, bag_dirs in manifest["mcap_bags"].items():
for bag_dir in bag_dirs:
bag_path = bag_root / bag_dir
if bag_path.exists():
for mcap_file in sorted(bag_path.glob("*.mcap")):
rel = str(mcap_file.relative_to(ssd_path))
tasks.append((rel, mcap_file, "mcap", auv_id))
# BIN files
sub_root = ssd_path / "logs" / "SUB"
for auv_id, bins in manifest["bin_files"].items():
for bname in bins:
for p in sub_root.rglob(bname):
if p.is_file():
rel = str(p.relative_to(ssd_path))
tasks.append((rel, p, "bin", auv_id))
break
# USBL logs
for rel_str in manifest["usbl_logs"]:
p = ssd_path / rel_str
if p.exists():
# try to infer AUV from path
m = AUV_PHYS_RE.search(rel_str)
auv_id = _normalize_auv_id(f"AUV{int(m.group(1)):03d}") if m else None
tasks.append((rel_str, p, "usbl", auv_id))
# MAG files
for category, files in manifest["mag_files"].items():
for rel_str in files:
p = ssd_path / rel_str
if p.exists():
tasks.append((rel_str, p, "mag", None))
# SSS KLF files
for rel_str in manifest["sss_files"].get("klf", []):
p = ssd_path / rel_str
if p.exists():
tasks.append((rel_str, p, "klf", None))
# Parallel extraction
def _extract(task):
rel, path, kind, auv_id = task
cov = extract_timestamps(path, kind)
return rel, cov, auv_id
with ThreadPoolExecutor(max_workers=8) as ex:
futures = {ex.submit(_extract, t): t for t in tasks}
for fut in as_completed(futures):
rel, cov, auv_id = fut.result()
if cov:
coverage[rel] = cov
if auv_id and cov.get("source") != "mtime_fallback":
auv_file_windows[auv_id].append((
cov["t_start"], cov["t_end"], rel
))
return coverage, dict(auv_file_windows)
def compute_mission_window(coverage: dict) -> "dict | None":
"""Global mission window: min t_start, max t_end over all files."""
starts = []
ends = []
for cov in coverage.values():
if cov.get("source") == "mtime_fallback":
continue
try:
starts.append(datetime.fromisoformat(cov["t_start"]))
ends.append(datetime.fromisoformat(cov["t_end"]))
except Exception:
pass
if not starts:
return None
t_start = min(starts)
t_end = max(ends)
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": int((t_end - t_start).total_seconds()),
}
def compute_auv_windows(auv_file_windows: dict) -> dict:
"""Per-AUV windows with gap detection (>60s gap between sorted windows)."""
result = {}
for auv_id, windows in sorted(auv_file_windows.items()):
# Sort by t_start
sorted_wins = sorted(windows, key=lambda x: x[0])
starts = [datetime.fromisoformat(w[0]) for w in sorted_wins]
ends = [datetime.fromisoformat(w[1]) for w in sorted_wins]
t_start = min(starts)
t_end = max(ends)
dur = int((t_end - t_start).total_seconds())
# Detect gaps: merge overlapping windows first, then find gaps
gaps = []
# Build merged intervals
intervals = sorted(zip(starts, ends, [w[2] for w in sorted_wins]))
cur_start, cur_end, cur_label = intervals[0]
prev_label = cur_label
for i_start, i_end, i_label in intervals[1:]:
if i_start - cur_end > timedelta(seconds=60):
gaps.append({
"from": cur_end.isoformat(timespec="seconds"),
"to": i_start.isoformat(timespec="seconds"),
"duration_s": int((i_start - cur_end).total_seconds()),
"between": [prev_label, i_label],
})
if i_end > cur_end:
cur_end = i_end
prev_label = i_label
# Collect source categories
sources = sorted(set(
"videos" if w[2].endswith(".MP4") or w[2].endswith(".mp4") else
"mcap" if w[2].endswith(".mcap") else
"bin" if w[2].endswith(".BIN") or w[2].endswith(".bin") else
"usbl" if "usbl" in w[2].lower() else
"other"
for w in sorted_wins
))
result[auv_id] = {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": dur,
"sources": sources,
"gaps": gaps,
}
return result
def safe_listdir(p: Path) -> "list[Path]":
if not p.exists() or not p.is_dir():
return []
try:
return sorted(p.iterdir())
except OSError:
return []
def collect_videos(ssd_path: Path) -> "tuple[dict, int, float]":
"""videos[AUV][GP1|GP2] -> sorted list of MP4 filenames. Two layouts tried."""
videos = defaultdict(lambda: defaultdict(list))
total_n = 0
total_bytes = 0
# Layout A: medias/videos/GP{n}{_|-}AUV{xxx}/*.MP4
vroot_a = ssd_path / "medias" / "videos"
for sub in safe_listdir(vroot_a):
if not sub.is_dir():
continue
if _is_old_path(sub):
continue
m = GP_FOLDER_RE.match(sub.name)
if not m:
continue
gp_key = f"GP{int(m.group('gp'))}"
auv_id = _normalize_auv_id(f"AUV{int(m.group('auv')):03d}")
for f in safe_listdir(sub):
if _is_old_path(f):
continue
if f.is_file() and f.suffix.upper() == ".MP4":
videos[auv_id][gp_key].append(f.name)
total_n += 1
try:
total_bytes += f.stat().st_size
except OSError:
pass
# Layout B: medias/videos/AUV{xxx}/GP{n}/*.MP4 (fallback)
for sub in safe_listdir(vroot_a):
if not sub.is_dir():
continue
if _is_old_path(sub):
continue
am = AUV_PHYS_RE.fullmatch(sub.name)
if not am:
continue
auv_id = _normalize_auv_id(f"AUV{int(am.group(1)):03d}")
for gp_sub in safe_listdir(sub):
if not gp_sub.is_dir():
continue
if _is_old_path(gp_sub):
continue
gm = re.fullmatch(r"GP(\d+)", gp_sub.name, re.I)
if not gm:
continue
gp_key = f"GP{int(gm.group(1))}"
for f in safe_listdir(gp_sub):
if _is_old_path(f):
continue
if f.is_file() and f.suffix.upper() == ".MP4":
videos[auv_id][gp_key].append(f.name)
total_n += 1
try:
total_bytes += f.stat().st_size
except OSError:
pass
# Sort + dedup
out = {}
for auv in sorted(videos.keys()):
out[auv] = {}
for gp in sorted(videos[auv].keys()):
out[auv][gp] = sorted(set(videos[auv][gp]))
return out, total_n, total_bytes / 1e9
def collect_mcap_bags(ssd_path: Path) -> "tuple[dict, int]":
"""mcap_bags[AUV{nnn}] -> sorted list of bag-dir names."""
bags = defaultdict(list)
n = 0
broot = ssd_path / "logs" / "SUB" / "bag"
for sub in safe_listdir(broot):
if not sub.is_dir():
continue
if _is_old_path(sub):
continue
m = BAG_AUV_RE.search(sub.name)
if not m:
continue
auv_id = _normalize_auv_id(f"AUV{int(m.group('auv')):03d}")
bags[auv_id].append(sub.name)
n += 1
return {k: sorted(set(v)) for k, v in sorted(bags.items())}, n
def collect_bin_files(ssd_path: Path) -> "tuple[dict, int]":
"""bin_files[AUV{nnn}] -> sorted .BIN filenames."""
bins = defaultdict(list)
n = 0
# Layout: logs/SUB/AUV{xxx}/*.BIN
sub_root = ssd_path / "logs" / "SUB"
for sub in safe_listdir(sub_root):
if not sub.is_dir():
continue
if _is_old_path(sub):
continue
am = AUV_PHYS_RE.fullmatch(sub.name)
if not am:
continue
auv_id = _normalize_auv_id(f"AUV{int(am.group(1)):03d}")
for f in safe_listdir(sub):
if _is_old_path(f):
continue
if f.is_file() and f.suffix.upper() == ".BIN":
bins[auv_id].append(f.name)
n += 1
# Also try logs/SUB/bin/*.BIN (flat layout)
flat = ssd_path / "logs" / "SUB" / "bin"
for f in safe_listdir(flat):
if _is_old_path(f):
continue
if f.is_file() and f.suffix.upper() == ".BIN":
bins.setdefault("AUV000", []).append(f.name)
n += 1
return {k: sorted(set(v)) for k, v in sorted(bins.items())}, n
def collect_usbl_logs(ssd_path: Path) -> "list[str]":
"""Return list of USBL csv paths relative to ssd_path."""
out = []
logs_root = ssd_path / "logs"
if not logs_root.exists():
return out
for p in logs_root.rglob("*"):
try:
if not p.is_file():
continue
except OSError:
continue
if _is_old_path(p):
continue
name_l = p.name.lower()
if "usbl" in name_l and name_l.endswith(".csv"):
try:
out.append(str(p.relative_to(ssd_path)))
except ValueError:
out.append(str(p))
return sorted(set(out))
def collect_audio_logs(ssd_path: Path) -> "list[str]":
out = []
aud = ssd_path / "audios"
for p in aud.rglob("*") if aud.exists() else []:
try:
if not p.is_file():
continue
except (OSError, ValueError):
continue
if _is_old_path(p):
continue
try:
out.append(str(p.relative_to(ssd_path)))
except ValueError:
continue
return sorted(out)
def collect_sss_files(ssd_path: Path) -> "tuple[dict, int]":
"""klf + bin under ssd_path/sss/, recursive."""
sss_root = ssd_path / "sss"
klf = []
bin_ = []
if sss_root.exists():
try:
for p in sss_root.rglob("*.klf"):
try:
if p.is_file() and not _is_old_path(p):
klf.append(str(p.relative_to(ssd_path)))
except (OSError, ValueError):
continue
for p in sss_root.rglob("*.bin"):
try:
if p.is_file() and not _is_old_path(p):
bin_.append(str(p.relative_to(ssd_path)))
except (OSError, ValueError):
continue
except OSError:
pass
result = {"klf": sorted(set(klf)), "bin": sorted(set(bin_))}
return result, len(result["klf"]) + len(result["bin"])
def collect_mag_files(ssd_path: Path) -> "tuple[dict, int]":
"""csv under ssd_path/mag/, categorised ar/av/side/other."""
mag_root = ssd_path / "mag"
out = {"ar": [], "av": [], "side": [], "other": []}
if mag_root.exists():
try:
for p in mag_root.rglob("*.csv"):
try:
if not p.is_file():
continue
if _is_old_path(p):
continue
rel = str(p.relative_to(ssd_path))
parts = p.parts
if "ar" in parts:
out["ar"].append(rel)
elif "av" in parts:
out["av"].append(rel)
elif "side" in parts:
out["side"].append(rel)
else:
out["other"].append(rel)
except (OSError, ValueError):
continue
except OSError:
pass
result = {k: sorted(set(v)) for k, v in out.items()}
total = sum(len(v) for v in result.values())
return result, total
def build_manifest(mission: str, ssd_base: Path) -> dict:
ssd_path = ssd_base / mission / "raw_data"
if not ssd_path.exists():
raise FileNotFoundError(f"raw_data not found: {ssd_path}")
videos, n_vids, total_gb = collect_videos(ssd_path)
mcap_bags, n_bags = collect_mcap_bags(ssd_path)
bin_files, n_bins = collect_bin_files(ssd_path)
usbl_logs = collect_usbl_logs(ssd_path)
audio_logs = collect_audio_logs(ssd_path)
sss_files, n_sss = collect_sss_files(ssd_path)
mag_files, n_mag = collect_mag_files(ssd_path)
# Parse mission date for date-focus filter
mission_date = _parse_mission_date(mission)
if mission_date:
win_start, win_end = _mission_date_window(mission_date)
mission_date_str = mission_date.strftime("%Y-%m-%d")
mission_date_window_utc = {
"start": win_start.isoformat(timespec="seconds"),
"end": win_end.isoformat(timespec="seconds"),
}
print(f" mission_date: {mission_date_str} | window: {win_start.isoformat(timespec='seconds')} -> {win_end.isoformat(timespec='seconds')}")
else:
mission_date_str = None
mission_date_window_utc = None
win_start = win_end = None
manifest = {
"mission": mission,
"generated_at": iso_utc_now(),
"ssd_path": str(ssd_path),
"mission_date": mission_date_str,
"mission_date_window_utc": mission_date_window_utc,
"videos": videos,
"mcap_bags": mcap_bags,
"bin_files": bin_files,
"usbl_logs": usbl_logs,
"audio_logs": audio_logs,
"sss_files": sss_files,
"mag_files": mag_files,
"totals": {
"n_videos": n_vids,
"n_mcap_bags": n_bags,
"n_bin_files": n_bins,
"n_usbl_logs": len(usbl_logs),
"n_audio_logs": len(audio_logs),
"n_sss_files": n_sss,
"n_mag_files": n_mag,
"total_video_size_gb": round(total_gb, 2),
},
}
# --- Coverage extraction ---
coverage, auv_file_windows = build_coverage(manifest, ssd_base)
# --- Date-focus filter on coverage ---
n_filtered_old = 0 # already handled at collect time via _is_old_path
n_filtered_out_of_date = 0
if win_start and win_end:
filtered_coverage = {}
for rel, cov in coverage.items():
if cov.get("source") == "mtime_fallback":
filtered_coverage[rel] = cov
continue
try:
t_s = datetime.fromisoformat(cov["t_start"])
# Ensure tz-aware
if t_s.tzinfo is None:
t_s = t_s.replace(tzinfo=timezone.utc)
if win_start <= t_s <= win_end:
filtered_coverage[rel] = cov
else:
n_filtered_out_of_date += 1
except Exception:
filtered_coverage[rel] = cov
if n_filtered_out_of_date:
print(f" date_filter: removed {n_filtered_out_of_date} entries outside mission date window")
coverage = filtered_coverage
# Rebuild auv_file_windows from filtered coverage
kept_rels = set(coverage.keys())
auv_file_windows_filtered = defaultdict(list)
for auv_id, wins in auv_file_windows.items():
for w in wins:
if w[2] in kept_rels:
auv_file_windows_filtered[auv_id].append(w)
auv_file_windows = dict(auv_file_windows_filtered)
mission_window = compute_mission_window(coverage)
auv_windows = compute_auv_windows(auv_file_windows)
# Coverage stats
import collections
source_counts = collections.Counter(v["source"] for v in coverage.values())
n_fallback = source_counts.get("mtime_fallback", 0)
n_total_cov = len(coverage)
if n_total_cov:
print(f" coverage: {n_total_cov} files | fallback={n_fallback} "
f"({100*n_fallback//n_total_cov}%) | sources={dict(source_counts)}")
manifest["coverage"] = coverage
manifest["mission_window"] = mission_window
manifest["auv_windows"] = auv_windows
manifest["totals"]["n_filtered_old"] = n_filtered_old
manifest["totals"]["n_filtered_out_of_date"] = n_filtered_out_of_date
return manifest
def main(argv=None):
ap = argparse.ArgumentParser(description="Stage 01 -- select mission, produce raw manifest.")
ap.add_argument("--mission", required=True, help="Mission folder name (e.g. 20260505-Lepradet)")
ap.add_argument("--ssd-base", default="/mnt/ssd", help="SSD base path (default /mnt/ssd)")
ap.add_argument("--out", default="/home/cosma/cosma-qc/data",
help="Output base dir (manifest written to <out>/<mission>/01_manifest.json)")
args = ap.parse_args(argv)
ssd_base = Path(args.ssd_base)
out_base = Path(args.out)
out_dir = out_base / args.mission
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "01_manifest.json"
with track_stage("01_select_mission", args.mission, out_dir,
output_files=[out_path]):
manifest = build_manifest(args.mission, ssd_base)
tmp = out_path.with_suffix(".json.tmp")
tmp.write_text(json.dumps(manifest, indent=2, ensure_ascii=False))
tmp.replace(out_path)
t = manifest["totals"]
print(f"[ok] {out_path}")
print(f" videos={t['n_videos']} ({t['total_video_size_gb']} GB) "
f"bags={t['n_mcap_bags']} bins={t['n_bin_files']} "
f"usbl={t['n_usbl_logs']} audio={t['n_audio_logs']} "
f"sss={t['n_sss_files']} mag={t['n_mag_files']}")
print(f" filtered: old={t.get('n_filtered_old', 0)} out_of_date={t.get('n_filtered_out_of_date', 0)}")
if manifest.get("mission_window"):
mw = manifest["mission_window"]
print(f" mission_window: {mw['t_start']} -> {mw['t_end']} ({mw['duration_s']}s)")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,835 @@
#!/usr/bin/env python3
"""Stage 02 — Mission run detection (v4 state-based).
Détecte runs via transitions /mavros/state (armed + mode) + validation rel_alt.
Fallback depth-only si topic state absent.
Usage:
python3 02_mission_run_detect.py --mission 20260508-sttropez [--ssd-base /mnt/ssd] [--out data/] [--dry-run]
python3 02_mission_run_detect.py --mission 20260505-Lepradet --out data/ \
--state-modes ALT_HOLD --require-armed --require-descent \
--min-mission-depth -2.0 --min-sustained-duration 30 --min-near-bottom-pct 50 --force
"""
import argparse
import glob
import json
import re
import statistics
import sys
from datetime import datetime, timezone
from pathlib import Path
from mcap.reader import make_reader
from mcap_ros2.decoder import DecoderFactory
# Local helper for stage time/memory tracking
sys.path.insert(0, str(Path(__file__).parent))
from _meta import track_stage # noqa: E402
# Mapping ID physique → ID logistique
AUV_PHYSICAL_MAP = {
"AUV010": "AUV210",
"AUV012": "AUV212",
"AUV013": "AUV213",
}
TOPIC_REL_ALT = "/mavros/global_position/rel_alt"
TOPIC_STATE = "/mavros/state"
THRESHOLD = -0.3 # rel_alt < THRESHOLD → immersé
NEED_STREAK = 30 # secondes consécutives pour confirmer début/fin
MIN_DURATION = 60 # secondes minimum par run
SMOOTH_WINDOW_S = 3 # fenêtre lissage médian (secondes à 1Hz = samples)
# Default modes considérés comme "mission active"
DEFAULT_STATE_MODES = {"AUTO", "GUIDED"}
# Modes qui TERMINENT un run (si require-armed=False)
STATE_STOP_MODES = {"SURFACE", "MANUAL"}
# Filtrage "vraie mission" (modifiables via argparse)
DEFAULT_MIN_MISSION_DEPTH = -3.0 # rel_alt doit atteindre ce seuil (m) — strict v5 2026-05-14
DEFAULT_MIN_SUSTAINED_DURATION = 60.0 # pendant au moins N secondes consécutives — strict v5
DEFAULT_MIN_NEAR_BOTTOM_PCT = 80.0 # % min du run où rel_alt < min_mission_depth — strict v5
DEFAULT_MIN_DISPLACEMENT_M = 5.0 # futur stage06+: déplacement min USBL — documenté seulement v5
# Filtrage "avant première plongée"
DEFAULT_FIRST_SUBMERSION_DEPTH = -2.0 # seuil rel_alt pour considérer submergé
DEFAULT_FIRST_SUBMERSION_DURATION = 5.0 # durée min continue (s) pour confirmer submersion
# ---------------------------------------------------------------------------
# Lecture MCAP rel_alt
# ---------------------------------------------------------------------------
def parse_mcap_relalt(mcap_path: Path):
"""Lit rel_alt depuis un fichier mcap. Retourne liste de (epoch_s, rel_alt_m)."""
data = []
try:
with open(mcap_path, "rb") as fp:
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(topics=[TOPIC_REL_ALT]):
ts = message.log_time / 1e9
data.append((ts, float(ros_msg.data)))
except Exception as exc:
print(f" [WARN] skip {mcap_path.name}: {exc}", file=sys.stderr)
return data
# ---------------------------------------------------------------------------
# Lecture MCAP /mavros/state
# ---------------------------------------------------------------------------
def read_mavros_state(mcap_path: Path, t_start: float = None, t_end: float = None):
"""
Lit /mavros/state depuis un fichier mcap.
Retourne liste de (epoch_s, armed:bool, mode:str).
Filtrage optionnel sur [t_start, t_end].
"""
data = []
try:
with open(mcap_path, "rb") as fp:
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(topics=[TOPIC_STATE]):
ts = message.log_time / 1e9
if t_start is not None and ts < t_start:
continue
if t_end is not None and ts > t_end:
continue
data.append((ts, bool(ros_msg.armed), str(ros_msg.mode)))
except Exception as exc:
# Silent — bag could be corrupt
pass
return data
def parse_mcap_both(mcap_path: Path):
"""Lit rel_alt ET state depuis un seul fichier mcap en un seul pass."""
alt_data = []
state_data = []
try:
with open(mcap_path, "rb") as fp:
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(
topics=[TOPIC_REL_ALT, TOPIC_STATE]):
ts = message.log_time / 1e9
if _channel.topic == TOPIC_REL_ALT:
alt_data.append((ts, float(ros_msg.data)))
elif _channel.topic == TOPIC_STATE:
state_data.append((ts, bool(ros_msg.armed), str(ros_msg.mode)))
except Exception as exc:
print(f" [WARN] skip {mcap_path.name}: {exc}", file=sys.stderr)
return alt_data, state_data
def extract_auv_id(folder_name: str):
"""'20260508_054551_AUV010''AUV010'."""
m = re.search(r"(AUV\d+)$", folder_name)
return m.group(1) if m else None
# ---------------------------------------------------------------------------
# Signal processing
# ---------------------------------------------------------------------------
def resample_1hz(data):
"""Resample (epoch, val) → 1Hz par interpolation linéaire. Retourne (times[], vals[])."""
if not data:
return [], []
data = sorted(data)
t0 = int(data[0][0])
t1 = int(data[-1][0])
times = list(range(t0, t1 + 1))
vals = []
idx = 0
for t in times:
while idx < len(data) - 1 and data[idx + 1][0] < t:
idx += 1
if t <= data[0][0]:
v = data[0][1]
elif t >= data[-1][0]:
v = data[-1][1]
else:
t0_, v0 = data[idx]
t1_, v1 = data[idx + 1]
dt = t1_ - t0_
v = v0 + ((t - t0_) / dt) * (v1 - v0) if dt > 0 else v0
vals.append(v)
return times, vals
def median_filter(vals, window=3):
half = window // 2
out = []
n = len(vals)
for i in range(n):
lo = max(0, i - half)
hi = min(n, i + half + 1)
w = sorted(vals[lo:hi])
out.append(w[len(w) // 2])
return out
def find_first_submersion_epoch(rel_alt_times, rel_alt_vals, threshold=-2.0, min_duration=5.0):
"""
Trouve la PREMIÈRE fois où rel_alt < threshold pendant >= min_duration secondes en continu.
Retourne epoch du début de cette submersion, ou None si jamais submergé.
Travaille sur données 1Hz lissées (times/vals déjà resampleés).
"""
n = len(rel_alt_times)
if n == 0:
return None
streak_start = None
streak_count = 0
for i in range(n):
if rel_alt_vals[i] < threshold:
if streak_count == 0:
streak_start = rel_alt_times[i]
streak_count += 1
if streak_count >= min_duration:
return float(streak_start)
else:
streak_count = 0
streak_start = None
return None
def detect_runs_depth_only(times, vals, threshold=THRESHOLD, need_streak=NEED_STREAK, min_duration=MIN_DURATION):
"""
Détecte les runs d'immersion via rel_alt seul (fallback).
Algorithme:
- need_streak samples consécutifs < threshold pour confirmer entrée/sortie
- min_duration secondes minimum par run
Returns liste de (start_epoch, end_epoch).
"""
n = len(times)
if n == 0:
return []
under = [v < threshold for v in vals]
runs = []
in_run = False
run_start = None
streak_under = 0
streak_surface = 0
i = 0
while i < n:
if not in_run:
if under[i]:
streak_under += 1
if streak_under >= need_streak:
run_start = times[i - need_streak + 1]
in_run = True
streak_surface = 0
else:
streak_under = 0
else:
if under[i]:
streak_surface = 0
else:
streak_surface += 1
if streak_surface >= need_streak:
run_end = times[i - need_streak + 1]
dur = run_end - run_start
if dur >= min_duration:
runs.append((run_start, run_end))
in_run = False
run_start = None
streak_under = 0
streak_surface = 0
i += 1
if in_run and run_start is not None:
run_end = times[-1]
dur = run_end - run_start
if dur >= min_duration:
runs.append((run_start, run_end))
return runs
# ---------------------------------------------------------------------------
# State-based run detection
# ---------------------------------------------------------------------------
def detect_runs_state_based(state_data, times_alt, vals_smooth, mission_modes, require_armed,
require_descent, min_duration, min_depth_m=-0.3):
"""
Détecte runs via /mavros/state:
- Intervalles continus armed=True (si require_armed) AND mode in mission_modes
- Valide via transition rel_alt surface → fond (si require_descent)
- min_duration filtre courtes fenêtres
Returns:
list of (start_epoch, end_epoch, dominant_mode)
"""
if not state_data:
return []
state_data_sorted = sorted(state_data)
# Construire séquence d'états
# Identifier intervalles "mission active"
intervals = []
cur_start = None
cur_mode_counts = {}
for ts, armed, mode in state_data_sorted:
active = (not require_armed or armed) and (mode in mission_modes)
if active and cur_start is None:
cur_start = ts
cur_mode_counts = {mode: 1}
elif active and cur_start is not None:
cur_mode_counts[mode] = cur_mode_counts.get(mode, 0) + 1
elif not active and cur_start is not None:
# Fin de l'intervalle
intervals.append((cur_start, ts, cur_mode_counts))
cur_start = None
cur_mode_counts = {}
# Flush dernier intervalle
if cur_start is not None:
intervals.append((cur_start, state_data_sorted[-1][0], cur_mode_counts))
# Filtrer par durée min
runs = []
for start_t, end_t, mode_counts in intervals:
dur = end_t - start_t
if dur < min_duration:
continue
dominant_mode = max(mode_counts, key=mode_counts.get) if mode_counts else "UNKNOWN"
if require_descent:
# Vérifier transition surface → fond dans la fenêtre
window_alts = [(t, v) for t, v in zip(times_alt, vals_smooth)
if start_t <= t <= end_t]
if not window_alts:
# Pas de données alt dans cette fenêtre — skip
continue
alt_vals = [v for _, v in window_alts]
# Début = surface (rel_alt > -1m dans les 60 premiers samples)
first_segment = alt_vals[:60]
last_segment = alt_vals[-60:]
starts_near_surface = any(v > -1.0 for v in first_segment)
reaches_depth = any(v <= min_depth_m for v in alt_vals)
if not (starts_near_surface and reaches_depth):
continue
runs.append((start_t, end_t, dominant_mode))
return runs
# ---------------------------------------------------------------------------
# Validation "vraie mission" — filtre oscillations surface
# ---------------------------------------------------------------------------
def compute_sustained_depth(times, vals_smooth, start_e, end_e, min_depth_m):
"""
Pour un run [start_e, end_e], calcule le plus long segment continu
où rel_alt <= min_depth_m (ex: -2.0m).
"""
run_pairs = [(t, v) for t, v in zip(times, vals_smooth) if start_e <= t <= end_e]
if not run_pairs:
return 0.0, 0.0
best_dur = 0
best_min = 0.0
cur_dur = 0
cur_min = 0.0
for _t, v in run_pairs:
if v <= min_depth_m:
cur_dur += 1
cur_min = min(cur_min, v) if cur_dur > 1 else v
else:
if cur_dur > best_dur:
best_dur = cur_dur
best_min = cur_min
cur_dur = 0
cur_min = 0.0
if cur_dur > best_dur:
best_dur = cur_dur
best_min = cur_min
return round(best_min, 2), float(best_dur)
def compute_near_bottom_pct(times, vals_smooth, start_e, end_e, min_depth_m):
"""Calcule le % de samples dans [start_e, end_e] où rel_alt < min_depth_m."""
run_vals = [v for t, v in zip(times, vals_smooth) if start_e <= t <= end_e]
if not run_vals:
return 0.0
n_below = sum(1 for v in run_vals if v < min_depth_m)
return round(100.0 * n_below / len(run_vals), 1)
# ---------------------------------------------------------------------------
# Processing AUV
# ---------------------------------------------------------------------------
def process_auv(auv_mcap_id: str, bag_dirs: list, ssd_base: Path,
min_mission_depth: float, min_sustained_duration: float,
min_near_bottom_pct: float = 0.0,
state_modes: set = None, require_armed: bool = True,
require_descent: bool = True,
first_submersion_depth: float = DEFAULT_FIRST_SUBMERSION_DEPTH,
first_submersion_duration: float = DEFAULT_FIRST_SUBMERSION_DURATION):
"""
Agréger tous les bags d'un AUV, détecter runs via state (avec fallback depth-only).
Filtre les runs entiers avant la première vraie submersion.
"""
physical_id = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id)
if state_modes is None:
state_modes = DEFAULT_STATE_MODES
all_alt_data = []
all_state_data = []
bags_list = []
n_parsed = 0
n_skipped = 0
# Parcourir bags dans l'ordre chronologique
for bag_dir in sorted(bag_dirs):
bags_list.append(str(bag_dir.name))
mcap_files = sorted(bag_dir.glob("*.mcap"))
for mcap_path in mcap_files:
alt_pts, state_pts = parse_mcap_both(mcap_path)
if alt_pts or state_pts:
all_alt_data.extend(alt_pts)
all_state_data.extend(state_pts)
n_parsed += 1
else:
n_skipped += 1
if (n_parsed + n_skipped) % 50 == 0:
print(f" [{auv_mcap_id}] ... {n_parsed + n_skipped} bags processed", file=sys.stderr)
has_state = len(all_state_data) > 0
print(f" [{auv_mcap_id}] {n_parsed} mcap OK, {n_skipped} skip, "
f"{len(all_alt_data)} rel_alt pts, {len(all_state_data)} state pts", file=sys.stderr)
if not all_alt_data:
print(f" [{auv_mcap_id}] [WARN] aucune donnée rel_alt", file=sys.stderr)
return {
"mcap_id": auv_mcap_id,
"physical_id": physical_id,
"bags": bags_list,
"total_immersion_s": 0,
"runs": [],
"runs_rejected": [],
"detection_method": "no_data",
"first_submersion_epoch": None,
"pre_water_rejected_count": 0,
}
# Dédup + tri rel_alt
seen = set()
deduped_alt = []
for t, v in sorted(all_alt_data):
key = round(t, 3)
if key not in seen:
seen.add(key)
deduped_alt.append((t, v))
# Resample 1Hz
times, vals = resample_1hz(deduped_alt)
print(f" [{auv_mcap_id}] {len(times)} samples 1Hz [{times[0]:.0f}..{times[-1]:.0f}]", file=sys.stderr)
# Lissage médian 3s
vals_smooth = median_filter(vals, window=SMOOTH_WINDOW_S)
# Trouver première submersion réelle
first_sub_epoch = find_first_submersion_epoch(
times, vals_smooth,
threshold=first_submersion_depth,
min_duration=first_submersion_duration,
)
if first_sub_epoch is not None:
print(f" [{auv_mcap_id}] Première submersion réelle: {first_sub_epoch:.0f} "
f"({datetime.fromtimestamp(first_sub_epoch, tz=timezone.utc).isoformat()})",
file=sys.stderr)
else:
print(f" [{auv_mcap_id}] [WARN] Aucune submersion réelle détectée "
f"(threshold={first_submersion_depth}m, min_dur={first_submersion_duration}s)",
file=sys.stderr)
# Détection runs
detection_method = "depth_only"
if has_state:
# Trier state data
all_state_data.sort(key=lambda x: x[0])
# Extraire modes présents
modes_in_data = set(m for _, _, m in all_state_data)
modes_active = modes_in_data & state_modes
print(f" [{auv_mcap_id}] state modes found: {modes_in_data}", file=sys.stderr)
print(f" [{auv_mcap_id}] state modes matching --state-modes: {modes_active}", file=sys.stderr)
if modes_active:
raw_runs_with_mode = detect_runs_state_based(
all_state_data, times, vals_smooth,
mission_modes=state_modes,
require_armed=require_armed,
require_descent=require_descent,
min_duration=MIN_DURATION,
min_depth_m=min_mission_depth,
)
raw_runs = [(s, e) for s, e, _m in raw_runs_with_mode]
run_modes = {(s, e): m for s, e, m in raw_runs_with_mode}
detection_method = "state_based"
print(f" [{auv_mcap_id}] {len(raw_runs)} candidats state-based", file=sys.stderr)
else:
print(f" [{auv_mcap_id}] [WARN] aucun mode {state_modes} dans state data "
f"→ fallback depth-only", file=sys.stderr)
raw_runs = detect_runs_depth_only(times, vals_smooth)
run_modes = {}
detection_method = "depth_only_fallback_no_modes"
else:
print(f" [{auv_mcap_id}] [WARN] topic /mavros/state vide → fallback depth-only", file=sys.stderr)
raw_runs = detect_runs_depth_only(times, vals_smooth)
run_modes = {}
print(f" [{auv_mcap_id}] {len(raw_runs)} runs candidats ({detection_method})", file=sys.stderr)
# Filtrage pre-water: exclure/tronquer runs avant première submersion
pre_water_rejected_count = 0
if first_sub_epoch is not None:
filtered_runs = []
for start_e, end_e in raw_runs:
if end_e <= first_sub_epoch:
# Run entier avant première plongée → rejeter
pre_water_rejected_count += 1
phys_id_log = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id)
print(f" [{auv_mcap_id}] PRE-WATER REJECT run [{start_e:.0f}..{end_e:.0f}] "
f"(end before first_sub={first_sub_epoch:.0f})", file=sys.stderr)
elif start_e < first_sub_epoch < end_e:
# Run chevauche → tronquer start
new_start = first_sub_epoch
print(f" [{auv_mcap_id}] PRE-WATER TRUNCATE run [{start_e:.0f}..{end_e:.0f}] "
f"→ [{new_start:.0f}..{end_e:.0f}]", file=sys.stderr)
# Mettre à jour run_modes si besoin
if (start_e, end_e) in run_modes:
run_modes[(new_start, end_e)] = run_modes.pop((start_e, end_e))
filtered_runs.append((new_start, end_e))
else:
filtered_runs.append((start_e, end_e))
raw_runs = filtered_runs
print(f" [{auv_mcap_id}] {pre_water_rejected_count} runs rejetés pre-water, "
f"{len(raw_runs)} restants", file=sys.stderr)
# Construire runs enrichis + filtre "vraie mission"
runs_out = []
runs_rejected = []
for idx, (start_e, end_e) in enumerate(raw_runs):
dur = end_e - start_e
run_id = f"{physical_id}_run_{idx:02d}"
dominant_mode = run_modes.get((start_e, end_e), "UNKNOWN")
run_vals = [v for t, v in zip(times, vals_smooth) if start_e <= t <= end_e]
max_depth = round(min(run_vals), 2) if run_vals else 0.0
mean_depth = round(statistics.mean(run_vals), 2) if run_vals else 0.0
sustained_below_m, sustained_duration_s = compute_sustained_depth(
times, vals_smooth, start_e, end_e, min_mission_depth
)
pct_near_bottom = compute_near_bottom_pct(
times, vals_smooth, start_e, end_e, min_mission_depth
)
run_entry = {
"run_id": run_id,
"start_epoch": float(start_e),
"end_epoch": float(end_e),
"duration_s": round(dur, 1),
"max_depth_m": max_depth,
"mean_depth_m": mean_depth,
"sustained_below_m": sustained_below_m,
"sustained_duration_s": sustained_duration_s,
"pct_near_bottom": pct_near_bottom,
"dominant_mode": dominant_mode,
"detection_method": detection_method,
}
reject_reason = None
if sustained_duration_s < min_sustained_duration:
reject_reason = (
f"sustained_depth only {sustained_duration_s:.0f}s "
f"(need {min_sustained_duration:.0f}s below {min_mission_depth}m)"
)
elif min_near_bottom_pct > 0.0 and pct_near_bottom < min_near_bottom_pct:
reject_reason = (
f"not_enough_immersion: only {pct_near_bottom:.1f}% "
f"time below {min_mission_depth}m (need {min_near_bottom_pct:.1f}%)"
)
if reject_reason is None:
runs_out.append(run_entry)
else:
run_entry["rejected_reason"] = reject_reason
runs_rejected.append(run_entry)
print(
f" [{auv_mcap_id}] REJECT {run_id}: max_depth={max_depth}m "
f"sustained={sustained_duration_s:.0f}s pct={pct_near_bottom:.1f}% "
f"reason={reject_reason[:60]}",
file=sys.stderr,
)
print(
f" [{auv_mcap_id}] {len(runs_out)} runs OK, {len(runs_rejected)} rejetés ({detection_method})",
file=sys.stderr,
)
total_immersion_s = round(sum(r["duration_s"] for r in runs_out), 1)
return {
"mcap_id": auv_mcap_id,
"physical_id": physical_id,
"bags": bags_list,
"total_immersion_s": total_immersion_s,
"runs": runs_out,
"runs_rejected": runs_rejected,
"detection_method": detection_method,
"first_submersion_epoch": first_sub_epoch,
"pre_water_rejected_count": pre_water_rejected_count,
}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def _run(args):
bags_root = Path(args.ssd_base) / args.mission / "raw_data" / "logs" / "SUB" / "bag"
if not bags_root.exists():
print(f"[ERROR] bags dir not found: {bags_root}", file=sys.stderr)
sys.exit(1)
state_modes = set(m.strip() for m in args.state_modes.split(",") if m.strip())
print(f"[stage02] Mission: {args.mission}")
print(f"[stage02] Bags root: {bags_root}", file=sys.stderr)
print(f"[stage02] State modes: {state_modes}, require_armed={args.require_armed}, "
f"require_descent={args.require_descent}", file=sys.stderr)
near_bottom_str = (f" + near_bottom >= {args.min_near_bottom_pct:.0f}%"
if args.min_near_bottom_pct > 0 else "")
print(
f"[stage02] Filtre mission: sustained >= {args.min_sustained_duration}s "
f"below {args.min_mission_depth}m{near_bottom_str}",
file=sys.stderr,
)
print(
f"[stage02] Filtre pre-water: first_submersion threshold={args.first_submersion_depth}m "
f"min_duration={args.first_submersion_duration}s",
file=sys.stderr,
)
# Grouper dossiers bag par AUV ID
auv_dirs = {}
for d in sorted(bags_root.iterdir()):
if not d.is_dir():
continue
auv_id = extract_auv_id(d.name)
if auv_id:
auv_dirs.setdefault(auv_id, []).append(d)
print(f"[stage02] AUVs: {sorted(auv_dirs.keys())}")
# Traiter chaque AUV
auvs_out = {}
all_runs_flat = []
all_rejected_flat = []
for auv_mcap_id in sorted(auv_dirs.keys()):
physical_id = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id)
print(f"\n[stage02] Processing {auv_mcap_id} -> {physical_id} ({len(auv_dirs[auv_mcap_id])} bag dirs)...")
result = process_auv(
auv_mcap_id,
auv_dirs[auv_mcap_id],
Path(args.ssd_base),
min_mission_depth=args.min_mission_depth,
min_sustained_duration=args.min_sustained_duration,
min_near_bottom_pct=args.min_near_bottom_pct,
state_modes=state_modes,
require_armed=args.require_armed,
require_descent=args.require_descent,
first_submersion_depth=args.first_submersion_depth,
first_submersion_duration=args.first_submersion_duration,
)
auvs_out[physical_id] = result
all_runs_flat.extend(result["runs"])
all_rejected_flat.extend(result.get("runs_rejected", []))
# Trier tous les runs par start_epoch
all_runs_sorted = sorted(all_runs_flat, key=lambda r: r["start_epoch"])
all_rejected_sorted = sorted(all_rejected_flat, key=lambda r: r["start_epoch"])
# Construire output JSON
output = {
"mission": args.mission,
"generated_at": datetime.now(timezone.utc).isoformat(),
"filter_params": {
"min_mission_depth_m": args.min_mission_depth,
"min_sustained_duration_s": args.min_sustained_duration,
"min_near_bottom_pct": args.min_near_bottom_pct,
"state_modes": sorted(state_modes),
"require_armed": args.require_armed,
"require_descent": args.require_descent,
"first_submersion_depth_m": args.first_submersion_depth,
"first_submersion_duration_s": args.first_submersion_duration,
},
"auvs": auvs_out,
"all_runs_sorted": all_runs_sorted,
"all_runs_rejected": all_rejected_sorted,
}
# Summary
total_runs = len(all_runs_sorted)
total_rejected = len(all_rejected_sorted)
all_depths = [r["max_depth_m"] for r in all_runs_sorted]
global_max_depth = min(all_depths) if all_depths else 0.0
total_immersion = sum(r["duration_s"] for r in all_runs_sorted)
total_pre_water = sum(v.get("pre_water_rejected_count", 0) for v in auvs_out.values())
print(f"\n{'='*60}")
print(f"[stage02] SUMMARY — {args.mission}")
print(f"{'='*60}")
for phys_id, auv in auvs_out.items():
deep = [r for r in auv["runs"] if r["max_depth_m"] < -10.0]
rej = len(auv.get("runs_rejected", []))
method = auv.get("detection_method", "?")
fsub = auv.get("first_submersion_epoch")
fsub_str = (datetime.fromtimestamp(fsub, tz=timezone.utc).isoformat()
if fsub else "None")
pre_w = auv.get("pre_water_rejected_count", 0)
print(f" {phys_id}: {auv['total_immersion_s']}s immersion, "
f"{len(auv['runs'])} runs OK, {rej} rejetés, runs>10m: {len(deep)} [{method}]")
print(f" first_submersion: {fsub_str} | pre_water_rejected: {pre_w}")
print(f" Total runs OK: {total_runs}")
print(f" Total runs rejetés: {total_rejected}")
print(f" Total pre-water rejetés: {total_pre_water}")
print(f" Global max depth: {global_max_depth:.1f}m")
print(f" Total immersion: {total_immersion:.0f}s")
if total_runs == 0:
print("[WARN] Aucun run validé!")
else:
print(f"[QC OK] {total_runs} runs validés")
for r in all_runs_sorted:
print(f" {r['run_id']}: {r['duration_s']:.0f}s depth={r['max_depth_m']}m "
f"pct={r['pct_near_bottom']:.1f}% mode={r.get('dominant_mode','?')} "
f"method={r.get('detection_method','?')}")
if args.dry_run:
print(f"\n[dry-run] JSON non écrit.")
return
# Écrire JSON
out_dir = Path(args.out) / args.mission
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "02_runs.json"
with open(out_path, "w") as f:
json.dump(output, f, indent=2)
print(f"[stage02] Written: {out_path}")
def main():
parser = argparse.ArgumentParser(description="Stage 02 — Detect mission underwater runs (v4 state-based)")
parser.add_argument("--mission", required=True, help="Mission folder, e.g. 20260508-sttropez")
parser.add_argument("--ssd-base", default="/mnt/ssd", help="SSD root path (READ-ONLY)")
parser.add_argument("--out", default="data/", help="Output base dir")
parser.add_argument("--dry-run", action="store_true", help="Print stats sans écrire fichier")
parser.add_argument(
"--min-mission-depth",
type=float,
default=DEFAULT_MIN_MISSION_DEPTH,
help=f"Profondeur seuil (m, négatif) pour valider un run (default: {DEFAULT_MIN_MISSION_DEPTH})",
)
parser.add_argument(
"--min-sustained-duration",
type=float,
default=DEFAULT_MIN_SUSTAINED_DURATION,
help=f"Durée min (s) consécutive sous min-mission-depth (default: {DEFAULT_MIN_SUSTAINED_DURATION})",
)
parser.add_argument(
"--min-near-bottom-pct",
type=float,
default=DEFAULT_MIN_NEAR_BOTTOM_PCT,
help=f"% min du run où rel_alt < min-mission-depth (0=désactivé, default: {DEFAULT_MIN_NEAR_BOTTOM_PCT})",
)
parser.add_argument(
"--state-modes",
type=str,
default=",".join(sorted(DEFAULT_STATE_MODES)),
help="CSV modes ArduSub considérés mission (ex: AUTO,GUIDED ou ALT_HOLD). "
f"Default: {','.join(sorted(DEFAULT_STATE_MODES))}",
)
parser.add_argument(
"--require-armed",
action="store_true",
default=True,
help="Exiger armed=True (défaut: True)",
)
parser.add_argument(
"--no-require-armed",
action="store_false",
dest="require_armed",
help="Ne pas exiger armed=True",
)
parser.add_argument(
"--require-descent",
action="store_true",
default=True,
help="Exiger transition surface→fond dans fenêtre (défaut: True)",
)
parser.add_argument(
"--no-require-descent",
action="store_false",
dest="require_descent",
help="Ne pas exiger transition surface→fond",
)
parser.add_argument(
"--first-submersion-depth",
type=float,
default=DEFAULT_FIRST_SUBMERSION_DEPTH,
help=f"Seuil rel_alt (m) pour détecter première submersion réelle "
f"(default: {DEFAULT_FIRST_SUBMERSION_DEPTH})",
)
parser.add_argument(
"--first-submersion-duration",
type=float,
default=DEFAULT_FIRST_SUBMERSION_DURATION,
help=f"Durée min continue (s) sous first-submersion-depth pour confirmer mise à l'eau "
f"(default: {DEFAULT_FIRST_SUBMERSION_DURATION})",
)
parser.add_argument(
"--force",
action="store_true",
help="Forcer recalcul même si fichier existe",
)
args = parser.parse_args()
if args.dry_run:
_run(args)
return
out_dir = Path(args.out) / args.mission
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "02_runs.json"
with track_stage("02_mission_run_detect", args.mission, out_dir,
output_files=[out_path]):
_run(args)
if __name__ == "__main__":
main()

352
pipeline/stages/02b_runs_diag.py Executable file
View File

@@ -0,0 +1,352 @@
#!/usr/bin/env python3
"""Stage 02b — Diagnostic plots per run candidate.
Reads 02_runs.json + replays MCAP rel_alt + /mavros/state to produce
4-panel PNG per run (validated + rejected).
Output:
data/<MISSION>/02_runs_diag/<RUN_ID>.png
data/<MISSION>/02_runs_diag/index.json
data/<MISSION>/02_runs_diag/index.html
Usage:
python3 02b_runs_diag.py --mission 20260505-Lepradet \
[--ssd-base /mnt/ssd] [--out data/] [--padding-s 120]
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from mcap.reader import make_reader
from mcap_ros2.decoder import DecoderFactory
TOPIC_REL_ALT = "/mavros/global_position/rel_alt"
TOPIC_STATE = "/mavros/state"
TOPIC_HEADING = "/mavros/global_position/compass_hdg"
AUV_PHYSICAL_MAP = {"AUV010": "AUV210", "AUV012": "AUV212", "AUV013": "AUV213"}
PHYS_TO_MCAP = {v: k for k, v in AUV_PHYSICAL_MAP.items()}
def extract_auv_id(folder_name):
m = re.search(r"(AUV\d+)$", folder_name)
return m.group(1) if m else None
def gather_signals_for_auv(bags_root, auv_mcap_id, t_start, t_end):
"""Read rel_alt + state + heading from all bags for this AUV in [t_start,t_end]."""
alt = [] # (t, v)
state = [] # (t, armed, mode)
hdg = [] # (t, v_deg)
for d in sorted(bags_root.iterdir()):
if not d.is_dir():
continue
if extract_auv_id(d.name) != auv_mcap_id:
continue
for mcap_path in sorted(d.glob("*.mcap")):
try:
with open(mcap_path, "rb") as fp:
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
for _schema, channel, message, ros_msg in reader.iter_decoded_messages(
topics=[TOPIC_REL_ALT, TOPIC_STATE, TOPIC_HEADING]):
ts = message.log_time / 1e9
if ts < t_start or ts > t_end:
continue
if channel.topic == TOPIC_REL_ALT:
alt.append((ts, float(ros_msg.data)))
elif channel.topic == TOPIC_STATE:
state.append((ts, bool(ros_msg.armed), str(ros_msg.mode)))
elif channel.topic == TOPIC_HEADING:
hdg.append((ts, float(ros_msg.data)))
except Exception as exc:
print(f" [WARN] skip {mcap_path.name}: {exc}", file=sys.stderr)
alt.sort()
state.sort()
hdg.sort()
return alt, state, hdg
def plot_run_diag(run, alt, state, hdg, filter_params, status, reject_reason, out_path):
"""4-panel diagnostic plot."""
start_e = run["start_epoch"]
end_e = run["end_epoch"]
dur = end_e - start_e
run_id = run["run_id"]
# Convert epoch → relative seconds from start_e for x-axis readability
def to_rel(t):
return t - start_e
fig, axes = plt.subplots(4, 1, figsize=(12, 11), gridspec_kw={"height_ratios": [3, 1, 2, 2]})
fig.suptitle(
f"{run_id}{status} | start={datetime.fromtimestamp(start_e, tz=timezone.utc).strftime('%H:%M:%S UTC')} duration={dur:.0f}s",
fontsize=12, fontweight="bold",
color=("#16a34a" if status == "OK" else "#dc2626"),
)
# === Panel A: rel_alt time series ===
ax = axes[0]
if alt:
t_a = [to_rel(t) for t, _ in alt]
v_a = [v for _, v in alt]
ax.plot(t_a, v_a, color="#1e40af", lw=1.0, label="rel_alt")
threshold = filter_params["min_mission_depth_m"]
ax.axhline(threshold, color="#dc2626", ls="--", lw=1.0,
label=f"threshold {threshold}m")
ax.axhline(0, color="#94a3b8", ls=":", lw=0.8, label="surface")
# Mark run window
ax.axvspan(0, dur, alpha=0.10, color="#16a34a" if status == "OK" else "#dc2626")
ax.set_ylabel("rel_alt (m)")
ax.set_title(
f"(a) Depth — max={run['max_depth_m']}m mean={run['mean_depth_m']}m "
f"sustained={run['sustained_duration_s']:.0f}s below {threshold}m "
f"pct_near_bottom={run['pct_near_bottom']:.1f}%"
)
ax.legend(loc="lower right", fontsize=8)
ax.grid(True, alpha=0.3)
ax.set_xlim(-30, dur + 30)
# === Panel B: state armed + mode ===
ax = axes[1]
if state:
t_s = [to_rel(t) for t, _, _ in state]
armed_y = [1 if a else 0 for _, a, _ in state]
ax.step(t_s, armed_y, where="post", color="#16a34a", lw=1.2, label="armed")
# Annotate dominant mode + mode transitions
prev_mode = None
for t, _, m in state:
if m != prev_mode:
ax.axvline(to_rel(t), color="#7c3aed", lw=0.5, alpha=0.4)
ax.text(to_rel(t), 0.5, m, rotation=90, fontsize=7,
color="#7c3aed", va="center", alpha=0.7)
prev_mode = m
ax.set_ylim(-0.2, 1.3)
ax.set_yticks([0, 1])
ax.set_yticklabels(["disarmed", "armed"])
ax.set_title(f"(b) MAVROS state — dominant mode: {run.get('dominant_mode','?')}")
ax.grid(True, alpha=0.3)
ax.set_xlim(-30, dur + 30)
# === Panel C: depth distribution + cumulative time below threshold ===
ax = axes[2]
# filter alt to run window
run_vals = [v for t, v in alt if start_e <= t <= end_e]
if run_vals:
ax.hist(run_vals, bins=40, color="#3b82f6", alpha=0.7, edgecolor="white")
ax.axvline(threshold, color="#dc2626", ls="--", lw=1.0,
label=f"threshold {threshold}m")
n_total = len(run_vals)
n_below = sum(1 for v in run_vals if v < threshold)
ax.set_title(
f"(c) Depth distribution within run | {n_below}/{n_total} samples below threshold "
f"= {100*n_below/n_total:.1f}%"
)
ax.legend(loc="upper right", fontsize=8)
ax.set_xlabel("rel_alt (m)")
ax.set_ylabel("samples")
ax.grid(True, alpha=0.3)
# === Panel D: verdict box ===
ax = axes[3]
ax.axis("off")
color = "#16a34a" if status == "OK" else "#dc2626"
verdict_text = f"VERDICT: {status}"
ax.text(0.02, 0.85, verdict_text, fontsize=18, fontweight="bold",
color=color, transform=ax.transAxes)
# Build criteria summary
crit_lines = []
min_dur = filter_params["min_sustained_duration_s"]
min_pct = filter_params["min_near_bottom_pct"]
min_depth = filter_params["min_mission_depth_m"]
pass_dur = run["sustained_duration_s"] >= min_dur
pass_pct = run["pct_near_bottom"] >= min_pct
pass_depth = run["max_depth_m"] < min_depth
crit_lines.append(
f"{'OK' if pass_depth else 'KO':3s} max_depth {run['max_depth_m']}m < {min_depth}m"
)
crit_lines.append(
f"{'OK' if pass_dur else 'KO':3s} sustained_duration {run['sustained_duration_s']:.0f}s >= {min_dur:.0f}s"
)
crit_lines.append(
f"{'OK' if pass_pct else 'KO':3s} pct_near_bottom {run['pct_near_bottom']:.1f}% >= {min_pct:.0f}%"
)
crit_lines.append(
f" duration {run['duration_s']:.0f}s | mode {run.get('dominant_mode','?')} | method {run.get('detection_method','?')}"
)
for i, line in enumerate(crit_lines):
ax.text(0.02, 0.65 - i * 0.13, line, fontsize=10, family="monospace",
transform=ax.transAxes)
if reject_reason:
ax.text(0.02, 0.05, f"REASON: {reject_reason}", fontsize=9,
color="#dc2626", transform=ax.transAxes, family="monospace")
axes[-2].set_xlabel("time since run start (s)")
plt.tight_layout()
plt.savefig(out_path, dpi=90, bbox_inches="tight")
plt.close(fig)
HTML_TEMPLATE = """<!doctype html>
<html lang="fr">
<head>
<meta charset="utf-8">
<title>02 Runs Diagnostic — {mission}</title>
<style>
body {{ font-family: -apple-system, system-ui, sans-serif; background: #0f172a; color: #e2e8f0; margin: 0; padding: 20px; }}
h1 {{ color: #38bdf8; margin: 0 0 4px 0; }}
.subtitle {{ color: #94a3b8; margin-bottom: 20px; font-size: 14px; }}
.params {{ background: #1e293b; padding: 10px 14px; border-radius: 6px; margin-bottom: 24px; font-family: monospace; font-size: 12px; color: #cbd5e1; }}
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(560px, 1fr)); gap: 20px; }}
.card {{ background: #1e293b; border-radius: 8px; padding: 12px; border-left: 6px solid #475569; }}
.card.ok {{ border-left-color: #16a34a; }}
.card.ko {{ border-left-color: #dc2626; }}
.card h3 {{ margin: 0 0 6px 0; font-size: 16px; }}
.card h3.ok {{ color: #4ade80; }}
.card h3.ko {{ color: #f87171; }}
.card img {{ width: 100%; height: auto; border-radius: 4px; }}
.meta {{ color: #94a3b8; font-size: 12px; margin: 4px 0 8px 0; font-family: monospace; }}
.reason {{ color: #fca5a5; font-size: 12px; margin-top: 6px; font-family: monospace; }}
.summary {{ display: flex; gap: 16px; margin-bottom: 20px; }}
.pill {{ background: #1e293b; padding: 8px 14px; border-radius: 999px; font-size: 13px; }}
.pill.ok {{ color: #4ade80; }}
.pill.ko {{ color: #f87171; }}
</style>
</head>
<body>
<h1>02 Runs Diagnostic — {mission}</h1>
<div class="subtitle">Generated {generated_at} · filter v5 strict</div>
<div class="summary">
<div class="pill ok">OK: {n_ok}</div>
<div class="pill ko">Rejected: {n_ko}</div>
<div class="pill">Total candidates: {n_total}</div>
</div>
<div class="params">{params}</div>
<div class="grid">
{cards}
</div>
</body>
</html>
"""
CARD_TEMPLATE = """<div class="card {cls}">
<h3 class="{cls}">{run_id}{status}</h3>
<div class="meta">duration={duration}s | max_depth={max_depth}m | sustained={sustained}s | pct_near={pct}% | mode={mode}</div>
<img src="{png}" alt="{run_id}">
{reason_block}
</div>"""
def build_html(mission, filter_params, generated_at, ok_runs, ko_runs, out_dir):
cards = []
for run, status, reason in [(r, "OK", None) for r in ok_runs] + [(r, "REJECTED", r.get("rejected_reason")) for r in ko_runs]:
cls = "ok" if status == "OK" else "ko"
reason_block = f'<div class="reason">REASON: {reason}</div>' if reason else ""
cards.append(CARD_TEMPLATE.format(
cls=cls, run_id=run["run_id"], status=status,
duration=f"{run['duration_s']:.0f}",
max_depth=run["max_depth_m"],
sustained=f"{run['sustained_duration_s']:.0f}",
pct=f"{run['pct_near_bottom']:.1f}",
mode=run.get("dominant_mode", "?"),
png=f"{run['run_id']}.png",
reason_block=reason_block,
))
html = HTML_TEMPLATE.format(
mission=mission,
generated_at=generated_at,
params=json.dumps(filter_params, indent=2),
n_ok=len(ok_runs),
n_ko=len(ko_runs),
n_total=len(ok_runs) + len(ko_runs),
cards="\n".join(cards),
)
(out_dir / "index.html").write_text(html, encoding="utf-8")
def main():
parser = argparse.ArgumentParser(description="Stage 02b — runs diagnostic plots")
parser.add_argument("--mission", required=True)
parser.add_argument("--ssd-base", default="/mnt/ssd")
parser.add_argument("--out", default="data/")
parser.add_argument("--padding-s", type=float, default=120.0,
help="padding (s) avant/après run pour contexte")
args = parser.parse_args()
mission_dir = Path(args.out) / args.mission
runs_json = mission_dir / "02_runs.json"
if not runs_json.exists():
print(f"[ERROR] {runs_json} missing — run 02_mission_run_detect first", file=sys.stderr)
sys.exit(1)
bags_root = Path(args.ssd_base) / args.mission / "raw_data" / "logs" / "SUB" / "bag"
if not bags_root.exists():
print(f"[ERROR] bags dir not found: {bags_root}", file=sys.stderr)
sys.exit(1)
data = json.loads(runs_json.read_text())
filter_params = data["filter_params"]
out_dir = mission_dir / "02_runs_diag"
out_dir.mkdir(parents=True, exist_ok=True)
ok_runs = data["all_runs_sorted"]
ko_runs = data["all_runs_rejected"]
print(f"[stage02b] {len(ok_runs)} OK + {len(ko_runs)} rejected = {len(ok_runs)+len(ko_runs)} runs to plot")
# Group by AUV → 1 read per AUV bag set covering all runs
runs_by_auv = {}
for run in ok_runs + ko_runs:
# run_id "AUV210_run_00" → AUV210
phys = run["run_id"].split("_")[0]
runs_by_auv.setdefault(phys, []).append(run)
pad = args.padding_s
for phys_id, runs in runs_by_auv.items():
mcap_id = PHYS_TO_MCAP.get(phys_id, phys_id)
t_start = min(r["start_epoch"] for r in runs) - pad
t_end = max(r["end_epoch"] for r in runs) + pad
print(f" [{phys_id}] reading {mcap_id} bags [{t_start:.0f}..{t_end:.0f}] for {len(runs)} runs")
alt, state, hdg = gather_signals_for_auv(bags_root, mcap_id, t_start, t_end)
print(f" {len(alt)} alt pts, {len(state)} state pts, {len(hdg)} hdg pts")
for run in runs:
r_start = run["start_epoch"] - pad
r_end = run["end_epoch"] + pad
alt_r = [(t, v) for t, v in alt if r_start <= t <= r_end]
state_r = [(t, a, m) for t, a, m in state if r_start <= t <= r_end]
hdg_r = [(t, v) for t, v in hdg if r_start <= t <= r_end]
status = "REJECTED" if run in ko_runs else "OK"
reason = run.get("rejected_reason") if status == "REJECTED" else None
out_png = out_dir / f"{run['run_id']}.png"
plot_run_diag(run, alt_r, state_r, hdg_r, filter_params, status, reason, out_png)
print(f" [{status}] {run['run_id']}.png written")
# Build index.html + index.json
generated_at = datetime.now(timezone.utc).isoformat()
index = {
"mission": args.mission,
"generated_at": generated_at,
"filter_params": filter_params,
"ok_runs": [r["run_id"] for r in ok_runs],
"rejected_runs": [r["run_id"] for r in ko_runs],
}
(out_dir / "index.json").write_text(json.dumps(index, indent=2))
build_html(args.mission, filter_params, generated_at, ok_runs, ko_runs, out_dir)
print(f"[stage02b] Done: {out_dir}/index.html")
if __name__ == "__main__":
main()

228
pipeline/stages/03b_trim_runs.py Executable file
View File

@@ -0,0 +1,228 @@
#!/usr/bin/env python3
"""Stage 03b - Trim videos per run (LRV proxies + -c copy, fast).
Inputs:
data/<MISSION>/02_runs.json
data/<MISSION>/03_video_index.json
Strategy:
- Use GoPro LRV proxy files (768x432 H.264 ~720 kbps) instead of 4K HEVC originals.
- ffmpeg -c copy per chapter (keyframe-aligned cut) + concat demuxer.
- Output: per-run .mp4 + ours_<gp>.mp4 (concat of per-run).
Falls back to MP4 source if matching LRV is missing.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import subprocess
import sys
import tempfile
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
OUT_ROOT = Path("/mnt/ssd/cosma-qc-out/03b_trim_runs")
def run_ff(cmd: list[str]) -> None:
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
sys.stderr.write(" ".join(cmd) + "\n")
sys.stderr.write(r.stderr[-3000:] + "\n")
raise RuntimeError(f"ffmpeg failed rc={r.returncode}")
def lrv_for_chapter(mp4_path: Path) -> Path | None:
"""Return matching .LRV path if it exists (GoPro low-res proxy)."""
name = mp4_path.name
if not name.startswith("GX") or not name.upper().endswith(".MP4"):
return None
lrv_name = "GL" + name[2:-4] + ".LRV"
p = mp4_path.parent / lrv_name
return p if p.exists() else None
def overlap_clips(run_start: float, run_end: float, chapters: list[dict]) -> list[tuple[dict, float, float]]:
"""Return [(chapter, start_off_s, duration_s)] for chapters overlapping the run."""
out = []
for ch in sorted(chapters, key=lambda c: c["start_epoch"]):
a = max(run_start, ch["start_epoch"])
b = min(run_end, ch["end_epoch"])
if b - a <= 1.0:
continue
start_off = max(0.0, run_start - ch["start_epoch"])
dur = b - a
out.append((ch, start_off, dur))
return out
def cut_clip(src: Path, start_off: float, duration: float, dst: Path) -> None:
"""Cut [start_off, start_off+duration] from src using -c copy (keyframe-aligned)."""
cmd = [
"ffmpeg", "-y", "-loglevel", "error",
"-ss", f"{start_off:.3f}",
"-i", str(src),
"-t", f"{duration:.3f}",
"-c", "copy",
"-avoid_negative_ts", "make_zero",
"-an",
str(dst),
]
run_ff(cmd)
def concat_demux(parts: list[Path], dst: Path) -> None:
"""Concat parts with ffmpeg concat demuxer (-c copy)."""
if not parts:
return
if len(parts) == 1:
shutil.copy2(parts[0], dst)
return
with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as f:
for p in parts:
f.write(f"file '{p.resolve()}'\n")
listfile = f.name
try:
cmd = [
"ffmpeg", "-y", "-loglevel", "error",
"-f", "concat", "-safe", "0",
"-i", listfile,
"-c", "copy",
"-movflags", "+faststart",
"-an",
str(dst),
]
run_ff(cmd)
finally:
os.unlink(listfile)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--mission", required=True)
ap.add_argument("--data-root", default="/home/cosma/cosma-qc/data")
ap.add_argument("--skip-existing", action="store_true")
ap.add_argument("--prefer-source", choices=["lrv", "mp4"], default="lrv",
help="lrv = use .LRV proxy (default, fast); mp4 = use 4K originals")
args = ap.parse_args()
mission = args.mission
data_dir = Path(args.data_root) / mission
runs = json.loads((data_dir / "02_runs.json").read_text())["runs"]
vidx = json.loads((data_dir / "03_video_index.json").read_text())
videos = vidx["videos"]
by_auv_gp: dict[tuple[str, str], list[dict]] = defaultdict(list)
for v in videos:
by_auv_gp[(v["auv"], v["gp"])].append(v)
all_gps = sorted({v["gp"] for v in videos})
out_dir = OUT_ROOT / mission
out_dir.mkdir(parents=True, exist_ok=True)
tmp_dir = out_dir / "_tmp"
tmp_dir.mkdir(exist_ok=True)
link = data_dir / "03b_trim_runs"
if link.is_symlink():
link.unlink()
elif link.exists():
shutil.rmtree(link)
link.symlink_to(out_dir)
manifest = {
"mission": mission,
"generated_at": datetime.now(timezone.utc).isoformat(),
"output_root": str(out_dir),
"source": args.prefer_source,
"runs": [],
"ours": {},
}
runs_by_chrono = sorted(runs, key=lambda r: r["start_epoch"])
ours_parts: dict[str, list[tuple[float, Path, str]]] = defaultdict(list)
for run in runs_by_chrono:
run_id = run["run_id"]
auv = run["auv"]
r_start = run["start_epoch"]
r_end = run["end_epoch"]
run_entry = {"run_id": run_id, "auv": auv, "duration_s": run["duration_s"], "outputs": []}
for gp in all_gps:
chapters = by_auv_gp.get((auv, gp), [])
if not chapters:
continue
clips = overlap_clips(r_start, r_end, chapters)
if not clips:
continue
out_name = f"{run_id}_{auv}_{gp}.mp4"
out_path = out_dir / out_name
if args.skip_existing and out_path.exists() and out_path.stat().st_size > 0:
print(f"[skip] {out_name}", flush=True)
else:
# Pick source per chapter
resolved: list[tuple[Path, float, float, str]] = []
src_tags: list[str] = []
for ch, soff, dur in clips:
mp4 = Path(ch["filepath"])
src = mp4
tag = "mp4"
if args.prefer_source == "lrv":
lrv = lrv_for_chapter(mp4)
if lrv:
src = lrv
tag = "lrv"
resolved.append((src, soff, dur, tag))
src_tags.append(tag)
print(
f"[cut ] {out_name} chapters={len(resolved)} src={','.join(src_tags)}",
flush=True,
)
tmp_parts: list[Path] = []
for i, (src, soff, dur, _) in enumerate(resolved):
tp = tmp_dir / f"{run_id}_{auv}_{gp}_p{i:02d}.mp4"
cut_clip(src, soff, dur, tp)
tmp_parts.append(tp)
concat_demux(tmp_parts, out_path)
for p in tmp_parts:
p.unlink(missing_ok=True)
sz_mb = round(out_path.stat().st_size / 1024 / 1024, 1)
run_entry["outputs"].append({"gp": gp, "file": out_name, "size_mb": sz_mb})
ours_parts[gp].append((r_start, out_path, f"{run_id} {auv}"))
manifest["runs"].append(run_entry)
for gp, parts in ours_parts.items():
parts.sort()
ordered_paths = [p for _, p, _ in parts]
ours_path = out_dir / f"ours_{gp}.mp4"
print(f"[ours] {ours_path.name} <- {len(ordered_paths)} clip(s)", flush=True)
concat_demux(ordered_paths, ours_path)
sz_mb = round(ours_path.stat().st_size / 1024 / 1024, 1)
manifest["ours"][gp] = {
"file": ours_path.name,
"size_mb": sz_mb,
"segments": [lbl for _, _, lbl in parts],
}
(data_dir / "03b_trim_runs.json").write_text(json.dumps(manifest, indent=2))
print(f"\n[done] manifest: {data_dir / '03b_trim_runs.json'}", flush=True)
print(f"[done] outputs: {out_dir}", flush=True)
try:
tmp_dir.rmdir()
except OSError:
pass
if __name__ == "__main__":
main()