stage01: filtre old/, fusion AUV0xx<->AUV2xx, focus date mission
- _is_old_path skip tout path contenant /old/ - _normalize_auv_id: AUV010->AUV210, AUV012->AUV212, AUV013->AUV213 - _parse_mission_date: regex YYYYMMDD du nom mission - _mission_date_window: filtre coverage hors [date-2h, date+26h] UTC - manifest: mission_date + mission_date_window_utc + n_filtered_old + n_filtered_out_of_date
This commit is contained in:
@@ -33,6 +33,7 @@ GP_FOLDER_RE = re.compile(r"^GP(?P<gp>\d+)[_-]AUV(?P<auv>\d{3})$", re.I)
|
||||
BAG_AUV_RE = re.compile(r"_AUV(?P<auv>\d{3})(?:[_/]|$)", re.I)
|
||||
USBL_FILE_RE = re.compile(r"usbl", re.I)
|
||||
KLF_TS_RE = re.compile(r"(\d{8})_(\d{6})")
|
||||
MISSION_DATE_RE = re.compile(r"^(\d{8})-")
|
||||
|
||||
# KLF throughput estimate: ~5MB/min
|
||||
KLF_BYTES_PER_SEC = 5 * 1024 * 1024 / 60
|
||||
@@ -42,6 +43,42 @@ def iso_utc_now() -> str:
|
||||
return datetime.now(timezone.utc).isoformat(timespec="seconds")
|
||||
|
||||
|
||||
def _is_old_path(path: Path) -> bool:
|
||||
"""Return True if path contains an 'old' component (skip these files)."""
|
||||
return "old" in path.parts
|
||||
|
||||
|
||||
def _normalize_auv_id(auv_str: str) -> str:
|
||||
"""AUV0xx -> AUV2xx; AUV2xx unchanged; others unchanged."""
|
||||
m = re.fullmatch(r"AUV(\d{3})", auv_str, re.I)
|
||||
if not m:
|
||||
return auv_str
|
||||
n = int(m.group(1))
|
||||
if 0 <= n < 100:
|
||||
return f"AUV2{n:02d}" # AUV010 -> AUV210
|
||||
return f"AUV{n:03d}"
|
||||
|
||||
|
||||
def _parse_mission_date(mission: str) -> "datetime | None":
|
||||
"""Parse YYYYMMDD from mission folder name. Returns UTC midnight or None."""
|
||||
m = MISSION_DATE_RE.match(mission)
|
||||
if not m:
|
||||
print(f" [warn] mission_date: cannot parse date from '{mission}', no date filter")
|
||||
return None
|
||||
try:
|
||||
return datetime.strptime(m.group(1), "%Y%m%d").replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
print(f" [warn] mission_date: invalid date '{m.group(1)}' in '{mission}'")
|
||||
return None
|
||||
|
||||
|
||||
def _mission_date_window(mission_date: datetime) -> "tuple[datetime, datetime]":
|
||||
"""Return [date-2h, date+26h] UTC window with +/-2h timezone tolerance."""
|
||||
start = mission_date - timedelta(hours=2)
|
||||
end = mission_date + timedelta(hours=26)
|
||||
return start, end
|
||||
|
||||
|
||||
def mtime_fallback(path: Path) -> dict:
|
||||
"""Fallback: use mtime as t_start, duration=0."""
|
||||
try:
|
||||
@@ -58,7 +95,7 @@ def mtime_fallback(path: Path) -> dict:
|
||||
|
||||
|
||||
def parse_smpte(smpte: str, fps: float = 25.0) -> float:
|
||||
"""SMPTE HH:MM:SS:FF → seconds since midnight."""
|
||||
"""SMPTE HH:MM:SS:FF -> seconds since midnight."""
|
||||
parts = smpte.split(":")
|
||||
if len(parts) != 4:
|
||||
return None
|
||||
@@ -66,7 +103,7 @@ def parse_smpte(smpte: str, fps: float = 25.0) -> float:
|
||||
return h * 3600 + m * 60 + s + f / fps
|
||||
|
||||
|
||||
def extract_mp4(path: Path) -> dict | None:
|
||||
def extract_mp4(path: Path) -> "dict | None":
|
||||
"""Extract timestamps from GoPro MP4 via ffprobe."""
|
||||
try:
|
||||
out = subprocess.run(
|
||||
@@ -131,7 +168,7 @@ def extract_mp4(path: Path) -> dict | None:
|
||||
return mtime_fallback(path)
|
||||
|
||||
|
||||
def extract_mcap(path: Path) -> dict | None:
|
||||
def extract_mcap(path: Path) -> "dict | None":
|
||||
"""Extract timestamps from MCAP bag via mcap.reader."""
|
||||
try:
|
||||
from mcap.reader import make_reader
|
||||
@@ -156,14 +193,14 @@ def extract_mcap(path: Path) -> dict | None:
|
||||
return mtime_fallback(path)
|
||||
|
||||
|
||||
def extract_bin(path: Path) -> dict | None:
|
||||
def extract_bin(path: Path) -> "dict | None":
|
||||
"""BIN ArduSub: no absolute timestamp available (TimeUS = boot-relative).
|
||||
Use mtime fallback."""
|
||||
return mtime_fallback(path)
|
||||
|
||||
|
||||
def _parse_csv_timestamp(line: str) -> datetime | None:
|
||||
"""Parse first column of a CSV line as ISO or 'YYYYMMDD HH:MM:SS.mmm'."""
|
||||
def _parse_csv_timestamp(line: str) -> "datetime | None":
|
||||
"""Parse first column of a CSV line as ISO or YYYYMMDD HH:MM:SS.mmm."""
|
||||
col = line.split(",")[0].strip().strip('"')
|
||||
# Try ISO format (2026-05-08 05:46:29.384209)
|
||||
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S.%f",
|
||||
@@ -187,7 +224,7 @@ def _parse_csv_timestamp(line: str) -> datetime | None:
|
||||
return None
|
||||
|
||||
|
||||
def extract_csv(path: Path) -> dict | None:
|
||||
def extract_csv(path: Path) -> "dict | None":
|
||||
"""Extract timestamps from CSV (USBL/MAG) via head+tail."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
@@ -234,7 +271,7 @@ def extract_csv(path: Path) -> dict | None:
|
||||
return mtime_fallback(path)
|
||||
|
||||
|
||||
def extract_klf(path: Path) -> dict | None:
|
||||
def extract_klf(path: Path) -> "dict | None":
|
||||
"""Extract timestamp from KLF Kogger filename: kogger_sss_YYYYMMDD_HHMMSS.klf"""
|
||||
try:
|
||||
m = KLF_TS_RE.search(path.name)
|
||||
@@ -259,7 +296,7 @@ def extract_klf(path: Path) -> dict | None:
|
||||
return mtime_fallback(path)
|
||||
|
||||
|
||||
def extract_timestamps(path: Path, kind: str) -> dict | None:
|
||||
def extract_timestamps(path: Path, kind: str) -> "dict | None":
|
||||
"""Dispatch timestamp extraction by file kind."""
|
||||
try:
|
||||
if kind == "mp4":
|
||||
@@ -278,15 +315,15 @@ def extract_timestamps(path: Path, kind: str) -> dict | None:
|
||||
return mtime_fallback(path)
|
||||
|
||||
|
||||
def build_coverage(manifest: dict, ssd_base: Path) -> tuple[dict, dict]:
|
||||
def build_coverage(manifest: dict, ssd_base: Path) -> "tuple[dict, dict]":
|
||||
"""Build coverage dict and per-AUV file lists for window computation."""
|
||||
mission = manifest["mission"]
|
||||
ssd_path = ssd_base / mission / "raw_data"
|
||||
coverage: dict[str, dict] = {}
|
||||
coverage: "dict[str, dict]" = {}
|
||||
# auv_file_windows[auv_id] = list of (t_start, t_end, source_label)
|
||||
auv_file_windows: dict[str, list] = defaultdict(list)
|
||||
auv_file_windows: "dict[str, list]" = defaultdict(list)
|
||||
|
||||
tasks: list[tuple[str, Path, str, str | None]] = [] # (rel, path, kind, auv_id)
|
||||
tasks: "list[tuple[str, Path, str, str | None]]" = [] # (rel, path, kind, auv_id)
|
||||
|
||||
# Videos
|
||||
vroot = ssd_path / "medias" / "videos"
|
||||
@@ -330,7 +367,7 @@ def build_coverage(manifest: dict, ssd_base: Path) -> tuple[dict, dict]:
|
||||
if p.exists():
|
||||
# try to infer AUV from path
|
||||
m = AUV_PHYS_RE.search(rel_str)
|
||||
auv_id = f"AUV{int(m.group(1)):03d}" if m else None
|
||||
auv_id = _normalize_auv_id(f"AUV{int(m.group(1)):03d}") if m else None
|
||||
tasks.append((rel_str, p, "usbl", auv_id))
|
||||
|
||||
# MAG files
|
||||
@@ -366,7 +403,7 @@ def build_coverage(manifest: dict, ssd_base: Path) -> tuple[dict, dict]:
|
||||
return coverage, dict(auv_file_windows)
|
||||
|
||||
|
||||
def compute_mission_window(coverage: dict) -> dict | None:
|
||||
def compute_mission_window(coverage: dict) -> "dict | None":
|
||||
"""Global mission window: min t_start, max t_end over all files."""
|
||||
starts = []
|
||||
ends = []
|
||||
@@ -440,7 +477,7 @@ def compute_auv_windows(auv_file_windows: dict) -> dict:
|
||||
return result
|
||||
|
||||
|
||||
def safe_listdir(p: Path) -> list[Path]:
|
||||
def safe_listdir(p: Path) -> "list[Path]":
|
||||
if not p.exists() or not p.is_dir():
|
||||
return []
|
||||
try:
|
||||
@@ -449,9 +486,9 @@ def safe_listdir(p: Path) -> list[Path]:
|
||||
return []
|
||||
|
||||
|
||||
def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int, float]:
|
||||
def collect_videos(ssd_path: Path) -> "tuple[dict, int, float]":
|
||||
"""videos[AUV][GP1|GP2] -> sorted list of MP4 filenames. Two layouts tried."""
|
||||
videos: dict[str, dict[str, list[str]]] = defaultdict(lambda: defaultdict(list))
|
||||
videos = defaultdict(lambda: defaultdict(list))
|
||||
total_n = 0
|
||||
total_bytes = 0
|
||||
|
||||
@@ -460,12 +497,16 @@ def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int
|
||||
for sub in safe_listdir(vroot_a):
|
||||
if not sub.is_dir():
|
||||
continue
|
||||
if _is_old_path(sub):
|
||||
continue
|
||||
m = GP_FOLDER_RE.match(sub.name)
|
||||
if not m:
|
||||
continue
|
||||
gp_key = f"GP{int(m.group('gp'))}"
|
||||
auv_id = f"AUV{int(m.group('auv')):03d}"
|
||||
auv_id = _normalize_auv_id(f"AUV{int(m.group('auv')):03d}")
|
||||
for f in safe_listdir(sub):
|
||||
if _is_old_path(f):
|
||||
continue
|
||||
if f.is_file() and f.suffix.upper() == ".MP4":
|
||||
videos[auv_id][gp_key].append(f.name)
|
||||
total_n += 1
|
||||
@@ -474,22 +515,28 @@ def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# Layout B: medias/videos/AUV{xxx}/GP{n}/*.MP4 (fallback if anything found)
|
||||
# Layout B: medias/videos/AUV{xxx}/GP{n}/*.MP4 (fallback)
|
||||
for sub in safe_listdir(vroot_a):
|
||||
if not sub.is_dir():
|
||||
continue
|
||||
if _is_old_path(sub):
|
||||
continue
|
||||
am = AUV_PHYS_RE.fullmatch(sub.name)
|
||||
if not am:
|
||||
continue
|
||||
auv_id = f"AUV{int(am.group(1)):03d}"
|
||||
auv_id = _normalize_auv_id(f"AUV{int(am.group(1)):03d}")
|
||||
for gp_sub in safe_listdir(sub):
|
||||
if not gp_sub.is_dir():
|
||||
continue
|
||||
if _is_old_path(gp_sub):
|
||||
continue
|
||||
gm = re.fullmatch(r"GP(\d+)", gp_sub.name, re.I)
|
||||
if not gm:
|
||||
continue
|
||||
gp_key = f"GP{int(gm.group(1))}"
|
||||
for f in safe_listdir(gp_sub):
|
||||
if _is_old_path(f):
|
||||
continue
|
||||
if f.is_file() and f.suffix.upper() == ".MP4":
|
||||
videos[auv_id][gp_key].append(f.name)
|
||||
total_n += 1
|
||||
@@ -499,7 +546,7 @@ def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int
|
||||
pass
|
||||
|
||||
# Sort + dedup
|
||||
out: dict[str, dict[str, list[str]]] = {}
|
||||
out = {}
|
||||
for auv in sorted(videos.keys()):
|
||||
out[auv] = {}
|
||||
for gp in sorted(videos[auv].keys()):
|
||||
@@ -507,50 +554,58 @@ def collect_videos(ssd_path: Path) -> tuple[dict[str, dict[str, list[str]]], int
|
||||
return out, total_n, total_bytes / 1e9
|
||||
|
||||
|
||||
def collect_mcap_bags(ssd_path: Path) -> tuple[dict[str, list[str]], int]:
|
||||
def collect_mcap_bags(ssd_path: Path) -> "tuple[dict, int]":
|
||||
"""mcap_bags[AUV{nnn}] -> sorted list of bag-dir names."""
|
||||
bags: dict[str, list[str]] = defaultdict(list)
|
||||
bags = defaultdict(list)
|
||||
n = 0
|
||||
broot = ssd_path / "logs" / "SUB" / "bag"
|
||||
for sub in safe_listdir(broot):
|
||||
if not sub.is_dir():
|
||||
continue
|
||||
if _is_old_path(sub):
|
||||
continue
|
||||
m = BAG_AUV_RE.search(sub.name)
|
||||
if not m:
|
||||
continue
|
||||
auv_id = f"AUV{int(m.group('auv')):03d}"
|
||||
auv_id = _normalize_auv_id(f"AUV{int(m.group('auv')):03d}")
|
||||
bags[auv_id].append(sub.name)
|
||||
n += 1
|
||||
return {k: sorted(set(v)) for k, v in sorted(bags.items())}, n
|
||||
|
||||
|
||||
def collect_bin_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]:
|
||||
def collect_bin_files(ssd_path: Path) -> "tuple[dict, int]":
|
||||
"""bin_files[AUV{nnn}] -> sorted .BIN filenames."""
|
||||
bins: dict[str, list[str]] = defaultdict(list)
|
||||
bins = defaultdict(list)
|
||||
n = 0
|
||||
# Layout: logs/SUB/AUV{xxx}/*.BIN
|
||||
sub_root = ssd_path / "logs" / "SUB"
|
||||
for sub in safe_listdir(sub_root):
|
||||
if not sub.is_dir():
|
||||
continue
|
||||
if _is_old_path(sub):
|
||||
continue
|
||||
am = AUV_PHYS_RE.fullmatch(sub.name)
|
||||
if not am:
|
||||
continue
|
||||
auv_id = f"AUV{int(am.group(1)):03d}"
|
||||
auv_id = _normalize_auv_id(f"AUV{int(am.group(1)):03d}")
|
||||
for f in safe_listdir(sub):
|
||||
if _is_old_path(f):
|
||||
continue
|
||||
if f.is_file() and f.suffix.upper() == ".BIN":
|
||||
bins[auv_id].append(f.name)
|
||||
n += 1
|
||||
# Also try logs/SUB/bin/*.BIN (flat layout)
|
||||
flat = ssd_path / "logs" / "SUB" / "bin"
|
||||
for f in safe_listdir(flat):
|
||||
if _is_old_path(f):
|
||||
continue
|
||||
if f.is_file() and f.suffix.upper() == ".BIN":
|
||||
bins.setdefault("AUV000", []).append(f.name)
|
||||
n += 1
|
||||
return {k: sorted(set(v)) for k, v in sorted(bins.items())}, n
|
||||
|
||||
|
||||
def collect_usbl_logs(ssd_path: Path) -> list[str]:
|
||||
def collect_usbl_logs(ssd_path: Path) -> "list[str]":
|
||||
"""Return list of USBL csv paths relative to ssd_path."""
|
||||
out = []
|
||||
logs_root = ssd_path / "logs"
|
||||
@@ -562,6 +617,8 @@ def collect_usbl_logs(ssd_path: Path) -> list[str]:
|
||||
continue
|
||||
except OSError:
|
||||
continue
|
||||
if _is_old_path(p):
|
||||
continue
|
||||
name_l = p.name.lower()
|
||||
if "usbl" in name_l and name_l.endswith(".csv"):
|
||||
try:
|
||||
@@ -571,34 +628,40 @@ def collect_usbl_logs(ssd_path: Path) -> list[str]:
|
||||
return sorted(set(out))
|
||||
|
||||
|
||||
def collect_audio_logs(ssd_path: Path) -> list[str]:
|
||||
def collect_audio_logs(ssd_path: Path) -> "list[str]":
|
||||
out = []
|
||||
aud = ssd_path / "audios"
|
||||
for p in aud.rglob("*") if aud.exists() else []:
|
||||
try:
|
||||
if p.is_file():
|
||||
out.append(str(p.relative_to(ssd_path)))
|
||||
if not p.is_file():
|
||||
continue
|
||||
except (OSError, ValueError):
|
||||
continue
|
||||
if _is_old_path(p):
|
||||
continue
|
||||
try:
|
||||
out.append(str(p.relative_to(ssd_path)))
|
||||
except ValueError:
|
||||
continue
|
||||
return sorted(out)
|
||||
|
||||
|
||||
def collect_sss_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]:
|
||||
def collect_sss_files(ssd_path: Path) -> "tuple[dict, int]":
|
||||
"""klf + bin under ssd_path/sss/, recursive."""
|
||||
sss_root = ssd_path / "sss"
|
||||
klf: list[str] = []
|
||||
bin_: list[str] = []
|
||||
klf = []
|
||||
bin_ = []
|
||||
if sss_root.exists():
|
||||
try:
|
||||
for p in sss_root.rglob("*.klf"):
|
||||
try:
|
||||
if p.is_file():
|
||||
if p.is_file() and not _is_old_path(p):
|
||||
klf.append(str(p.relative_to(ssd_path)))
|
||||
except (OSError, ValueError):
|
||||
continue
|
||||
for p in sss_root.rglob("*.bin"):
|
||||
try:
|
||||
if p.is_file():
|
||||
if p.is_file() and not _is_old_path(p):
|
||||
bin_.append(str(p.relative_to(ssd_path)))
|
||||
except (OSError, ValueError):
|
||||
continue
|
||||
@@ -608,16 +671,18 @@ def collect_sss_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]:
|
||||
return result, len(result["klf"]) + len(result["bin"])
|
||||
|
||||
|
||||
def collect_mag_files(ssd_path: Path) -> tuple[dict[str, list[str]], int]:
|
||||
def collect_mag_files(ssd_path: Path) -> "tuple[dict, int]":
|
||||
"""csv under ssd_path/mag/, categorised ar/av/side/other."""
|
||||
mag_root = ssd_path / "mag"
|
||||
out: dict[str, list[str]] = {"ar": [], "av": [], "side": [], "other": []}
|
||||
out = {"ar": [], "av": [], "side": [], "other": []}
|
||||
if mag_root.exists():
|
||||
try:
|
||||
for p in mag_root.rglob("*.csv"):
|
||||
try:
|
||||
if not p.is_file():
|
||||
continue
|
||||
if _is_old_path(p):
|
||||
continue
|
||||
rel = str(p.relative_to(ssd_path))
|
||||
parts = p.parts
|
||||
if "ar" in parts:
|
||||
@@ -650,10 +715,27 @@ def build_manifest(mission: str, ssd_base: Path) -> dict:
|
||||
sss_files, n_sss = collect_sss_files(ssd_path)
|
||||
mag_files, n_mag = collect_mag_files(ssd_path)
|
||||
|
||||
# Parse mission date for date-focus filter
|
||||
mission_date = _parse_mission_date(mission)
|
||||
if mission_date:
|
||||
win_start, win_end = _mission_date_window(mission_date)
|
||||
mission_date_str = mission_date.strftime("%Y-%m-%d")
|
||||
mission_date_window_utc = {
|
||||
"start": win_start.isoformat(timespec="seconds"),
|
||||
"end": win_end.isoformat(timespec="seconds"),
|
||||
}
|
||||
print(f" mission_date: {mission_date_str} | window: {win_start.isoformat(timespec='seconds')} -> {win_end.isoformat(timespec='seconds')}")
|
||||
else:
|
||||
mission_date_str = None
|
||||
mission_date_window_utc = None
|
||||
win_start = win_end = None
|
||||
|
||||
manifest = {
|
||||
"mission": mission,
|
||||
"generated_at": iso_utc_now(),
|
||||
"ssd_path": str(ssd_path),
|
||||
"mission_date": mission_date_str,
|
||||
"mission_date_window_utc": mission_date_window_utc,
|
||||
"videos": videos,
|
||||
"mcap_bags": mcap_bags,
|
||||
"bin_files": bin_files,
|
||||
@@ -675,6 +757,40 @@ def build_manifest(mission: str, ssd_base: Path) -> dict:
|
||||
|
||||
# --- Coverage extraction ---
|
||||
coverage, auv_file_windows = build_coverage(manifest, ssd_base)
|
||||
|
||||
# --- Date-focus filter on coverage ---
|
||||
n_filtered_old = 0 # already handled at collect time via _is_old_path
|
||||
n_filtered_out_of_date = 0
|
||||
if win_start and win_end:
|
||||
filtered_coverage = {}
|
||||
for rel, cov in coverage.items():
|
||||
if cov.get("source") == "mtime_fallback":
|
||||
filtered_coverage[rel] = cov
|
||||
continue
|
||||
try:
|
||||
t_s = datetime.fromisoformat(cov["t_start"])
|
||||
# Ensure tz-aware
|
||||
if t_s.tzinfo is None:
|
||||
t_s = t_s.replace(tzinfo=timezone.utc)
|
||||
if win_start <= t_s <= win_end:
|
||||
filtered_coverage[rel] = cov
|
||||
else:
|
||||
n_filtered_out_of_date += 1
|
||||
except Exception:
|
||||
filtered_coverage[rel] = cov
|
||||
if n_filtered_out_of_date:
|
||||
print(f" date_filter: removed {n_filtered_out_of_date} entries outside mission date window")
|
||||
coverage = filtered_coverage
|
||||
|
||||
# Rebuild auv_file_windows from filtered coverage
|
||||
kept_rels = set(coverage.keys())
|
||||
auv_file_windows_filtered = defaultdict(list)
|
||||
for auv_id, wins in auv_file_windows.items():
|
||||
for w in wins:
|
||||
if w[2] in kept_rels:
|
||||
auv_file_windows_filtered[auv_id].append(w)
|
||||
auv_file_windows = dict(auv_file_windows_filtered)
|
||||
|
||||
mission_window = compute_mission_window(coverage)
|
||||
auv_windows = compute_auv_windows(auv_file_windows)
|
||||
|
||||
@@ -690,12 +806,14 @@ def build_manifest(mission: str, ssd_base: Path) -> dict:
|
||||
manifest["coverage"] = coverage
|
||||
manifest["mission_window"] = mission_window
|
||||
manifest["auv_windows"] = auv_windows
|
||||
manifest["totals"]["n_filtered_old"] = n_filtered_old
|
||||
manifest["totals"]["n_filtered_out_of_date"] = n_filtered_out_of_date
|
||||
|
||||
return manifest
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
ap = argparse.ArgumentParser(description="Stage 01 — select mission, produce raw manifest.")
|
||||
def main(argv=None):
|
||||
ap = argparse.ArgumentParser(description="Stage 01 -- select mission, produce raw manifest.")
|
||||
ap.add_argument("--mission", required=True, help="Mission folder name (e.g. 20260505-Lepradet)")
|
||||
ap.add_argument("--ssd-base", default="/mnt/ssd", help="SSD base path (default /mnt/ssd)")
|
||||
ap.add_argument("--out", default="/home/cosma/cosma-qc/data",
|
||||
@@ -721,9 +839,10 @@ def main(argv: list[str] | None = None) -> int:
|
||||
f"bags={t['n_mcap_bags']} bins={t['n_bin_files']} "
|
||||
f"usbl={t['n_usbl_logs']} audio={t['n_audio_logs']} "
|
||||
f"sss={t['n_sss_files']} mag={t['n_mag_files']}")
|
||||
print(f" filtered: old={t.get('n_filtered_old', 0)} out_of_date={t.get('n_filtered_out_of_date', 0)}")
|
||||
if manifest.get("mission_window"):
|
||||
mw = manifest["mission_window"]
|
||||
print(f" mission_window: {mw['t_start']} → {mw['t_end']} ({mw['duration_s']}s)")
|
||||
print(f" mission_window: {mw['t_start']} -> {mw['t_end']} ({mw['duration_s']}s)")
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user