#!/usr/bin/env python3 """Dispatcher daemon: picks queued jobs and runs them on available workers. One-shot worker loop. Run as a systemd service (or manually). Handles both extraction (ffmpeg on the worker) and reconstruction (lingbot-map on the worker). Progress is written back to the DB. Env: COSMA_QC_DB : SQLite path (default /var/lib/cosma-qc/jobs.db) COSMA_QC_WORKERS : JSON list of workers [{host, ssh_alias, gpu, vram_mib, frames_dir, lingbot_path}] COSMA_QC_FPS : extraction fps (default 3) COSMA_QC_IMG_H : image height (default 294) COSMA_QC_IMG_W : image width (default 518) Jobs lifecycle: queued → extracting → running → done ↘ error """ from __future__ import annotations import json import os import re import shlex import sqlite3 import subprocess import sys import time from contextlib import closing from datetime import datetime, timezone from pathlib import Path def _now_iso() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db")) FPS = int(os.environ.get("COSMA_QC_FPS", "3")) IMG_H = int(os.environ.get("COSMA_QC_IMG_H", "294")) IMG_W = int(os.environ.get("COSMA_QC_IMG_W", "518")) POLL_S = int(os.environ.get("COSMA_QC_POLL_S", "4")) DEFAULT_WORKERS = [ { "host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB", "vram_mib": 11913, "frames_dir": "/home/floppyrj45/cosma-qc-frames", "lingbot_path": "/home/floppyrj45/ai-video/lingbot-map", "viser_port_base": 8100, }, { "host": "192.168.0.84", "ssh_alias": "cosma-vm", "gpu": "RTX 3090 24GB", "vram_mib": 24576, "frames_dir": "/home/floppyrj45/cosma-qc-frames", "lingbot_path": "/home/floppyrj45/ai-video/lingbot-map", "viser_port_base": 8100, }, ] WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps(DEFAULT_WORKERS))) def db() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH, isolation_level=None) conn.execute("PRAGMA journal_mode=WAL") conn.row_factory = sqlite3.Row return conn def ssh(alias: str, cmd: str, timeout: int = 30) -> tuple[int, str, str]: p = subprocess.run( ["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", alias, cmd], capture_output=True, text=True, timeout=timeout, ) return p.returncode, p.stdout, p.stderr def worker_free_vram_mib(worker: dict) -> int: rc, out, _ = ssh(worker["ssh_alias"], "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits") try: return int(out.strip().splitlines()[0]) if rc == 0 else 0 except Exception: return 0 def pick_worker(estimated_vram_mib: int) -> dict | None: best = None for w in WORKERS: free = worker_free_vram_mib(w) if free >= estimated_vram_mib and (best is None or free > best[0]): best = (free, w) return best[1] if best else None def estimate_vram_mib(frame_count: int) -> int: # Based on empirical: 300 frames peak ≈ 9.4 GiB, 600 frames OOM @ ~11 GiB. # Linear extrapolation with headroom. return int(3500 + 13 * frame_count) # MiB def set_status(job_id: int, **fields): keys = list(fields.keys()) vals = [fields[k] for k in keys] q = "UPDATE jobs SET " + ", ".join(f"{k}=?" for k in keys) + " WHERE id=?" with closing(db()) as conn: conn.execute(q, (*vals, job_id)) def count_frames(worker: dict, frames_dir: str) -> int: rc, out, _ = ssh(worker["ssh_alias"], f"ls {shlex.quote(frames_dir)} 2>/dev/null | wc -l") try: return int(out.strip()) if rc == 0 else 0 except Exception: return 0 def do_extract(job: sqlite3.Row, worker: dict) -> str: """Run ffmpeg on the worker for each video in job.video_paths.""" videos = json.loads(job["video_paths"]) frames_dir = f"{worker['frames_dir']}/job_{job['id']}" ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)}") idx = 0 for v in videos: vf = f"fps={FPS},scale={IMG_W}:{IMG_H}" pattern = f"{frames_dir}/frame_%06d.jpg" # Prepend to idx to keep frame ordering across videos. cmd = ( f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(v)} " f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 " f"{shlex.quote(pattern)}" ) rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3600) if rc != 0: raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}") # Count frames now present to bump idx idx = count_frames(worker, frames_dir) set_status(job["id"], frame_count=idx) return frames_dir def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str, str]: port = worker["viser_port_base"] + job["id"] log = f"/tmp/cosma-qc-job-{job['id']}.log" ckpt = f"{worker['lingbot_path']}/checkpoints/lingbot-map/lingbot-map-long.pt" cmd = ( f"cd {shlex.quote(worker['lingbot_path'])} && source .venv/bin/activate && " f"python3 demo.py --model_path {shlex.quote(ckpt)} " f"--image_folder {shlex.quote(frames_dir)} --port {port} " f"--use_sdpa --mode windowed --window_size 16 --overlap_size 2 --offload_to_cpu " f"> {log} 2>&1" ) rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3 * 3600) if rc != 0: tail = ssh(worker["ssh_alias"], f"tail -30 {log}")[1] raise RuntimeError(f"demo.py failed: {err[:200]}\n---\n{tail[:800]}") viser_url = f"http://{worker['host']}:{port}" return viser_url, log def run_one(job: sqlite3.Row): job_id = job["id"] estimated = estimate_vram_mib(job["frame_count"] or 400) worker = pick_worker(estimated) if not worker: return # retry later set_status(job_id, status="extracting", worker_host=worker["host"], started_at=_now_iso()) try: frames_dir = do_extract(job, worker) frame_count = count_frames(worker, frames_dir) set_status(job_id, frames_dir=frames_dir, frame_count=frame_count, status="running", progress=0) viser_url, log = do_reconstruct(job, worker, frames_dir) set_status(job_id, status="done", viser_url=viser_url, progress=100, log_tail=log, finished_at=_now_iso()) except Exception as e: set_status(job_id, status="error", error=str(e)[:2000], finished_at=_now_iso()) def pop_queued() -> sqlite3.Row | None: with closing(db()) as conn: return conn.execute( "SELECT * FROM jobs WHERE status='queued' ORDER BY created_at LIMIT 1" ).fetchone() def main(): print(f"cosma-qc dispatcher · DB={DB_PATH} · workers={[w['host'] for w in WORKERS]}") while True: job = pop_queued() if job is None: time.sleep(POLL_S); continue print(f"→ picking up job #{job['id']} ({job['auv']}/{job['gopro_serial']}/{job['segment_label']})") run_one(job) if __name__ == "__main__": try: main() except KeyboardInterrupt: sys.exit(0)