diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index c2bb537..669f74c 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -105,6 +105,21 @@ def worker_free_vram_mib(worker: dict) -> int: return 0 +def _cleanup_stale_demo(worker: dict) -> bool: + """Kill the oldest demo.py on a worker with no active reconstruction to reclaim VRAM. + Returns True if a process was killed.""" + rc, out, _ = ssh(worker["ssh_alias"], "pgrep -o -f 'python3.*demo\\.py' 2>/dev/null", timeout=10) + if rc != 0 or not out.strip(): + return False + pid = out.strip().splitlines()[0].strip() + if not pid.isdigit(): + return False + ssh(worker["ssh_alias"], f"kill {pid} 2>/dev/null", timeout=10) + print(f" cleanup: killed stale demo.py pid={pid} on {worker['host']}", flush=True) + time.sleep(3) + return True + + _jobs_per_worker: dict[str, int] = {} @@ -123,6 +138,10 @@ def pick_worker(estimated_vram_mib: int) -> dict | None: candidates.append(((load, -free, w["host"]), w)) if not candidates: print(f" pick_worker: no candidate for {estimated_vram_mib} MiB", flush=True) + # Free VRAM on idle workers by killing leftover demo.py (kept alive for viser). + for w in WORKERS: + if _jobs_per_worker.get(w["host"], 0) == 0: + _cleanup_stale_demo(w) return None candidates.sort(key=lambda c: c[0]) w = candidates[0][1] @@ -323,6 +342,30 @@ def video_duration_s(worker: dict, worker_src: str) -> float: return 0.0 +def resolve_worker_video_source(worker: dict, video_path: str, frames_dir: str) -> tuple[str, bool]: + """Return a path readable by the worker and whether it is ephemeral cache. + + Preferred path: direct read from the shared/external storage if the worker can see it. + Fallback: stage a src_*.MP4 copy inside the job frames_dir on the worker. + """ + if ":" in video_path and not video_path.startswith("/"): + src_host, src_path = video_path.split(":", 1) + if src_host == worker["ssh_alias"] or src_host == worker["host"]: + return src_path, False + return f"{frames_dir}/src_{_path_basename(video_path)}", True + + +def ensure_worker_video_source(video_path: str, worker: dict, worker_src: str, is_ephemeral: bool, step_label: str): + if not is_ephemeral: + return + rc_check = ssh(worker["ssh_alias"], f"test -f {shlex.quote(worker_src)}")[0] + if rc_check == 0: + return + print(f" scp {_path_basename(video_path)} → {worker['host']}...") + set_status(step_label, step=f"scp: {_path_basename(video_path)}") + scp_to_worker(video_path, worker, worker_src) + + def do_extract(job: sqlite3.Row, worker: dict) -> str: videos = json.loads(job["video_paths"]) frames_dir = f"{worker['frames_dir']}/job_{job['id']}" @@ -333,15 +376,11 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: total_frames_est = 0 # will be computed after each scp total_duration_s = 0.0 n_videos = len(videos) - for v in videos: + for vid_num, v in enumerate(videos, start=1): vf = f"fps={FPS},scale={IMG_W}:{IMG_H}" pattern = f"{frames_dir}/frame_%06d.jpg" - worker_src = f"{frames_dir}/src_{_path_basename(v)}" - rc_check = ssh(worker["ssh_alias"], f"test -f {shlex.quote(worker_src)}")[0] - if rc_check != 0: - print(f" scp {_path_basename(v)} → {worker['host']}...") - set_status(job["id"], step=f"scp {idx // 1 + 1}/{n_videos}: {_path_basename(v)}") - scp_to_worker(v, worker, worker_src) + worker_src, is_ephemeral = resolve_worker_video_source(worker, v, frames_dir) + ensure_worker_video_source(v, worker, worker_src, is_ephemeral, job["id"]) dur = video_duration_s(worker, worker_src) total_duration_s += dur total_frames_est += max(1, int(dur * FPS)) @@ -356,9 +395,6 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: ) ssh(worker["ssh_alias"], f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &") - # 1-based index for humans. We cannot compute it from `idx` directly because idx is - # the running frame counter, so count the loop iterations via total_duration_s order. - vid_num = videos.index(v) + 1 thumb_refresh_counter = 0 while True: # Use -s (file exists AND size > 0) to avoid race: setsid bash writes the exit code @@ -383,9 +419,9 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: _, err, _ = ssh(worker["ssh_alias"], f"cat /tmp/cosma-ffmpeg-{job['id']}.log 2>/dev/null | tail -5 || echo ''") raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}") idx = count_frames(worker, frames_dir) - # Free MP4 cache immediately: thin pool on Proxmox host is tight and src_*.MP4 - # are 1-11 GB each. Frames are already extracted so worker_src is no longer needed. - ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}") + # Free MP4 cache immediately only when we had to stage a local copy. + if is_ephemeral: + ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}") set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est)) # Persist the measured video duration so the dashboard shows real length (segment_label # from ingest is only the timestamp of the first MP4 and lies when a segment spans multiple). @@ -420,26 +456,27 @@ def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str log = f"/tmp/cosma-qc-job-{job['id']}.log" ckpt = f"{worker['lingbot_path']}/checkpoints/lingbot-map/lingbot-map-long.pt" ply_path = f"{frames_dir}/reconstruction.ply" - # Adaptive stride to fit CPU RAM: load_fn stacks full image tensor ~3.15 MB/frame @ 512x512x3 fp32. - # .87 has 23 GB RAM, .84 has 62 GB. Keep effective frame count ~4k to stay safe. + # More conservative RAM policy: recent runs die with rc=137 during image loading. + # Push stride earlier and keep windows smaller to trade speed for survival. frame_count = job["frame_count"] or 0 ram_gb = 23 if worker["host"] == "192.168.0.87" else 62 - ram_budget_gb = ram_gb * 0.35 # leave headroom for model + OS + cuda pinned buffers + ram_budget_gb = ram_gb * 0.22 stride = 1 while frame_count * 3.15 / 1024 / stride > ram_budget_gb: stride += 1 - # demo.py starts a viser web server after saving the PLY and never exits. - # Wrap it: launch in bg, wait for "PLY saved" marker in the log, kill, exit 0. - # Match on the unique job frames_dir to identify our demo.py among all children/threads. - # Adapt window size to sequence length (lingbot-map README recommendation): bigger windows - # reduce overhead on long sequences. Effective frame count = frame_count / stride. + if frame_count > 4000: + stride = max(stride, 4) + elif frame_count > 2500: + stride = max(stride, 3) + elif frame_count > 1500: + stride = max(stride, 2) eff = frame_count // max(1, stride) if frame_count else 0 - if eff > 3000: - window_size, overlap_size = 64, 16 - elif eff > 320: - window_size, overlap_size = 32, 8 + if eff > 2200: + window_size, overlap_size = 24, 6 + elif eff > 900: + window_size, overlap_size = 16, 4 else: - window_size, overlap_size = 16, 2 + window_size, overlap_size = 12, 3 marker = shlex.quote(frames_dir) cmd = ( f"cd {shlex.quote(worker['lingbot_path'])} && source .venv/bin/activate && "