diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index f3b0b4f..6f7c49e 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -716,31 +716,40 @@ def deploy_stitch_script(worker: dict): def run_one_stitch(stitch: sqlite3.Row): stitch_id = stitch["id"] - worker = pick_worker(2000) - if not worker: - worker = WORKERS[0] with closing(db()) as conn: if stitch["level"] == "per_auv": job_ids = json.loads(stitch["input_job_ids"] or "[]") if job_ids: rows = conn.execute( - f"SELECT ply_path FROM jobs WHERE id IN ({','.join('?'*len(job_ids))})", + f"SELECT ply_path, worker_host FROM jobs WHERE id IN ({','.join('?'*len(job_ids))})", job_ids ).fetchall() else: rows = [] ply_paths = [r["ply_path"] for r in rows if r["ply_path"]] + # Force stitch to run on the same worker as the PLY files. + hosts = list({r["worker_host"] for r in rows if r["worker_host"]}) else: stitch_ids = json.loads(stitch["input_stitch_ids"] or "[]") if stitch_ids: rows = conn.execute( - f"SELECT output_ply FROM stitches WHERE id IN ({','.join('?'*len(stitch_ids))})", + f"SELECT output_ply, worker_host FROM stitches WHERE id IN ({','.join('?'*len(stitch_ids))})", stitch_ids ).fetchall() else: rows = [] ply_paths = [r["output_ply"] for r in rows if r["output_ply"]] + hosts = list({r["worker_host"] for r in rows if r["worker_host"]}) + + # Pick worker that already holds the PLY files; fall back to any free worker. + worker = None + for w in WORKERS: + if w["host"] in hosts: + worker = w + break + if not worker: + worker = pick_worker(2000) or WORKERS[0] if len(ply_paths) == 0: set_stitch_status(stitch_id, status="error",