From 754f3c7272f9a555f2aa21805e2653853df54b3b Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 15 May 2026 10:48:22 +0000 Subject: [PATCH] stage01: filtre old/, fusion AUV0xx<->AUV2xx, focus date mission - _is_old_path skip tout path contenant /old/ - _normalize_auv_id: AUV010->AUV210, AUV012->AUV212, AUV013->AUV213 - _parse_mission_date: regex YYYYMMDD du nom mission - _mission_date_window: filtre coverage hors [date-2h, date+26h] UTC - manifest: mission_date + mission_date_window_utc + n_filtered_old + n_filtered_out_of_date --- pipeline/stages/01_select_mission.py | 203 +++++++++++++++++++++------ 1 file changed, 161 insertions(+), 42 deletions(-) diff --git a/pipeline/stages/01_select_mission.py b/pipeline/stages/01_select_mission.py index 0642c75..f49915f 100755 --- a/pipeline/stages/01_select_mission.py +++ b/pipeline/stages/01_select_mission.py @@ -33,6 +33,7 @@ GP_FOLDER_RE = re.compile(r"^GP(?P\d+)[_-]AUV(?P\d{3})$", re.I) BAG_AUV_RE = re.compile(r"_AUV(?P\d{3})(?:[_/]|$)", re.I) USBL_FILE_RE = re.compile(r"usbl", re.I) KLF_TS_RE = re.compile(r"(\d{8})_(\d{6})") +MISSION_DATE_RE = re.compile(r"^(\d{8})-") # KLF throughput estimate: ~5MB/min KLF_BYTES_PER_SEC = 5 * 1024 * 1024 / 60 @@ -42,6 +43,42 @@ def iso_utc_now() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") +def _is_old_path(path: Path) -> bool: + """Return True if path contains an 'old' component (skip these files).""" + return "old" in path.parts + + +def _normalize_auv_id(auv_str: str) -> str: + """AUV0xx -> AUV2xx; AUV2xx unchanged; others unchanged.""" + m = re.fullmatch(r"AUV(\d{3})", auv_str, re.I) + if not m: + return auv_str + n = int(m.group(1)) + if 0 <= n < 100: + return f"AUV2{n:02d}" # AUV010 -> AUV210 + return f"AUV{n:03d}" + + +def _parse_mission_date(mission: str) -> "datetime | None": + """Parse YYYYMMDD from mission folder name. Returns UTC midnight or None.""" + m = MISSION_DATE_RE.match(mission) + if not m: + print(f" [warn] mission_date: cannot parse date from '{mission}', no date filter") + return None + try: + return datetime.strptime(m.group(1), "%Y%m%d").replace(tzinfo=timezone.utc) + except ValueError: + print(f" [warn] mission_date: invalid date '{m.group(1)}' in '{mission}'") + return None + + +def _mission_date_window(mission_date: datetime) -> "tuple[datetime, datetime]": + """Return [date-2h, date+26h] UTC window with +/-2h timezone tolerance.""" + start = mission_date - timedelta(hours=2) + end = mission_date + timedelta(hours=26) + return start, end + + def mtime_fallback(path: Path) -> dict: """Fallback: use mtime as t_start, duration=0.""" try: @@ -58,7 +95,7 @@ def mtime_fallback(path: Path) -> dict: def parse_smpte(smpte: str, fps: float = 25.0) -> float: - """SMPTE HH:MM:SS:FF → seconds since midnight.""" + """SMPTE HH:MM:SS:FF -> seconds since midnight.""" parts = smpte.split(":") if len(parts) != 4: return None @@ -66,7 +103,7 @@ def parse_smpte(smpte: str, fps: float = 25.0) -> float: return h * 3600 + m * 60 + s + f / fps -def extract_mp4(path: Path) -> dict | None: +def extract_mp4(path: Path) -> "dict | None": """Extract timestamps from GoPro MP4 via ffprobe.""" try: out = subprocess.run( @@ -131,7 +168,7 @@ def extract_mp4(path: Path) -> dict | None: return mtime_fallback(path) -def extract_mcap(path: Path) -> dict | None: +def extract_mcap(path: Path) -> "dict | None": """Extract timestamps from MCAP bag via mcap.reader.""" try: from mcap.reader import make_reader @@ -156,14 +193,14 @@ def extract_mcap(path: Path) -> dict | None: return mtime_fallback(path) -def extract_bin(path: Path) -> dict | None: +def extract_bin(path: Path) -> "dict | None": """BIN ArduSub: no absolute timestamp available (TimeUS = boot-relative). Use mtime fallback.""" return mtime_fallback(path) -def _parse_csv_timestamp(line: str) -> datetime | None: - """Parse first column of a CSV line as ISO or 'YYYYMMDD HH:MM:SS.mmm'.""" +def _parse_csv_timestamp(line: str) -> "datetime | None": + """Parse first column of a CSV line as ISO or YYYYMMDD HH:MM:SS.mmm.""" col = line.split(",")[0].strip().strip('"') # Try ISO format (2026-05-08 05:46:29.384209) for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S.%f", @@ -187,7 +224,7 @@ def _parse_csv_timestamp(line: str) -> datetime | None: return None -def extract_csv(path: Path) -> dict | None: +def extract_csv(path: Path) -> "dict | None": """Extract timestamps from CSV (USBL/MAG) via head+tail.""" try: result = subprocess.run( @@ -234,7 +271,7 @@ def extract_csv(path: Path) -> dict | None: return mtime_fallback(path) -def extract_klf(path: Path) -> dict | None: +def extract_klf(path: Path) -> "dict | None": """Extract timestamp from KLF Kogger filename: kogger_sss_YYYYMMDD_HHMMSS.klf""" try: m = KLF_TS_RE.search(path.name) @@ -259,7 +296,7 @@ def extract_klf(path: Path) -> dict | None: return mtime_fallback(path) -def extract_timestamps(path: Path, kind: str) -> dict | None: +def extract_timestamps(path: Path, kind: str) -> "dict | None": """Dispatch timestamp extraction by file kind.""" try: if kind == "mp4": @@ -278,15 +315,15 @@ def extract_timestamps(path: Path, kind: str) -> dict | None: return mtime_fallback(path) -def build_coverage(manifest: dict, ssd_base: Path) -> tuple[dict, dict]: +def build_coverage(manifest: dict, ssd_base: Path) -> "tuple[dict, dict]": """Build coverage dict and per-AUV file lists for window computation.""" mission = manifest["mission"] ssd_path = ssd_base / mission / "raw_data" - coverage: dict[str, dict] = {} + coverage: "dict[str, dict]" = {} # auv_file_windows[auv_id] = list of (t_start, t_end, source_label) - auv_file_windows: dict[str, list] = defaultdict(list) + auv_file_windows: "dict[str, list]" = defaultdict(list) - tasks: list[tuple[str, Path, str, str | None]] = [] # (rel, path, kind, auv_id) + tasks: "list[tuple[str, Path, str, str | None]]" = [] # (rel, path, kind, auv_id) # Videos vroot = ssd_path / "medias" / "videos" @@ -330,7 +367,7 @@ def build_coverage(manifest: dict, ssd_base: Path) -> tuple[dict, dict]: if p.exists(): # try to infer AUV from path m = AUV_PHYS_RE.search(rel_str) - auv_id = f"AUV{int(m.group(1)):03d}" if m else None + auv_id = _normalize_auv_id(f"AUV{int(m.group(1)):03d}") if m else None tasks.append((rel_str, p, "usbl", auv_id)) # MAG files @@ -366,7 +403,7 @@ def build_coverage(manifest: dict, ssd_base: Path) -> tuple[dict, dict]: return coverage, dict(auv_file_windows) -def compute_mission_window(coverage: dict) -> dict | None: +def compute_mission_window(coverage: dict) -> "dict | None": """Global mission window: min t_start, max t_end over all files.""" starts = [] ends = [] @@ -440,7 +477,7 @@ def compute_auv_windows(auv_file_windows: dict) -> dict: return result -def safe_listdir(p: Path) -> list[Path]: +def safe_listdir(p: Path) -> "list[Path]": if not p.exists() or not p.is_dir(): return [] try: @@ -449,9 +486,9 @@ def safe_listdir(p: Path) -> list[Path]: return [] -def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int, float]: +def collect_videos(ssd_path: Path) -> "tuple[dict, int, float]": """videos[AUV][GP1|GP2] -> sorted list of MP4 filenames. Two layouts tried.""" - videos: dict[str, dict[str, list[str]]] = defaultdict(lambda: defaultdict(list)) + videos = defaultdict(lambda: defaultdict(list)) total_n = 0 total_bytes = 0 @@ -460,12 +497,16 @@ def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int for sub in safe_listdir(vroot_a): if not sub.is_dir(): continue + if _is_old_path(sub): + continue m = GP_FOLDER_RE.match(sub.name) if not m: continue gp_key = f"GP{int(m.group('gp'))}" - auv_id = f"AUV{int(m.group('auv')):03d}" + auv_id = _normalize_auv_id(f"AUV{int(m.group('auv')):03d}") for f in safe_listdir(sub): + if _is_old_path(f): + continue if f.is_file() and f.suffix.upper() == ".MP4": videos[auv_id][gp_key].append(f.name) total_n += 1 @@ -474,22 +515,28 @@ def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int except OSError: pass - # Layout B: medias/videos/AUV{xxx}/GP{n}/*.MP4 (fallback if anything found) + # Layout B: medias/videos/AUV{xxx}/GP{n}/*.MP4 (fallback) for sub in safe_listdir(vroot_a): if not sub.is_dir(): continue + if _is_old_path(sub): + continue am = AUV_PHYS_RE.fullmatch(sub.name) if not am: continue - auv_id = f"AUV{int(am.group(1)):03d}" + auv_id = _normalize_auv_id(f"AUV{int(am.group(1)):03d}") for gp_sub in safe_listdir(sub): if not gp_sub.is_dir(): continue + if _is_old_path(gp_sub): + continue gm = re.fullmatch(r"GP(\d+)", gp_sub.name, re.I) if not gm: continue gp_key = f"GP{int(gm.group(1))}" for f in safe_listdir(gp_sub): + if _is_old_path(f): + continue if f.is_file() and f.suffix.upper() == ".MP4": videos[auv_id][gp_key].append(f.name) total_n += 1 @@ -499,7 +546,7 @@ def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int pass # Sort + dedup - out: dict[str, dict[str, list[str]]] = {} + out = {} for auv in sorted(videos.keys()): out[auv] = {} for gp in sorted(videos[auv].keys()): @@ -507,50 +554,58 @@ def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int return out, total_n, total_bytes / 1e9 -def collect_mcap_bags(ssd_path: Path) -> tuple[dict[str, list[str]], int]: +def collect_mcap_bags(ssd_path: Path) -> "tuple[dict, int]": """mcap_bags[AUV{nnn}] -> sorted list of bag-dir names.""" - bags: dict[str, list[str]] = defaultdict(list) + bags = defaultdict(list) n = 0 broot = ssd_path / "logs" / "SUB" / "bag" for sub in safe_listdir(broot): if not sub.is_dir(): continue + if _is_old_path(sub): + continue m = BAG_AUV_RE.search(sub.name) if not m: continue - auv_id = f"AUV{int(m.group('auv')):03d}" + auv_id = _normalize_auv_id(f"AUV{int(m.group('auv')):03d}") bags[auv_id].append(sub.name) n += 1 return {k: sorted(set(v)) for k, v in sorted(bags.items())}, n -def collect_bin_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]: +def collect_bin_files(ssd_path: Path) -> "tuple[dict, int]": """bin_files[AUV{nnn}] -> sorted .BIN filenames.""" - bins: dict[str, list[str]] = defaultdict(list) + bins = defaultdict(list) n = 0 # Layout: logs/SUB/AUV{xxx}/*.BIN sub_root = ssd_path / "logs" / "SUB" for sub in safe_listdir(sub_root): if not sub.is_dir(): continue + if _is_old_path(sub): + continue am = AUV_PHYS_RE.fullmatch(sub.name) if not am: continue - auv_id = f"AUV{int(am.group(1)):03d}" + auv_id = _normalize_auv_id(f"AUV{int(am.group(1)):03d}") for f in safe_listdir(sub): + if _is_old_path(f): + continue if f.is_file() and f.suffix.upper() == ".BIN": bins[auv_id].append(f.name) n += 1 # Also try logs/SUB/bin/*.BIN (flat layout) flat = ssd_path / "logs" / "SUB" / "bin" for f in safe_listdir(flat): + if _is_old_path(f): + continue if f.is_file() and f.suffix.upper() == ".BIN": bins.setdefault("AUV000", []).append(f.name) n += 1 return {k: sorted(set(v)) for k, v in sorted(bins.items())}, n -def collect_usbl_logs(ssd_path: Path) -> list[str]: +def collect_usbl_logs(ssd_path: Path) -> "list[str]": """Return list of USBL csv paths relative to ssd_path.""" out = [] logs_root = ssd_path / "logs" @@ -562,6 +617,8 @@ def collect_usbl_logs(ssd_path: Path) -> list[str]: continue except OSError: continue + if _is_old_path(p): + continue name_l = p.name.lower() if "usbl" in name_l and name_l.endswith(".csv"): try: @@ -571,34 +628,40 @@ def collect_usbl_logs(ssd_path: Path) -> list[str]: return sorted(set(out)) -def collect_audio_logs(ssd_path: Path) -> list[str]: +def collect_audio_logs(ssd_path: Path) -> "list[str]": out = [] aud = ssd_path / "audios" for p in aud.rglob("*") if aud.exists() else []: try: - if p.is_file(): - out.append(str(p.relative_to(ssd_path))) + if not p.is_file(): + continue except (OSError, ValueError): continue + if _is_old_path(p): + continue + try: + out.append(str(p.relative_to(ssd_path))) + except ValueError: + continue return sorted(out) -def collect_sss_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]: +def collect_sss_files(ssd_path: Path) -> "tuple[dict, int]": """klf + bin under ssd_path/sss/, recursive.""" sss_root = ssd_path / "sss" - klf: list[str] = [] - bin_: list[str] = [] + klf = [] + bin_ = [] if sss_root.exists(): try: for p in sss_root.rglob("*.klf"): try: - if p.is_file(): + if p.is_file() and not _is_old_path(p): klf.append(str(p.relative_to(ssd_path))) except (OSError, ValueError): continue for p in sss_root.rglob("*.bin"): try: - if p.is_file(): + if p.is_file() and not _is_old_path(p): bin_.append(str(p.relative_to(ssd_path))) except (OSError, ValueError): continue @@ -608,16 +671,18 @@ def collect_sss_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]: return result, len(result["klf"]) + len(result["bin"]) -def collect_mag_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]: +def collect_mag_files(ssd_path: Path) -> "tuple[dict, int]": """csv under ssd_path/mag/, categorised ar/av/side/other.""" mag_root = ssd_path / "mag" - out: dict[str, list[str]] = {"ar": [], "av": [], "side": [], "other": []} + out = {"ar": [], "av": [], "side": [], "other": []} if mag_root.exists(): try: for p in mag_root.rglob("*.csv"): try: if not p.is_file(): continue + if _is_old_path(p): + continue rel = str(p.relative_to(ssd_path)) parts = p.parts if "ar" in parts: @@ -650,10 +715,27 @@ def build_manifest(mission: str, ssd_base: Path) -> dict: sss_files, n_sss = collect_sss_files(ssd_path) mag_files, n_mag = collect_mag_files(ssd_path) + # Parse mission date for date-focus filter + mission_date = _parse_mission_date(mission) + if mission_date: + win_start, win_end = _mission_date_window(mission_date) + mission_date_str = mission_date.strftime("%Y-%m-%d") + mission_date_window_utc = { + "start": win_start.isoformat(timespec="seconds"), + "end": win_end.isoformat(timespec="seconds"), + } + print(f" mission_date: {mission_date_str} | window: {win_start.isoformat(timespec='seconds')} -> {win_end.isoformat(timespec='seconds')}") + else: + mission_date_str = None + mission_date_window_utc = None + win_start = win_end = None + manifest = { "mission": mission, "generated_at": iso_utc_now(), "ssd_path": str(ssd_path), + "mission_date": mission_date_str, + "mission_date_window_utc": mission_date_window_utc, "videos": videos, "mcap_bags": mcap_bags, "bin_files": bin_files, @@ -675,6 +757,40 @@ def build_manifest(mission: str, ssd_base: Path) -> dict: # --- Coverage extraction --- coverage, auv_file_windows = build_coverage(manifest, ssd_base) + + # --- Date-focus filter on coverage --- + n_filtered_old = 0 # already handled at collect time via _is_old_path + n_filtered_out_of_date = 0 + if win_start and win_end: + filtered_coverage = {} + for rel, cov in coverage.items(): + if cov.get("source") == "mtime_fallback": + filtered_coverage[rel] = cov + continue + try: + t_s = datetime.fromisoformat(cov["t_start"]) + # Ensure tz-aware + if t_s.tzinfo is None: + t_s = t_s.replace(tzinfo=timezone.utc) + if win_start <= t_s <= win_end: + filtered_coverage[rel] = cov + else: + n_filtered_out_of_date += 1 + except Exception: + filtered_coverage[rel] = cov + if n_filtered_out_of_date: + print(f" date_filter: removed {n_filtered_out_of_date} entries outside mission date window") + coverage = filtered_coverage + + # Rebuild auv_file_windows from filtered coverage + kept_rels = set(coverage.keys()) + auv_file_windows_filtered = defaultdict(list) + for auv_id, wins in auv_file_windows.items(): + for w in wins: + if w[2] in kept_rels: + auv_file_windows_filtered[auv_id].append(w) + auv_file_windows = dict(auv_file_windows_filtered) + mission_window = compute_mission_window(coverage) auv_windows = compute_auv_windows(auv_file_windows) @@ -690,12 +806,14 @@ def build_manifest(mission: str, ssd_base: Path) -> dict: manifest["coverage"] = coverage manifest["mission_window"] = mission_window manifest["auv_windows"] = auv_windows + manifest["totals"]["n_filtered_old"] = n_filtered_old + manifest["totals"]["n_filtered_out_of_date"] = n_filtered_out_of_date return manifest -def main(argv: list[str] | None = None) -> int: - ap = argparse.ArgumentParser(description="Stage 01 — select mission, produce raw manifest.") +def main(argv=None): + ap = argparse.ArgumentParser(description="Stage 01 -- select mission, produce raw manifest.") ap.add_argument("--mission", required=True, help="Mission folder name (e.g. 20260505-Lepradet)") ap.add_argument("--ssd-base", default="/mnt/ssd", help="SSD base path (default /mnt/ssd)") ap.add_argument("--out", default="/home/cosma/cosma-qc/data", @@ -721,9 +839,10 @@ def main(argv: list[str] | None = None) -> int: f"bags={t['n_mcap_bags']} bins={t['n_bin_files']} " f"usbl={t['n_usbl_logs']} audio={t['n_audio_logs']} " f"sss={t['n_sss_files']} mag={t['n_mag_files']}") + print(f" filtered: old={t.get('n_filtered_old', 0)} out_of_date={t.get('n_filtered_out_of_date', 0)}") if manifest.get("mission_window"): mw = manifest["mission_window"] - print(f" mission_window: {mw['t_start']} → {mw['t_end']} ({mw['duration_s']}s)") + print(f" mission_window: {mw['t_start']} -> {mw['t_end']} ({mw['duration_s']}s)") return 0