Files
cosma-qc/scripts/dispatcher.py
Flag 3eb568f14e dispatcher — rm worker_src apres extract + fstrim pour eviter thin pool full
Le cache src_*.MP4 sur les workers s empile: 12 fichiers pour 82 GB au pire.
Le thin pool LVM sur le host Proxmox est trop petit (810 GB pour 1144 GB
thick-provisionned) et se remplit a 100% en quelques heures de pipeline
-> I/O errors -> VMs auto-paused -> tout casse.

Fix: delete src_*.MP4 immediatement apres count_frames (les frames sont
deja extraites), puis fstrim en fin de job pour que le thin pool reclaim
les blocks immediatement via DISCARD/UNMAP.
2026-04-22 15:39:56 +00:00

546 lines
21 KiB
Python

#!/usr/bin/env python3
"""Dispatcher daemon: picks queued jobs/stitches and runs them on available workers.
Env:
COSMA_QC_DB : SQLite path (default /var/lib/cosma-qc/jobs.db)
COSMA_QC_WORKERS : JSON list of workers [{host, ssh_alias, gpu, vram_mib,
frames_dir, lingbot_path}]
COSMA_QC_FPS : extraction fps (default 3)
COSMA_QC_IMG_H : image height (default 294)
COSMA_QC_IMG_W : image width (default 518)
Jobs lifecycle:
queued → extracting → running → done → [triggers per_auv stitch]
↘ error
Stitch lifecycle:
queued → running → done → [triggers cross_auv stitch if all per_auv done]
↘ error
"""
from __future__ import annotations
import json
import os
import re
import shlex
import sqlite3
import subprocess
import sys
import threading
import time
from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
FPS = int(os.environ.get("COSMA_QC_FPS", "3"))
IMG_H = int(os.environ.get("COSMA_QC_IMG_H", "294"))
IMG_W = int(os.environ.get("COSMA_QC_IMG_W", "518"))
POLL_S = int(os.environ.get("COSMA_QC_POLL_S", "4"))
STITCH_SCRIPT = Path(__file__).parent / "stitch.py"
DEFAULT_WORKERS = [
{
"host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB",
"vram_mib": 11913,
"frames_dir": "/home/floppyrj45/cosma-qc-frames",
"lingbot_path": "/home/floppyrj45/ai-video/lingbot-map",
"viser_port_base": 8100,
},
{
"host": "192.168.0.84", "ssh_alias": "cosma-vm", "gpu": "RTX 3090 24GB",
"vram_mib": 24576,
"frames_dir": "/home/floppyrj45/cosma-qc-frames",
"lingbot_path": "/home/floppyrj45/ai-video/lingbot-map",
"viser_port_base": 8100,
},
]
WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps(DEFAULT_WORKERS)))
_worker_lock = threading.Lock()
_reserved_vram = {w["host"]: 0 for w in WORKERS}
def db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH, isolation_level=None)
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
return conn
def ssh(alias: str, cmd: str, timeout: int = 30) -> tuple[int, str, str]:
p = subprocess.run(
["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", alias, cmd],
capture_output=True, text=True, timeout=timeout,
)
return p.returncode, p.stdout, p.stderr
def worker_free_vram_mib(worker: dict) -> int:
rc, out, _ = ssh(worker["ssh_alias"], "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits")
try:
return int(out.strip().splitlines()[0]) if rc == 0 else 0
except Exception:
return 0
def pick_worker(estimated_vram_mib: int) -> dict | None:
"""Pick worker with most effective free VRAM (actual free minus local reservations)
and reserve the estimated VRAM. Returns None if none fit."""
with _worker_lock:
best = None
for w in WORKERS:
free = worker_free_vram_mib(w) - _reserved_vram.get(w["host"], 0)
if free >= estimated_vram_mib and (best is None or free > best[0]):
best = (free, w)
if best:
_reserved_vram[best[1]["host"]] = _reserved_vram.get(best[1]["host"], 0) + estimated_vram_mib
return best[1]
return None
def release_worker(worker: dict, estimated_vram_mib: int):
with _worker_lock:
h = worker["host"]
_reserved_vram[h] = max(0, _reserved_vram.get(h, 0) - estimated_vram_mib)
def estimate_vram_mib(frame_count: int) -> int:
# windowed mode + offload_to_cpu caps VRAM usage regardless of total frames.
# Observed: ~3.5 GB model + ~1.5 GB working set for window_size=16. Safe budget: 6 GB.
return 6000
def set_status(job_id: int, **fields):
keys = list(fields.keys())
vals = [fields[k] for k in keys]
q = "UPDATE jobs SET " + ", ".join(f"{k}=?" for k in keys) + " WHERE id=?"
with closing(db()) as conn:
conn.execute(q, (*vals, job_id))
def set_stitch_status(stitch_id: int, **fields):
keys = list(fields.keys())
vals = [fields[k] for k in keys]
q = "UPDATE stitches SET " + ", ".join(f"{k}=?" for k in keys) + " WHERE id=?"
with closing(db()) as conn:
conn.execute(q, (*vals, stitch_id))
def count_frames(worker: dict, frames_dir: str) -> int:
rc, out, _ = ssh(worker["ssh_alias"], f"ls {shlex.quote(frames_dir)} 2>/dev/null | wc -l")
try:
return int(out.strip()) if rc == 0 else 0
except Exception:
return 0
def scp_to_worker(local_path: str, worker: dict, remote_path: str):
"""Copy a file to the worker.
`local_path` may be either:
- a path on the dispatcher host (standard scp from here)
- "host:abs_path" — pulled by the worker directly from `host`
(avoids routing bytes through the dispatcher).
"""
if ":" in local_path and not local_path.startswith("/"):
src_host, src_path = local_path.split(":", 1)
# Pull from source host directly on the worker
pull_cmd = (
f"scp -o BatchMode=yes {shlex.quote(src_host)}:{shlex.quote(src_path)} "
f"{shlex.quote(remote_path)}"
)
rc, _, err = ssh(worker["ssh_alias"], pull_cmd, timeout=7200)
if rc != 0:
raise RuntimeError(f"remote scp ({src_host}{worker['host']}) failed: {err[:200]}")
return
r = subprocess.run(
["scp", "-o", "BatchMode=yes", local_path, f"{worker['ssh_alias']}:{remote_path}"],
capture_output=True, timeout=1800,
)
if r.returncode != 0:
raise RuntimeError(f"scp failed: {r.stderr.decode()[:200]}")
def _path_basename(p: str) -> str:
if ":" in p and not p.startswith("/"):
return Path(p.split(":", 1)[1]).name
return Path(p).name
def video_duration_s(worker: dict, worker_src: str) -> float:
_, out, _ = ssh(worker["ssh_alias"],
f"ffprobe -v error -show_entries format=duration "
f"-of csv=p=0 {shlex.quote(worker_src)} 2>/dev/null || echo 0")
try:
return float(out.strip())
except Exception:
return 0.0
def do_extract(job: sqlite3.Row, worker: dict) -> str:
videos = json.loads(job["video_paths"])
frames_dir = f"{worker['frames_dir']}/job_{job['id']}"
ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)}")
idx = 0
total_frames_est = 0 # will be computed after each scp
for v in videos:
vf = f"fps={FPS},scale={IMG_W}:{IMG_H}"
pattern = f"{frames_dir}/frame_%06d.jpg"
worker_src = f"{frames_dir}/src_{_path_basename(v)}"
rc_check = ssh(worker["ssh_alias"], f"test -f {shlex.quote(worker_src)}")[0]
if rc_check != 0:
print(f" scp {_path_basename(v)}{worker['host']}...")
scp_to_worker(v, worker, worker_src)
dur = video_duration_s(worker, worker_src)
total_frames_est += max(1, int(dur * FPS))
exit_file = f"/tmp/cosma-ffmpeg-{job['id']}-{idx}.exit"
bg = (
f"rm -f {shlex.quote(exit_file)}; "
f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(worker_src)} "
f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 {shlex.quote(pattern)} "
f"</dev/null >/tmp/cosma-ffmpeg-{job['id']}.log 2>&1; "
f"echo $? > {shlex.quote(exit_file)}"
)
ssh(worker["ssh_alias"], f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &")
while True:
# Use -s (file exists AND size > 0) to avoid race: setsid bash writes the exit code
# AFTER ffmpeg finishes; a plain -f can match a zero-byte placeholder mid-write.
rc_done, _, _ = ssh(worker["ssh_alias"], f"test -s {shlex.quote(exit_file)}")
current = count_frames(worker, frames_dir)
pct = min(99, current * 100 // total_frames_est)
set_status(job["id"], frame_count=current, progress=pct)
if rc_done == 0:
break
time.sleep(5)
_, code_str, _ = ssh(worker["ssh_alias"], f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1")
rc = int(code_str.strip()) if code_str.strip().isdigit() else 1
if rc != 0:
_, err, _ = ssh(worker["ssh_alias"], f"cat /tmp/cosma-ffmpeg-{job['id']}.log 2>/dev/null | tail -5 || echo ''")
raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}")
idx = count_frames(worker, frames_dir)
# Free MP4 cache immediately: thin pool on Proxmox host is tight and src_*.MP4
# are 1-11 GB each. Frames are already extracted so worker_src is no longer needed.
ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}")
set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est))
# Trim once per job so LVM thin pool on the host actually reclaims the freed blocks.
ssh(worker["ssh_alias"], "sudo fstrim / 2>/dev/null || fstrim / 2>/dev/null", timeout=60)
return frames_dir
def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str, str, str]:
port = worker["viser_port_base"] + job["id"]
log = f"/tmp/cosma-qc-job-{job['id']}.log"
ckpt = f"{worker['lingbot_path']}/checkpoints/lingbot-map/lingbot-map-long.pt"
ply_path = f"{frames_dir}/reconstruction.ply"
# Adaptive stride to fit CPU RAM: load_fn stacks full image tensor ~3.15 MB/frame @ 512x512x3 fp32.
# .87 has 23 GB RAM, .84 has 62 GB. Keep effective frame count ~4k to stay safe.
frame_count = job["frame_count"] or 0
ram_gb = 23 if worker["host"] == "192.168.0.87" else 62
ram_budget_gb = ram_gb * 0.55
stride = 1
while frame_count * 3.15 / 1024 / stride > ram_budget_gb:
stride += 1
# demo.py starts a viser web server after saving the PLY and never exits.
# Wrap it: launch in bg, wait for "PLY saved" marker in the log, kill, exit 0.
# Match on the unique job frames_dir to identify our demo.py among all children/threads.
marker = shlex.quote(frames_dir)
cmd = (
f"cd {shlex.quote(worker['lingbot_path'])} && source .venv/bin/activate && "
f"setsid python3 demo.py --model_path {shlex.quote(ckpt)} "
f"--image_folder {shlex.quote(frames_dir)} --port {port} "
f"--stride {stride} --use_sdpa --mode windowed --window_size 16 --overlap_size 2 --offload_to_cpu "
f"--save_ply {shlex.quote(ply_path)} > {log} 2>&1 & "
f"DEMO_PID=$!; "
f"for i in $(seq 1 3600); do "
f" if ! kill -0 $DEMO_PID 2>/dev/null; then wait $DEMO_PID; exit $?; fi; "
f" if grep -q 'PLY saved:' {log} 2>/dev/null; then "
f" sleep 2; "
f" pkill -TERM -f \"demo.py.*{frames_dir}\" 2>/dev/null; sleep 1; "
f" pkill -KILL -f \"demo.py.*{frames_dir}\" 2>/dev/null; "
f" wait $DEMO_PID 2>/dev/null; exit 0; "
f" fi; "
f" sleep 3; "
f"done; "
f"pkill -KILL -f \"demo.py.*{frames_dir}\" 2>/dev/null; exit 124"
)
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3 * 3600)
# Accept rc==0 OR PLY file exists with non-zero size (kill -TERM may return non-zero)
ply_rc, ply_size, _ = ssh(worker["ssh_alias"], f"stat -c %s {shlex.quote(ply_path)} 2>/dev/null || echo 0")
try:
ply_ok = int(ply_size.strip()) > 0
except ValueError:
ply_ok = False
if not ply_ok:
tail = ssh(worker["ssh_alias"], f"tail -30 {log}")[1]
raise RuntimeError(f"demo.py failed (rc={rc}): {err[:200]}\n---\n{tail[:800]}")
viser_url = f"http://{worker['host']}:{port}"
return viser_url, log, ply_path
def _maybe_create_per_auv_stitch(job_id: int):
with closing(db()) as conn:
job = conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
if not job:
return
acq_id, auv = job["acquisition_id"], job["auv"]
total = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE acquisition_id=? AND auv=?", (acq_id, auv)
).fetchone()[0]
done = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE acquisition_id=? AND auv=? AND status='done'", (acq_id, auv)
).fetchone()[0]
if total == 0 or done < total:
return
existing = conn.execute(
"SELECT id FROM stitches WHERE acquisition_id=? AND level='per_auv' AND auv=?", (acq_id, auv)
).fetchone()
if existing:
return
job_ids = [r["id"] for r in conn.execute(
"SELECT id FROM jobs WHERE acquisition_id=? AND auv=?", (acq_id, auv)
).fetchall()]
conn.execute(
"INSERT INTO stitches (acquisition_id, level, auv, input_job_ids) VALUES (?,?,?,?)",
(acq_id, "per_auv", auv, json.dumps(job_ids))
)
print(f" → Stitch per_auv créé pour {auv} acq#{acq_id}")
def _maybe_create_cross_auv_stitch(stitch_id: int):
with closing(db()) as conn:
st = conn.execute("SELECT * FROM stitches WHERE id=?", (stitch_id,)).fetchone()
if not st:
return
acq_id = st["acquisition_id"]
n_auvs = conn.execute(
"SELECT COUNT(DISTINCT auv) FROM jobs WHERE acquisition_id=?", (acq_id,)
).fetchone()[0]
if n_auvs < 2:
return
total_per_auv = conn.execute(
"SELECT COUNT(*) FROM stitches WHERE acquisition_id=? AND level='per_auv'", (acq_id,)
).fetchone()[0]
done_per_auv = conn.execute(
"SELECT COUNT(*) FROM stitches WHERE acquisition_id=? AND level='per_auv' AND status='done'", (acq_id,)
).fetchone()[0]
if total_per_auv == 0 or done_per_auv < n_auvs:
return
existing = conn.execute(
"SELECT id FROM stitches WHERE acquisition_id=? AND level='cross_auv'", (acq_id,)
).fetchone()
if existing:
return
stitch_ids = [r["id"] for r in conn.execute(
"SELECT id FROM stitches WHERE acquisition_id=? AND level='per_auv'", (acq_id,)
).fetchall()]
conn.execute(
"INSERT INTO stitches (acquisition_id, level, input_stitch_ids, input_job_ids) VALUES (?,?,?,?)",
(acq_id, "cross_auv", json.dumps(stitch_ids), "[]")
)
print(f" → Stitch cross_auv créé pour acq#{acq_id}")
def deploy_stitch_script(worker: dict):
subprocess.run(
["scp", str(STITCH_SCRIPT), f"{worker['ssh_alias']}:/tmp/cosma-stitch.py"],
capture_output=True, timeout=30
)
def run_one_stitch(stitch: sqlite3.Row):
stitch_id = stitch["id"]
worker = pick_worker(2000)
if not worker:
worker = WORKERS[0]
with closing(db()) as conn:
if stitch["level"] == "per_auv":
job_ids = json.loads(stitch["input_job_ids"] or "[]")
if job_ids:
rows = conn.execute(
f"SELECT ply_path FROM jobs WHERE id IN ({','.join('?'*len(job_ids))})",
job_ids
).fetchall()
else:
rows = []
ply_paths = [r["ply_path"] for r in rows if r["ply_path"]]
else:
stitch_ids = json.loads(stitch["input_stitch_ids"] or "[]")
if stitch_ids:
rows = conn.execute(
f"SELECT output_ply FROM stitches WHERE id IN ({','.join('?'*len(stitch_ids))})",
stitch_ids
).fetchall()
else:
rows = []
ply_paths = [r["output_ply"] for r in rows if r["output_ply"]]
if len(ply_paths) == 0:
set_stitch_status(stitch_id, status="error",
error="Aucun PLY disponible",
finished_at=_now_iso())
return
out_ply = f"{worker['frames_dir']}/stitch_{stitch_id}.ply"
# Single PLY — no alignment needed, pass through directly.
if len(ply_paths) == 1:
rc, _, err = ssh(worker["ssh_alias"], f"cp {shlex.quote(ply_paths[0])} {shlex.quote(out_ply)}")
if rc != 0:
set_stitch_status(stitch_id, status="error", error=f"cp failed: {err[:200]}", finished_at=_now_iso())
return
set_stitch_status(stitch_id, status="done", output_ply=out_ply, finished_at=_now_iso())
print(f" → stitch #{stitch_id} passthrough (1 PLY) → {out_ply}")
_maybe_create_cross_auv_stitch(stitch_id)
return
deploy_stitch_script(worker)
cmd = (
f"source {shlex.quote(worker['lingbot_path'])}/.venv/bin/activate && "
f"python3 /tmp/cosma-stitch.py {shlex.quote(out_ply)} "
+ " ".join(shlex.quote(p) for p in ply_paths)
+ f" > /tmp/cosma-stitch-{stitch_id}.log 2>&1"
)
set_stitch_status(stitch_id, status="running", worker_host=worker["host"], started_at=_now_iso())
try:
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=4 * 3600)
except Exception as e:
set_stitch_status(stitch_id, status="error", error=str(e)[:500], finished_at=_now_iso())
return
if rc == 0:
set_stitch_status(stitch_id, status="done", output_ply=out_ply, finished_at=_now_iso())
_maybe_create_cross_auv_stitch(stitch_id)
else:
tail = ssh(worker["ssh_alias"], f"tail -20 /tmp/cosma-stitch-{stitch_id}.log")[1]
set_stitch_status(stitch_id, status="error",
error=f"{err[:200]}\n{tail[:600]}",
finished_at=_now_iso())
def run_one(job: sqlite3.Row) -> bool:
"""Returns True if a worker was picked and work started."""
job_id = job["id"]
estimated = estimate_vram_mib(job["frame_count"] or 150)
worker = pick_worker(estimated)
if not worker:
# release claim so the job can be re-tried by main loop
set_status(job_id, status="queued")
return False
set_status(job_id, status="extracting", worker_host=worker["host"], started_at=_now_iso())
try:
frames_dir = do_extract(job, worker)
frame_count = count_frames(worker, frames_dir)
set_status(job_id, frames_dir=frames_dir, frame_count=frame_count,
status="running", progress=0)
viser_url, log, ply_path = do_reconstruct(job, worker, frames_dir)
set_status(job_id, status="done", viser_url=viser_url, ply_path=ply_path,
progress=100, log_tail=log, finished_at=_now_iso())
_maybe_create_per_auv_stitch(job_id)
except Exception as e:
set_status(job_id, status="error", error=str(e)[:2000], finished_at=_now_iso())
finally:
release_worker(worker, estimated)
return True
def pop_queued() -> sqlite3.Row | None:
"""Atomic claim: grab a queued job and mark it 'claimed' to prevent double-dispatch."""
with closing(db()) as conn:
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT * FROM jobs WHERE status='queued' ORDER BY created_at LIMIT 1"
).fetchone()
if row:
conn.execute("UPDATE jobs SET status='claimed' WHERE id=?", (row["id"],))
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
return None
return row
def pop_queued_stitch() -> sqlite3.Row | None:
with closing(db()) as conn:
try:
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT * FROM stitches WHERE status='queued' ORDER BY created_at LIMIT 1"
).fetchone()
if row:
conn.execute("UPDATE stitches SET status='claimed' WHERE id=?", (row["id"],))
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
return None
return row
def write_heartbeat():
hb = DB_PATH.parent / "dispatcher.heartbeat"
try:
hb.write_text(_now_iso())
except Exception:
pass
_heartbeat_stop = threading.Event()
def _heartbeat_loop():
while not _heartbeat_stop.is_set():
write_heartbeat()
_heartbeat_stop.wait(5)
def _run_one_thread(job: sqlite3.Row):
try:
if not run_one(job):
print(f" ↳ job #{job['id']}: pas de worker dispo, remis en queued")
except Exception as e:
set_status(job["id"], status="error", error=f"run_one thread crashed: {str(e)[:1500]}",
finished_at=_now_iso())
def main():
print(f"cosma-qc dispatcher · DB={DB_PATH} · workers={[w['host'] for w in WORKERS]}")
threading.Thread(target=_heartbeat_loop, daemon=True).start()
active: set[threading.Thread] = set()
max_parallel = max(1, len(WORKERS))
while True:
# drain finished threads
active = {t for t in active if t.is_alive()}
if len(active) < max_parallel:
job = pop_queued()
if job:
print(f"→ job #{job['id']} ({job['auv']}/{job['gopro_serial']}/{job['segment_label']}) [active={len(active) + 1}]")
t = threading.Thread(target=_run_one_thread, args=(job,), daemon=False)
t.start()
active.add(t)
# brief pause so the thread can reserve its worker before we pop another
time.sleep(0.5)
continue
stitch = pop_queued_stitch()
if stitch:
label = f"{stitch['level']} {stitch['auv'] or ''} acq#{stitch['acquisition_id']}"
print(f"→ stitch #{stitch['id']} ({label})")
run_one_stitch(stitch)
continue
time.sleep(POLL_S)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(0)