Compare commits
11 Commits
auto-iter-
...
feature/au
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
171f90ce9f | ||
|
|
754f3c7272 | ||
|
|
90621dea12 | ||
|
|
15b4ddfd70 | ||
|
|
65bda7ff71 | ||
|
|
2858217897 | ||
|
|
568ff9469b | ||
|
|
2611a72aa2 | ||
| 50ca77490d | |||
|
|
503d6d64c2 | ||
|
|
13323f2edf |
@@ -15,7 +15,7 @@ frame_extract:
|
|||||||
bottom_visible_pct_min: 25
|
bottom_visible_pct_min: 25
|
||||||
inference:
|
inference:
|
||||||
ply_conf_threshold: 1.5
|
ply_conf_threshold: 1.5
|
||||||
max_frame_num: 1024
|
max_frame_num: 2048
|
||||||
mode: streaming
|
mode: streaming
|
||||||
keyframe_interval: 1
|
keyframe_interval: 1
|
||||||
min_frames_for_inference: 32
|
min_frames_for_inference: 32
|
||||||
|
|||||||
@@ -137,3 +137,37 @@
|
|||||||
- Lancer stage06_align sur 4 PLY
|
- Lancer stage06_align sur 4 PLY
|
||||||
- Veille : RoPE issues arxiv, underwater 3D reconstruction papers
|
- Veille : RoPE issues arxiv, underwater 3D reconstruction papers
|
||||||
- **Suggestion prochaine** : update lingbot-map .84 (git pull) OU switch mee-deepreefmap (pas ce problème)
|
- **Suggestion prochaine** : update lingbot-map .84 (git pull) OU switch mee-deepreefmap (pas ce problème)
|
||||||
|
|
||||||
|
### Findings Stage06 Path
|
||||||
|
- **stage06_align_absolute.py** exists (requires trajectory CSV + MCAP IMU/GPS, outputs ENU-aligned trajectory)
|
||||||
|
- **stage06b_imu_depth_align.py** exists (IMU/depth post-processing)
|
||||||
|
- **blocker** : lingbot PLY output → poses CSV conversion not automated ; need extract viser poses → COLMAP format OR use mee-deepreefmap (simpler pipeline)
|
||||||
|
- **decision** : defer stage06 until trajectory extraction finalized ; prioritize lingbot-map update on .84
|
||||||
|
|
||||||
|
### Veille Signal (6h window)
|
||||||
|
- arxiv 20260513: RoPE optimization papers (rope_xformers, YaRN variants) — pertinent si update lingbot-map
|
||||||
|
- GitHub: LingBot-Map last commit 2026-04-27 (keyframe fix 1 semaine écoulé)
|
||||||
|
- Hugging Face: ReefMapGS v0.8 (underwater 3D specialist, arxiv 2026-05-11)
|
||||||
|
- Decision: monitor RoPE fixes, test ReefMapGS on GX029839 (85M pts reference) vs lingbot
|
||||||
|
|
||||||
|
### Suggestion prochaine
|
||||||
|
1. ⚠️ Priority: Update lingbot-map on .84/.87 (git pull + rebuild venv) — RoPE + keyframe fixes 2026-04-27
|
||||||
|
2. Retry GX019817 après update
|
||||||
|
3. Start stage06_align preparation (pose extraction pipeline)
|
||||||
|
4. Test ReefMapGS on known-good segment (GX029839 85M pts)
|
||||||
|
|
||||||
|
## Itération 10 — 2026-05-14 04:55 UTC
|
||||||
|
- **Signal détecté** : RoPE tensor mismatch GX019817 (1357 frames) = overflow max_frame_num=1024 → RoPE précompute seulement 1124 positions (max_frame_num+100). Source confirmée : `aggregator/stream.py` ligne 226 `max_total_frames=self.max_frame_num+100=1124 < 1357`.
|
||||||
|
- **Patch** :
|
||||||
|
- AUTO-COMMIT 2611a72 : `thresholds.yaml` — `max_frame_num: 1024 → 2048` (supporte jusqu'à 2148 frames)
|
||||||
|
- MERGE PR#13 : `fix/05-inference-viser-kill-offload` → `feature/auto-pipeline` (kill_stale_demo_py + offload_to_cpu depuis yaml + background+poll SSH)
|
||||||
|
- **Type** : auto-commit (yaml) + merge PR Gitea #13
|
||||||
|
- **Sanity check** : SKIP — cosma@192.168.0.83 SSH banner exchange timeout (VM à 97% RAM, TCP OK mais aucun process répond, sshd gelé). Retry GX019817 impossible jusqu'à rétablissement .83.
|
||||||
|
- **Infrastructure** : 4 orphelins viser_ply.py tués sur .84 (libéré ~29GB RAM). VM .83 inaccessible — bloquer retry pipeline.
|
||||||
|
- **Veille** : lingbot-map GitHub mis à jour 2026-05-08 (docs+deps seulement, pas de fix RoPE) ; arxiv AUV nav fusion 9/10 (2605.04672) ; VGGT CVPR 2025 7/10
|
||||||
|
- **Bloquants** : cosma@.83 SSH figé → impossible de retrouver frames GX019817 ni relancer stage05. Nécessite intervention humaine (.83 sshd restart ou VM reset).
|
||||||
|
- **Suggestion prochaine** :
|
||||||
|
1. ⚠️ Intervention : débloquer SSH .83 (restart sshd ou VM reset via Proxmox)
|
||||||
|
2. Après rétablissement : retry GX019817 inference avec max_frame_num=2048
|
||||||
|
3. Si .83 reste mort : cloner lingbot-map sur workspace → push Gitea → update .84/.87 depuis réseau local (les workers ne peuvent pas atteindre GitHub)
|
||||||
|
4. Évaluer ReefMapGS v0.8 (underwater-specific) sur GX029839 (85M pts référence)
|
||||||
|
|||||||
850
pipeline/stages/01_select_mission.py
Executable file
850
pipeline/stages/01_select_mission.py
Executable file
@@ -0,0 +1,850 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Stage 01 — Select mission and produce a raw manifest.
|
||||||
|
|
||||||
|
Scans `<ssd_base>/<mission>/raw_data/` and writes
|
||||||
|
`<out>/<mission>/01_manifest.json` listing all AUVs, GoPro folders, MCAP bags,
|
||||||
|
BIN files, USBL logs.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 01_select_mission.py \
|
||||||
|
--mission 20260505-Lepradet \
|
||||||
|
--ssd-base /mnt/ssd \
|
||||||
|
--out /home/cosma/cosma-qc/data
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from collections import defaultdict
|
||||||
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||||
|
from datetime import datetime, timezone, timedelta
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Local helper for stage time/memory tracking
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent))
|
||||||
|
from _meta import track_stage # noqa: E402
|
||||||
|
|
||||||
|
# AUV physical (GoPro) and AUV MCAP IDs — pattern matching only, never write to SSD.
|
||||||
|
AUV_PHYS_RE = re.compile(r"AUV(\d{3})", re.I)
|
||||||
|
GP_FOLDER_RE = re.compile(r"^GP(?P<gp>\d+)[_-]AUV(?P<auv>\d{3})$", re.I)
|
||||||
|
BAG_AUV_RE = re.compile(r"_AUV(?P<auv>\d{3})(?:[_/]|$)", re.I)
|
||||||
|
USBL_FILE_RE = re.compile(r"usbl", re.I)
|
||||||
|
KLF_TS_RE = re.compile(r"(\d{8})_(\d{6})")
|
||||||
|
MISSION_DATE_RE = re.compile(r"^(\d{8})-")
|
||||||
|
|
||||||
|
# KLF throughput estimate: ~5MB/min
|
||||||
|
KLF_BYTES_PER_SEC = 5 * 1024 * 1024 / 60
|
||||||
|
|
||||||
|
|
||||||
|
def iso_utc_now() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat(timespec="seconds")
|
||||||
|
|
||||||
|
|
||||||
|
def _is_old_path(path: Path) -> bool:
|
||||||
|
"""Return True if path contains an 'old' component (skip these files)."""
|
||||||
|
return "old" in path.parts
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_auv_id(auv_str: str) -> str:
|
||||||
|
"""AUV0xx -> AUV2xx; AUV2xx unchanged; others unchanged."""
|
||||||
|
m = re.fullmatch(r"AUV(\d{3})", auv_str, re.I)
|
||||||
|
if not m:
|
||||||
|
return auv_str
|
||||||
|
n = int(m.group(1))
|
||||||
|
if 0 <= n < 100:
|
||||||
|
return f"AUV2{n:02d}" # AUV010 -> AUV210
|
||||||
|
return f"AUV{n:03d}"
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_mission_date(mission: str) -> "datetime | None":
|
||||||
|
"""Parse YYYYMMDD from mission folder name. Returns UTC midnight or None."""
|
||||||
|
m = MISSION_DATE_RE.match(mission)
|
||||||
|
if not m:
|
||||||
|
print(f" [warn] mission_date: cannot parse date from '{mission}', no date filter")
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return datetime.strptime(m.group(1), "%Y%m%d").replace(tzinfo=timezone.utc)
|
||||||
|
except ValueError:
|
||||||
|
print(f" [warn] mission_date: invalid date '{m.group(1)}' in '{mission}'")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _mission_date_window(mission_date: datetime) -> "tuple[datetime, datetime]":
|
||||||
|
"""Return [date-2h, date+26h] UTC window with +/-2h timezone tolerance."""
|
||||||
|
start = mission_date - timedelta(hours=2)
|
||||||
|
end = mission_date + timedelta(hours=26)
|
||||||
|
return start, end
|
||||||
|
|
||||||
|
|
||||||
|
def mtime_fallback(path: Path) -> dict:
|
||||||
|
"""Fallback: use mtime as t_start, duration=0."""
|
||||||
|
try:
|
||||||
|
mt = path.stat().st_mtime
|
||||||
|
t = datetime.fromtimestamp(mt, tz=timezone.utc)
|
||||||
|
return {
|
||||||
|
"t_start": t.isoformat(timespec="seconds"),
|
||||||
|
"t_end": t.isoformat(timespec="seconds"),
|
||||||
|
"duration_s": 0,
|
||||||
|
"source": "mtime_fallback",
|
||||||
|
}
|
||||||
|
except OSError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def parse_smpte(smpte: str, fps: float = 25.0) -> float:
|
||||||
|
"""SMPTE HH:MM:SS:FF -> seconds since midnight."""
|
||||||
|
parts = smpte.split(":")
|
||||||
|
if len(parts) != 4:
|
||||||
|
return None
|
||||||
|
h, m, s, f = int(parts[0]), int(parts[1]), int(parts[2]), int(parts[3])
|
||||||
|
return h * 3600 + m * 60 + s + f / fps
|
||||||
|
|
||||||
|
|
||||||
|
def extract_mp4(path: Path) -> "dict | None":
|
||||||
|
"""Extract timestamps from GoPro MP4 via ffprobe."""
|
||||||
|
try:
|
||||||
|
out = subprocess.run(
|
||||||
|
["ffprobe", "-v", "quiet", "-print_format", "json",
|
||||||
|
"-show_format", "-show_streams", str(path)],
|
||||||
|
capture_output=True, text=True, timeout=10,
|
||||||
|
).stdout
|
||||||
|
if not out:
|
||||||
|
return mtime_fallback(path)
|
||||||
|
data = json.loads(out)
|
||||||
|
fmt = data.get("format", {})
|
||||||
|
duration = float(fmt.get("duration", 0) or 0)
|
||||||
|
|
||||||
|
# Try SMPTE timecode first
|
||||||
|
smpte = None
|
||||||
|
for s in data.get("streams", []):
|
||||||
|
tc = s.get("tags", {}).get("timecode")
|
||||||
|
if tc:
|
||||||
|
smpte = tc
|
||||||
|
break
|
||||||
|
|
||||||
|
if smpte:
|
||||||
|
secs_since_midnight = parse_smpte(smpte)
|
||||||
|
if secs_since_midnight is not None:
|
||||||
|
# Build t_start from creation_time date + smpte time
|
||||||
|
creation_iso = fmt.get("tags", {}).get("creation_time", "")
|
||||||
|
try:
|
||||||
|
date_part = datetime.fromisoformat(
|
||||||
|
creation_iso.replace("Z", "+00:00")
|
||||||
|
).date()
|
||||||
|
midnight = datetime(
|
||||||
|
date_part.year, date_part.month, date_part.day,
|
||||||
|
tzinfo=timezone.utc
|
||||||
|
)
|
||||||
|
t_start = midnight + timedelta(seconds=secs_since_midnight)
|
||||||
|
except Exception:
|
||||||
|
# fallback: use creation_time directly
|
||||||
|
t_start = datetime.fromisoformat(
|
||||||
|
creation_iso.replace("Z", "+00:00")
|
||||||
|
)
|
||||||
|
t_end = t_start + timedelta(seconds=duration)
|
||||||
|
return {
|
||||||
|
"t_start": t_start.isoformat(timespec="seconds"),
|
||||||
|
"t_end": t_end.isoformat(timespec="seconds"),
|
||||||
|
"duration_s": int(duration),
|
||||||
|
"source": "smpte",
|
||||||
|
}
|
||||||
|
|
||||||
|
# Fallback: creation_time
|
||||||
|
creation_iso = fmt.get("tags", {}).get("creation_time", "")
|
||||||
|
if creation_iso:
|
||||||
|
t_start = datetime.fromisoformat(creation_iso.replace("Z", "+00:00"))
|
||||||
|
t_end = t_start + timedelta(seconds=duration)
|
||||||
|
return {
|
||||||
|
"t_start": t_start.isoformat(timespec="seconds"),
|
||||||
|
"t_end": t_end.isoformat(timespec="seconds"),
|
||||||
|
"duration_s": int(duration),
|
||||||
|
"source": "creation_time",
|
||||||
|
}
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return mtime_fallback(path)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_mcap(path: Path) -> "dict | None":
|
||||||
|
"""Extract timestamps from MCAP bag via mcap.reader."""
|
||||||
|
try:
|
||||||
|
from mcap.reader import make_reader
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
reader = make_reader(f)
|
||||||
|
summary = reader.get_summary()
|
||||||
|
if summary and summary.statistics:
|
||||||
|
start_ns = summary.statistics.message_start_time
|
||||||
|
end_ns = summary.statistics.message_end_time
|
||||||
|
if start_ns and end_ns:
|
||||||
|
t_start = datetime.fromtimestamp(start_ns / 1e9, tz=timezone.utc)
|
||||||
|
t_end = datetime.fromtimestamp(end_ns / 1e9, tz=timezone.utc)
|
||||||
|
dur = int((end_ns - start_ns) / 1e9)
|
||||||
|
return {
|
||||||
|
"t_start": t_start.isoformat(timespec="seconds"),
|
||||||
|
"t_end": t_end.isoformat(timespec="seconds"),
|
||||||
|
"duration_s": dur,
|
||||||
|
"source": "mcap_summary",
|
||||||
|
}
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return mtime_fallback(path)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_bin(path: Path) -> "dict | None":
|
||||||
|
"""BIN ArduSub: no absolute timestamp available (TimeUS = boot-relative).
|
||||||
|
Use mtime fallback."""
|
||||||
|
return mtime_fallback(path)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_csv_timestamp(line: str) -> "datetime | None":
|
||||||
|
"""Parse first column of a CSV line as ISO or YYYYMMDD HH:MM:SS.mmm."""
|
||||||
|
col = line.split(",")[0].strip().strip('"')
|
||||||
|
# Try ISO format (2026-05-08 05:46:29.384209)
|
||||||
|
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S.%f",
|
||||||
|
"%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%d %H:%M:%S"):
|
||||||
|
try:
|
||||||
|
dt = datetime.strptime(col, fmt)
|
||||||
|
return dt.replace(tzinfo=timezone.utc)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
# Try COSMA MAG format: '20260508 07:52:28.456'
|
||||||
|
try:
|
||||||
|
dt = datetime.strptime(col, "%Y%m%d %H:%M:%S.%f")
|
||||||
|
return dt.replace(tzinfo=timezone.utc)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
# Try float epoch
|
||||||
|
try:
|
||||||
|
return datetime.fromtimestamp(float(col), tz=timezone.utc)
|
||||||
|
except (ValueError, OSError):
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def extract_csv(path: Path) -> "dict | None":
|
||||||
|
"""Extract timestamps from CSV (USBL/MAG) via head+tail."""
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["head", "-n", "5", str(path)],
|
||||||
|
capture_output=True, text=True, timeout=5,
|
||||||
|
)
|
||||||
|
lines = [l for l in result.stdout.splitlines() if l.strip()]
|
||||||
|
|
||||||
|
tail_result = subprocess.run(
|
||||||
|
["tail", "-n", "3", str(path)],
|
||||||
|
capture_output=True, text=True, timeout=5,
|
||||||
|
)
|
||||||
|
tail_lines = [l for l in tail_result.stdout.splitlines() if l.strip()]
|
||||||
|
|
||||||
|
t_start = None
|
||||||
|
# Skip header lines (contain letters in first col)
|
||||||
|
for line in lines:
|
||||||
|
col = line.split(",")[0].strip()
|
||||||
|
if not col or col[0].isalpha():
|
||||||
|
continue
|
||||||
|
t_start = _parse_csv_timestamp(line)
|
||||||
|
if t_start:
|
||||||
|
break
|
||||||
|
|
||||||
|
t_end = None
|
||||||
|
for line in reversed(tail_lines):
|
||||||
|
col = line.split(",")[0].strip()
|
||||||
|
if not col or col[0].isalpha():
|
||||||
|
continue
|
||||||
|
t_end = _parse_csv_timestamp(line)
|
||||||
|
if t_end:
|
||||||
|
break
|
||||||
|
|
||||||
|
if t_start and t_end:
|
||||||
|
dur = int((t_end - t_start).total_seconds())
|
||||||
|
return {
|
||||||
|
"t_start": t_start.isoformat(timespec="seconds"),
|
||||||
|
"t_end": t_end.isoformat(timespec="seconds"),
|
||||||
|
"duration_s": max(0, dur),
|
||||||
|
"source": "csv_inline",
|
||||||
|
}
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return mtime_fallback(path)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_klf(path: Path) -> "dict | None":
|
||||||
|
"""Extract timestamp from KLF Kogger filename: kogger_sss_YYYYMMDD_HHMMSS.klf"""
|
||||||
|
try:
|
||||||
|
m = KLF_TS_RE.search(path.name)
|
||||||
|
if m:
|
||||||
|
dt = datetime.strptime(m.group(1) + m.group(2), "%Y%m%d%H%M%S")
|
||||||
|
t_start = dt.replace(tzinfo=timezone.utc)
|
||||||
|
# Estimate duration from file size
|
||||||
|
try:
|
||||||
|
size = path.stat().st_size
|
||||||
|
dur = int(size / KLF_BYTES_PER_SEC)
|
||||||
|
except OSError:
|
||||||
|
dur = 0
|
||||||
|
t_end = t_start + timedelta(seconds=dur)
|
||||||
|
return {
|
||||||
|
"t_start": t_start.isoformat(timespec="seconds"),
|
||||||
|
"t_end": t_end.isoformat(timespec="seconds"),
|
||||||
|
"duration_s": dur,
|
||||||
|
"source": "filename_parse",
|
||||||
|
}
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return mtime_fallback(path)
|
||||||
|
|
||||||
|
|
||||||
|
def extract_timestamps(path: Path, kind: str) -> "dict | None":
|
||||||
|
"""Dispatch timestamp extraction by file kind."""
|
||||||
|
try:
|
||||||
|
if kind == "mp4":
|
||||||
|
return extract_mp4(path)
|
||||||
|
elif kind == "mcap":
|
||||||
|
return extract_mcap(path)
|
||||||
|
elif kind == "bin":
|
||||||
|
return extract_bin(path)
|
||||||
|
elif kind in ("usbl", "mag", "csv"):
|
||||||
|
return extract_csv(path)
|
||||||
|
elif kind == "klf":
|
||||||
|
return extract_klf(path)
|
||||||
|
else:
|
||||||
|
return mtime_fallback(path)
|
||||||
|
except Exception:
|
||||||
|
return mtime_fallback(path)
|
||||||
|
|
||||||
|
|
||||||
|
def build_coverage(manifest: dict, ssd_base: Path) -> "tuple[dict, dict]":
|
||||||
|
"""Build coverage dict and per-AUV file lists for window computation."""
|
||||||
|
mission = manifest["mission"]
|
||||||
|
ssd_path = ssd_base / mission / "raw_data"
|
||||||
|
coverage: "dict[str, dict]" = {}
|
||||||
|
# auv_file_windows[auv_id] = list of (t_start, t_end, source_label)
|
||||||
|
auv_file_windows: "dict[str, list]" = defaultdict(list)
|
||||||
|
|
||||||
|
tasks: "list[tuple[str, Path, str, str | None]]" = [] # (rel, path, kind, auv_id)
|
||||||
|
|
||||||
|
# Videos
|
||||||
|
vroot = ssd_path / "medias" / "videos"
|
||||||
|
for auv_id, gps in manifest["videos"].items():
|
||||||
|
for gp_key, files in gps.items():
|
||||||
|
for fname in files:
|
||||||
|
# find the file
|
||||||
|
for sub in (vroot,):
|
||||||
|
for folder in sub.iterdir() if sub.exists() else []:
|
||||||
|
if not folder.is_dir():
|
||||||
|
continue
|
||||||
|
p = folder / fname
|
||||||
|
if p.exists():
|
||||||
|
rel = str(p.relative_to(ssd_path))
|
||||||
|
tasks.append((rel, p, "mp4", auv_id))
|
||||||
|
break
|
||||||
|
|
||||||
|
# MCAP bags
|
||||||
|
bag_root = ssd_path / "logs" / "SUB" / "bag"
|
||||||
|
for auv_id, bag_dirs in manifest["mcap_bags"].items():
|
||||||
|
for bag_dir in bag_dirs:
|
||||||
|
bag_path = bag_root / bag_dir
|
||||||
|
if bag_path.exists():
|
||||||
|
for mcap_file in sorted(bag_path.glob("*.mcap")):
|
||||||
|
rel = str(mcap_file.relative_to(ssd_path))
|
||||||
|
tasks.append((rel, mcap_file, "mcap", auv_id))
|
||||||
|
|
||||||
|
# BIN files
|
||||||
|
sub_root = ssd_path / "logs" / "SUB"
|
||||||
|
for auv_id, bins in manifest["bin_files"].items():
|
||||||
|
for bname in bins:
|
||||||
|
for p in sub_root.rglob(bname):
|
||||||
|
if p.is_file():
|
||||||
|
rel = str(p.relative_to(ssd_path))
|
||||||
|
tasks.append((rel, p, "bin", auv_id))
|
||||||
|
break
|
||||||
|
|
||||||
|
# USBL logs
|
||||||
|
for rel_str in manifest["usbl_logs"]:
|
||||||
|
p = ssd_path / rel_str
|
||||||
|
if p.exists():
|
||||||
|
# try to infer AUV from path
|
||||||
|
m = AUV_PHYS_RE.search(rel_str)
|
||||||
|
auv_id = _normalize_auv_id(f"AUV{int(m.group(1)):03d}") if m else None
|
||||||
|
tasks.append((rel_str, p, "usbl", auv_id))
|
||||||
|
|
||||||
|
# MAG files
|
||||||
|
for category, files in manifest["mag_files"].items():
|
||||||
|
for rel_str in files:
|
||||||
|
p = ssd_path / rel_str
|
||||||
|
if p.exists():
|
||||||
|
tasks.append((rel_str, p, "mag", None))
|
||||||
|
|
||||||
|
# SSS KLF files
|
||||||
|
for rel_str in manifest["sss_files"].get("klf", []):
|
||||||
|
p = ssd_path / rel_str
|
||||||
|
if p.exists():
|
||||||
|
tasks.append((rel_str, p, "klf", None))
|
||||||
|
|
||||||
|
# Parallel extraction
|
||||||
|
def _extract(task):
|
||||||
|
rel, path, kind, auv_id = task
|
||||||
|
cov = extract_timestamps(path, kind)
|
||||||
|
return rel, cov, auv_id
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=8) as ex:
|
||||||
|
futures = {ex.submit(_extract, t): t for t in tasks}
|
||||||
|
for fut in as_completed(futures):
|
||||||
|
rel, cov, auv_id = fut.result()
|
||||||
|
if cov:
|
||||||
|
coverage[rel] = cov
|
||||||
|
if auv_id and cov.get("source") != "mtime_fallback":
|
||||||
|
auv_file_windows[auv_id].append((
|
||||||
|
cov["t_start"], cov["t_end"], rel
|
||||||
|
))
|
||||||
|
|
||||||
|
return coverage, dict(auv_file_windows)
|
||||||
|
|
||||||
|
|
||||||
|
def compute_mission_window(coverage: dict) -> "dict | None":
|
||||||
|
"""Global mission window: min t_start, max t_end over all files."""
|
||||||
|
starts = []
|
||||||
|
ends = []
|
||||||
|
for cov in coverage.values():
|
||||||
|
if cov.get("source") == "mtime_fallback":
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
starts.append(datetime.fromisoformat(cov["t_start"]))
|
||||||
|
ends.append(datetime.fromisoformat(cov["t_end"]))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
if not starts:
|
||||||
|
return None
|
||||||
|
t_start = min(starts)
|
||||||
|
t_end = max(ends)
|
||||||
|
return {
|
||||||
|
"t_start": t_start.isoformat(timespec="seconds"),
|
||||||
|
"t_end": t_end.isoformat(timespec="seconds"),
|
||||||
|
"duration_s": int((t_end - t_start).total_seconds()),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def compute_auv_windows(auv_file_windows: dict) -> dict:
|
||||||
|
"""Per-AUV windows with gap detection (>60s gap between sorted windows)."""
|
||||||
|
result = {}
|
||||||
|
for auv_id, windows in sorted(auv_file_windows.items()):
|
||||||
|
# Sort by t_start
|
||||||
|
sorted_wins = sorted(windows, key=lambda x: x[0])
|
||||||
|
starts = [datetime.fromisoformat(w[0]) for w in sorted_wins]
|
||||||
|
ends = [datetime.fromisoformat(w[1]) for w in sorted_wins]
|
||||||
|
|
||||||
|
t_start = min(starts)
|
||||||
|
t_end = max(ends)
|
||||||
|
dur = int((t_end - t_start).total_seconds())
|
||||||
|
|
||||||
|
# Detect gaps: merge overlapping windows first, then find gaps
|
||||||
|
gaps = []
|
||||||
|
# Build merged intervals
|
||||||
|
intervals = sorted(zip(starts, ends, [w[2] for w in sorted_wins]))
|
||||||
|
cur_start, cur_end, cur_label = intervals[0]
|
||||||
|
prev_label = cur_label
|
||||||
|
for i_start, i_end, i_label in intervals[1:]:
|
||||||
|
if i_start - cur_end > timedelta(seconds=60):
|
||||||
|
gaps.append({
|
||||||
|
"from": cur_end.isoformat(timespec="seconds"),
|
||||||
|
"to": i_start.isoformat(timespec="seconds"),
|
||||||
|
"duration_s": int((i_start - cur_end).total_seconds()),
|
||||||
|
"between": [prev_label, i_label],
|
||||||
|
})
|
||||||
|
if i_end > cur_end:
|
||||||
|
cur_end = i_end
|
||||||
|
prev_label = i_label
|
||||||
|
|
||||||
|
# Collect source categories
|
||||||
|
sources = sorted(set(
|
||||||
|
"videos" if w[2].endswith(".MP4") or w[2].endswith(".mp4") else
|
||||||
|
"mcap" if w[2].endswith(".mcap") else
|
||||||
|
"bin" if w[2].endswith(".BIN") or w[2].endswith(".bin") else
|
||||||
|
"usbl" if "usbl" in w[2].lower() else
|
||||||
|
"other"
|
||||||
|
for w in sorted_wins
|
||||||
|
))
|
||||||
|
|
||||||
|
result[auv_id] = {
|
||||||
|
"t_start": t_start.isoformat(timespec="seconds"),
|
||||||
|
"t_end": t_end.isoformat(timespec="seconds"),
|
||||||
|
"duration_s": dur,
|
||||||
|
"sources": sources,
|
||||||
|
"gaps": gaps,
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def safe_listdir(p: Path) -> "list[Path]":
|
||||||
|
if not p.exists() or not p.is_dir():
|
||||||
|
return []
|
||||||
|
try:
|
||||||
|
return sorted(p.iterdir())
|
||||||
|
except OSError:
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def collect_videos(ssd_path: Path) -> "tuple[dict, int, float]":
|
||||||
|
"""videos[AUV][GP1|GP2] -> sorted list of MP4 filenames. Two layouts tried."""
|
||||||
|
videos = defaultdict(lambda: defaultdict(list))
|
||||||
|
total_n = 0
|
||||||
|
total_bytes = 0
|
||||||
|
|
||||||
|
# Layout A: medias/videos/GP{n}{_|-}AUV{xxx}/*.MP4
|
||||||
|
vroot_a = ssd_path / "medias" / "videos"
|
||||||
|
for sub in safe_listdir(vroot_a):
|
||||||
|
if not sub.is_dir():
|
||||||
|
continue
|
||||||
|
if _is_old_path(sub):
|
||||||
|
continue
|
||||||
|
m = GP_FOLDER_RE.match(sub.name)
|
||||||
|
if not m:
|
||||||
|
continue
|
||||||
|
gp_key = f"GP{int(m.group('gp'))}"
|
||||||
|
auv_id = _normalize_auv_id(f"AUV{int(m.group('auv')):03d}")
|
||||||
|
for f in safe_listdir(sub):
|
||||||
|
if _is_old_path(f):
|
||||||
|
continue
|
||||||
|
if f.is_file() and f.suffix.upper() == ".MP4":
|
||||||
|
videos[auv_id][gp_key].append(f.name)
|
||||||
|
total_n += 1
|
||||||
|
try:
|
||||||
|
total_bytes += f.stat().st_size
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Layout B: medias/videos/AUV{xxx}/GP{n}/*.MP4 (fallback)
|
||||||
|
for sub in safe_listdir(vroot_a):
|
||||||
|
if not sub.is_dir():
|
||||||
|
continue
|
||||||
|
if _is_old_path(sub):
|
||||||
|
continue
|
||||||
|
am = AUV_PHYS_RE.fullmatch(sub.name)
|
||||||
|
if not am:
|
||||||
|
continue
|
||||||
|
auv_id = _normalize_auv_id(f"AUV{int(am.group(1)):03d}")
|
||||||
|
for gp_sub in safe_listdir(sub):
|
||||||
|
if not gp_sub.is_dir():
|
||||||
|
continue
|
||||||
|
if _is_old_path(gp_sub):
|
||||||
|
continue
|
||||||
|
gm = re.fullmatch(r"GP(\d+)", gp_sub.name, re.I)
|
||||||
|
if not gm:
|
||||||
|
continue
|
||||||
|
gp_key = f"GP{int(gm.group(1))}"
|
||||||
|
for f in safe_listdir(gp_sub):
|
||||||
|
if _is_old_path(f):
|
||||||
|
continue
|
||||||
|
if f.is_file() and f.suffix.upper() == ".MP4":
|
||||||
|
videos[auv_id][gp_key].append(f.name)
|
||||||
|
total_n += 1
|
||||||
|
try:
|
||||||
|
total_bytes += f.stat().st_size
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Sort + dedup
|
||||||
|
out = {}
|
||||||
|
for auv in sorted(videos.keys()):
|
||||||
|
out[auv] = {}
|
||||||
|
for gp in sorted(videos[auv].keys()):
|
||||||
|
out[auv][gp] = sorted(set(videos[auv][gp]))
|
||||||
|
return out, total_n, total_bytes / 1e9
|
||||||
|
|
||||||
|
|
||||||
|
def collect_mcap_bags(ssd_path: Path) -> "tuple[dict, int]":
|
||||||
|
"""mcap_bags[AUV{nnn}] -> sorted list of bag-dir names."""
|
||||||
|
bags = defaultdict(list)
|
||||||
|
n = 0
|
||||||
|
broot = ssd_path / "logs" / "SUB" / "bag"
|
||||||
|
for sub in safe_listdir(broot):
|
||||||
|
if not sub.is_dir():
|
||||||
|
continue
|
||||||
|
if _is_old_path(sub):
|
||||||
|
continue
|
||||||
|
m = BAG_AUV_RE.search(sub.name)
|
||||||
|
if not m:
|
||||||
|
continue
|
||||||
|
auv_id = _normalize_auv_id(f"AUV{int(m.group('auv')):03d}")
|
||||||
|
bags[auv_id].append(sub.name)
|
||||||
|
n += 1
|
||||||
|
return {k: sorted(set(v)) for k, v in sorted(bags.items())}, n
|
||||||
|
|
||||||
|
|
||||||
|
def collect_bin_files(ssd_path: Path) -> "tuple[dict, int]":
|
||||||
|
"""bin_files[AUV{nnn}] -> sorted .BIN filenames."""
|
||||||
|
bins = defaultdict(list)
|
||||||
|
n = 0
|
||||||
|
# Layout: logs/SUB/AUV{xxx}/*.BIN
|
||||||
|
sub_root = ssd_path / "logs" / "SUB"
|
||||||
|
for sub in safe_listdir(sub_root):
|
||||||
|
if not sub.is_dir():
|
||||||
|
continue
|
||||||
|
if _is_old_path(sub):
|
||||||
|
continue
|
||||||
|
am = AUV_PHYS_RE.fullmatch(sub.name)
|
||||||
|
if not am:
|
||||||
|
continue
|
||||||
|
auv_id = _normalize_auv_id(f"AUV{int(am.group(1)):03d}")
|
||||||
|
for f in safe_listdir(sub):
|
||||||
|
if _is_old_path(f):
|
||||||
|
continue
|
||||||
|
if f.is_file() and f.suffix.upper() == ".BIN":
|
||||||
|
bins[auv_id].append(f.name)
|
||||||
|
n += 1
|
||||||
|
# Also try logs/SUB/bin/*.BIN (flat layout)
|
||||||
|
flat = ssd_path / "logs" / "SUB" / "bin"
|
||||||
|
for f in safe_listdir(flat):
|
||||||
|
if _is_old_path(f):
|
||||||
|
continue
|
||||||
|
if f.is_file() and f.suffix.upper() == ".BIN":
|
||||||
|
bins.setdefault("AUV000", []).append(f.name)
|
||||||
|
n += 1
|
||||||
|
return {k: sorted(set(v)) for k, v in sorted(bins.items())}, n
|
||||||
|
|
||||||
|
|
||||||
|
def collect_usbl_logs(ssd_path: Path) -> "list[str]":
|
||||||
|
"""Return list of USBL csv paths relative to ssd_path."""
|
||||||
|
out = []
|
||||||
|
logs_root = ssd_path / "logs"
|
||||||
|
if not logs_root.exists():
|
||||||
|
return out
|
||||||
|
for p in logs_root.rglob("*"):
|
||||||
|
try:
|
||||||
|
if not p.is_file():
|
||||||
|
continue
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
if _is_old_path(p):
|
||||||
|
continue
|
||||||
|
name_l = p.name.lower()
|
||||||
|
if "usbl" in name_l and name_l.endswith(".csv"):
|
||||||
|
try:
|
||||||
|
out.append(str(p.relative_to(ssd_path)))
|
||||||
|
except ValueError:
|
||||||
|
out.append(str(p))
|
||||||
|
return sorted(set(out))
|
||||||
|
|
||||||
|
|
||||||
|
def collect_audio_logs(ssd_path: Path) -> "list[str]":
|
||||||
|
out = []
|
||||||
|
aud = ssd_path / "audios"
|
||||||
|
for p in aud.rglob("*") if aud.exists() else []:
|
||||||
|
try:
|
||||||
|
if not p.is_file():
|
||||||
|
continue
|
||||||
|
except (OSError, ValueError):
|
||||||
|
continue
|
||||||
|
if _is_old_path(p):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
out.append(str(p.relative_to(ssd_path)))
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
return sorted(out)
|
||||||
|
|
||||||
|
|
||||||
|
def collect_sss_files(ssd_path: Path) -> "tuple[dict, int]":
|
||||||
|
"""klf + bin under ssd_path/sss/, recursive."""
|
||||||
|
sss_root = ssd_path / "sss"
|
||||||
|
klf = []
|
||||||
|
bin_ = []
|
||||||
|
if sss_root.exists():
|
||||||
|
try:
|
||||||
|
for p in sss_root.rglob("*.klf"):
|
||||||
|
try:
|
||||||
|
if p.is_file() and not _is_old_path(p):
|
||||||
|
klf.append(str(p.relative_to(ssd_path)))
|
||||||
|
except (OSError, ValueError):
|
||||||
|
continue
|
||||||
|
for p in sss_root.rglob("*.bin"):
|
||||||
|
try:
|
||||||
|
if p.is_file() and not _is_old_path(p):
|
||||||
|
bin_.append(str(p.relative_to(ssd_path)))
|
||||||
|
except (OSError, ValueError):
|
||||||
|
continue
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
result = {"klf": sorted(set(klf)), "bin": sorted(set(bin_))}
|
||||||
|
return result, len(result["klf"]) + len(result["bin"])
|
||||||
|
|
||||||
|
|
||||||
|
def collect_mag_files(ssd_path: Path) -> "tuple[dict, int]":
|
||||||
|
"""csv under ssd_path/mag/, categorised ar/av/side/other."""
|
||||||
|
mag_root = ssd_path / "mag"
|
||||||
|
out = {"ar": [], "av": [], "side": [], "other": []}
|
||||||
|
if mag_root.exists():
|
||||||
|
try:
|
||||||
|
for p in mag_root.rglob("*.csv"):
|
||||||
|
try:
|
||||||
|
if not p.is_file():
|
||||||
|
continue
|
||||||
|
if _is_old_path(p):
|
||||||
|
continue
|
||||||
|
rel = str(p.relative_to(ssd_path))
|
||||||
|
parts = p.parts
|
||||||
|
if "ar" in parts:
|
||||||
|
out["ar"].append(rel)
|
||||||
|
elif "av" in parts:
|
||||||
|
out["av"].append(rel)
|
||||||
|
elif "side" in parts:
|
||||||
|
out["side"].append(rel)
|
||||||
|
else:
|
||||||
|
out["other"].append(rel)
|
||||||
|
except (OSError, ValueError):
|
||||||
|
continue
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
result = {k: sorted(set(v)) for k, v in out.items()}
|
||||||
|
total = sum(len(v) for v in result.values())
|
||||||
|
return result, total
|
||||||
|
|
||||||
|
|
||||||
|
def build_manifest(mission: str, ssd_base: Path) -> dict:
|
||||||
|
ssd_path = ssd_base / mission / "raw_data"
|
||||||
|
if not ssd_path.exists():
|
||||||
|
raise FileNotFoundError(f"raw_data not found: {ssd_path}")
|
||||||
|
|
||||||
|
videos, n_vids, total_gb = collect_videos(ssd_path)
|
||||||
|
mcap_bags, n_bags = collect_mcap_bags(ssd_path)
|
||||||
|
bin_files, n_bins = collect_bin_files(ssd_path)
|
||||||
|
usbl_logs = collect_usbl_logs(ssd_path)
|
||||||
|
audio_logs = collect_audio_logs(ssd_path)
|
||||||
|
sss_files, n_sss = collect_sss_files(ssd_path)
|
||||||
|
mag_files, n_mag = collect_mag_files(ssd_path)
|
||||||
|
|
||||||
|
# Parse mission date for date-focus filter
|
||||||
|
mission_date = _parse_mission_date(mission)
|
||||||
|
if mission_date:
|
||||||
|
win_start, win_end = _mission_date_window(mission_date)
|
||||||
|
mission_date_str = mission_date.strftime("%Y-%m-%d")
|
||||||
|
mission_date_window_utc = {
|
||||||
|
"start": win_start.isoformat(timespec="seconds"),
|
||||||
|
"end": win_end.isoformat(timespec="seconds"),
|
||||||
|
}
|
||||||
|
print(f" mission_date: {mission_date_str} | window: {win_start.isoformat(timespec='seconds')} -> {win_end.isoformat(timespec='seconds')}")
|
||||||
|
else:
|
||||||
|
mission_date_str = None
|
||||||
|
mission_date_window_utc = None
|
||||||
|
win_start = win_end = None
|
||||||
|
|
||||||
|
manifest = {
|
||||||
|
"mission": mission,
|
||||||
|
"generated_at": iso_utc_now(),
|
||||||
|
"ssd_path": str(ssd_path),
|
||||||
|
"mission_date": mission_date_str,
|
||||||
|
"mission_date_window_utc": mission_date_window_utc,
|
||||||
|
"videos": videos,
|
||||||
|
"mcap_bags": mcap_bags,
|
||||||
|
"bin_files": bin_files,
|
||||||
|
"usbl_logs": usbl_logs,
|
||||||
|
"audio_logs": audio_logs,
|
||||||
|
"sss_files": sss_files,
|
||||||
|
"mag_files": mag_files,
|
||||||
|
"totals": {
|
||||||
|
"n_videos": n_vids,
|
||||||
|
"n_mcap_bags": n_bags,
|
||||||
|
"n_bin_files": n_bins,
|
||||||
|
"n_usbl_logs": len(usbl_logs),
|
||||||
|
"n_audio_logs": len(audio_logs),
|
||||||
|
"n_sss_files": n_sss,
|
||||||
|
"n_mag_files": n_mag,
|
||||||
|
"total_video_size_gb": round(total_gb, 2),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
# --- Coverage extraction ---
|
||||||
|
coverage, auv_file_windows = build_coverage(manifest, ssd_base)
|
||||||
|
|
||||||
|
# --- Date-focus filter on coverage ---
|
||||||
|
n_filtered_old = 0 # already handled at collect time via _is_old_path
|
||||||
|
n_filtered_out_of_date = 0
|
||||||
|
if win_start and win_end:
|
||||||
|
filtered_coverage = {}
|
||||||
|
for rel, cov in coverage.items():
|
||||||
|
if cov.get("source") == "mtime_fallback":
|
||||||
|
filtered_coverage[rel] = cov
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
t_s = datetime.fromisoformat(cov["t_start"])
|
||||||
|
# Ensure tz-aware
|
||||||
|
if t_s.tzinfo is None:
|
||||||
|
t_s = t_s.replace(tzinfo=timezone.utc)
|
||||||
|
if win_start <= t_s <= win_end:
|
||||||
|
filtered_coverage[rel] = cov
|
||||||
|
else:
|
||||||
|
n_filtered_out_of_date += 1
|
||||||
|
except Exception:
|
||||||
|
filtered_coverage[rel] = cov
|
||||||
|
if n_filtered_out_of_date:
|
||||||
|
print(f" date_filter: removed {n_filtered_out_of_date} entries outside mission date window")
|
||||||
|
coverage = filtered_coverage
|
||||||
|
|
||||||
|
# Rebuild auv_file_windows from filtered coverage
|
||||||
|
kept_rels = set(coverage.keys())
|
||||||
|
auv_file_windows_filtered = defaultdict(list)
|
||||||
|
for auv_id, wins in auv_file_windows.items():
|
||||||
|
for w in wins:
|
||||||
|
if w[2] in kept_rels:
|
||||||
|
auv_file_windows_filtered[auv_id].append(w)
|
||||||
|
auv_file_windows = dict(auv_file_windows_filtered)
|
||||||
|
|
||||||
|
mission_window = compute_mission_window(coverage)
|
||||||
|
auv_windows = compute_auv_windows(auv_file_windows)
|
||||||
|
|
||||||
|
# Coverage stats
|
||||||
|
import collections
|
||||||
|
source_counts = collections.Counter(v["source"] for v in coverage.values())
|
||||||
|
n_fallback = source_counts.get("mtime_fallback", 0)
|
||||||
|
n_total_cov = len(coverage)
|
||||||
|
if n_total_cov:
|
||||||
|
print(f" coverage: {n_total_cov} files | fallback={n_fallback} "
|
||||||
|
f"({100*n_fallback//n_total_cov}%) | sources={dict(source_counts)}")
|
||||||
|
|
||||||
|
manifest["coverage"] = coverage
|
||||||
|
manifest["mission_window"] = mission_window
|
||||||
|
manifest["auv_windows"] = auv_windows
|
||||||
|
manifest["totals"]["n_filtered_old"] = n_filtered_old
|
||||||
|
manifest["totals"]["n_filtered_out_of_date"] = n_filtered_out_of_date
|
||||||
|
|
||||||
|
return manifest
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv=None):
|
||||||
|
ap = argparse.ArgumentParser(description="Stage 01 -- select mission, produce raw manifest.")
|
||||||
|
ap.add_argument("--mission", required=True, help="Mission folder name (e.g. 20260505-Lepradet)")
|
||||||
|
ap.add_argument("--ssd-base", default="/mnt/ssd", help="SSD base path (default /mnt/ssd)")
|
||||||
|
ap.add_argument("--out", default="/home/cosma/cosma-qc/data",
|
||||||
|
help="Output base dir (manifest written to <out>/<mission>/01_manifest.json)")
|
||||||
|
args = ap.parse_args(argv)
|
||||||
|
|
||||||
|
ssd_base = Path(args.ssd_base)
|
||||||
|
out_base = Path(args.out)
|
||||||
|
out_dir = out_base / args.mission
|
||||||
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
out_path = out_dir / "01_manifest.json"
|
||||||
|
|
||||||
|
with track_stage("01_select_mission", args.mission, out_dir,
|
||||||
|
output_files=[out_path]):
|
||||||
|
manifest = build_manifest(args.mission, ssd_base)
|
||||||
|
tmp = out_path.with_suffix(".json.tmp")
|
||||||
|
tmp.write_text(json.dumps(manifest, indent=2, ensure_ascii=False))
|
||||||
|
tmp.replace(out_path)
|
||||||
|
|
||||||
|
t = manifest["totals"]
|
||||||
|
print(f"[ok] {out_path}")
|
||||||
|
print(f" videos={t['n_videos']} ({t['total_video_size_gb']} GB) "
|
||||||
|
f"bags={t['n_mcap_bags']} bins={t['n_bin_files']} "
|
||||||
|
f"usbl={t['n_usbl_logs']} audio={t['n_audio_logs']} "
|
||||||
|
f"sss={t['n_sss_files']} mag={t['n_mag_files']}")
|
||||||
|
print(f" filtered: old={t.get('n_filtered_old', 0)} out_of_date={t.get('n_filtered_out_of_date', 0)}")
|
||||||
|
if manifest.get("mission_window"):
|
||||||
|
mw = manifest["mission_window"]
|
||||||
|
print(f" mission_window: {mw['t_start']} -> {mw['t_end']} ({mw['duration_s']}s)")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
835
pipeline/stages/02_mission_run_detect.py
Executable file
835
pipeline/stages/02_mission_run_detect.py
Executable file
@@ -0,0 +1,835 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Stage 02 — Mission run detection (v4 state-based).
|
||||||
|
|
||||||
|
Détecte runs via transitions /mavros/state (armed + mode) + validation rel_alt.
|
||||||
|
Fallback depth-only si topic state absent.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 02_mission_run_detect.py --mission 20260508-sttropez [--ssd-base /mnt/ssd] [--out data/] [--dry-run]
|
||||||
|
python3 02_mission_run_detect.py --mission 20260505-Lepradet --out data/ \
|
||||||
|
--state-modes ALT_HOLD --require-armed --require-descent \
|
||||||
|
--min-mission-depth -2.0 --min-sustained-duration 30 --min-near-bottom-pct 50 --force
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import glob
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import statistics
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from mcap.reader import make_reader
|
||||||
|
from mcap_ros2.decoder import DecoderFactory
|
||||||
|
|
||||||
|
# Local helper for stage time/memory tracking
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent))
|
||||||
|
from _meta import track_stage # noqa: E402
|
||||||
|
|
||||||
|
# Mapping ID physique → ID logistique
|
||||||
|
AUV_PHYSICAL_MAP = {
|
||||||
|
"AUV010": "AUV210",
|
||||||
|
"AUV012": "AUV212",
|
||||||
|
"AUV013": "AUV213",
|
||||||
|
}
|
||||||
|
|
||||||
|
TOPIC_REL_ALT = "/mavros/global_position/rel_alt"
|
||||||
|
TOPIC_STATE = "/mavros/state"
|
||||||
|
THRESHOLD = -0.3 # rel_alt < THRESHOLD → immersé
|
||||||
|
NEED_STREAK = 30 # secondes consécutives pour confirmer début/fin
|
||||||
|
MIN_DURATION = 60 # secondes minimum par run
|
||||||
|
SMOOTH_WINDOW_S = 3 # fenêtre lissage médian (secondes à 1Hz = samples)
|
||||||
|
|
||||||
|
# Default modes considérés comme "mission active"
|
||||||
|
DEFAULT_STATE_MODES = {"AUTO", "GUIDED"}
|
||||||
|
# Modes qui TERMINENT un run (si require-armed=False)
|
||||||
|
STATE_STOP_MODES = {"SURFACE", "MANUAL"}
|
||||||
|
|
||||||
|
# Filtrage "vraie mission" (modifiables via argparse)
|
||||||
|
DEFAULT_MIN_MISSION_DEPTH = -3.0 # rel_alt doit atteindre ce seuil (m) — strict v5 2026-05-14
|
||||||
|
DEFAULT_MIN_SUSTAINED_DURATION = 60.0 # pendant au moins N secondes consécutives — strict v5
|
||||||
|
DEFAULT_MIN_NEAR_BOTTOM_PCT = 80.0 # % min du run où rel_alt < min_mission_depth — strict v5
|
||||||
|
DEFAULT_MIN_DISPLACEMENT_M = 5.0 # futur stage06+: déplacement min USBL — documenté seulement v5
|
||||||
|
|
||||||
|
# Filtrage "avant première plongée"
|
||||||
|
DEFAULT_FIRST_SUBMERSION_DEPTH = -2.0 # seuil rel_alt pour considérer submergé
|
||||||
|
DEFAULT_FIRST_SUBMERSION_DURATION = 5.0 # durée min continue (s) pour confirmer submersion
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Lecture MCAP rel_alt
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def parse_mcap_relalt(mcap_path: Path):
|
||||||
|
"""Lit rel_alt depuis un fichier mcap. Retourne liste de (epoch_s, rel_alt_m)."""
|
||||||
|
data = []
|
||||||
|
try:
|
||||||
|
with open(mcap_path, "rb") as fp:
|
||||||
|
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
|
||||||
|
for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(topics=[TOPIC_REL_ALT]):
|
||||||
|
ts = message.log_time / 1e9
|
||||||
|
data.append((ts, float(ros_msg.data)))
|
||||||
|
except Exception as exc:
|
||||||
|
print(f" [WARN] skip {mcap_path.name}: {exc}", file=sys.stderr)
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Lecture MCAP /mavros/state
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def read_mavros_state(mcap_path: Path, t_start: float = None, t_end: float = None):
|
||||||
|
"""
|
||||||
|
Lit /mavros/state depuis un fichier mcap.
|
||||||
|
Retourne liste de (epoch_s, armed:bool, mode:str).
|
||||||
|
Filtrage optionnel sur [t_start, t_end].
|
||||||
|
"""
|
||||||
|
data = []
|
||||||
|
try:
|
||||||
|
with open(mcap_path, "rb") as fp:
|
||||||
|
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
|
||||||
|
for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(topics=[TOPIC_STATE]):
|
||||||
|
ts = message.log_time / 1e9
|
||||||
|
if t_start is not None and ts < t_start:
|
||||||
|
continue
|
||||||
|
if t_end is not None and ts > t_end:
|
||||||
|
continue
|
||||||
|
data.append((ts, bool(ros_msg.armed), str(ros_msg.mode)))
|
||||||
|
except Exception as exc:
|
||||||
|
# Silent — bag could be corrupt
|
||||||
|
pass
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def parse_mcap_both(mcap_path: Path):
|
||||||
|
"""Lit rel_alt ET state depuis un seul fichier mcap en un seul pass."""
|
||||||
|
alt_data = []
|
||||||
|
state_data = []
|
||||||
|
try:
|
||||||
|
with open(mcap_path, "rb") as fp:
|
||||||
|
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
|
||||||
|
for _schema, _channel, message, ros_msg in reader.iter_decoded_messages(
|
||||||
|
topics=[TOPIC_REL_ALT, TOPIC_STATE]):
|
||||||
|
ts = message.log_time / 1e9
|
||||||
|
if _channel.topic == TOPIC_REL_ALT:
|
||||||
|
alt_data.append((ts, float(ros_msg.data)))
|
||||||
|
elif _channel.topic == TOPIC_STATE:
|
||||||
|
state_data.append((ts, bool(ros_msg.armed), str(ros_msg.mode)))
|
||||||
|
except Exception as exc:
|
||||||
|
print(f" [WARN] skip {mcap_path.name}: {exc}", file=sys.stderr)
|
||||||
|
return alt_data, state_data
|
||||||
|
|
||||||
|
|
||||||
|
def extract_auv_id(folder_name: str):
|
||||||
|
"""'20260508_054551_AUV010' → 'AUV010'."""
|
||||||
|
m = re.search(r"(AUV\d+)$", folder_name)
|
||||||
|
return m.group(1) if m else None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Signal processing
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def resample_1hz(data):
|
||||||
|
"""Resample (epoch, val) → 1Hz par interpolation linéaire. Retourne (times[], vals[])."""
|
||||||
|
if not data:
|
||||||
|
return [], []
|
||||||
|
data = sorted(data)
|
||||||
|
t0 = int(data[0][0])
|
||||||
|
t1 = int(data[-1][0])
|
||||||
|
times = list(range(t0, t1 + 1))
|
||||||
|
vals = []
|
||||||
|
idx = 0
|
||||||
|
for t in times:
|
||||||
|
while idx < len(data) - 1 and data[idx + 1][0] < t:
|
||||||
|
idx += 1
|
||||||
|
if t <= data[0][0]:
|
||||||
|
v = data[0][1]
|
||||||
|
elif t >= data[-1][0]:
|
||||||
|
v = data[-1][1]
|
||||||
|
else:
|
||||||
|
t0_, v0 = data[idx]
|
||||||
|
t1_, v1 = data[idx + 1]
|
||||||
|
dt = t1_ - t0_
|
||||||
|
v = v0 + ((t - t0_) / dt) * (v1 - v0) if dt > 0 else v0
|
||||||
|
vals.append(v)
|
||||||
|
return times, vals
|
||||||
|
|
||||||
|
|
||||||
|
def median_filter(vals, window=3):
|
||||||
|
half = window // 2
|
||||||
|
out = []
|
||||||
|
n = len(vals)
|
||||||
|
for i in range(n):
|
||||||
|
lo = max(0, i - half)
|
||||||
|
hi = min(n, i + half + 1)
|
||||||
|
w = sorted(vals[lo:hi])
|
||||||
|
out.append(w[len(w) // 2])
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def find_first_submersion_epoch(rel_alt_times, rel_alt_vals, threshold=-2.0, min_duration=5.0):
|
||||||
|
"""
|
||||||
|
Trouve la PREMIÈRE fois où rel_alt < threshold pendant >= min_duration secondes en continu.
|
||||||
|
Retourne epoch du début de cette submersion, ou None si jamais submergé.
|
||||||
|
|
||||||
|
Travaille sur données 1Hz lissées (times/vals déjà resampleés).
|
||||||
|
"""
|
||||||
|
n = len(rel_alt_times)
|
||||||
|
if n == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
streak_start = None
|
||||||
|
streak_count = 0
|
||||||
|
|
||||||
|
for i in range(n):
|
||||||
|
if rel_alt_vals[i] < threshold:
|
||||||
|
if streak_count == 0:
|
||||||
|
streak_start = rel_alt_times[i]
|
||||||
|
streak_count += 1
|
||||||
|
if streak_count >= min_duration:
|
||||||
|
return float(streak_start)
|
||||||
|
else:
|
||||||
|
streak_count = 0
|
||||||
|
streak_start = None
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def detect_runs_depth_only(times, vals, threshold=THRESHOLD, need_streak=NEED_STREAK, min_duration=MIN_DURATION):
|
||||||
|
"""
|
||||||
|
Détecte les runs d'immersion via rel_alt seul (fallback).
|
||||||
|
|
||||||
|
Algorithme:
|
||||||
|
- need_streak samples consécutifs < threshold pour confirmer entrée/sortie
|
||||||
|
- min_duration secondes minimum par run
|
||||||
|
Returns liste de (start_epoch, end_epoch).
|
||||||
|
"""
|
||||||
|
n = len(times)
|
||||||
|
if n == 0:
|
||||||
|
return []
|
||||||
|
|
||||||
|
under = [v < threshold for v in vals]
|
||||||
|
|
||||||
|
runs = []
|
||||||
|
in_run = False
|
||||||
|
run_start = None
|
||||||
|
streak_under = 0
|
||||||
|
streak_surface = 0
|
||||||
|
|
||||||
|
i = 0
|
||||||
|
while i < n:
|
||||||
|
if not in_run:
|
||||||
|
if under[i]:
|
||||||
|
streak_under += 1
|
||||||
|
if streak_under >= need_streak:
|
||||||
|
run_start = times[i - need_streak + 1]
|
||||||
|
in_run = True
|
||||||
|
streak_surface = 0
|
||||||
|
else:
|
||||||
|
streak_under = 0
|
||||||
|
else:
|
||||||
|
if under[i]:
|
||||||
|
streak_surface = 0
|
||||||
|
else:
|
||||||
|
streak_surface += 1
|
||||||
|
if streak_surface >= need_streak:
|
||||||
|
run_end = times[i - need_streak + 1]
|
||||||
|
dur = run_end - run_start
|
||||||
|
if dur >= min_duration:
|
||||||
|
runs.append((run_start, run_end))
|
||||||
|
in_run = False
|
||||||
|
run_start = None
|
||||||
|
streak_under = 0
|
||||||
|
streak_surface = 0
|
||||||
|
i += 1
|
||||||
|
|
||||||
|
if in_run and run_start is not None:
|
||||||
|
run_end = times[-1]
|
||||||
|
dur = run_end - run_start
|
||||||
|
if dur >= min_duration:
|
||||||
|
runs.append((run_start, run_end))
|
||||||
|
|
||||||
|
return runs
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# State-based run detection
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def detect_runs_state_based(state_data, times_alt, vals_smooth, mission_modes, require_armed,
|
||||||
|
require_descent, min_duration, min_depth_m=-0.3):
|
||||||
|
"""
|
||||||
|
Détecte runs via /mavros/state:
|
||||||
|
- Intervalles continus armed=True (si require_armed) AND mode in mission_modes
|
||||||
|
- Valide via transition rel_alt surface → fond (si require_descent)
|
||||||
|
- min_duration filtre courtes fenêtres
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list of (start_epoch, end_epoch, dominant_mode)
|
||||||
|
"""
|
||||||
|
if not state_data:
|
||||||
|
return []
|
||||||
|
|
||||||
|
state_data_sorted = sorted(state_data)
|
||||||
|
|
||||||
|
# Construire séquence d'états
|
||||||
|
# Identifier intervalles "mission active"
|
||||||
|
intervals = []
|
||||||
|
cur_start = None
|
||||||
|
cur_mode_counts = {}
|
||||||
|
|
||||||
|
for ts, armed, mode in state_data_sorted:
|
||||||
|
active = (not require_armed or armed) and (mode in mission_modes)
|
||||||
|
|
||||||
|
if active and cur_start is None:
|
||||||
|
cur_start = ts
|
||||||
|
cur_mode_counts = {mode: 1}
|
||||||
|
elif active and cur_start is not None:
|
||||||
|
cur_mode_counts[mode] = cur_mode_counts.get(mode, 0) + 1
|
||||||
|
elif not active and cur_start is not None:
|
||||||
|
# Fin de l'intervalle
|
||||||
|
intervals.append((cur_start, ts, cur_mode_counts))
|
||||||
|
cur_start = None
|
||||||
|
cur_mode_counts = {}
|
||||||
|
|
||||||
|
# Flush dernier intervalle
|
||||||
|
if cur_start is not None:
|
||||||
|
intervals.append((cur_start, state_data_sorted[-1][0], cur_mode_counts))
|
||||||
|
|
||||||
|
# Filtrer par durée min
|
||||||
|
runs = []
|
||||||
|
for start_t, end_t, mode_counts in intervals:
|
||||||
|
dur = end_t - start_t
|
||||||
|
if dur < min_duration:
|
||||||
|
continue
|
||||||
|
|
||||||
|
dominant_mode = max(mode_counts, key=mode_counts.get) if mode_counts else "UNKNOWN"
|
||||||
|
|
||||||
|
if require_descent:
|
||||||
|
# Vérifier transition surface → fond dans la fenêtre
|
||||||
|
window_alts = [(t, v) for t, v in zip(times_alt, vals_smooth)
|
||||||
|
if start_t <= t <= end_t]
|
||||||
|
if not window_alts:
|
||||||
|
# Pas de données alt dans cette fenêtre — skip
|
||||||
|
continue
|
||||||
|
alt_vals = [v for _, v in window_alts]
|
||||||
|
# Début = surface (rel_alt > -1m dans les 60 premiers samples)
|
||||||
|
first_segment = alt_vals[:60]
|
||||||
|
last_segment = alt_vals[-60:]
|
||||||
|
starts_near_surface = any(v > -1.0 for v in first_segment)
|
||||||
|
reaches_depth = any(v <= min_depth_m for v in alt_vals)
|
||||||
|
if not (starts_near_surface and reaches_depth):
|
||||||
|
continue
|
||||||
|
|
||||||
|
runs.append((start_t, end_t, dominant_mode))
|
||||||
|
|
||||||
|
return runs
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Validation "vraie mission" — filtre oscillations surface
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def compute_sustained_depth(times, vals_smooth, start_e, end_e, min_depth_m):
|
||||||
|
"""
|
||||||
|
Pour un run [start_e, end_e], calcule le plus long segment continu
|
||||||
|
où rel_alt <= min_depth_m (ex: -2.0m).
|
||||||
|
"""
|
||||||
|
run_pairs = [(t, v) for t, v in zip(times, vals_smooth) if start_e <= t <= end_e]
|
||||||
|
if not run_pairs:
|
||||||
|
return 0.0, 0.0
|
||||||
|
|
||||||
|
best_dur = 0
|
||||||
|
best_min = 0.0
|
||||||
|
cur_dur = 0
|
||||||
|
cur_min = 0.0
|
||||||
|
|
||||||
|
for _t, v in run_pairs:
|
||||||
|
if v <= min_depth_m:
|
||||||
|
cur_dur += 1
|
||||||
|
cur_min = min(cur_min, v) if cur_dur > 1 else v
|
||||||
|
else:
|
||||||
|
if cur_dur > best_dur:
|
||||||
|
best_dur = cur_dur
|
||||||
|
best_min = cur_min
|
||||||
|
cur_dur = 0
|
||||||
|
cur_min = 0.0
|
||||||
|
|
||||||
|
if cur_dur > best_dur:
|
||||||
|
best_dur = cur_dur
|
||||||
|
best_min = cur_min
|
||||||
|
|
||||||
|
return round(best_min, 2), float(best_dur)
|
||||||
|
|
||||||
|
|
||||||
|
def compute_near_bottom_pct(times, vals_smooth, start_e, end_e, min_depth_m):
|
||||||
|
"""Calcule le % de samples dans [start_e, end_e] où rel_alt < min_depth_m."""
|
||||||
|
run_vals = [v for t, v in zip(times, vals_smooth) if start_e <= t <= end_e]
|
||||||
|
if not run_vals:
|
||||||
|
return 0.0
|
||||||
|
n_below = sum(1 for v in run_vals if v < min_depth_m)
|
||||||
|
return round(100.0 * n_below / len(run_vals), 1)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Processing AUV
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def process_auv(auv_mcap_id: str, bag_dirs: list, ssd_base: Path,
|
||||||
|
min_mission_depth: float, min_sustained_duration: float,
|
||||||
|
min_near_bottom_pct: float = 0.0,
|
||||||
|
state_modes: set = None, require_armed: bool = True,
|
||||||
|
require_descent: bool = True,
|
||||||
|
first_submersion_depth: float = DEFAULT_FIRST_SUBMERSION_DEPTH,
|
||||||
|
first_submersion_duration: float = DEFAULT_FIRST_SUBMERSION_DURATION):
|
||||||
|
"""
|
||||||
|
Agréger tous les bags d'un AUV, détecter runs via state (avec fallback depth-only).
|
||||||
|
Filtre les runs entiers avant la première vraie submersion.
|
||||||
|
"""
|
||||||
|
physical_id = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id)
|
||||||
|
if state_modes is None:
|
||||||
|
state_modes = DEFAULT_STATE_MODES
|
||||||
|
|
||||||
|
all_alt_data = []
|
||||||
|
all_state_data = []
|
||||||
|
bags_list = []
|
||||||
|
n_parsed = 0
|
||||||
|
n_skipped = 0
|
||||||
|
|
||||||
|
# Parcourir bags dans l'ordre chronologique
|
||||||
|
for bag_dir in sorted(bag_dirs):
|
||||||
|
bags_list.append(str(bag_dir.name))
|
||||||
|
mcap_files = sorted(bag_dir.glob("*.mcap"))
|
||||||
|
for mcap_path in mcap_files:
|
||||||
|
alt_pts, state_pts = parse_mcap_both(mcap_path)
|
||||||
|
if alt_pts or state_pts:
|
||||||
|
all_alt_data.extend(alt_pts)
|
||||||
|
all_state_data.extend(state_pts)
|
||||||
|
n_parsed += 1
|
||||||
|
else:
|
||||||
|
n_skipped += 1
|
||||||
|
if (n_parsed + n_skipped) % 50 == 0:
|
||||||
|
print(f" [{auv_mcap_id}] ... {n_parsed + n_skipped} bags processed", file=sys.stderr)
|
||||||
|
|
||||||
|
has_state = len(all_state_data) > 0
|
||||||
|
print(f" [{auv_mcap_id}] {n_parsed} mcap OK, {n_skipped} skip, "
|
||||||
|
f"{len(all_alt_data)} rel_alt pts, {len(all_state_data)} state pts", file=sys.stderr)
|
||||||
|
|
||||||
|
if not all_alt_data:
|
||||||
|
print(f" [{auv_mcap_id}] [WARN] aucune donnée rel_alt", file=sys.stderr)
|
||||||
|
return {
|
||||||
|
"mcap_id": auv_mcap_id,
|
||||||
|
"physical_id": physical_id,
|
||||||
|
"bags": bags_list,
|
||||||
|
"total_immersion_s": 0,
|
||||||
|
"runs": [],
|
||||||
|
"runs_rejected": [],
|
||||||
|
"detection_method": "no_data",
|
||||||
|
"first_submersion_epoch": None,
|
||||||
|
"pre_water_rejected_count": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Dédup + tri rel_alt
|
||||||
|
seen = set()
|
||||||
|
deduped_alt = []
|
||||||
|
for t, v in sorted(all_alt_data):
|
||||||
|
key = round(t, 3)
|
||||||
|
if key not in seen:
|
||||||
|
seen.add(key)
|
||||||
|
deduped_alt.append((t, v))
|
||||||
|
|
||||||
|
# Resample 1Hz
|
||||||
|
times, vals = resample_1hz(deduped_alt)
|
||||||
|
print(f" [{auv_mcap_id}] {len(times)} samples 1Hz [{times[0]:.0f}..{times[-1]:.0f}]", file=sys.stderr)
|
||||||
|
|
||||||
|
# Lissage médian 3s
|
||||||
|
vals_smooth = median_filter(vals, window=SMOOTH_WINDOW_S)
|
||||||
|
|
||||||
|
# Trouver première submersion réelle
|
||||||
|
first_sub_epoch = find_first_submersion_epoch(
|
||||||
|
times, vals_smooth,
|
||||||
|
threshold=first_submersion_depth,
|
||||||
|
min_duration=first_submersion_duration,
|
||||||
|
)
|
||||||
|
if first_sub_epoch is not None:
|
||||||
|
print(f" [{auv_mcap_id}] Première submersion réelle: {first_sub_epoch:.0f} "
|
||||||
|
f"({datetime.fromtimestamp(first_sub_epoch, tz=timezone.utc).isoformat()})",
|
||||||
|
file=sys.stderr)
|
||||||
|
else:
|
||||||
|
print(f" [{auv_mcap_id}] [WARN] Aucune submersion réelle détectée "
|
||||||
|
f"(threshold={first_submersion_depth}m, min_dur={first_submersion_duration}s)",
|
||||||
|
file=sys.stderr)
|
||||||
|
|
||||||
|
# Détection runs
|
||||||
|
detection_method = "depth_only"
|
||||||
|
if has_state:
|
||||||
|
# Trier state data
|
||||||
|
all_state_data.sort(key=lambda x: x[0])
|
||||||
|
|
||||||
|
# Extraire modes présents
|
||||||
|
modes_in_data = set(m for _, _, m in all_state_data)
|
||||||
|
modes_active = modes_in_data & state_modes
|
||||||
|
print(f" [{auv_mcap_id}] state modes found: {modes_in_data}", file=sys.stderr)
|
||||||
|
print(f" [{auv_mcap_id}] state modes matching --state-modes: {modes_active}", file=sys.stderr)
|
||||||
|
|
||||||
|
if modes_active:
|
||||||
|
raw_runs_with_mode = detect_runs_state_based(
|
||||||
|
all_state_data, times, vals_smooth,
|
||||||
|
mission_modes=state_modes,
|
||||||
|
require_armed=require_armed,
|
||||||
|
require_descent=require_descent,
|
||||||
|
min_duration=MIN_DURATION,
|
||||||
|
min_depth_m=min_mission_depth,
|
||||||
|
)
|
||||||
|
raw_runs = [(s, e) for s, e, _m in raw_runs_with_mode]
|
||||||
|
run_modes = {(s, e): m for s, e, m in raw_runs_with_mode}
|
||||||
|
detection_method = "state_based"
|
||||||
|
print(f" [{auv_mcap_id}] {len(raw_runs)} candidats state-based", file=sys.stderr)
|
||||||
|
else:
|
||||||
|
print(f" [{auv_mcap_id}] [WARN] aucun mode {state_modes} dans state data "
|
||||||
|
f"→ fallback depth-only", file=sys.stderr)
|
||||||
|
raw_runs = detect_runs_depth_only(times, vals_smooth)
|
||||||
|
run_modes = {}
|
||||||
|
detection_method = "depth_only_fallback_no_modes"
|
||||||
|
else:
|
||||||
|
print(f" [{auv_mcap_id}] [WARN] topic /mavros/state vide → fallback depth-only", file=sys.stderr)
|
||||||
|
raw_runs = detect_runs_depth_only(times, vals_smooth)
|
||||||
|
run_modes = {}
|
||||||
|
|
||||||
|
print(f" [{auv_mcap_id}] {len(raw_runs)} runs candidats ({detection_method})", file=sys.stderr)
|
||||||
|
|
||||||
|
# Filtrage pre-water: exclure/tronquer runs avant première submersion
|
||||||
|
pre_water_rejected_count = 0
|
||||||
|
if first_sub_epoch is not None:
|
||||||
|
filtered_runs = []
|
||||||
|
for start_e, end_e in raw_runs:
|
||||||
|
if end_e <= first_sub_epoch:
|
||||||
|
# Run entier avant première plongée → rejeter
|
||||||
|
pre_water_rejected_count += 1
|
||||||
|
phys_id_log = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id)
|
||||||
|
print(f" [{auv_mcap_id}] PRE-WATER REJECT run [{start_e:.0f}..{end_e:.0f}] "
|
||||||
|
f"(end before first_sub={first_sub_epoch:.0f})", file=sys.stderr)
|
||||||
|
elif start_e < first_sub_epoch < end_e:
|
||||||
|
# Run chevauche → tronquer start
|
||||||
|
new_start = first_sub_epoch
|
||||||
|
print(f" [{auv_mcap_id}] PRE-WATER TRUNCATE run [{start_e:.0f}..{end_e:.0f}] "
|
||||||
|
f"→ [{new_start:.0f}..{end_e:.0f}]", file=sys.stderr)
|
||||||
|
# Mettre à jour run_modes si besoin
|
||||||
|
if (start_e, end_e) in run_modes:
|
||||||
|
run_modes[(new_start, end_e)] = run_modes.pop((start_e, end_e))
|
||||||
|
filtered_runs.append((new_start, end_e))
|
||||||
|
else:
|
||||||
|
filtered_runs.append((start_e, end_e))
|
||||||
|
raw_runs = filtered_runs
|
||||||
|
print(f" [{auv_mcap_id}] {pre_water_rejected_count} runs rejetés pre-water, "
|
||||||
|
f"{len(raw_runs)} restants", file=sys.stderr)
|
||||||
|
|
||||||
|
# Construire runs enrichis + filtre "vraie mission"
|
||||||
|
runs_out = []
|
||||||
|
runs_rejected = []
|
||||||
|
for idx, (start_e, end_e) in enumerate(raw_runs):
|
||||||
|
dur = end_e - start_e
|
||||||
|
run_id = f"{physical_id}_run_{idx:02d}"
|
||||||
|
dominant_mode = run_modes.get((start_e, end_e), "UNKNOWN")
|
||||||
|
|
||||||
|
run_vals = [v for t, v in zip(times, vals_smooth) if start_e <= t <= end_e]
|
||||||
|
max_depth = round(min(run_vals), 2) if run_vals else 0.0
|
||||||
|
mean_depth = round(statistics.mean(run_vals), 2) if run_vals else 0.0
|
||||||
|
|
||||||
|
sustained_below_m, sustained_duration_s = compute_sustained_depth(
|
||||||
|
times, vals_smooth, start_e, end_e, min_mission_depth
|
||||||
|
)
|
||||||
|
pct_near_bottom = compute_near_bottom_pct(
|
||||||
|
times, vals_smooth, start_e, end_e, min_mission_depth
|
||||||
|
)
|
||||||
|
|
||||||
|
run_entry = {
|
||||||
|
"run_id": run_id,
|
||||||
|
"start_epoch": float(start_e),
|
||||||
|
"end_epoch": float(end_e),
|
||||||
|
"duration_s": round(dur, 1),
|
||||||
|
"max_depth_m": max_depth,
|
||||||
|
"mean_depth_m": mean_depth,
|
||||||
|
"sustained_below_m": sustained_below_m,
|
||||||
|
"sustained_duration_s": sustained_duration_s,
|
||||||
|
"pct_near_bottom": pct_near_bottom,
|
||||||
|
"dominant_mode": dominant_mode,
|
||||||
|
"detection_method": detection_method,
|
||||||
|
}
|
||||||
|
|
||||||
|
reject_reason = None
|
||||||
|
if sustained_duration_s < min_sustained_duration:
|
||||||
|
reject_reason = (
|
||||||
|
f"sustained_depth only {sustained_duration_s:.0f}s "
|
||||||
|
f"(need {min_sustained_duration:.0f}s below {min_mission_depth}m)"
|
||||||
|
)
|
||||||
|
elif min_near_bottom_pct > 0.0 and pct_near_bottom < min_near_bottom_pct:
|
||||||
|
reject_reason = (
|
||||||
|
f"not_enough_immersion: only {pct_near_bottom:.1f}% "
|
||||||
|
f"time below {min_mission_depth}m (need {min_near_bottom_pct:.1f}%)"
|
||||||
|
)
|
||||||
|
|
||||||
|
if reject_reason is None:
|
||||||
|
runs_out.append(run_entry)
|
||||||
|
else:
|
||||||
|
run_entry["rejected_reason"] = reject_reason
|
||||||
|
runs_rejected.append(run_entry)
|
||||||
|
print(
|
||||||
|
f" [{auv_mcap_id}] REJECT {run_id}: max_depth={max_depth}m "
|
||||||
|
f"sustained={sustained_duration_s:.0f}s pct={pct_near_bottom:.1f}% "
|
||||||
|
f"reason={reject_reason[:60]}",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f" [{auv_mcap_id}] {len(runs_out)} runs OK, {len(runs_rejected)} rejetés ({detection_method})",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
|
||||||
|
total_immersion_s = round(sum(r["duration_s"] for r in runs_out), 1)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"mcap_id": auv_mcap_id,
|
||||||
|
"physical_id": physical_id,
|
||||||
|
"bags": bags_list,
|
||||||
|
"total_immersion_s": total_immersion_s,
|
||||||
|
"runs": runs_out,
|
||||||
|
"runs_rejected": runs_rejected,
|
||||||
|
"detection_method": detection_method,
|
||||||
|
"first_submersion_epoch": first_sub_epoch,
|
||||||
|
"pre_water_rejected_count": pre_water_rejected_count,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Main
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _run(args):
|
||||||
|
bags_root = Path(args.ssd_base) / args.mission / "raw_data" / "logs" / "SUB" / "bag"
|
||||||
|
if not bags_root.exists():
|
||||||
|
print(f"[ERROR] bags dir not found: {bags_root}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
state_modes = set(m.strip() for m in args.state_modes.split(",") if m.strip())
|
||||||
|
|
||||||
|
print(f"[stage02] Mission: {args.mission}")
|
||||||
|
print(f"[stage02] Bags root: {bags_root}", file=sys.stderr)
|
||||||
|
print(f"[stage02] State modes: {state_modes}, require_armed={args.require_armed}, "
|
||||||
|
f"require_descent={args.require_descent}", file=sys.stderr)
|
||||||
|
near_bottom_str = (f" + near_bottom >= {args.min_near_bottom_pct:.0f}%"
|
||||||
|
if args.min_near_bottom_pct > 0 else "")
|
||||||
|
print(
|
||||||
|
f"[stage02] Filtre mission: sustained >= {args.min_sustained_duration}s "
|
||||||
|
f"below {args.min_mission_depth}m{near_bottom_str}",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f"[stage02] Filtre pre-water: first_submersion threshold={args.first_submersion_depth}m "
|
||||||
|
f"min_duration={args.first_submersion_duration}s",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Grouper dossiers bag par AUV ID
|
||||||
|
auv_dirs = {}
|
||||||
|
for d in sorted(bags_root.iterdir()):
|
||||||
|
if not d.is_dir():
|
||||||
|
continue
|
||||||
|
auv_id = extract_auv_id(d.name)
|
||||||
|
if auv_id:
|
||||||
|
auv_dirs.setdefault(auv_id, []).append(d)
|
||||||
|
|
||||||
|
print(f"[stage02] AUVs: {sorted(auv_dirs.keys())}")
|
||||||
|
|
||||||
|
# Traiter chaque AUV
|
||||||
|
auvs_out = {}
|
||||||
|
all_runs_flat = []
|
||||||
|
all_rejected_flat = []
|
||||||
|
|
||||||
|
for auv_mcap_id in sorted(auv_dirs.keys()):
|
||||||
|
physical_id = AUV_PHYSICAL_MAP.get(auv_mcap_id, auv_mcap_id)
|
||||||
|
print(f"\n[stage02] Processing {auv_mcap_id} -> {physical_id} ({len(auv_dirs[auv_mcap_id])} bag dirs)...")
|
||||||
|
result = process_auv(
|
||||||
|
auv_mcap_id,
|
||||||
|
auv_dirs[auv_mcap_id],
|
||||||
|
Path(args.ssd_base),
|
||||||
|
min_mission_depth=args.min_mission_depth,
|
||||||
|
min_sustained_duration=args.min_sustained_duration,
|
||||||
|
min_near_bottom_pct=args.min_near_bottom_pct,
|
||||||
|
state_modes=state_modes,
|
||||||
|
require_armed=args.require_armed,
|
||||||
|
require_descent=args.require_descent,
|
||||||
|
first_submersion_depth=args.first_submersion_depth,
|
||||||
|
first_submersion_duration=args.first_submersion_duration,
|
||||||
|
)
|
||||||
|
auvs_out[physical_id] = result
|
||||||
|
all_runs_flat.extend(result["runs"])
|
||||||
|
all_rejected_flat.extend(result.get("runs_rejected", []))
|
||||||
|
|
||||||
|
# Trier tous les runs par start_epoch
|
||||||
|
all_runs_sorted = sorted(all_runs_flat, key=lambda r: r["start_epoch"])
|
||||||
|
all_rejected_sorted = sorted(all_rejected_flat, key=lambda r: r["start_epoch"])
|
||||||
|
|
||||||
|
# Construire output JSON
|
||||||
|
output = {
|
||||||
|
"mission": args.mission,
|
||||||
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"filter_params": {
|
||||||
|
"min_mission_depth_m": args.min_mission_depth,
|
||||||
|
"min_sustained_duration_s": args.min_sustained_duration,
|
||||||
|
"min_near_bottom_pct": args.min_near_bottom_pct,
|
||||||
|
"state_modes": sorted(state_modes),
|
||||||
|
"require_armed": args.require_armed,
|
||||||
|
"require_descent": args.require_descent,
|
||||||
|
"first_submersion_depth_m": args.first_submersion_depth,
|
||||||
|
"first_submersion_duration_s": args.first_submersion_duration,
|
||||||
|
},
|
||||||
|
"auvs": auvs_out,
|
||||||
|
"all_runs_sorted": all_runs_sorted,
|
||||||
|
"all_runs_rejected": all_rejected_sorted,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Summary
|
||||||
|
total_runs = len(all_runs_sorted)
|
||||||
|
total_rejected = len(all_rejected_sorted)
|
||||||
|
all_depths = [r["max_depth_m"] for r in all_runs_sorted]
|
||||||
|
global_max_depth = min(all_depths) if all_depths else 0.0
|
||||||
|
total_immersion = sum(r["duration_s"] for r in all_runs_sorted)
|
||||||
|
total_pre_water = sum(v.get("pre_water_rejected_count", 0) for v in auvs_out.values())
|
||||||
|
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print(f"[stage02] SUMMARY — {args.mission}")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
for phys_id, auv in auvs_out.items():
|
||||||
|
deep = [r for r in auv["runs"] if r["max_depth_m"] < -10.0]
|
||||||
|
rej = len(auv.get("runs_rejected", []))
|
||||||
|
method = auv.get("detection_method", "?")
|
||||||
|
fsub = auv.get("first_submersion_epoch")
|
||||||
|
fsub_str = (datetime.fromtimestamp(fsub, tz=timezone.utc).isoformat()
|
||||||
|
if fsub else "None")
|
||||||
|
pre_w = auv.get("pre_water_rejected_count", 0)
|
||||||
|
print(f" {phys_id}: {auv['total_immersion_s']}s immersion, "
|
||||||
|
f"{len(auv['runs'])} runs OK, {rej} rejetés, runs>10m: {len(deep)} [{method}]")
|
||||||
|
print(f" first_submersion: {fsub_str} | pre_water_rejected: {pre_w}")
|
||||||
|
print(f" Total runs OK: {total_runs}")
|
||||||
|
print(f" Total runs rejetés: {total_rejected}")
|
||||||
|
print(f" Total pre-water rejetés: {total_pre_water}")
|
||||||
|
print(f" Global max depth: {global_max_depth:.1f}m")
|
||||||
|
print(f" Total immersion: {total_immersion:.0f}s")
|
||||||
|
|
||||||
|
if total_runs == 0:
|
||||||
|
print("[WARN] Aucun run validé!")
|
||||||
|
else:
|
||||||
|
print(f"[QC OK] {total_runs} runs validés")
|
||||||
|
for r in all_runs_sorted:
|
||||||
|
print(f" {r['run_id']}: {r['duration_s']:.0f}s depth={r['max_depth_m']}m "
|
||||||
|
f"pct={r['pct_near_bottom']:.1f}% mode={r.get('dominant_mode','?')} "
|
||||||
|
f"method={r.get('detection_method','?')}")
|
||||||
|
|
||||||
|
if args.dry_run:
|
||||||
|
print(f"\n[dry-run] JSON non écrit.")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Écrire JSON
|
||||||
|
out_dir = Path(args.out) / args.mission
|
||||||
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
out_path = out_dir / "02_runs.json"
|
||||||
|
|
||||||
|
with open(out_path, "w") as f:
|
||||||
|
json.dump(output, f, indent=2)
|
||||||
|
|
||||||
|
print(f"[stage02] Written: {out_path}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Stage 02 — Detect mission underwater runs (v4 state-based)")
|
||||||
|
parser.add_argument("--mission", required=True, help="Mission folder, e.g. 20260508-sttropez")
|
||||||
|
parser.add_argument("--ssd-base", default="/mnt/ssd", help="SSD root path (READ-ONLY)")
|
||||||
|
parser.add_argument("--out", default="data/", help="Output base dir")
|
||||||
|
parser.add_argument("--dry-run", action="store_true", help="Print stats sans écrire fichier")
|
||||||
|
parser.add_argument(
|
||||||
|
"--min-mission-depth",
|
||||||
|
type=float,
|
||||||
|
default=DEFAULT_MIN_MISSION_DEPTH,
|
||||||
|
help=f"Profondeur seuil (m, négatif) pour valider un run (default: {DEFAULT_MIN_MISSION_DEPTH})",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--min-sustained-duration",
|
||||||
|
type=float,
|
||||||
|
default=DEFAULT_MIN_SUSTAINED_DURATION,
|
||||||
|
help=f"Durée min (s) consécutive sous min-mission-depth (default: {DEFAULT_MIN_SUSTAINED_DURATION})",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--min-near-bottom-pct",
|
||||||
|
type=float,
|
||||||
|
default=DEFAULT_MIN_NEAR_BOTTOM_PCT,
|
||||||
|
help=f"% min du run où rel_alt < min-mission-depth (0=désactivé, default: {DEFAULT_MIN_NEAR_BOTTOM_PCT})",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--state-modes",
|
||||||
|
type=str,
|
||||||
|
default=",".join(sorted(DEFAULT_STATE_MODES)),
|
||||||
|
help="CSV modes ArduSub considérés mission (ex: AUTO,GUIDED ou ALT_HOLD). "
|
||||||
|
f"Default: {','.join(sorted(DEFAULT_STATE_MODES))}",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--require-armed",
|
||||||
|
action="store_true",
|
||||||
|
default=True,
|
||||||
|
help="Exiger armed=True (défaut: True)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--no-require-armed",
|
||||||
|
action="store_false",
|
||||||
|
dest="require_armed",
|
||||||
|
help="Ne pas exiger armed=True",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--require-descent",
|
||||||
|
action="store_true",
|
||||||
|
default=True,
|
||||||
|
help="Exiger transition surface→fond dans fenêtre (défaut: True)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--no-require-descent",
|
||||||
|
action="store_false",
|
||||||
|
dest="require_descent",
|
||||||
|
help="Ne pas exiger transition surface→fond",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--first-submersion-depth",
|
||||||
|
type=float,
|
||||||
|
default=DEFAULT_FIRST_SUBMERSION_DEPTH,
|
||||||
|
help=f"Seuil rel_alt (m) pour détecter première submersion réelle "
|
||||||
|
f"(default: {DEFAULT_FIRST_SUBMERSION_DEPTH})",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--first-submersion-duration",
|
||||||
|
type=float,
|
||||||
|
default=DEFAULT_FIRST_SUBMERSION_DURATION,
|
||||||
|
help=f"Durée min continue (s) sous first-submersion-depth pour confirmer mise à l'eau "
|
||||||
|
f"(default: {DEFAULT_FIRST_SUBMERSION_DURATION})",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--force",
|
||||||
|
action="store_true",
|
||||||
|
help="Forcer recalcul même si fichier existe",
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.dry_run:
|
||||||
|
_run(args)
|
||||||
|
return
|
||||||
|
|
||||||
|
out_dir = Path(args.out) / args.mission
|
||||||
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
out_path = out_dir / "02_runs.json"
|
||||||
|
with track_stage("02_mission_run_detect", args.mission, out_dir,
|
||||||
|
output_files=[out_path]):
|
||||||
|
_run(args)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
352
pipeline/stages/02b_runs_diag.py
Executable file
352
pipeline/stages/02b_runs_diag.py
Executable file
@@ -0,0 +1,352 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Stage 02b — Diagnostic plots per run candidate.
|
||||||
|
|
||||||
|
Reads 02_runs.json + replays MCAP rel_alt + /mavros/state to produce
|
||||||
|
4-panel PNG per run (validated + rejected).
|
||||||
|
|
||||||
|
Output:
|
||||||
|
data/<MISSION>/02_runs_diag/<RUN_ID>.png
|
||||||
|
data/<MISSION>/02_runs_diag/index.json
|
||||||
|
data/<MISSION>/02_runs_diag/index.html
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 02b_runs_diag.py --mission 20260505-Lepradet \
|
||||||
|
[--ssd-base /mnt/ssd] [--out data/] [--padding-s 120]
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import matplotlib
|
||||||
|
matplotlib.use("Agg")
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from matplotlib.patches import Rectangle
|
||||||
|
|
||||||
|
from mcap.reader import make_reader
|
||||||
|
from mcap_ros2.decoder import DecoderFactory
|
||||||
|
|
||||||
|
TOPIC_REL_ALT = "/mavros/global_position/rel_alt"
|
||||||
|
TOPIC_STATE = "/mavros/state"
|
||||||
|
TOPIC_HEADING = "/mavros/global_position/compass_hdg"
|
||||||
|
|
||||||
|
AUV_PHYSICAL_MAP = {"AUV010": "AUV210", "AUV012": "AUV212", "AUV013": "AUV213"}
|
||||||
|
PHYS_TO_MCAP = {v: k for k, v in AUV_PHYSICAL_MAP.items()}
|
||||||
|
|
||||||
|
|
||||||
|
def extract_auv_id(folder_name):
|
||||||
|
m = re.search(r"(AUV\d+)$", folder_name)
|
||||||
|
return m.group(1) if m else None
|
||||||
|
|
||||||
|
|
||||||
|
def gather_signals_for_auv(bags_root, auv_mcap_id, t_start, t_end):
|
||||||
|
"""Read rel_alt + state + heading from all bags for this AUV in [t_start,t_end]."""
|
||||||
|
alt = [] # (t, v)
|
||||||
|
state = [] # (t, armed, mode)
|
||||||
|
hdg = [] # (t, v_deg)
|
||||||
|
for d in sorted(bags_root.iterdir()):
|
||||||
|
if not d.is_dir():
|
||||||
|
continue
|
||||||
|
if extract_auv_id(d.name) != auv_mcap_id:
|
||||||
|
continue
|
||||||
|
for mcap_path in sorted(d.glob("*.mcap")):
|
||||||
|
try:
|
||||||
|
with open(mcap_path, "rb") as fp:
|
||||||
|
reader = make_reader(fp, decoder_factories=[DecoderFactory()])
|
||||||
|
for _schema, channel, message, ros_msg in reader.iter_decoded_messages(
|
||||||
|
topics=[TOPIC_REL_ALT, TOPIC_STATE, TOPIC_HEADING]):
|
||||||
|
ts = message.log_time / 1e9
|
||||||
|
if ts < t_start or ts > t_end:
|
||||||
|
continue
|
||||||
|
if channel.topic == TOPIC_REL_ALT:
|
||||||
|
alt.append((ts, float(ros_msg.data)))
|
||||||
|
elif channel.topic == TOPIC_STATE:
|
||||||
|
state.append((ts, bool(ros_msg.armed), str(ros_msg.mode)))
|
||||||
|
elif channel.topic == TOPIC_HEADING:
|
||||||
|
hdg.append((ts, float(ros_msg.data)))
|
||||||
|
except Exception as exc:
|
||||||
|
print(f" [WARN] skip {mcap_path.name}: {exc}", file=sys.stderr)
|
||||||
|
alt.sort()
|
||||||
|
state.sort()
|
||||||
|
hdg.sort()
|
||||||
|
return alt, state, hdg
|
||||||
|
|
||||||
|
|
||||||
|
def plot_run_diag(run, alt, state, hdg, filter_params, status, reject_reason, out_path):
|
||||||
|
"""4-panel diagnostic plot."""
|
||||||
|
start_e = run["start_epoch"]
|
||||||
|
end_e = run["end_epoch"]
|
||||||
|
dur = end_e - start_e
|
||||||
|
run_id = run["run_id"]
|
||||||
|
|
||||||
|
# Convert epoch → relative seconds from start_e for x-axis readability
|
||||||
|
def to_rel(t):
|
||||||
|
return t - start_e
|
||||||
|
|
||||||
|
fig, axes = plt.subplots(4, 1, figsize=(12, 11), gridspec_kw={"height_ratios": [3, 1, 2, 2]})
|
||||||
|
fig.suptitle(
|
||||||
|
f"{run_id} — {status} | start={datetime.fromtimestamp(start_e, tz=timezone.utc).strftime('%H:%M:%S UTC')} duration={dur:.0f}s",
|
||||||
|
fontsize=12, fontweight="bold",
|
||||||
|
color=("#16a34a" if status == "OK" else "#dc2626"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# === Panel A: rel_alt time series ===
|
||||||
|
ax = axes[0]
|
||||||
|
if alt:
|
||||||
|
t_a = [to_rel(t) for t, _ in alt]
|
||||||
|
v_a = [v for _, v in alt]
|
||||||
|
ax.plot(t_a, v_a, color="#1e40af", lw=1.0, label="rel_alt")
|
||||||
|
threshold = filter_params["min_mission_depth_m"]
|
||||||
|
ax.axhline(threshold, color="#dc2626", ls="--", lw=1.0,
|
||||||
|
label=f"threshold {threshold}m")
|
||||||
|
ax.axhline(0, color="#94a3b8", ls=":", lw=0.8, label="surface")
|
||||||
|
# Mark run window
|
||||||
|
ax.axvspan(0, dur, alpha=0.10, color="#16a34a" if status == "OK" else "#dc2626")
|
||||||
|
ax.set_ylabel("rel_alt (m)")
|
||||||
|
ax.set_title(
|
||||||
|
f"(a) Depth — max={run['max_depth_m']}m mean={run['mean_depth_m']}m "
|
||||||
|
f"sustained={run['sustained_duration_s']:.0f}s below {threshold}m "
|
||||||
|
f"pct_near_bottom={run['pct_near_bottom']:.1f}%"
|
||||||
|
)
|
||||||
|
ax.legend(loc="lower right", fontsize=8)
|
||||||
|
ax.grid(True, alpha=0.3)
|
||||||
|
ax.set_xlim(-30, dur + 30)
|
||||||
|
|
||||||
|
# === Panel B: state armed + mode ===
|
||||||
|
ax = axes[1]
|
||||||
|
if state:
|
||||||
|
t_s = [to_rel(t) for t, _, _ in state]
|
||||||
|
armed_y = [1 if a else 0 for _, a, _ in state]
|
||||||
|
ax.step(t_s, armed_y, where="post", color="#16a34a", lw=1.2, label="armed")
|
||||||
|
# Annotate dominant mode + mode transitions
|
||||||
|
prev_mode = None
|
||||||
|
for t, _, m in state:
|
||||||
|
if m != prev_mode:
|
||||||
|
ax.axvline(to_rel(t), color="#7c3aed", lw=0.5, alpha=0.4)
|
||||||
|
ax.text(to_rel(t), 0.5, m, rotation=90, fontsize=7,
|
||||||
|
color="#7c3aed", va="center", alpha=0.7)
|
||||||
|
prev_mode = m
|
||||||
|
ax.set_ylim(-0.2, 1.3)
|
||||||
|
ax.set_yticks([0, 1])
|
||||||
|
ax.set_yticklabels(["disarmed", "armed"])
|
||||||
|
ax.set_title(f"(b) MAVROS state — dominant mode: {run.get('dominant_mode','?')}")
|
||||||
|
ax.grid(True, alpha=0.3)
|
||||||
|
ax.set_xlim(-30, dur + 30)
|
||||||
|
|
||||||
|
# === Panel C: depth distribution + cumulative time below threshold ===
|
||||||
|
ax = axes[2]
|
||||||
|
# filter alt to run window
|
||||||
|
run_vals = [v for t, v in alt if start_e <= t <= end_e]
|
||||||
|
if run_vals:
|
||||||
|
ax.hist(run_vals, bins=40, color="#3b82f6", alpha=0.7, edgecolor="white")
|
||||||
|
ax.axvline(threshold, color="#dc2626", ls="--", lw=1.0,
|
||||||
|
label=f"threshold {threshold}m")
|
||||||
|
n_total = len(run_vals)
|
||||||
|
n_below = sum(1 for v in run_vals if v < threshold)
|
||||||
|
ax.set_title(
|
||||||
|
f"(c) Depth distribution within run | {n_below}/{n_total} samples below threshold "
|
||||||
|
f"= {100*n_below/n_total:.1f}%"
|
||||||
|
)
|
||||||
|
ax.legend(loc="upper right", fontsize=8)
|
||||||
|
ax.set_xlabel("rel_alt (m)")
|
||||||
|
ax.set_ylabel("samples")
|
||||||
|
ax.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
# === Panel D: verdict box ===
|
||||||
|
ax = axes[3]
|
||||||
|
ax.axis("off")
|
||||||
|
color = "#16a34a" if status == "OK" else "#dc2626"
|
||||||
|
verdict_text = f"VERDICT: {status}"
|
||||||
|
ax.text(0.02, 0.85, verdict_text, fontsize=18, fontweight="bold",
|
||||||
|
color=color, transform=ax.transAxes)
|
||||||
|
|
||||||
|
# Build criteria summary
|
||||||
|
crit_lines = []
|
||||||
|
min_dur = filter_params["min_sustained_duration_s"]
|
||||||
|
min_pct = filter_params["min_near_bottom_pct"]
|
||||||
|
min_depth = filter_params["min_mission_depth_m"]
|
||||||
|
|
||||||
|
pass_dur = run["sustained_duration_s"] >= min_dur
|
||||||
|
pass_pct = run["pct_near_bottom"] >= min_pct
|
||||||
|
pass_depth = run["max_depth_m"] < min_depth
|
||||||
|
|
||||||
|
crit_lines.append(
|
||||||
|
f"{'OK' if pass_depth else 'KO':3s} max_depth {run['max_depth_m']}m < {min_depth}m"
|
||||||
|
)
|
||||||
|
crit_lines.append(
|
||||||
|
f"{'OK' if pass_dur else 'KO':3s} sustained_duration {run['sustained_duration_s']:.0f}s >= {min_dur:.0f}s"
|
||||||
|
)
|
||||||
|
crit_lines.append(
|
||||||
|
f"{'OK' if pass_pct else 'KO':3s} pct_near_bottom {run['pct_near_bottom']:.1f}% >= {min_pct:.0f}%"
|
||||||
|
)
|
||||||
|
crit_lines.append(
|
||||||
|
f" duration {run['duration_s']:.0f}s | mode {run.get('dominant_mode','?')} | method {run.get('detection_method','?')}"
|
||||||
|
)
|
||||||
|
|
||||||
|
for i, line in enumerate(crit_lines):
|
||||||
|
ax.text(0.02, 0.65 - i * 0.13, line, fontsize=10, family="monospace",
|
||||||
|
transform=ax.transAxes)
|
||||||
|
|
||||||
|
if reject_reason:
|
||||||
|
ax.text(0.02, 0.05, f"REASON: {reject_reason}", fontsize=9,
|
||||||
|
color="#dc2626", transform=ax.transAxes, family="monospace")
|
||||||
|
|
||||||
|
axes[-2].set_xlabel("time since run start (s)")
|
||||||
|
plt.tight_layout()
|
||||||
|
plt.savefig(out_path, dpi=90, bbox_inches="tight")
|
||||||
|
plt.close(fig)
|
||||||
|
|
||||||
|
|
||||||
|
HTML_TEMPLATE = """<!doctype html>
|
||||||
|
<html lang="fr">
|
||||||
|
<head>
|
||||||
|
<meta charset="utf-8">
|
||||||
|
<title>02 Runs Diagnostic — {mission}</title>
|
||||||
|
<style>
|
||||||
|
body {{ font-family: -apple-system, system-ui, sans-serif; background: #0f172a; color: #e2e8f0; margin: 0; padding: 20px; }}
|
||||||
|
h1 {{ color: #38bdf8; margin: 0 0 4px 0; }}
|
||||||
|
.subtitle {{ color: #94a3b8; margin-bottom: 20px; font-size: 14px; }}
|
||||||
|
.params {{ background: #1e293b; padding: 10px 14px; border-radius: 6px; margin-bottom: 24px; font-family: monospace; font-size: 12px; color: #cbd5e1; }}
|
||||||
|
.grid {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(560px, 1fr)); gap: 20px; }}
|
||||||
|
.card {{ background: #1e293b; border-radius: 8px; padding: 12px; border-left: 6px solid #475569; }}
|
||||||
|
.card.ok {{ border-left-color: #16a34a; }}
|
||||||
|
.card.ko {{ border-left-color: #dc2626; }}
|
||||||
|
.card h3 {{ margin: 0 0 6px 0; font-size: 16px; }}
|
||||||
|
.card h3.ok {{ color: #4ade80; }}
|
||||||
|
.card h3.ko {{ color: #f87171; }}
|
||||||
|
.card img {{ width: 100%; height: auto; border-radius: 4px; }}
|
||||||
|
.meta {{ color: #94a3b8; font-size: 12px; margin: 4px 0 8px 0; font-family: monospace; }}
|
||||||
|
.reason {{ color: #fca5a5; font-size: 12px; margin-top: 6px; font-family: monospace; }}
|
||||||
|
.summary {{ display: flex; gap: 16px; margin-bottom: 20px; }}
|
||||||
|
.pill {{ background: #1e293b; padding: 8px 14px; border-radius: 999px; font-size: 13px; }}
|
||||||
|
.pill.ok {{ color: #4ade80; }}
|
||||||
|
.pill.ko {{ color: #f87171; }}
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<h1>02 Runs Diagnostic — {mission}</h1>
|
||||||
|
<div class="subtitle">Generated {generated_at} · filter v5 strict</div>
|
||||||
|
<div class="summary">
|
||||||
|
<div class="pill ok">OK: {n_ok}</div>
|
||||||
|
<div class="pill ko">Rejected: {n_ko}</div>
|
||||||
|
<div class="pill">Total candidates: {n_total}</div>
|
||||||
|
</div>
|
||||||
|
<div class="params">{params}</div>
|
||||||
|
<div class="grid">
|
||||||
|
{cards}
|
||||||
|
</div>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
CARD_TEMPLATE = """<div class="card {cls}">
|
||||||
|
<h3 class="{cls}">{run_id} — {status}</h3>
|
||||||
|
<div class="meta">duration={duration}s | max_depth={max_depth}m | sustained={sustained}s | pct_near={pct}% | mode={mode}</div>
|
||||||
|
<img src="{png}" alt="{run_id}">
|
||||||
|
{reason_block}
|
||||||
|
</div>"""
|
||||||
|
|
||||||
|
|
||||||
|
def build_html(mission, filter_params, generated_at, ok_runs, ko_runs, out_dir):
|
||||||
|
cards = []
|
||||||
|
for run, status, reason in [(r, "OK", None) for r in ok_runs] + [(r, "REJECTED", r.get("rejected_reason")) for r in ko_runs]:
|
||||||
|
cls = "ok" if status == "OK" else "ko"
|
||||||
|
reason_block = f'<div class="reason">REASON: {reason}</div>' if reason else ""
|
||||||
|
cards.append(CARD_TEMPLATE.format(
|
||||||
|
cls=cls, run_id=run["run_id"], status=status,
|
||||||
|
duration=f"{run['duration_s']:.0f}",
|
||||||
|
max_depth=run["max_depth_m"],
|
||||||
|
sustained=f"{run['sustained_duration_s']:.0f}",
|
||||||
|
pct=f"{run['pct_near_bottom']:.1f}",
|
||||||
|
mode=run.get("dominant_mode", "?"),
|
||||||
|
png=f"{run['run_id']}.png",
|
||||||
|
reason_block=reason_block,
|
||||||
|
))
|
||||||
|
html = HTML_TEMPLATE.format(
|
||||||
|
mission=mission,
|
||||||
|
generated_at=generated_at,
|
||||||
|
params=json.dumps(filter_params, indent=2),
|
||||||
|
n_ok=len(ok_runs),
|
||||||
|
n_ko=len(ko_runs),
|
||||||
|
n_total=len(ok_runs) + len(ko_runs),
|
||||||
|
cards="\n".join(cards),
|
||||||
|
)
|
||||||
|
(out_dir / "index.html").write_text(html, encoding="utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Stage 02b — runs diagnostic plots")
|
||||||
|
parser.add_argument("--mission", required=True)
|
||||||
|
parser.add_argument("--ssd-base", default="/mnt/ssd")
|
||||||
|
parser.add_argument("--out", default="data/")
|
||||||
|
parser.add_argument("--padding-s", type=float, default=120.0,
|
||||||
|
help="padding (s) avant/après run pour contexte")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
mission_dir = Path(args.out) / args.mission
|
||||||
|
runs_json = mission_dir / "02_runs.json"
|
||||||
|
if not runs_json.exists():
|
||||||
|
print(f"[ERROR] {runs_json} missing — run 02_mission_run_detect first", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
bags_root = Path(args.ssd_base) / args.mission / "raw_data" / "logs" / "SUB" / "bag"
|
||||||
|
if not bags_root.exists():
|
||||||
|
print(f"[ERROR] bags dir not found: {bags_root}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
data = json.loads(runs_json.read_text())
|
||||||
|
filter_params = data["filter_params"]
|
||||||
|
out_dir = mission_dir / "02_runs_diag"
|
||||||
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
ok_runs = data["all_runs_sorted"]
|
||||||
|
ko_runs = data["all_runs_rejected"]
|
||||||
|
print(f"[stage02b] {len(ok_runs)} OK + {len(ko_runs)} rejected = {len(ok_runs)+len(ko_runs)} runs to plot")
|
||||||
|
|
||||||
|
# Group by AUV → 1 read per AUV bag set covering all runs
|
||||||
|
runs_by_auv = {}
|
||||||
|
for run in ok_runs + ko_runs:
|
||||||
|
# run_id "AUV210_run_00" → AUV210
|
||||||
|
phys = run["run_id"].split("_")[0]
|
||||||
|
runs_by_auv.setdefault(phys, []).append(run)
|
||||||
|
|
||||||
|
pad = args.padding_s
|
||||||
|
for phys_id, runs in runs_by_auv.items():
|
||||||
|
mcap_id = PHYS_TO_MCAP.get(phys_id, phys_id)
|
||||||
|
t_start = min(r["start_epoch"] for r in runs) - pad
|
||||||
|
t_end = max(r["end_epoch"] for r in runs) + pad
|
||||||
|
print(f" [{phys_id}] reading {mcap_id} bags [{t_start:.0f}..{t_end:.0f}] for {len(runs)} runs")
|
||||||
|
alt, state, hdg = gather_signals_for_auv(bags_root, mcap_id, t_start, t_end)
|
||||||
|
print(f" {len(alt)} alt pts, {len(state)} state pts, {len(hdg)} hdg pts")
|
||||||
|
|
||||||
|
for run in runs:
|
||||||
|
r_start = run["start_epoch"] - pad
|
||||||
|
r_end = run["end_epoch"] + pad
|
||||||
|
alt_r = [(t, v) for t, v in alt if r_start <= t <= r_end]
|
||||||
|
state_r = [(t, a, m) for t, a, m in state if r_start <= t <= r_end]
|
||||||
|
hdg_r = [(t, v) for t, v in hdg if r_start <= t <= r_end]
|
||||||
|
status = "REJECTED" if run in ko_runs else "OK"
|
||||||
|
reason = run.get("rejected_reason") if status == "REJECTED" else None
|
||||||
|
out_png = out_dir / f"{run['run_id']}.png"
|
||||||
|
plot_run_diag(run, alt_r, state_r, hdg_r, filter_params, status, reason, out_png)
|
||||||
|
print(f" [{status}] {run['run_id']}.png written")
|
||||||
|
|
||||||
|
# Build index.html + index.json
|
||||||
|
generated_at = datetime.now(timezone.utc).isoformat()
|
||||||
|
index = {
|
||||||
|
"mission": args.mission,
|
||||||
|
"generated_at": generated_at,
|
||||||
|
"filter_params": filter_params,
|
||||||
|
"ok_runs": [r["run_id"] for r in ok_runs],
|
||||||
|
"rejected_runs": [r["run_id"] for r in ko_runs],
|
||||||
|
}
|
||||||
|
(out_dir / "index.json").write_text(json.dumps(index, indent=2))
|
||||||
|
build_html(args.mission, filter_params, generated_at, ok_runs, ko_runs, out_dir)
|
||||||
|
print(f"[stage02b] Done: {out_dir}/index.html")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
228
pipeline/stages/03b_trim_runs.py
Executable file
228
pipeline/stages/03b_trim_runs.py
Executable file
@@ -0,0 +1,228 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Stage 03b - Trim videos per run (LRV proxies + -c copy, fast).
|
||||||
|
|
||||||
|
Inputs:
|
||||||
|
data/<MISSION>/02_runs.json
|
||||||
|
data/<MISSION>/03_video_index.json
|
||||||
|
|
||||||
|
Strategy:
|
||||||
|
- Use GoPro LRV proxy files (768x432 H.264 ~720 kbps) instead of 4K HEVC originals.
|
||||||
|
- ffmpeg -c copy per chapter (keyframe-aligned cut) + concat demuxer.
|
||||||
|
- Output: per-run .mp4 + ours_<gp>.mp4 (concat of per-run).
|
||||||
|
|
||||||
|
Falls back to MP4 source if matching LRV is missing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
from collections import defaultdict
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
OUT_ROOT = Path("/mnt/ssd/cosma-qc-out/03b_trim_runs")
|
||||||
|
|
||||||
|
|
||||||
|
def run_ff(cmd: list[str]) -> None:
|
||||||
|
r = subprocess.run(cmd, capture_output=True, text=True)
|
||||||
|
if r.returncode != 0:
|
||||||
|
sys.stderr.write(" ".join(cmd) + "\n")
|
||||||
|
sys.stderr.write(r.stderr[-3000:] + "\n")
|
||||||
|
raise RuntimeError(f"ffmpeg failed rc={r.returncode}")
|
||||||
|
|
||||||
|
|
||||||
|
def lrv_for_chapter(mp4_path: Path) -> Path | None:
|
||||||
|
"""Return matching .LRV path if it exists (GoPro low-res proxy)."""
|
||||||
|
name = mp4_path.name
|
||||||
|
if not name.startswith("GX") or not name.upper().endswith(".MP4"):
|
||||||
|
return None
|
||||||
|
lrv_name = "GL" + name[2:-4] + ".LRV"
|
||||||
|
p = mp4_path.parent / lrv_name
|
||||||
|
return p if p.exists() else None
|
||||||
|
|
||||||
|
|
||||||
|
def overlap_clips(run_start: float, run_end: float, chapters: list[dict]) -> list[tuple[dict, float, float]]:
|
||||||
|
"""Return [(chapter, start_off_s, duration_s)] for chapters overlapping the run."""
|
||||||
|
out = []
|
||||||
|
for ch in sorted(chapters, key=lambda c: c["start_epoch"]):
|
||||||
|
a = max(run_start, ch["start_epoch"])
|
||||||
|
b = min(run_end, ch["end_epoch"])
|
||||||
|
if b - a <= 1.0:
|
||||||
|
continue
|
||||||
|
start_off = max(0.0, run_start - ch["start_epoch"])
|
||||||
|
dur = b - a
|
||||||
|
out.append((ch, start_off, dur))
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def cut_clip(src: Path, start_off: float, duration: float, dst: Path) -> None:
|
||||||
|
"""Cut [start_off, start_off+duration] from src using -c copy (keyframe-aligned)."""
|
||||||
|
cmd = [
|
||||||
|
"ffmpeg", "-y", "-loglevel", "error",
|
||||||
|
"-ss", f"{start_off:.3f}",
|
||||||
|
"-i", str(src),
|
||||||
|
"-t", f"{duration:.3f}",
|
||||||
|
"-c", "copy",
|
||||||
|
"-avoid_negative_ts", "make_zero",
|
||||||
|
"-an",
|
||||||
|
str(dst),
|
||||||
|
]
|
||||||
|
run_ff(cmd)
|
||||||
|
|
||||||
|
|
||||||
|
def concat_demux(parts: list[Path], dst: Path) -> None:
|
||||||
|
"""Concat parts with ffmpeg concat demuxer (-c copy)."""
|
||||||
|
if not parts:
|
||||||
|
return
|
||||||
|
if len(parts) == 1:
|
||||||
|
shutil.copy2(parts[0], dst)
|
||||||
|
return
|
||||||
|
with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as f:
|
||||||
|
for p in parts:
|
||||||
|
f.write(f"file '{p.resolve()}'\n")
|
||||||
|
listfile = f.name
|
||||||
|
try:
|
||||||
|
cmd = [
|
||||||
|
"ffmpeg", "-y", "-loglevel", "error",
|
||||||
|
"-f", "concat", "-safe", "0",
|
||||||
|
"-i", listfile,
|
||||||
|
"-c", "copy",
|
||||||
|
"-movflags", "+faststart",
|
||||||
|
"-an",
|
||||||
|
str(dst),
|
||||||
|
]
|
||||||
|
run_ff(cmd)
|
||||||
|
finally:
|
||||||
|
os.unlink(listfile)
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ap = argparse.ArgumentParser()
|
||||||
|
ap.add_argument("--mission", required=True)
|
||||||
|
ap.add_argument("--data-root", default="/home/cosma/cosma-qc/data")
|
||||||
|
ap.add_argument("--skip-existing", action="store_true")
|
||||||
|
ap.add_argument("--prefer-source", choices=["lrv", "mp4"], default="lrv",
|
||||||
|
help="lrv = use .LRV proxy (default, fast); mp4 = use 4K originals")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
mission = args.mission
|
||||||
|
data_dir = Path(args.data_root) / mission
|
||||||
|
|
||||||
|
runs = json.loads((data_dir / "02_runs.json").read_text())["runs"]
|
||||||
|
vidx = json.loads((data_dir / "03_video_index.json").read_text())
|
||||||
|
videos = vidx["videos"]
|
||||||
|
|
||||||
|
by_auv_gp: dict[tuple[str, str], list[dict]] = defaultdict(list)
|
||||||
|
for v in videos:
|
||||||
|
by_auv_gp[(v["auv"], v["gp"])].append(v)
|
||||||
|
all_gps = sorted({v["gp"] for v in videos})
|
||||||
|
|
||||||
|
out_dir = OUT_ROOT / mission
|
||||||
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
tmp_dir = out_dir / "_tmp"
|
||||||
|
tmp_dir.mkdir(exist_ok=True)
|
||||||
|
|
||||||
|
link = data_dir / "03b_trim_runs"
|
||||||
|
if link.is_symlink():
|
||||||
|
link.unlink()
|
||||||
|
elif link.exists():
|
||||||
|
shutil.rmtree(link)
|
||||||
|
link.symlink_to(out_dir)
|
||||||
|
|
||||||
|
manifest = {
|
||||||
|
"mission": mission,
|
||||||
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||||
|
"output_root": str(out_dir),
|
||||||
|
"source": args.prefer_source,
|
||||||
|
"runs": [],
|
||||||
|
"ours": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
runs_by_chrono = sorted(runs, key=lambda r: r["start_epoch"])
|
||||||
|
ours_parts: dict[str, list[tuple[float, Path, str]]] = defaultdict(list)
|
||||||
|
|
||||||
|
for run in runs_by_chrono:
|
||||||
|
run_id = run["run_id"]
|
||||||
|
auv = run["auv"]
|
||||||
|
r_start = run["start_epoch"]
|
||||||
|
r_end = run["end_epoch"]
|
||||||
|
run_entry = {"run_id": run_id, "auv": auv, "duration_s": run["duration_s"], "outputs": []}
|
||||||
|
|
||||||
|
for gp in all_gps:
|
||||||
|
chapters = by_auv_gp.get((auv, gp), [])
|
||||||
|
if not chapters:
|
||||||
|
continue
|
||||||
|
clips = overlap_clips(r_start, r_end, chapters)
|
||||||
|
if not clips:
|
||||||
|
continue
|
||||||
|
|
||||||
|
out_name = f"{run_id}_{auv}_{gp}.mp4"
|
||||||
|
out_path = out_dir / out_name
|
||||||
|
if args.skip_existing and out_path.exists() and out_path.stat().st_size > 0:
|
||||||
|
print(f"[skip] {out_name}", flush=True)
|
||||||
|
else:
|
||||||
|
# Pick source per chapter
|
||||||
|
resolved: list[tuple[Path, float, float, str]] = []
|
||||||
|
src_tags: list[str] = []
|
||||||
|
for ch, soff, dur in clips:
|
||||||
|
mp4 = Path(ch["filepath"])
|
||||||
|
src = mp4
|
||||||
|
tag = "mp4"
|
||||||
|
if args.prefer_source == "lrv":
|
||||||
|
lrv = lrv_for_chapter(mp4)
|
||||||
|
if lrv:
|
||||||
|
src = lrv
|
||||||
|
tag = "lrv"
|
||||||
|
resolved.append((src, soff, dur, tag))
|
||||||
|
src_tags.append(tag)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"[cut ] {out_name} chapters={len(resolved)} src={','.join(src_tags)}",
|
||||||
|
flush=True,
|
||||||
|
)
|
||||||
|
tmp_parts: list[Path] = []
|
||||||
|
for i, (src, soff, dur, _) in enumerate(resolved):
|
||||||
|
tp = tmp_dir / f"{run_id}_{auv}_{gp}_p{i:02d}.mp4"
|
||||||
|
cut_clip(src, soff, dur, tp)
|
||||||
|
tmp_parts.append(tp)
|
||||||
|
concat_demux(tmp_parts, out_path)
|
||||||
|
for p in tmp_parts:
|
||||||
|
p.unlink(missing_ok=True)
|
||||||
|
|
||||||
|
sz_mb = round(out_path.stat().st_size / 1024 / 1024, 1)
|
||||||
|
run_entry["outputs"].append({"gp": gp, "file": out_name, "size_mb": sz_mb})
|
||||||
|
ours_parts[gp].append((r_start, out_path, f"{run_id} {auv}"))
|
||||||
|
|
||||||
|
manifest["runs"].append(run_entry)
|
||||||
|
|
||||||
|
for gp, parts in ours_parts.items():
|
||||||
|
parts.sort()
|
||||||
|
ordered_paths = [p for _, p, _ in parts]
|
||||||
|
ours_path = out_dir / f"ours_{gp}.mp4"
|
||||||
|
print(f"[ours] {ours_path.name} <- {len(ordered_paths)} clip(s)", flush=True)
|
||||||
|
concat_demux(ordered_paths, ours_path)
|
||||||
|
sz_mb = round(ours_path.stat().st_size / 1024 / 1024, 1)
|
||||||
|
manifest["ours"][gp] = {
|
||||||
|
"file": ours_path.name,
|
||||||
|
"size_mb": sz_mb,
|
||||||
|
"segments": [lbl for _, _, lbl in parts],
|
||||||
|
}
|
||||||
|
|
||||||
|
(data_dir / "03b_trim_runs.json").write_text(json.dumps(manifest, indent=2))
|
||||||
|
print(f"\n[done] manifest: {data_dir / '03b_trim_runs.json'}", flush=True)
|
||||||
|
print(f"[done] outputs: {out_dir}", flush=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
tmp_dir.rmdir()
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -13,11 +13,12 @@ Workers:
|
|||||||
Auto: pick by lowest GPU memory usage (nvidia-smi via SSH).
|
Auto: pick by lowest GPU memory usage (nvidia-smi via SSH).
|
||||||
|
|
||||||
Flow:
|
Flow:
|
||||||
1. rsync frames .83 → worker /root/cosma-frames-tmp/ (or /home/floppyrj45/)
|
1. Kill any stale demo.py on worker before starting
|
||||||
2. SSH launch demo.py with windowed mode (window=64, overlap=16)
|
2. rsync frames .83 → worker /root/cosma-frames-tmp/
|
||||||
3. Retrieve PLY + NPZ → .83 ~/cosma-pipeline/data/<mission>/ply/<AUV>/<segment>.{ply,npz}
|
3. SSH launch demo.py in background; poll for PLY file; kill viser server once PLY done
|
||||||
4. Cleanup worker temp dir
|
4. Retrieve PLY + NPZ → .83 ~/cosma-pipeline/data/<mission>/ply/<AUV>/<segment>.{ply,npz}
|
||||||
5. Log to SQLite: duration, GPU peak mem, nb points in PLY
|
5. Cleanup worker temp dir
|
||||||
|
6. Log to SQLite: duration, GPU peak mem, nb points in PLY
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
python3 05_inference.py --frames-dir ~/cosma-pipeline/data/20260505-Lepradet/frames/AUV210/GX019837 --worker auto --mission 20260505-Lepradet
|
python3 05_inference.py --frames-dir ~/cosma-pipeline/data/20260505-Lepradet/frames/AUV210/GX019837 --worker auto --mission 20260505-Lepradet
|
||||||
@@ -83,6 +84,21 @@ def get_gpu_mem_used(worker_key: str) -> int:
|
|||||||
return 99999
|
return 99999
|
||||||
|
|
||||||
|
|
||||||
|
def kill_stale_demo_py(worker_key: str) -> None:
|
||||||
|
"""Kill any lingering demo.py processes on worker before starting new inference."""
|
||||||
|
w = WORKERS[worker_key]
|
||||||
|
ssh_target = f"{w['user']}@{w['host']}"
|
||||||
|
try:
|
||||||
|
subprocess.run(
|
||||||
|
["ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10",
|
||||||
|
ssh_target, "pkill -9 -f demo.py 2>/dev/null; sleep 1; echo stale_killed"],
|
||||||
|
capture_output=True, text=True, timeout=15,
|
||||||
|
)
|
||||||
|
print(f" [05] Stale demo.py killed on {worker_key}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f" [05] Warning: kill_stale failed on {worker_key}: {e}")
|
||||||
|
|
||||||
|
|
||||||
def pick_worker() -> str:
|
def pick_worker() -> str:
|
||||||
"""Auto-select worker with lowest GPU memory usage."""
|
"""Auto-select worker with lowest GPU memory usage."""
|
||||||
best = None
|
best = None
|
||||||
@@ -140,6 +156,9 @@ def run_inference(frames_dir: Path, worker_key: str, mission_name: str,
|
|||||||
"status": "ok",
|
"status": "ok",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Step 0: kill any stale demo.py on worker
|
||||||
|
kill_stale_demo_py(worker_key)
|
||||||
|
|
||||||
# Step 1: create remote temp dir + rsync frames
|
# Step 1: create remote temp dir + rsync frames
|
||||||
print(f" [05] rsync {frames_dir} → {ssh_target}:{worker_frames}...")
|
print(f" [05] rsync {frames_dir} → {ssh_target}:{worker_frames}...")
|
||||||
subprocess.run(
|
subprocess.run(
|
||||||
@@ -165,6 +184,9 @@ def run_inference(frames_dir: Path, worker_key: str, mission_name: str,
|
|||||||
conf_thr = _INF_CFG.get("ply_conf_threshold", 1.5)
|
conf_thr = _INF_CFG.get("ply_conf_threshold", 1.5)
|
||||||
kf_interval = _INF_CFG.get("keyframe_interval", 1)
|
kf_interval = _INF_CFG.get("keyframe_interval", 1)
|
||||||
max_frames = _INF_CFG.get("max_frame_num", 1024)
|
max_frames = _INF_CFG.get("max_frame_num", 1024)
|
||||||
|
use_offload = _INF_CFG.get("offload_to_cpu", False)
|
||||||
|
offload_flag = "--offload_to_cpu" if use_offload else "--no-offload_to_cpu"
|
||||||
|
|
||||||
if inf_mode == "windowed":
|
if inf_mode == "windowed":
|
||||||
window_size = _INF_CFG.get("window_size", 64)
|
window_size = _INF_CFG.get("window_size", 64)
|
||||||
overlap_size = _INF_CFG.get("overlap_size", 16)
|
overlap_size = _INF_CFG.get("overlap_size", 16)
|
||||||
@@ -179,39 +201,67 @@ def run_inference(frames_dir: Path, worker_key: str, mission_name: str,
|
|||||||
f"--keyframe_interval {kf_interval} "
|
f"--keyframe_interval {kf_interval} "
|
||||||
f"--max_frame_num {max_frames} "
|
f"--max_frame_num {max_frames} "
|
||||||
)
|
)
|
||||||
demo_cmd = (
|
|
||||||
f"cd {w['ai_dir']} && "
|
|
||||||
f"{w['venv']} demo.py "
|
|
||||||
f"--model_path {checkpoint} "
|
|
||||||
f"--image_folder {worker_frames} "
|
|
||||||
f"{mode_flags}"
|
|
||||||
f"--ply_conf_threshold {conf_thr} "
|
|
||||||
f"--save_ply {ply_remote} "
|
|
||||||
f"--save_poses {npz_remote} "
|
|
||||||
f"--use_sdpa "
|
|
||||||
f"--offload_to_cpu "
|
|
||||||
f"2>&1"
|
|
||||||
)
|
|
||||||
|
|
||||||
print(f" [05] Launching inference on {host}...")
|
inf_timeout = int(_INF_CFG.get("inference_timeout_s", 10800))
|
||||||
|
|
||||||
|
# Remote script: launch demo.py in background, poll for PLY, kill viser when done
|
||||||
|
# This avoids the SSH blocking on the viser server that starts after inference
|
||||||
|
remote_script = f"""#!/bin/bash
|
||||||
|
set -e
|
||||||
|
PLY={ply_remote}
|
||||||
|
LOG=/tmp/cosma_demo_{segment}.log
|
||||||
|
# Launch demo.py in background
|
||||||
|
nohup {w['venv']} {w['ai_dir']}/demo.py \\
|
||||||
|
--model_path {checkpoint} \\
|
||||||
|
--image_folder {worker_frames} \\
|
||||||
|
{mode_flags}--ply_conf_threshold {conf_thr} \\
|
||||||
|
--save_ply \\
|
||||||
|
--save_poses {npz_remote} \\
|
||||||
|
--use_sdpa {offload_flag} \\
|
||||||
|
> 2>&1 &
|
||||||
|
DEMO_PID=
|
||||||
|
echo "demo.py PID=" >&2
|
||||||
|
# Poll for PLY file (check every 30s)
|
||||||
|
WAITED=0
|
||||||
|
while [ -lt {inf_timeout} ]; do
|
||||||
|
if [ -f "" ] && [ $(wc -c < "") -gt 100 ]; then
|
||||||
|
sleep 10 # let write finish
|
||||||
|
echo "PLY_DONE size=$(wc -c < )" >&2
|
||||||
|
kill 2>/dev/null || true
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
# Check if process died with error
|
||||||
|
if ! kill -0 2>/dev/null; then
|
||||||
|
echo "Process died early" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
sleep 30
|
||||||
|
WAITED=30
|
||||||
|
done
|
||||||
|
echo "TIMEOUT after {inf_timeout}s" >&2
|
||||||
|
kill -9 2>/dev/null || true
|
||||||
|
exit 2
|
||||||
|
"""
|
||||||
|
|
||||||
|
print(f" [05] Launching inference on {host} (background+poll, timeout={inf_timeout}s)...")
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
r = subprocess.run(
|
r = subprocess.run(
|
||||||
["ssh", "-o", "StrictHostKeyChecking=no", ssh_target, demo_cmd],
|
["ssh", "-o", "StrictHostKeyChecking=no", ssh_target,
|
||||||
capture_output=True, text=True, timeout=7200, # 2h max
|
"bash -s"],
|
||||||
|
input=remote_script,
|
||||||
|
capture_output=True, text=True, timeout=inf_timeout + 60,
|
||||||
)
|
)
|
||||||
elapsed = time.time() - t0
|
elapsed = time.time() - t0
|
||||||
metrics["inference_s"] = round(elapsed, 1)
|
metrics["inference_s"] = round(elapsed, 1)
|
||||||
|
|
||||||
if r.returncode != 0:
|
if r.returncode != 0:
|
||||||
metrics["status"] = "error"
|
metrics["status"] = "error"
|
||||||
metrics["error"] = r.stdout[-500:] + r.stderr[-200:]
|
metrics["error"] = (r.stdout + r.stderr)[-500:]
|
||||||
print(f" [05] inference error: {metrics['error'][-200:]}")
|
print(f" [05] inference error: {metrics['error'][-200:]}")
|
||||||
return metrics
|
return metrics
|
||||||
|
|
||||||
print(f" [05] Inference done in {elapsed:.1f}s")
|
print(f" [05] Inference done in {elapsed:.1f}s (returncode={r.returncode})")
|
||||||
|
|
||||||
# Step 3: GPU peak mem from nvidia-smi log (best-effort parse)
|
|
||||||
gpu_mem_line = [l for l in r.stdout.split("\n") if "MiB" in l]
|
|
||||||
metrics["gpu_peak_mb"] = get_gpu_mem_used(worker_key)
|
metrics["gpu_peak_mb"] = get_gpu_mem_used(worker_key)
|
||||||
|
|
||||||
# Step 4: rsync PLY + NPZ back
|
# Step 4: rsync PLY + NPZ back
|
||||||
@@ -242,17 +292,14 @@ def run_inference(frames_dir: Path, worker_key: str, mission_name: str,
|
|||||||
|
|
||||||
def process_frames_dir(frames_dir: Path, worker_key: str, mission_name: str) -> list[dict]:
|
def process_frames_dir(frames_dir: Path, worker_key: str, mission_name: str) -> list[dict]:
|
||||||
"""Process a directory of frames (single segment or AUV tree)."""
|
"""Process a directory of frames (single segment or AUV tree)."""
|
||||||
# Detect if frames_dir contains frame_*.jpg directly or subdirs
|
|
||||||
direct_frames = list(frames_dir.glob("frame_*.jpg"))
|
direct_frames = list(frames_dir.glob("frame_*.jpg"))
|
||||||
|
|
||||||
if direct_frames:
|
if direct_frames:
|
||||||
# Single segment
|
|
||||||
parts = frames_dir.parts
|
parts = frames_dir.parts
|
||||||
auv_id = frames_dir.parent.name if len(parts) >= 2 else "UNKNOWN"
|
auv_id = frames_dir.parent.name if len(parts) >= 2 else "UNKNOWN"
|
||||||
segment = frames_dir.name
|
segment = frames_dir.name
|
||||||
return [run_inference(frames_dir, worker_key, mission_name, auv_id, segment)]
|
return [run_inference(frames_dir, worker_key, mission_name, auv_id, segment)]
|
||||||
|
|
||||||
# Tree: frames_dir/<AUV>/<segment>/frame_*.jpg
|
|
||||||
all_metrics = []
|
all_metrics = []
|
||||||
for auv_dir in sorted(frames_dir.iterdir()):
|
for auv_dir in sorted(frames_dir.iterdir()):
|
||||||
if not auv_dir.is_dir():
|
if not auv_dir.is_dir():
|
||||||
@@ -265,6 +312,19 @@ def process_frames_dir(frames_dir: Path, worker_key: str, mission_name: str) ->
|
|||||||
if not frames:
|
if not frames:
|
||||||
continue
|
continue
|
||||||
print(f"\n[05] === {auv_id}/{seg_dir.name}: {len(frames)} frames ===")
|
print(f"\n[05] === {auv_id}/{seg_dir.name}: {len(frames)} frames ===")
|
||||||
|
# Guard: min frames required for model (RoPE/attention)
|
||||||
|
min_frames = int(_INF_CFG.get("min_frames_for_inference", 32))
|
||||||
|
if len(frames) < min_frames:
|
||||||
|
print(f" [05] SKIP {auv_id}/{seg_dir.name}: {len(frames)} frames < {min_frames} min")
|
||||||
|
init_db()
|
||||||
|
with get_conn() as conn_mf:
|
||||||
|
mr = conn_mf.execute("SELECT id FROM missions WHERE name=?", (mission_name,)).fetchone()
|
||||||
|
if mr:
|
||||||
|
upsert_job(conn_mf, mr["id"], auv_id, seg_dir.name, "05_inference",
|
||||||
|
status="skipped",
|
||||||
|
error_msg=f"frames_too_few={len(frames)}<{min_frames}")
|
||||||
|
continue
|
||||||
|
|
||||||
m = run_inference(seg_dir, worker_key, mission_name, auv_id, seg_dir.name)
|
m = run_inference(seg_dir, worker_key, mission_name, auv_id, seg_dir.name)
|
||||||
all_metrics.append(m)
|
all_metrics.append(m)
|
||||||
|
|
||||||
@@ -291,12 +351,9 @@ def process_frames_dir(frames_dir: Path, worker_key: str, mission_name: str) ->
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
ap = argparse.ArgumentParser(description="Stage 05 — lingbot-map inference")
|
ap = argparse.ArgumentParser(description="Stage 05 — lingbot-map inference")
|
||||||
ap.add_argument("--frames-dir", type=Path, required=True,
|
ap.add_argument("--frames-dir", type=Path, required=True)
|
||||||
help="Frames dir (single segment or AUV tree)")
|
ap.add_argument("--worker", type=str, default="auto", choices=["auto", ".84", ".87"])
|
||||||
ap.add_argument("--worker", type=str, default="auto",
|
ap.add_argument("--mission", type=str, required=True)
|
||||||
choices=["auto", ".84", ".87"])
|
|
||||||
ap.add_argument("--mission", type=str, required=True,
|
|
||||||
help="Mission name (e.g. 20260505-Lepradet)")
|
|
||||||
args = ap.parse_args()
|
args = ap.parse_args()
|
||||||
|
|
||||||
worker = args.worker
|
worker = args.worker
|
||||||
|
|||||||
Reference in New Issue
Block a user