diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 75eca6d..01ed6a7 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -120,6 +120,15 @@ def count_frames(worker: dict, frames_dir: str) -> int: return 0 +def scp_to_worker(local_path: str, worker: dict, remote_path: str): + r = subprocess.run( + ["scp", "-o", "BatchMode=yes", local_path, f"{worker['ssh_alias']}:{remote_path}"], + capture_output=True, timeout=1800, + ) + if r.returncode != 0: + raise RuntimeError(f"scp failed: {r.stderr.decode()[:200]}") + + def do_extract(job: sqlite3.Row, worker: dict) -> str: videos = json.loads(job["video_paths"]) frames_dir = f"{worker['frames_dir']}/job_{job['id']}" @@ -128,8 +137,14 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: for v in videos: vf = f"fps={FPS},scale={IMG_W}:{IMG_H}" pattern = f"{frames_dir}/frame_%06d.jpg" + # Copy video to worker if it doesn't exist there + worker_src = f"{frames_dir}/src_{Path(v).name}" + rc_check = ssh(worker["ssh_alias"], f"test -f {shlex.quote(worker_src)}")[0] + if rc_check != 0: + print(f" scp {Path(v).name} → {worker['host']}...") + scp_to_worker(v, worker, worker_src) cmd = ( - f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(v)} " + f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(worker_src)} " f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 " f"{shlex.quote(pattern)}" ) @@ -146,18 +161,39 @@ def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str log = f"/tmp/cosma-qc-job-{job['id']}.log" ckpt = f"{worker['lingbot_path']}/checkpoints/lingbot-map/lingbot-map-long.pt" ply_path = f"{frames_dir}/reconstruction.ply" + # demo.py starts a viser web server after saving the PLY and never exits. + # Wrap it: launch in bg, wait for "PLY saved" marker in the log, kill, exit 0. + # Match on the unique job frames_dir to identify our demo.py among all children/threads. + marker = shlex.quote(frames_dir) cmd = ( f"cd {shlex.quote(worker['lingbot_path'])} && source .venv/bin/activate && " - f"python3 demo.py --model_path {shlex.quote(ckpt)} " + f"setsid python3 demo.py --model_path {shlex.quote(ckpt)} " f"--image_folder {shlex.quote(frames_dir)} --port {port} " f"--use_sdpa --mode windowed --window_size 16 --overlap_size 2 --offload_to_cpu " - f"--save_ply {shlex.quote(ply_path)} " - f"> {log} 2>&1" + f"--save_ply {shlex.quote(ply_path)} > {log} 2>&1 & " + f"DEMO_PID=$!; " + f"for i in $(seq 1 3600); do " + f" if ! kill -0 $DEMO_PID 2>/dev/null; then wait $DEMO_PID; exit $?; fi; " + f" if grep -q 'PLY saved:' {log} 2>/dev/null; then " + f" sleep 2; " + f" pkill -TERM -f \"demo.py.*{frames_dir}\" 2>/dev/null; sleep 1; " + f" pkill -KILL -f \"demo.py.*{frames_dir}\" 2>/dev/null; " + f" wait $DEMO_PID 2>/dev/null; exit 0; " + f" fi; " + f" sleep 3; " + f"done; " + f"pkill -KILL -f \"demo.py.*{frames_dir}\" 2>/dev/null; exit 124" ) rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3 * 3600) - if rc != 0: + # Accept rc==0 OR PLY file exists with non-zero size (kill -TERM may return non-zero) + ply_rc, ply_size, _ = ssh(worker["ssh_alias"], f"stat -c %s {shlex.quote(ply_path)} 2>/dev/null || echo 0") + try: + ply_ok = int(ply_size.strip()) > 0 + except ValueError: + ply_ok = False + if not ply_ok: tail = ssh(worker["ssh_alias"], f"tail -30 {log}")[1] - raise RuntimeError(f"demo.py failed: {err[:200]}\n---\n{tail[:800]}") + raise RuntimeError(f"demo.py failed (rc={rc}): {err[:200]}\n---\n{tail[:800]}") viser_url = f"http://{worker['host']}:{port}" return viser_url, log, ply_path @@ -296,7 +332,7 @@ def run_one_stitch(stitch: sqlite3.Row): def run_one(job: sqlite3.Row) -> bool: """Returns True if a worker was picked and work started.""" job_id = job["id"] - estimated = estimate_vram_mib(job["frame_count"] or 400) + estimated = estimate_vram_mib(job["frame_count"] or 150) worker = pick_worker(estimated) if not worker: return False diff --git a/scripts/ingest.py b/scripts/ingest.py index 7f545c9..fe696e5 100644 --- a/scripts/ingest.py +++ b/scripts/ingest.py @@ -34,7 +34,12 @@ def exif_create_date(path: Path) -> datetime | None: ["exiftool", "-s3", "-CreateDate", "-api", "QuickTimeUTC=1", str(path)], stderr=subprocess.DEVNULL, text=True, timeout=10, ).strip() - return datetime.strptime(out, "%Y:%m:%d %H:%M:%S") if out else None + if not out: + return None + # Strip timezone suffix (+HH:MM or -HH:MM) if present + import re as _re + out = _re.sub(r'[+-]\d{2}:\d{2}$', '', out).strip() + return datetime.strptime(out, "%Y:%m:%d %H:%M:%S") except Exception: return None