From 171f90ce9f864f56313c6fc58ef57ed64efd1c1a Mon Sep 17 00:00:00 2001 From: Poulpe Date: Sat, 16 May 2026 16:05:41 +0000 Subject: [PATCH] stage03b: trim videos per run + ours rough cut LRV proxy (GoPro low-res 768x432 H.264) + ffmpeg -c copy keyframe-aligned. Inputs: 02_runs.json + 03_video_index.json. Outputs: per-run mp4 + ours_.mp4 chrono concat. Tested on 20260505-Lepradet: 5 files + 2 ours (~11 GB total). Co-Authored-By: Claude Opus 4.7 --- pipeline/stages/03b_trim_runs.py | 228 +++++++++++++++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100755 pipeline/stages/03b_trim_runs.py diff --git a/pipeline/stages/03b_trim_runs.py b/pipeline/stages/03b_trim_runs.py new file mode 100755 index 0000000..8765740 --- /dev/null +++ b/pipeline/stages/03b_trim_runs.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +"""Stage 03b - Trim videos per run (LRV proxies + -c copy, fast). + +Inputs: + data//02_runs.json + data//03_video_index.json + +Strategy: + - Use GoPro LRV proxy files (768x432 H.264 ~720 kbps) instead of 4K HEVC originals. + - ffmpeg -c copy per chapter (keyframe-aligned cut) + concat demuxer. + - Output: per-run .mp4 + ours_.mp4 (concat of per-run). + +Falls back to MP4 source if matching LRV is missing. +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import subprocess +import sys +import tempfile +from collections import defaultdict +from datetime import datetime, timezone +from pathlib import Path + +OUT_ROOT = Path("/mnt/ssd/cosma-qc-out/03b_trim_runs") + + +def run_ff(cmd: list[str]) -> None: + r = subprocess.run(cmd, capture_output=True, text=True) + if r.returncode != 0: + sys.stderr.write(" ".join(cmd) + "\n") + sys.stderr.write(r.stderr[-3000:] + "\n") + raise RuntimeError(f"ffmpeg failed rc={r.returncode}") + + +def lrv_for_chapter(mp4_path: Path) -> Path | None: + """Return matching .LRV path if it exists (GoPro low-res proxy).""" + name = mp4_path.name + if not name.startswith("GX") or not name.upper().endswith(".MP4"): + return None + lrv_name = "GL" + name[2:-4] + ".LRV" + p = mp4_path.parent / lrv_name + return p if p.exists() else None + + +def overlap_clips(run_start: float, run_end: float, chapters: list[dict]) -> list[tuple[dict, float, float]]: + """Return [(chapter, start_off_s, duration_s)] for chapters overlapping the run.""" + out = [] + for ch in sorted(chapters, key=lambda c: c["start_epoch"]): + a = max(run_start, ch["start_epoch"]) + b = min(run_end, ch["end_epoch"]) + if b - a <= 1.0: + continue + start_off = max(0.0, run_start - ch["start_epoch"]) + dur = b - a + out.append((ch, start_off, dur)) + return out + + +def cut_clip(src: Path, start_off: float, duration: float, dst: Path) -> None: + """Cut [start_off, start_off+duration] from src using -c copy (keyframe-aligned).""" + cmd = [ + "ffmpeg", "-y", "-loglevel", "error", + "-ss", f"{start_off:.3f}", + "-i", str(src), + "-t", f"{duration:.3f}", + "-c", "copy", + "-avoid_negative_ts", "make_zero", + "-an", + str(dst), + ] + run_ff(cmd) + + +def concat_demux(parts: list[Path], dst: Path) -> None: + """Concat parts with ffmpeg concat demuxer (-c copy).""" + if not parts: + return + if len(parts) == 1: + shutil.copy2(parts[0], dst) + return + with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as f: + for p in parts: + f.write(f"file '{p.resolve()}'\n") + listfile = f.name + try: + cmd = [ + "ffmpeg", "-y", "-loglevel", "error", + "-f", "concat", "-safe", "0", + "-i", listfile, + "-c", "copy", + "-movflags", "+faststart", + "-an", + str(dst), + ] + run_ff(cmd) + finally: + os.unlink(listfile) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--mission", required=True) + ap.add_argument("--data-root", default="/home/cosma/cosma-qc/data") + ap.add_argument("--skip-existing", action="store_true") + ap.add_argument("--prefer-source", choices=["lrv", "mp4"], default="lrv", + help="lrv = use .LRV proxy (default, fast); mp4 = use 4K originals") + args = ap.parse_args() + + mission = args.mission + data_dir = Path(args.data_root) / mission + + runs = json.loads((data_dir / "02_runs.json").read_text())["runs"] + vidx = json.loads((data_dir / "03_video_index.json").read_text()) + videos = vidx["videos"] + + by_auv_gp: dict[tuple[str, str], list[dict]] = defaultdict(list) + for v in videos: + by_auv_gp[(v["auv"], v["gp"])].append(v) + all_gps = sorted({v["gp"] for v in videos}) + + out_dir = OUT_ROOT / mission + out_dir.mkdir(parents=True, exist_ok=True) + tmp_dir = out_dir / "_tmp" + tmp_dir.mkdir(exist_ok=True) + + link = data_dir / "03b_trim_runs" + if link.is_symlink(): + link.unlink() + elif link.exists(): + shutil.rmtree(link) + link.symlink_to(out_dir) + + manifest = { + "mission": mission, + "generated_at": datetime.now(timezone.utc).isoformat(), + "output_root": str(out_dir), + "source": args.prefer_source, + "runs": [], + "ours": {}, + } + + runs_by_chrono = sorted(runs, key=lambda r: r["start_epoch"]) + ours_parts: dict[str, list[tuple[float, Path, str]]] = defaultdict(list) + + for run in runs_by_chrono: + run_id = run["run_id"] + auv = run["auv"] + r_start = run["start_epoch"] + r_end = run["end_epoch"] + run_entry = {"run_id": run_id, "auv": auv, "duration_s": run["duration_s"], "outputs": []} + + for gp in all_gps: + chapters = by_auv_gp.get((auv, gp), []) + if not chapters: + continue + clips = overlap_clips(r_start, r_end, chapters) + if not clips: + continue + + out_name = f"{run_id}_{auv}_{gp}.mp4" + out_path = out_dir / out_name + if args.skip_existing and out_path.exists() and out_path.stat().st_size > 0: + print(f"[skip] {out_name}", flush=True) + else: + # Pick source per chapter + resolved: list[tuple[Path, float, float, str]] = [] + src_tags: list[str] = [] + for ch, soff, dur in clips: + mp4 = Path(ch["filepath"]) + src = mp4 + tag = "mp4" + if args.prefer_source == "lrv": + lrv = lrv_for_chapter(mp4) + if lrv: + src = lrv + tag = "lrv" + resolved.append((src, soff, dur, tag)) + src_tags.append(tag) + + print( + f"[cut ] {out_name} chapters={len(resolved)} src={','.join(src_tags)}", + flush=True, + ) + tmp_parts: list[Path] = [] + for i, (src, soff, dur, _) in enumerate(resolved): + tp = tmp_dir / f"{run_id}_{auv}_{gp}_p{i:02d}.mp4" + cut_clip(src, soff, dur, tp) + tmp_parts.append(tp) + concat_demux(tmp_parts, out_path) + for p in tmp_parts: + p.unlink(missing_ok=True) + + sz_mb = round(out_path.stat().st_size / 1024 / 1024, 1) + run_entry["outputs"].append({"gp": gp, "file": out_name, "size_mb": sz_mb}) + ours_parts[gp].append((r_start, out_path, f"{run_id} {auv}")) + + manifest["runs"].append(run_entry) + + for gp, parts in ours_parts.items(): + parts.sort() + ordered_paths = [p for _, p, _ in parts] + ours_path = out_dir / f"ours_{gp}.mp4" + print(f"[ours] {ours_path.name} <- {len(ordered_paths)} clip(s)", flush=True) + concat_demux(ordered_paths, ours_path) + sz_mb = round(ours_path.stat().st_size / 1024 / 1024, 1) + manifest["ours"][gp] = { + "file": ours_path.name, + "size_mb": sz_mb, + "segments": [lbl for _, _, lbl in parts], + } + + (data_dir / "03b_trim_runs.json").write_text(json.dumps(manifest, indent=2)) + print(f"\n[done] manifest: {data_dir / '03b_trim_runs.json'}", flush=True) + print(f"[done] outputs: {out_dir}", flush=True) + + try: + tmp_dir.rmdir() + except OSError: + pass + + +if __name__ == "__main__": + main()