Files
cosma-qc/scripts/dispatcher.py
Poulpe 69eb547463 dispatcher+ingest — fixes pour test end-to-end réel
- dispatcher: scp du MP4 source vers le worker avant ffmpeg (les chemins .82 ne sont pas accessibles côté .87)
- dispatcher: wrapper shell autour de demo.py pour killer viser dès que le PLY est écrit (setsid + pkill -f frames_dir)
- dispatcher: PLY_ok fallback — accepte rc!=0 si le PLY existe et a une taille > 0
- dispatcher: fallback frame_count abaissé à 150 pour l'estimation VRAM
- ingest: strip du suffixe timezone (+00:00) des timestamps exiftool QuickTimeUTC=1

Testé bout-en-bout sur GX010001.MP4 (70 frames, 10.6M pts PLY, VRAM peak 9.4 GB, kill viser OK).
2026-04-21 12:49:09 +00:00

401 lines
15 KiB
Python

#!/usr/bin/env python3
"""Dispatcher daemon: picks queued jobs/stitches and runs them on available workers.
Env:
COSMA_QC_DB : SQLite path (default /var/lib/cosma-qc/jobs.db)
COSMA_QC_WORKERS : JSON list of workers [{host, ssh_alias, gpu, vram_mib,
frames_dir, lingbot_path}]
COSMA_QC_FPS : extraction fps (default 3)
COSMA_QC_IMG_H : image height (default 294)
COSMA_QC_IMG_W : image width (default 518)
Jobs lifecycle:
queued → extracting → running → done → [triggers per_auv stitch]
↘ error
Stitch lifecycle:
queued → running → done → [triggers cross_auv stitch if all per_auv done]
↘ error
"""
from __future__ import annotations
import json
import os
import re
import shlex
import sqlite3
import subprocess
import sys
import time
from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
FPS = int(os.environ.get("COSMA_QC_FPS", "3"))
IMG_H = int(os.environ.get("COSMA_QC_IMG_H", "294"))
IMG_W = int(os.environ.get("COSMA_QC_IMG_W", "518"))
POLL_S = int(os.environ.get("COSMA_QC_POLL_S", "4"))
STITCH_SCRIPT = Path(__file__).parent / "stitch.py"
DEFAULT_WORKERS = [
{
"host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB",
"vram_mib": 11913,
"frames_dir": "/home/floppyrj45/cosma-qc-frames",
"lingbot_path": "/home/floppyrj45/ai-video/lingbot-map",
"viser_port_base": 8100,
},
{
"host": "192.168.0.84", "ssh_alias": "cosma-vm", "gpu": "RTX 3090 24GB",
"vram_mib": 24576,
"frames_dir": "/home/floppyrj45/cosma-qc-frames",
"lingbot_path": "/home/floppyrj45/ai-video/lingbot-map",
"viser_port_base": 8100,
},
]
WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps(DEFAULT_WORKERS)))
def db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH, isolation_level=None)
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
return conn
def ssh(alias: str, cmd: str, timeout: int = 30) -> tuple[int, str, str]:
p = subprocess.run(
["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", alias, cmd],
capture_output=True, text=True, timeout=timeout,
)
return p.returncode, p.stdout, p.stderr
def worker_free_vram_mib(worker: dict) -> int:
rc, out, _ = ssh(worker["ssh_alias"], "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits")
try:
return int(out.strip().splitlines()[0]) if rc == 0 else 0
except Exception:
return 0
def pick_worker(estimated_vram_mib: int) -> dict | None:
best = None
for w in WORKERS:
free = worker_free_vram_mib(w)
if free >= estimated_vram_mib and (best is None or free > best[0]):
best = (free, w)
return best[1] if best else None
def estimate_vram_mib(frame_count: int) -> int:
return int(3500 + 13 * frame_count)
def set_status(job_id: int, **fields):
keys = list(fields.keys())
vals = [fields[k] for k in keys]
q = "UPDATE jobs SET " + ", ".join(f"{k}=?" for k in keys) + " WHERE id=?"
with closing(db()) as conn:
conn.execute(q, (*vals, job_id))
def set_stitch_status(stitch_id: int, **fields):
keys = list(fields.keys())
vals = [fields[k] for k in keys]
q = "UPDATE stitches SET " + ", ".join(f"{k}=?" for k in keys) + " WHERE id=?"
with closing(db()) as conn:
conn.execute(q, (*vals, stitch_id))
def count_frames(worker: dict, frames_dir: str) -> int:
rc, out, _ = ssh(worker["ssh_alias"], f"ls {shlex.quote(frames_dir)} 2>/dev/null | wc -l")
try:
return int(out.strip()) if rc == 0 else 0
except Exception:
return 0
def scp_to_worker(local_path: str, worker: dict, remote_path: str):
r = subprocess.run(
["scp", "-o", "BatchMode=yes", local_path, f"{worker['ssh_alias']}:{remote_path}"],
capture_output=True, timeout=1800,
)
if r.returncode != 0:
raise RuntimeError(f"scp failed: {r.stderr.decode()[:200]}")
def do_extract(job: sqlite3.Row, worker: dict) -> str:
videos = json.loads(job["video_paths"])
frames_dir = f"{worker['frames_dir']}/job_{job['id']}"
ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)}")
idx = 0
for v in videos:
vf = f"fps={FPS},scale={IMG_W}:{IMG_H}"
pattern = f"{frames_dir}/frame_%06d.jpg"
# Copy video to worker if it doesn't exist there
worker_src = f"{frames_dir}/src_{Path(v).name}"
rc_check = ssh(worker["ssh_alias"], f"test -f {shlex.quote(worker_src)}")[0]
if rc_check != 0:
print(f" scp {Path(v).name}{worker['host']}...")
scp_to_worker(v, worker, worker_src)
cmd = (
f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(worker_src)} "
f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 "
f"{shlex.quote(pattern)}"
)
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3600)
if rc != 0:
raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}")
idx = count_frames(worker, frames_dir)
set_status(job["id"], frame_count=idx)
return frames_dir
def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str, str, str]:
port = worker["viser_port_base"] + job["id"]
log = f"/tmp/cosma-qc-job-{job['id']}.log"
ckpt = f"{worker['lingbot_path']}/checkpoints/lingbot-map/lingbot-map-long.pt"
ply_path = f"{frames_dir}/reconstruction.ply"
# demo.py starts a viser web server after saving the PLY and never exits.
# Wrap it: launch in bg, wait for "PLY saved" marker in the log, kill, exit 0.
# Match on the unique job frames_dir to identify our demo.py among all children/threads.
marker = shlex.quote(frames_dir)
cmd = (
f"cd {shlex.quote(worker['lingbot_path'])} && source .venv/bin/activate && "
f"setsid python3 demo.py --model_path {shlex.quote(ckpt)} "
f"--image_folder {shlex.quote(frames_dir)} --port {port} "
f"--use_sdpa --mode windowed --window_size 16 --overlap_size 2 --offload_to_cpu "
f"--save_ply {shlex.quote(ply_path)} > {log} 2>&1 & "
f"DEMO_PID=$!; "
f"for i in $(seq 1 3600); do "
f" if ! kill -0 $DEMO_PID 2>/dev/null; then wait $DEMO_PID; exit $?; fi; "
f" if grep -q 'PLY saved:' {log} 2>/dev/null; then "
f" sleep 2; "
f" pkill -TERM -f \"demo.py.*{frames_dir}\" 2>/dev/null; sleep 1; "
f" pkill -KILL -f \"demo.py.*{frames_dir}\" 2>/dev/null; "
f" wait $DEMO_PID 2>/dev/null; exit 0; "
f" fi; "
f" sleep 3; "
f"done; "
f"pkill -KILL -f \"demo.py.*{frames_dir}\" 2>/dev/null; exit 124"
)
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3 * 3600)
# Accept rc==0 OR PLY file exists with non-zero size (kill -TERM may return non-zero)
ply_rc, ply_size, _ = ssh(worker["ssh_alias"], f"stat -c %s {shlex.quote(ply_path)} 2>/dev/null || echo 0")
try:
ply_ok = int(ply_size.strip()) > 0
except ValueError:
ply_ok = False
if not ply_ok:
tail = ssh(worker["ssh_alias"], f"tail -30 {log}")[1]
raise RuntimeError(f"demo.py failed (rc={rc}): {err[:200]}\n---\n{tail[:800]}")
viser_url = f"http://{worker['host']}:{port}"
return viser_url, log, ply_path
def _maybe_create_per_auv_stitch(job_id: int):
with closing(db()) as conn:
job = conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
if not job:
return
acq_id, auv = job["acquisition_id"], job["auv"]
total = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE acquisition_id=? AND auv=?", (acq_id, auv)
).fetchone()[0]
done = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE acquisition_id=? AND auv=? AND status='done'", (acq_id, auv)
).fetchone()[0]
if total == 0 or done < total:
return
existing = conn.execute(
"SELECT id FROM stitches WHERE acquisition_id=? AND level='per_auv' AND auv=?", (acq_id, auv)
).fetchone()
if existing:
return
job_ids = [r["id"] for r in conn.execute(
"SELECT id FROM jobs WHERE acquisition_id=? AND auv=?", (acq_id, auv)
).fetchall()]
conn.execute(
"INSERT INTO stitches (acquisition_id, level, auv, input_job_ids) VALUES (?,?,?,?)",
(acq_id, "per_auv", auv, json.dumps(job_ids))
)
print(f" → Stitch per_auv créé pour {auv} acq#{acq_id}")
def _maybe_create_cross_auv_stitch(stitch_id: int):
with closing(db()) as conn:
st = conn.execute("SELECT * FROM stitches WHERE id=?", (stitch_id,)).fetchone()
if not st:
return
acq_id = st["acquisition_id"]
n_auvs = conn.execute(
"SELECT COUNT(DISTINCT auv) FROM jobs WHERE acquisition_id=?", (acq_id,)
).fetchone()[0]
if n_auvs < 2:
return
total_per_auv = conn.execute(
"SELECT COUNT(*) FROM stitches WHERE acquisition_id=? AND level='per_auv'", (acq_id,)
).fetchone()[0]
done_per_auv = conn.execute(
"SELECT COUNT(*) FROM stitches WHERE acquisition_id=? AND level='per_auv' AND status='done'", (acq_id,)
).fetchone()[0]
if total_per_auv == 0 or done_per_auv < n_auvs:
return
existing = conn.execute(
"SELECT id FROM stitches WHERE acquisition_id=? AND level='cross_auv'", (acq_id,)
).fetchone()
if existing:
return
stitch_ids = [r["id"] for r in conn.execute(
"SELECT id FROM stitches WHERE acquisition_id=? AND level='per_auv'", (acq_id,)
).fetchall()]
conn.execute(
"INSERT INTO stitches (acquisition_id, level, input_stitch_ids, input_job_ids) VALUES (?,?,?,?)",
(acq_id, "cross_auv", json.dumps(stitch_ids), "[]")
)
print(f" → Stitch cross_auv créé pour acq#{acq_id}")
def deploy_stitch_script(worker: dict):
subprocess.run(
["scp", str(STITCH_SCRIPT), f"{worker['ssh_alias']}:/tmp/cosma-stitch.py"],
capture_output=True, timeout=30
)
def run_one_stitch(stitch: sqlite3.Row):
stitch_id = stitch["id"]
worker = pick_worker(2000)
if not worker:
worker = WORKERS[0]
with closing(db()) as conn:
if stitch["level"] == "per_auv":
job_ids = json.loads(stitch["input_job_ids"] or "[]")
if job_ids:
rows = conn.execute(
f"SELECT ply_path FROM jobs WHERE id IN ({','.join('?'*len(job_ids))})",
job_ids
).fetchall()
else:
rows = []
ply_paths = [r["ply_path"] for r in rows if r["ply_path"]]
else:
stitch_ids = json.loads(stitch["input_stitch_ids"] or "[]")
if stitch_ids:
rows = conn.execute(
f"SELECT output_ply FROM stitches WHERE id IN ({','.join('?'*len(stitch_ids))})",
stitch_ids
).fetchall()
else:
rows = []
ply_paths = [r["output_ply"] for r in rows if r["output_ply"]]
if len(ply_paths) < 2:
set_stitch_status(stitch_id, status="error",
error=f"Pas assez de PLY disponibles ({len(ply_paths)})",
finished_at=_now_iso())
return
out_ply = f"{worker['frames_dir']}/stitch_{stitch_id}.ply"
deploy_stitch_script(worker)
cmd = (
f"source {shlex.quote(worker['lingbot_path'])}/.venv/bin/activate && "
f"python3 /tmp/cosma-stitch.py {shlex.quote(out_ply)} "
+ " ".join(shlex.quote(p) for p in ply_paths)
+ f" > /tmp/cosma-stitch-{stitch_id}.log 2>&1"
)
set_stitch_status(stitch_id, status="running", worker_host=worker["host"], started_at=_now_iso())
try:
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=4 * 3600)
except Exception as e:
set_stitch_status(stitch_id, status="error", error=str(e)[:500], finished_at=_now_iso())
return
if rc == 0:
set_stitch_status(stitch_id, status="done", output_ply=out_ply, finished_at=_now_iso())
_maybe_create_cross_auv_stitch(stitch_id)
else:
tail = ssh(worker["ssh_alias"], f"tail -20 /tmp/cosma-stitch-{stitch_id}.log")[1]
set_stitch_status(stitch_id, status="error",
error=f"{err[:200]}\n{tail[:600]}",
finished_at=_now_iso())
def run_one(job: sqlite3.Row) -> bool:
"""Returns True if a worker was picked and work started."""
job_id = job["id"]
estimated = estimate_vram_mib(job["frame_count"] or 150)
worker = pick_worker(estimated)
if not worker:
return False
set_status(job_id, status="extracting", worker_host=worker["host"], started_at=_now_iso())
try:
frames_dir = do_extract(job, worker)
frame_count = count_frames(worker, frames_dir)
set_status(job_id, frames_dir=frames_dir, frame_count=frame_count,
status="running", progress=0)
viser_url, log, ply_path = do_reconstruct(job, worker, frames_dir)
set_status(job_id, status="done", viser_url=viser_url, ply_path=ply_path,
progress=100, log_tail=log, finished_at=_now_iso())
_maybe_create_per_auv_stitch(job_id)
except Exception as e:
set_status(job_id, status="error", error=str(e)[:2000], finished_at=_now_iso())
return True
def pop_queued() -> sqlite3.Row | None:
with closing(db()) as conn:
return conn.execute(
"SELECT * FROM jobs WHERE status='queued' ORDER BY created_at LIMIT 1"
).fetchone()
def pop_queued_stitch() -> sqlite3.Row | None:
with closing(db()) as conn:
return conn.execute(
"SELECT * FROM stitches WHERE status='queued' ORDER BY created_at LIMIT 1"
).fetchone()
def write_heartbeat():
hb = DB_PATH.parent / "dispatcher.heartbeat"
try:
hb.write_text(_now_iso())
except Exception:
pass
def main():
print(f"cosma-qc dispatcher · DB={DB_PATH} · workers={[w['host'] for w in WORKERS]}")
while True:
write_heartbeat()
job = pop_queued()
if job:
print(f"→ job #{job['id']} ({job['auv']}/{job['gopro_serial']}/{job['segment_label']})")
if not run_one(job):
print(" ↳ pas de worker dispo, retry dans 30s")
time.sleep(30)
continue
stitch = pop_queued_stitch()
if stitch:
label = f"{stitch['level']} {stitch['auv'] or ''} acq#{stitch['acquisition_id']}"
print(f"→ stitch #{stitch['id']} ({label})")
run_one_stitch(stitch)
continue
time.sleep(POLL_S)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(0)