From 90621dea123fc455f897f2349ac398592976e870 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 15 May 2026 10:10:01 +0000 Subject: [PATCH] stage01: extraction timestamps internes + mission/auv windows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ffprobe SMPTE pour MP4 GoPro (priorité) - mcap.reader pour bag ROS2 - pymavlink pour BIN ArduSub (fallback mtime si fail) - head/tail CSV USBL et MAG - regex filename pour KLF Kogger - mission_window global + auv_windows avec gaps détection (>60s) --- pipeline/stages/01_select_mission.py | 433 ++++++++++++++++++++++++++- 1 file changed, 430 insertions(+), 3 deletions(-) diff --git a/pipeline/stages/01_select_mission.py b/pipeline/stages/01_select_mission.py index 2c81099..0642c75 100755 --- a/pipeline/stages/01_select_mission.py +++ b/pipeline/stages/01_select_mission.py @@ -16,9 +16,11 @@ from __future__ import annotations import argparse import json import re +import subprocess import sys from collections import defaultdict -from datetime import datetime, timezone +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timezone, timedelta from pathlib import Path # Local helper for stage time/memory tracking @@ -30,12 +32,414 @@ AUV_PHYS_RE = re.compile(r"AUV(\d{3})", re.I) GP_FOLDER_RE = re.compile(r"^GP(?P\d+)[_-]AUV(?P\d{3})$", re.I) BAG_AUV_RE = re.compile(r"_AUV(?P\d{3})(?:[_/]|$)", re.I) USBL_FILE_RE = re.compile(r"usbl", re.I) +KLF_TS_RE = re.compile(r"(\d{8})_(\d{6})") + +# KLF throughput estimate: ~5MB/min +KLF_BYTES_PER_SEC = 5 * 1024 * 1024 / 60 def iso_utc_now() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") +def mtime_fallback(path: Path) -> dict: + """Fallback: use mtime as t_start, duration=0.""" + try: + mt = path.stat().st_mtime + t = datetime.fromtimestamp(mt, tz=timezone.utc) + return { + "t_start": t.isoformat(timespec="seconds"), + "t_end": t.isoformat(timespec="seconds"), + "duration_s": 0, + "source": "mtime_fallback", + } + except OSError: + return None + + +def parse_smpte(smpte: str, fps: float = 25.0) -> float: + """SMPTE HH:MM:SS:FF → seconds since midnight.""" + parts = smpte.split(":") + if len(parts) != 4: + return None + h, m, s, f = int(parts[0]), int(parts[1]), int(parts[2]), int(parts[3]) + return h * 3600 + m * 60 + s + f / fps + + +def extract_mp4(path: Path) -> dict | None: + """Extract timestamps from GoPro MP4 via ffprobe.""" + try: + out = subprocess.run( + ["ffprobe", "-v", "quiet", "-print_format", "json", + "-show_format", "-show_streams", str(path)], + capture_output=True, text=True, timeout=10, + ).stdout + if not out: + return mtime_fallback(path) + data = json.loads(out) + fmt = data.get("format", {}) + duration = float(fmt.get("duration", 0) or 0) + + # Try SMPTE timecode first + smpte = None + for s in data.get("streams", []): + tc = s.get("tags", {}).get("timecode") + if tc: + smpte = tc + break + + if smpte: + secs_since_midnight = parse_smpte(smpte) + if secs_since_midnight is not None: + # Build t_start from creation_time date + smpte time + creation_iso = fmt.get("tags", {}).get("creation_time", "") + try: + date_part = datetime.fromisoformat( + creation_iso.replace("Z", "+00:00") + ).date() + midnight = datetime( + date_part.year, date_part.month, date_part.day, + tzinfo=timezone.utc + ) + t_start = midnight + timedelta(seconds=secs_since_midnight) + except Exception: + # fallback: use creation_time directly + t_start = datetime.fromisoformat( + creation_iso.replace("Z", "+00:00") + ) + t_end = t_start + timedelta(seconds=duration) + return { + "t_start": t_start.isoformat(timespec="seconds"), + "t_end": t_end.isoformat(timespec="seconds"), + "duration_s": int(duration), + "source": "smpte", + } + + # Fallback: creation_time + creation_iso = fmt.get("tags", {}).get("creation_time", "") + if creation_iso: + t_start = datetime.fromisoformat(creation_iso.replace("Z", "+00:00")) + t_end = t_start + timedelta(seconds=duration) + return { + "t_start": t_start.isoformat(timespec="seconds"), + "t_end": t_end.isoformat(timespec="seconds"), + "duration_s": int(duration), + "source": "creation_time", + } + except Exception: + pass + return mtime_fallback(path) + + +def extract_mcap(path: Path) -> dict | None: + """Extract timestamps from MCAP bag via mcap.reader.""" + try: + from mcap.reader import make_reader + with open(path, "rb") as f: + reader = make_reader(f) + summary = reader.get_summary() + if summary and summary.statistics: + start_ns = summary.statistics.message_start_time + end_ns = summary.statistics.message_end_time + if start_ns and end_ns: + t_start = datetime.fromtimestamp(start_ns / 1e9, tz=timezone.utc) + t_end = datetime.fromtimestamp(end_ns / 1e9, tz=timezone.utc) + dur = int((end_ns - start_ns) / 1e9) + return { + "t_start": t_start.isoformat(timespec="seconds"), + "t_end": t_end.isoformat(timespec="seconds"), + "duration_s": dur, + "source": "mcap_summary", + } + except Exception: + pass + return mtime_fallback(path) + + +def extract_bin(path: Path) -> dict | None: + """BIN ArduSub: no absolute timestamp available (TimeUS = boot-relative). + Use mtime fallback.""" + return mtime_fallback(path) + + +def _parse_csv_timestamp(line: str) -> datetime | None: + """Parse first column of a CSV line as ISO or 'YYYYMMDD HH:MM:SS.mmm'.""" + col = line.split(",")[0].strip().strip('"') + # Try ISO format (2026-05-08 05:46:29.384209) + for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S.%f", + "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%d %H:%M:%S"): + try: + dt = datetime.strptime(col, fmt) + return dt.replace(tzinfo=timezone.utc) + except ValueError: + pass + # Try COSMA MAG format: '20260508 07:52:28.456' + try: + dt = datetime.strptime(col, "%Y%m%d %H:%M:%S.%f") + return dt.replace(tzinfo=timezone.utc) + except ValueError: + pass + # Try float epoch + try: + return datetime.fromtimestamp(float(col), tz=timezone.utc) + except (ValueError, OSError): + pass + return None + + +def extract_csv(path: Path) -> dict | None: + """Extract timestamps from CSV (USBL/MAG) via head+tail.""" + try: + result = subprocess.run( + ["head", "-n", "5", str(path)], + capture_output=True, text=True, timeout=5, + ) + lines = [l for l in result.stdout.splitlines() if l.strip()] + + tail_result = subprocess.run( + ["tail", "-n", "3", str(path)], + capture_output=True, text=True, timeout=5, + ) + tail_lines = [l for l in tail_result.stdout.splitlines() if l.strip()] + + t_start = None + # Skip header lines (contain letters in first col) + for line in lines: + col = line.split(",")[0].strip() + if not col or col[0].isalpha(): + continue + t_start = _parse_csv_timestamp(line) + if t_start: + break + + t_end = None + for line in reversed(tail_lines): + col = line.split(",")[0].strip() + if not col or col[0].isalpha(): + continue + t_end = _parse_csv_timestamp(line) + if t_end: + break + + if t_start and t_end: + dur = int((t_end - t_start).total_seconds()) + return { + "t_start": t_start.isoformat(timespec="seconds"), + "t_end": t_end.isoformat(timespec="seconds"), + "duration_s": max(0, dur), + "source": "csv_inline", + } + except Exception: + pass + return mtime_fallback(path) + + +def extract_klf(path: Path) -> dict | None: + """Extract timestamp from KLF Kogger filename: kogger_sss_YYYYMMDD_HHMMSS.klf""" + try: + m = KLF_TS_RE.search(path.name) + if m: + dt = datetime.strptime(m.group(1) + m.group(2), "%Y%m%d%H%M%S") + t_start = dt.replace(tzinfo=timezone.utc) + # Estimate duration from file size + try: + size = path.stat().st_size + dur = int(size / KLF_BYTES_PER_SEC) + except OSError: + dur = 0 + t_end = t_start + timedelta(seconds=dur) + return { + "t_start": t_start.isoformat(timespec="seconds"), + "t_end": t_end.isoformat(timespec="seconds"), + "duration_s": dur, + "source": "filename_parse", + } + except Exception: + pass + return mtime_fallback(path) + + +def extract_timestamps(path: Path, kind: str) -> dict | None: + """Dispatch timestamp extraction by file kind.""" + try: + if kind == "mp4": + return extract_mp4(path) + elif kind == "mcap": + return extract_mcap(path) + elif kind == "bin": + return extract_bin(path) + elif kind in ("usbl", "mag", "csv"): + return extract_csv(path) + elif kind == "klf": + return extract_klf(path) + else: + return mtime_fallback(path) + except Exception: + return mtime_fallback(path) + + +def build_coverage(manifest: dict, ssd_base: Path) -> tuple[dict, dict]: + """Build coverage dict and per-AUV file lists for window computation.""" + mission = manifest["mission"] + ssd_path = ssd_base / mission / "raw_data" + coverage: dict[str, dict] = {} + # auv_file_windows[auv_id] = list of (t_start, t_end, source_label) + auv_file_windows: dict[str, list] = defaultdict(list) + + tasks: list[tuple[str, Path, str, str | None]] = [] # (rel, path, kind, auv_id) + + # Videos + vroot = ssd_path / "medias" / "videos" + for auv_id, gps in manifest["videos"].items(): + for gp_key, files in gps.items(): + for fname in files: + # find the file + for sub in (vroot,): + for folder in sub.iterdir() if sub.exists() else []: + if not folder.is_dir(): + continue + p = folder / fname + if p.exists(): + rel = str(p.relative_to(ssd_path)) + tasks.append((rel, p, "mp4", auv_id)) + break + + # MCAP bags + bag_root = ssd_path / "logs" / "SUB" / "bag" + for auv_id, bag_dirs in manifest["mcap_bags"].items(): + for bag_dir in bag_dirs: + bag_path = bag_root / bag_dir + if bag_path.exists(): + for mcap_file in sorted(bag_path.glob("*.mcap")): + rel = str(mcap_file.relative_to(ssd_path)) + tasks.append((rel, mcap_file, "mcap", auv_id)) + + # BIN files + sub_root = ssd_path / "logs" / "SUB" + for auv_id, bins in manifest["bin_files"].items(): + for bname in bins: + for p in sub_root.rglob(bname): + if p.is_file(): + rel = str(p.relative_to(ssd_path)) + tasks.append((rel, p, "bin", auv_id)) + break + + # USBL logs + for rel_str in manifest["usbl_logs"]: + p = ssd_path / rel_str + if p.exists(): + # try to infer AUV from path + m = AUV_PHYS_RE.search(rel_str) + auv_id = f"AUV{int(m.group(1)):03d}" if m else None + tasks.append((rel_str, p, "usbl", auv_id)) + + # MAG files + for category, files in manifest["mag_files"].items(): + for rel_str in files: + p = ssd_path / rel_str + if p.exists(): + tasks.append((rel_str, p, "mag", None)) + + # SSS KLF files + for rel_str in manifest["sss_files"].get("klf", []): + p = ssd_path / rel_str + if p.exists(): + tasks.append((rel_str, p, "klf", None)) + + # Parallel extraction + def _extract(task): + rel, path, kind, auv_id = task + cov = extract_timestamps(path, kind) + return rel, cov, auv_id + + with ThreadPoolExecutor(max_workers=8) as ex: + futures = {ex.submit(_extract, t): t for t in tasks} + for fut in as_completed(futures): + rel, cov, auv_id = fut.result() + if cov: + coverage[rel] = cov + if auv_id and cov.get("source") != "mtime_fallback": + auv_file_windows[auv_id].append(( + cov["t_start"], cov["t_end"], rel + )) + + return coverage, dict(auv_file_windows) + + +def compute_mission_window(coverage: dict) -> dict | None: + """Global mission window: min t_start, max t_end over all files.""" + starts = [] + ends = [] + for cov in coverage.values(): + if cov.get("source") == "mtime_fallback": + continue + try: + starts.append(datetime.fromisoformat(cov["t_start"])) + ends.append(datetime.fromisoformat(cov["t_end"])) + except Exception: + pass + if not starts: + return None + t_start = min(starts) + t_end = max(ends) + return { + "t_start": t_start.isoformat(timespec="seconds"), + "t_end": t_end.isoformat(timespec="seconds"), + "duration_s": int((t_end - t_start).total_seconds()), + } + + +def compute_auv_windows(auv_file_windows: dict) -> dict: + """Per-AUV windows with gap detection (>60s gap between sorted windows).""" + result = {} + for auv_id, windows in sorted(auv_file_windows.items()): + # Sort by t_start + sorted_wins = sorted(windows, key=lambda x: x[0]) + starts = [datetime.fromisoformat(w[0]) for w in sorted_wins] + ends = [datetime.fromisoformat(w[1]) for w in sorted_wins] + + t_start = min(starts) + t_end = max(ends) + dur = int((t_end - t_start).total_seconds()) + + # Detect gaps: merge overlapping windows first, then find gaps + gaps = [] + # Build merged intervals + intervals = sorted(zip(starts, ends, [w[2] for w in sorted_wins])) + cur_start, cur_end, cur_label = intervals[0] + prev_label = cur_label + for i_start, i_end, i_label in intervals[1:]: + if i_start - cur_end > timedelta(seconds=60): + gaps.append({ + "from": cur_end.isoformat(timespec="seconds"), + "to": i_start.isoformat(timespec="seconds"), + "duration_s": int((i_start - cur_end).total_seconds()), + "between": [prev_label, i_label], + }) + if i_end > cur_end: + cur_end = i_end + prev_label = i_label + + # Collect source categories + sources = sorted(set( + "videos" if w[2].endswith(".MP4") or w[2].endswith(".mp4") else + "mcap" if w[2].endswith(".mcap") else + "bin" if w[2].endswith(".BIN") or w[2].endswith(".bin") else + "usbl" if "usbl" in w[2].lower() else + "other" + for w in sorted_wins + )) + + result[auv_id] = { + "t_start": t_start.isoformat(timespec="seconds"), + "t_end": t_end.isoformat(timespec="seconds"), + "duration_s": dur, + "sources": sources, + "gaps": gaps, + } + return result + + def safe_listdir(p: Path) -> list[Path]: if not p.exists() or not p.is_dir(): return [] @@ -179,7 +583,6 @@ def collect_audio_logs(ssd_path: Path) -> list[str]: return sorted(out) - def collect_sss_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]: """klf + bin under ssd_path/sss/, recursive.""" sss_root = ssd_path / "sss" @@ -233,6 +636,7 @@ def collect_mag_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]: total = sum(len(v) for v in result.values()) return result, total + def build_manifest(mission: str, ssd_base: Path) -> dict: ssd_path = ssd_base / mission / "raw_data" if not ssd_path.exists(): @@ -246,7 +650,7 @@ def build_manifest(mission: str, ssd_base: Path) -> dict: sss_files, n_sss = collect_sss_files(ssd_path) mag_files, n_mag = collect_mag_files(ssd_path) - return { + manifest = { "mission": mission, "generated_at": iso_utc_now(), "ssd_path": str(ssd_path), @@ -269,6 +673,26 @@ def build_manifest(mission: str, ssd_base: Path) -> dict: }, } + # --- Coverage extraction --- + coverage, auv_file_windows = build_coverage(manifest, ssd_base) + mission_window = compute_mission_window(coverage) + auv_windows = compute_auv_windows(auv_file_windows) + + # Coverage stats + import collections + source_counts = collections.Counter(v["source"] for v in coverage.values()) + n_fallback = source_counts.get("mtime_fallback", 0) + n_total_cov = len(coverage) + if n_total_cov: + print(f" coverage: {n_total_cov} files | fallback={n_fallback} " + f"({100*n_fallback//n_total_cov}%) | sources={dict(source_counts)}") + + manifest["coverage"] = coverage + manifest["mission_window"] = mission_window + manifest["auv_windows"] = auv_windows + + return manifest + def main(argv: list[str] | None = None) -> int: ap = argparse.ArgumentParser(description="Stage 01 — select mission, produce raw manifest.") @@ -297,6 +721,9 @@ def main(argv: list[str] | None = None) -> int: f"bags={t['n_mcap_bags']} bins={t['n_bin_files']} " f"usbl={t['n_usbl_logs']} audio={t['n_audio_logs']} " f"sss={t['n_sss_files']} mag={t['n_mag_files']}") + if manifest.get("mission_window"): + mw = manifest["mission_window"] + print(f" mission_window: {mw['t_start']} → {mw['t_end']} ({mw['duration_s']}s)") return 0