stitch pipeline câblé : DB + dispatcher + UI + fix subpath Caddy

- Table stitches (per_auv + cross_auv) avec cancel/retry API
- Dispatcher : PLY export auto (--save_ply), trigger stitch en cascade
  quand tous les jobs d'un AUV sont done
- UI : section stitch live depuis DB avec statuts/durées/boutons
- Fix : <base href="/cosma-qc/"> + chemins relatifs pour Caddy subpath
- open3d 0.19.0 installé sur gpu (.87)
- SSH key .82→.87 configurée, alias gpu ajouté sur .82

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Poulpe
2026-04-21 10:32:05 +00:00
parent 3b005a4994
commit 26e5bfc05b
5 changed files with 281 additions and 48 deletions

View File

@@ -1,9 +1,5 @@
#!/usr/bin/env python3
"""Dispatcher daemon: picks queued jobs and runs them on available workers.
One-shot worker loop. Run as a systemd service (or manually). Handles both
extraction (ffmpeg on the worker) and reconstruction (lingbot-map on the
worker). Progress is written back to the DB.
"""Dispatcher daemon: picks queued jobs/stitches and runs them on available workers.
Env:
COSMA_QC_DB : SQLite path (default /var/lib/cosma-qc/jobs.db)
@@ -14,8 +10,11 @@ Env:
COSMA_QC_IMG_W : image width (default 518)
Jobs lifecycle:
queued → extracting → running → done
queued → extracting → running → done → [triggers per_auv stitch]
↘ error
Stitch lifecycle:
queued → running → done → [triggers cross_auv stitch if all per_auv done]
↘ error
"""
from __future__ import annotations
@@ -40,6 +39,7 @@ FPS = int(os.environ.get("COSMA_QC_FPS", "3"))
IMG_H = int(os.environ.get("COSMA_QC_IMG_H", "294"))
IMG_W = int(os.environ.get("COSMA_QC_IMG_W", "518"))
POLL_S = int(os.environ.get("COSMA_QC_POLL_S", "4"))
STITCH_SCRIPT = Path(__file__).parent / "stitch.py"
DEFAULT_WORKERS = [
{
@@ -93,9 +93,7 @@ def pick_worker(estimated_vram_mib: int) -> dict | None:
def estimate_vram_mib(frame_count: int) -> int:
# Based on empirical: 300 frames peak ≈ 9.4 GiB, 600 frames OOM @ ~11 GiB.
# Linear extrapolation with headroom.
return int(3500 + 13 * frame_count) # MiB
return int(3500 + 13 * frame_count)
def set_status(job_id: int, **fields):
@@ -106,6 +104,14 @@ def set_status(job_id: int, **fields):
conn.execute(q, (*vals, job_id))
def set_stitch_status(stitch_id: int, **fields):
keys = list(fields.keys())
vals = [fields[k] for k in keys]
q = "UPDATE stitches SET " + ", ".join(f"{k}=?" for k in keys) + " WHERE id=?"
with closing(db()) as conn:
conn.execute(q, (*vals, stitch_id))
def count_frames(worker: dict, frames_dir: str) -> int:
rc, out, _ = ssh(worker["ssh_alias"], f"ls {shlex.quote(frames_dir)} 2>/dev/null | wc -l")
try:
@@ -115,7 +121,6 @@ def count_frames(worker: dict, frames_dir: str) -> int:
def do_extract(job: sqlite3.Row, worker: dict) -> str:
"""Run ffmpeg on the worker for each video in job.video_paths."""
videos = json.loads(job["video_paths"])
frames_dir = f"{worker['frames_dir']}/job_{job['id']}"
ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)}")
@@ -123,7 +128,6 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
for v in videos:
vf = f"fps={FPS},scale={IMG_W}:{IMG_H}"
pattern = f"{frames_dir}/frame_%06d.jpg"
# Prepend to idx to keep frame ordering across videos.
cmd = (
f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(v)} "
f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 "
@@ -132,21 +136,22 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3600)
if rc != 0:
raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}")
# Count frames now present to bump idx
idx = count_frames(worker, frames_dir)
set_status(job["id"], frame_count=idx)
return frames_dir
def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str, str]:
def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str, str, str]:
port = worker["viser_port_base"] + job["id"]
log = f"/tmp/cosma-qc-job-{job['id']}.log"
ckpt = f"{worker['lingbot_path']}/checkpoints/lingbot-map/lingbot-map-long.pt"
ply_path = f"{frames_dir}/reconstruction.ply"
cmd = (
f"cd {shlex.quote(worker['lingbot_path'])} && source .venv/bin/activate && "
f"python3 demo.py --model_path {shlex.quote(ckpt)} "
f"--image_folder {shlex.quote(frames_dir)} --port {port} "
f"--use_sdpa --mode windowed --window_size 16 --overlap_size 2 --offload_to_cpu "
f"--save_ply {shlex.quote(ply_path)} "
f"> {log} 2>&1"
)
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3 * 3600)
@@ -154,7 +159,138 @@ def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str
tail = ssh(worker["ssh_alias"], f"tail -30 {log}")[1]
raise RuntimeError(f"demo.py failed: {err[:200]}\n---\n{tail[:800]}")
viser_url = f"http://{worker['host']}:{port}"
return viser_url, log
return viser_url, log, ply_path
def _maybe_create_per_auv_stitch(job_id: int):
with closing(db()) as conn:
job = conn.execute("SELECT * FROM jobs WHERE id=?", (job_id,)).fetchone()
if not job:
return
acq_id, auv = job["acquisition_id"], job["auv"]
total = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE acquisition_id=? AND auv=?", (acq_id, auv)
).fetchone()[0]
done = conn.execute(
"SELECT COUNT(*) FROM jobs WHERE acquisition_id=? AND auv=? AND status='done'", (acq_id, auv)
).fetchone()[0]
if total == 0 or done < total:
return
existing = conn.execute(
"SELECT id FROM stitches WHERE acquisition_id=? AND level='per_auv' AND auv=?", (acq_id, auv)
).fetchone()
if existing:
return
job_ids = [r["id"] for r in conn.execute(
"SELECT id FROM jobs WHERE acquisition_id=? AND auv=?", (acq_id, auv)
).fetchall()]
conn.execute(
"INSERT INTO stitches (acquisition_id, level, auv, input_job_ids) VALUES (?,?,?,?)",
(acq_id, "per_auv", auv, json.dumps(job_ids))
)
print(f" → Stitch per_auv créé pour {auv} acq#{acq_id}")
def _maybe_create_cross_auv_stitch(stitch_id: int):
with closing(db()) as conn:
st = conn.execute("SELECT * FROM stitches WHERE id=?", (stitch_id,)).fetchone()
if not st:
return
acq_id = st["acquisition_id"]
n_auvs = conn.execute(
"SELECT COUNT(DISTINCT auv) FROM jobs WHERE acquisition_id=?", (acq_id,)
).fetchone()[0]
if n_auvs < 2:
return
total_per_auv = conn.execute(
"SELECT COUNT(*) FROM stitches WHERE acquisition_id=? AND level='per_auv'", (acq_id,)
).fetchone()[0]
done_per_auv = conn.execute(
"SELECT COUNT(*) FROM stitches WHERE acquisition_id=? AND level='per_auv' AND status='done'", (acq_id,)
).fetchone()[0]
if total_per_auv == 0 or done_per_auv < n_auvs:
return
existing = conn.execute(
"SELECT id FROM stitches WHERE acquisition_id=? AND level='cross_auv'", (acq_id,)
).fetchone()
if existing:
return
stitch_ids = [r["id"] for r in conn.execute(
"SELECT id FROM stitches WHERE acquisition_id=? AND level='per_auv'", (acq_id,)
).fetchall()]
conn.execute(
"INSERT INTO stitches (acquisition_id, level, input_stitch_ids, input_job_ids) VALUES (?,?,?,?)",
(acq_id, "cross_auv", json.dumps(stitch_ids), "[]")
)
print(f" → Stitch cross_auv créé pour acq#{acq_id}")
def deploy_stitch_script(worker: dict):
subprocess.run(
["scp", str(STITCH_SCRIPT), f"{worker['ssh_alias']}:/tmp/cosma-stitch.py"],
capture_output=True, timeout=30
)
def run_one_stitch(stitch: sqlite3.Row):
stitch_id = stitch["id"]
worker = pick_worker(2000)
if not worker:
worker = WORKERS[0]
with closing(db()) as conn:
if stitch["level"] == "per_auv":
job_ids = json.loads(stitch["input_job_ids"] or "[]")
if job_ids:
rows = conn.execute(
f"SELECT ply_path FROM jobs WHERE id IN ({','.join('?'*len(job_ids))})",
job_ids
).fetchall()
else:
rows = []
ply_paths = [r["ply_path"] for r in rows if r["ply_path"]]
else:
stitch_ids = json.loads(stitch["input_stitch_ids"] or "[]")
if stitch_ids:
rows = conn.execute(
f"SELECT output_ply FROM stitches WHERE id IN ({','.join('?'*len(stitch_ids))})",
stitch_ids
).fetchall()
else:
rows = []
ply_paths = [r["output_ply"] for r in rows if r["output_ply"]]
if len(ply_paths) < 2:
set_stitch_status(stitch_id, status="error",
error=f"Pas assez de PLY disponibles ({len(ply_paths)})",
finished_at=_now_iso())
return
out_ply = f"{worker['frames_dir']}/stitch_{stitch_id}.ply"
deploy_stitch_script(worker)
cmd = (
f"source {shlex.quote(worker['lingbot_path'])}/.venv/bin/activate && "
f"python3 /tmp/cosma-stitch.py {shlex.quote(out_ply)} "
+ " ".join(shlex.quote(p) for p in ply_paths)
+ f" > /tmp/cosma-stitch-{stitch_id}.log 2>&1"
)
set_stitch_status(stitch_id, status="running", worker_host=worker["host"], started_at=_now_iso())
try:
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=4 * 3600)
except Exception as e:
set_stitch_status(stitch_id, status="error", error=str(e)[:500], finished_at=_now_iso())
return
if rc == 0:
set_stitch_status(stitch_id, status="done", output_ply=out_ply, finished_at=_now_iso())
_maybe_create_cross_auv_stitch(stitch_id)
else:
tail = ssh(worker["ssh_alias"], f"tail -20 /tmp/cosma-stitch-{stitch_id}.log")[1]
set_stitch_status(stitch_id, status="error",
error=f"{err[:200]}\n{tail[:600]}",
finished_at=_now_iso())
def run_one(job: sqlite3.Row):
@@ -162,21 +298,19 @@ def run_one(job: sqlite3.Row):
estimated = estimate_vram_mib(job["frame_count"] or 400)
worker = pick_worker(estimated)
if not worker:
return # retry later
set_status(job_id, status="extracting", worker_host=worker["host"],
started_at=_now_iso())
return
set_status(job_id, status="extracting", worker_host=worker["host"], started_at=_now_iso())
try:
frames_dir = do_extract(job, worker)
frame_count = count_frames(worker, frames_dir)
set_status(job_id, frames_dir=frames_dir, frame_count=frame_count,
status="running", progress=0)
viser_url, log = do_reconstruct(job, worker, frames_dir)
set_status(job_id, status="done", viser_url=viser_url, progress=100,
log_tail=log,
finished_at=_now_iso())
viser_url, log, ply_path = do_reconstruct(job, worker, frames_dir)
set_status(job_id, status="done", viser_url=viser_url, ply_path=ply_path,
progress=100, log_tail=log, finished_at=_now_iso())
_maybe_create_per_auv_stitch(job_id)
except Exception as e:
set_status(job_id, status="error", error=str(e)[:2000],
finished_at=_now_iso())
set_status(job_id, status="error", error=str(e)[:2000], finished_at=_now_iso())
def pop_queued() -> sqlite3.Row | None:
@@ -186,14 +320,28 @@ def pop_queued() -> sqlite3.Row | None:
).fetchone()
def pop_queued_stitch() -> sqlite3.Row | None:
with closing(db()) as conn:
return conn.execute(
"SELECT * FROM stitches WHERE status='queued' ORDER BY created_at LIMIT 1"
).fetchone()
def main():
print(f"cosma-qc dispatcher · DB={DB_PATH} · workers={[w['host'] for w in WORKERS]}")
while True:
job = pop_queued()
if job is None:
time.sleep(POLL_S); continue
print(f"→ picking up job #{job['id']} ({job['auv']}/{job['gopro_serial']}/{job['segment_label']})")
run_one(job)
if job:
print(f"→ job #{job['id']} ({job['auv']}/{job['gopro_serial']}/{job['segment_label']})")
run_one(job)
continue
stitch = pop_queued_stitch()
if stitch:
label = f"{stitch['level']} {stitch['auv'] or ''} acq#{stitch['acquisition_id']}"
print(f"→ stitch #{stitch['id']} ({label})")
run_one_stitch(stitch)
continue
time.sleep(POLL_S)
if __name__ == "__main__":