stage01: extraction timestamps internes + mission/auv windows

- ffprobe SMPTE pour MP4 GoPro (priorité)
- mcap.reader pour bag ROS2
- pymavlink pour BIN ArduSub (fallback mtime si fail)
- head/tail CSV USBL et MAG
- regex filename pour KLF Kogger
- mission_window global + auv_windows avec gaps détection (>60s)
This commit is contained in:
Ubuntu
2026-05-15 10:10:01 +00:00
parent 15b4ddfd70
commit 90621dea12

View File

@@ -16,9 +16,11 @@ from __future__ import annotations
import argparse import argparse
import json import json
import re import re
import subprocess
import sys import sys
from collections import defaultdict from collections import defaultdict
from datetime import datetime, timezone from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone, timedelta
from pathlib import Path from pathlib import Path
# Local helper for stage time/memory tracking # Local helper for stage time/memory tracking
@@ -30,12 +32,414 @@ AUV_PHYS_RE = re.compile(r"AUV(\d{3})", re.I)
GP_FOLDER_RE = re.compile(r"^GP(?P<gp>\d+)[_-]AUV(?P<auv>\d{3})$", re.I) GP_FOLDER_RE = re.compile(r"^GP(?P<gp>\d+)[_-]AUV(?P<auv>\d{3})$", re.I)
BAG_AUV_RE = re.compile(r"_AUV(?P<auv>\d{3})(?:[_/]|$)", re.I) BAG_AUV_RE = re.compile(r"_AUV(?P<auv>\d{3})(?:[_/]|$)", re.I)
USBL_FILE_RE = re.compile(r"usbl", re.I) USBL_FILE_RE = re.compile(r"usbl", re.I)
KLF_TS_RE = re.compile(r"(\d{8})_(\d{6})")
# KLF throughput estimate: ~5MB/min
KLF_BYTES_PER_SEC = 5 * 1024 * 1024 / 60
def iso_utc_now() -> str: def iso_utc_now() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds") return datetime.now(timezone.utc).isoformat(timespec="seconds")
def mtime_fallback(path: Path) -> dict:
"""Fallback: use mtime as t_start, duration=0."""
try:
mt = path.stat().st_mtime
t = datetime.fromtimestamp(mt, tz=timezone.utc)
return {
"t_start": t.isoformat(timespec="seconds"),
"t_end": t.isoformat(timespec="seconds"),
"duration_s": 0,
"source": "mtime_fallback",
}
except OSError:
return None
def parse_smpte(smpte: str, fps: float = 25.0) -> float:
"""SMPTE HH:MM:SS:FF → seconds since midnight."""
parts = smpte.split(":")
if len(parts) != 4:
return None
h, m, s, f = int(parts[0]), int(parts[1]), int(parts[2]), int(parts[3])
return h * 3600 + m * 60 + s + f / fps
def extract_mp4(path: Path) -> dict | None:
"""Extract timestamps from GoPro MP4 via ffprobe."""
try:
out = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_format", "-show_streams", str(path)],
capture_output=True, text=True, timeout=10,
).stdout
if not out:
return mtime_fallback(path)
data = json.loads(out)
fmt = data.get("format", {})
duration = float(fmt.get("duration", 0) or 0)
# Try SMPTE timecode first
smpte = None
for s in data.get("streams", []):
tc = s.get("tags", {}).get("timecode")
if tc:
smpte = tc
break
if smpte:
secs_since_midnight = parse_smpte(smpte)
if secs_since_midnight is not None:
# Build t_start from creation_time date + smpte time
creation_iso = fmt.get("tags", {}).get("creation_time", "")
try:
date_part = datetime.fromisoformat(
creation_iso.replace("Z", "+00:00")
).date()
midnight = datetime(
date_part.year, date_part.month, date_part.day,
tzinfo=timezone.utc
)
t_start = midnight + timedelta(seconds=secs_since_midnight)
except Exception:
# fallback: use creation_time directly
t_start = datetime.fromisoformat(
creation_iso.replace("Z", "+00:00")
)
t_end = t_start + timedelta(seconds=duration)
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": int(duration),
"source": "smpte",
}
# Fallback: creation_time
creation_iso = fmt.get("tags", {}).get("creation_time", "")
if creation_iso:
t_start = datetime.fromisoformat(creation_iso.replace("Z", "+00:00"))
t_end = t_start + timedelta(seconds=duration)
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": int(duration),
"source": "creation_time",
}
except Exception:
pass
return mtime_fallback(path)
def extract_mcap(path: Path) -> dict | None:
"""Extract timestamps from MCAP bag via mcap.reader."""
try:
from mcap.reader import make_reader
with open(path, "rb") as f:
reader = make_reader(f)
summary = reader.get_summary()
if summary and summary.statistics:
start_ns = summary.statistics.message_start_time
end_ns = summary.statistics.message_end_time
if start_ns and end_ns:
t_start = datetime.fromtimestamp(start_ns / 1e9, tz=timezone.utc)
t_end = datetime.fromtimestamp(end_ns / 1e9, tz=timezone.utc)
dur = int((end_ns - start_ns) / 1e9)
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": dur,
"source": "mcap_summary",
}
except Exception:
pass
return mtime_fallback(path)
def extract_bin(path: Path) -> dict | None:
"""BIN ArduSub: no absolute timestamp available (TimeUS = boot-relative).
Use mtime fallback."""
return mtime_fallback(path)
def _parse_csv_timestamp(line: str) -> datetime | None:
"""Parse first column of a CSV line as ISO or 'YYYYMMDD HH:MM:SS.mmm'."""
col = line.split(",")[0].strip().strip('"')
# Try ISO format (2026-05-08 05:46:29.384209)
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(col, fmt)
return dt.replace(tzinfo=timezone.utc)
except ValueError:
pass
# Try COSMA MAG format: '20260508 07:52:28.456'
try:
dt = datetime.strptime(col, "%Y%m%d %H:%M:%S.%f")
return dt.replace(tzinfo=timezone.utc)
except ValueError:
pass
# Try float epoch
try:
return datetime.fromtimestamp(float(col), tz=timezone.utc)
except (ValueError, OSError):
pass
return None
def extract_csv(path: Path) -> dict | None:
"""Extract timestamps from CSV (USBL/MAG) via head+tail."""
try:
result = subprocess.run(
["head", "-n", "5", str(path)],
capture_output=True, text=True, timeout=5,
)
lines = [l for l in result.stdout.splitlines() if l.strip()]
tail_result = subprocess.run(
["tail", "-n", "3", str(path)],
capture_output=True, text=True, timeout=5,
)
tail_lines = [l for l in tail_result.stdout.splitlines() if l.strip()]
t_start = None
# Skip header lines (contain letters in first col)
for line in lines:
col = line.split(",")[0].strip()
if not col or col[0].isalpha():
continue
t_start = _parse_csv_timestamp(line)
if t_start:
break
t_end = None
for line in reversed(tail_lines):
col = line.split(",")[0].strip()
if not col or col[0].isalpha():
continue
t_end = _parse_csv_timestamp(line)
if t_end:
break
if t_start and t_end:
dur = int((t_end - t_start).total_seconds())
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": max(0, dur),
"source": "csv_inline",
}
except Exception:
pass
return mtime_fallback(path)
def extract_klf(path: Path) -> dict | None:
"""Extract timestamp from KLF Kogger filename: kogger_sss_YYYYMMDD_HHMMSS.klf"""
try:
m = KLF_TS_RE.search(path.name)
if m:
dt = datetime.strptime(m.group(1) + m.group(2), "%Y%m%d%H%M%S")
t_start = dt.replace(tzinfo=timezone.utc)
# Estimate duration from file size
try:
size = path.stat().st_size
dur = int(size / KLF_BYTES_PER_SEC)
except OSError:
dur = 0
t_end = t_start + timedelta(seconds=dur)
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": dur,
"source": "filename_parse",
}
except Exception:
pass
return mtime_fallback(path)
def extract_timestamps(path: Path, kind: str) -> dict | None:
"""Dispatch timestamp extraction by file kind."""
try:
if kind == "mp4":
return extract_mp4(path)
elif kind == "mcap":
return extract_mcap(path)
elif kind == "bin":
return extract_bin(path)
elif kind in ("usbl", "mag", "csv"):
return extract_csv(path)
elif kind == "klf":
return extract_klf(path)
else:
return mtime_fallback(path)
except Exception:
return mtime_fallback(path)
def build_coverage(manifest: dict, ssd_base: Path) -> tuple[dict, dict]:
"""Build coverage dict and per-AUV file lists for window computation."""
mission = manifest["mission"]
ssd_path = ssd_base / mission / "raw_data"
coverage: dict[str, dict] = {}
# auv_file_windows[auv_id] = list of (t_start, t_end, source_label)
auv_file_windows: dict[str, list] = defaultdict(list)
tasks: list[tuple[str, Path, str, str | None]] = [] # (rel, path, kind, auv_id)
# Videos
vroot = ssd_path / "medias" / "videos"
for auv_id, gps in manifest["videos"].items():
for gp_key, files in gps.items():
for fname in files:
# find the file
for sub in (vroot,):
for folder in sub.iterdir() if sub.exists() else []:
if not folder.is_dir():
continue
p = folder / fname
if p.exists():
rel = str(p.relative_to(ssd_path))
tasks.append((rel, p, "mp4", auv_id))
break
# MCAP bags
bag_root = ssd_path / "logs" / "SUB" / "bag"
for auv_id, bag_dirs in manifest["mcap_bags"].items():
for bag_dir in bag_dirs:
bag_path = bag_root / bag_dir
if bag_path.exists():
for mcap_file in sorted(bag_path.glob("*.mcap")):
rel = str(mcap_file.relative_to(ssd_path))
tasks.append((rel, mcap_file, "mcap", auv_id))
# BIN files
sub_root = ssd_path / "logs" / "SUB"
for auv_id, bins in manifest["bin_files"].items():
for bname in bins:
for p in sub_root.rglob(bname):
if p.is_file():
rel = str(p.relative_to(ssd_path))
tasks.append((rel, p, "bin", auv_id))
break
# USBL logs
for rel_str in manifest["usbl_logs"]:
p = ssd_path / rel_str
if p.exists():
# try to infer AUV from path
m = AUV_PHYS_RE.search(rel_str)
auv_id = f"AUV{int(m.group(1)):03d}" if m else None
tasks.append((rel_str, p, "usbl", auv_id))
# MAG files
for category, files in manifest["mag_files"].items():
for rel_str in files:
p = ssd_path / rel_str
if p.exists():
tasks.append((rel_str, p, "mag", None))
# SSS KLF files
for rel_str in manifest["sss_files"].get("klf", []):
p = ssd_path / rel_str
if p.exists():
tasks.append((rel_str, p, "klf", None))
# Parallel extraction
def _extract(task):
rel, path, kind, auv_id = task
cov = extract_timestamps(path, kind)
return rel, cov, auv_id
with ThreadPoolExecutor(max_workers=8) as ex:
futures = {ex.submit(_extract, t): t for t in tasks}
for fut in as_completed(futures):
rel, cov, auv_id = fut.result()
if cov:
coverage[rel] = cov
if auv_id and cov.get("source") != "mtime_fallback":
auv_file_windows[auv_id].append((
cov["t_start"], cov["t_end"], rel
))
return coverage, dict(auv_file_windows)
def compute_mission_window(coverage: dict) -> dict | None:
"""Global mission window: min t_start, max t_end over all files."""
starts = []
ends = []
for cov in coverage.values():
if cov.get("source") == "mtime_fallback":
continue
try:
starts.append(datetime.fromisoformat(cov["t_start"]))
ends.append(datetime.fromisoformat(cov["t_end"]))
except Exception:
pass
if not starts:
return None
t_start = min(starts)
t_end = max(ends)
return {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": int((t_end - t_start).total_seconds()),
}
def compute_auv_windows(auv_file_windows: dict) -> dict:
"""Per-AUV windows with gap detection (>60s gap between sorted windows)."""
result = {}
for auv_id, windows in sorted(auv_file_windows.items()):
# Sort by t_start
sorted_wins = sorted(windows, key=lambda x: x[0])
starts = [datetime.fromisoformat(w[0]) for w in sorted_wins]
ends = [datetime.fromisoformat(w[1]) for w in sorted_wins]
t_start = min(starts)
t_end = max(ends)
dur = int((t_end - t_start).total_seconds())
# Detect gaps: merge overlapping windows first, then find gaps
gaps = []
# Build merged intervals
intervals = sorted(zip(starts, ends, [w[2] for w in sorted_wins]))
cur_start, cur_end, cur_label = intervals[0]
prev_label = cur_label
for i_start, i_end, i_label in intervals[1:]:
if i_start - cur_end > timedelta(seconds=60):
gaps.append({
"from": cur_end.isoformat(timespec="seconds"),
"to": i_start.isoformat(timespec="seconds"),
"duration_s": int((i_start - cur_end).total_seconds()),
"between": [prev_label, i_label],
})
if i_end > cur_end:
cur_end = i_end
prev_label = i_label
# Collect source categories
sources = sorted(set(
"videos" if w[2].endswith(".MP4") or w[2].endswith(".mp4") else
"mcap" if w[2].endswith(".mcap") else
"bin" if w[2].endswith(".BIN") or w[2].endswith(".bin") else
"usbl" if "usbl" in w[2].lower() else
"other"
for w in sorted_wins
))
result[auv_id] = {
"t_start": t_start.isoformat(timespec="seconds"),
"t_end": t_end.isoformat(timespec="seconds"),
"duration_s": dur,
"sources": sources,
"gaps": gaps,
}
return result
def safe_listdir(p: Path) -> list[Path]: def safe_listdir(p: Path) -> list[Path]:
if not p.exists() or not p.is_dir(): if not p.exists() or not p.is_dir():
return [] return []
@@ -179,7 +583,6 @@ def collect_audio_logs(ssd_path: Path) -> list[str]:
return sorted(out) return sorted(out)
def collect_sss_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]: def collect_sss_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]:
"""klf + bin under ssd_path/sss/, recursive.""" """klf + bin under ssd_path/sss/, recursive."""
sss_root = ssd_path / "sss" sss_root = ssd_path / "sss"
@@ -233,6 +636,7 @@ def collect_mag_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]:
total = sum(len(v) for v in result.values()) total = sum(len(v) for v in result.values())
return result, total return result, total
def build_manifest(mission: str, ssd_base: Path) -> dict: def build_manifest(mission: str, ssd_base: Path) -> dict:
ssd_path = ssd_base / mission / "raw_data" ssd_path = ssd_base / mission / "raw_data"
if not ssd_path.exists(): if not ssd_path.exists():
@@ -246,7 +650,7 @@ def build_manifest(mission: str, ssd_base: Path) -> dict:
sss_files, n_sss = collect_sss_files(ssd_path) sss_files, n_sss = collect_sss_files(ssd_path)
mag_files, n_mag = collect_mag_files(ssd_path) mag_files, n_mag = collect_mag_files(ssd_path)
return { manifest = {
"mission": mission, "mission": mission,
"generated_at": iso_utc_now(), "generated_at": iso_utc_now(),
"ssd_path": str(ssd_path), "ssd_path": str(ssd_path),
@@ -269,6 +673,26 @@ def build_manifest(mission: str, ssd_base: Path) -> dict:
}, },
} }
# --- Coverage extraction ---
coverage, auv_file_windows = build_coverage(manifest, ssd_base)
mission_window = compute_mission_window(coverage)
auv_windows = compute_auv_windows(auv_file_windows)
# Coverage stats
import collections
source_counts = collections.Counter(v["source"] for v in coverage.values())
n_fallback = source_counts.get("mtime_fallback", 0)
n_total_cov = len(coverage)
if n_total_cov:
print(f" coverage: {n_total_cov} files | fallback={n_fallback} "
f"({100*n_fallback//n_total_cov}%) | sources={dict(source_counts)}")
manifest["coverage"] = coverage
manifest["mission_window"] = mission_window
manifest["auv_windows"] = auv_windows
return manifest
def main(argv: list[str] | None = None) -> int: def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="Stage 01 — select mission, produce raw manifest.") ap = argparse.ArgumentParser(description="Stage 01 — select mission, produce raw manifest.")
@@ -297,6 +721,9 @@ def main(argv: list[str] | None = None) -> int:
f"bags={t['n_mcap_bags']} bins={t['n_bin_files']} " f"bags={t['n_mcap_bags']} bins={t['n_bin_files']} "
f"usbl={t['n_usbl_logs']} audio={t['n_audio_logs']} " f"usbl={t['n_usbl_logs']} audio={t['n_audio_logs']} "
f"sss={t['n_sss_files']} mag={t['n_mag_files']}") f"sss={t['n_sss_files']} mag={t['n_mag_files']}")
if manifest.get("mission_window"):
mw = manifest["mission_window"]
print(f" mission_window: {mw['t_start']}{mw['t_end']} ({mw['duration_s']}s)")
return 0 return 0