dispatcher — load balance 2 GPU (lower-load d abord) + fps=2 + debug pick_worker

pick_worker trie les candidats par:
  1. nombre de jobs deja assignes sur le worker (moins d abord)
  2. VRAM free (plus d abord)
  3. hostname (tiebreaker sans comparer les dicts)

Avant: le worker avec le plus de VRAM gagnait toujours (ex: .84 24GB vs
.87 12GB) donc tous les jobs empilaient sur .84 pendant que .87 idle.

fps=3 -> fps=2 via COSMA_QC_FPS dans dispatcher.env (cf user: 1 kt +
reconstruction SfM -> 2 fps pas stride 6).

Logs pick_worker ajoutes pour debug quand no candidate.
This commit is contained in:
Flag
2026-04-22 22:02:20 +00:00
parent 033abc41c5
commit e90d775dfd

View File

@@ -103,25 +103,37 @@ def worker_free_vram_mib(worker: dict) -> int:
return 0 return 0
_jobs_per_worker: dict[str, int] = {}
def pick_worker(estimated_vram_mib: int) -> dict | None: def pick_worker(estimated_vram_mib: int) -> dict | None:
"""Pick worker with most effective free VRAM (actual free minus local reservations) """Pick a worker that (a) has enough free VRAM AND (b) is currently the least busy —
and reserve the estimated VRAM. Returns None if none fit.""" otherwise the beefiest GPU always wins and the second worker sits idle."""
with _worker_lock: with _worker_lock:
best = None candidates = []
for w in WORKERS: for w in WORKERS:
free = worker_free_vram_mib(w) - _reserved_vram.get(w["host"], 0) free = worker_free_vram_mib(w) - _reserved_vram.get(w["host"], 0)
if free >= estimated_vram_mib and (best is None or free > best[0]): print(f" pick_worker: {w['host']} free={free} reserved={_reserved_vram.get(w['host'], 0)} load={_jobs_per_worker.get(w['host'], 0)}", flush=True)
best = (free, w) if free < estimated_vram_mib:
if best: continue
_reserved_vram[best[1]["host"]] = _reserved_vram.get(best[1]["host"], 0) + estimated_vram_mib load = _jobs_per_worker.get(w["host"], 0)
return best[1] # Sort key must avoid comparing dicts: use host as tiebreaker.
return None candidates.append(((load, -free, w["host"]), w))
if not candidates:
print(f" pick_worker: no candidate for {estimated_vram_mib} MiB", flush=True)
return None
candidates.sort(key=lambda c: c[0])
w = candidates[0][1]
_reserved_vram[w["host"]] = _reserved_vram.get(w["host"], 0) + estimated_vram_mib
_jobs_per_worker[w["host"]] = _jobs_per_worker.get(w["host"], 0) + 1
return w
def release_worker(worker: dict, estimated_vram_mib: int): def release_worker(worker: dict, estimated_vram_mib: int):
with _worker_lock: with _worker_lock:
h = worker["host"] h = worker["host"]
_reserved_vram[h] = max(0, _reserved_vram.get(h, 0) - estimated_vram_mib) _reserved_vram[h] = max(0, _reserved_vram.get(h, 0) - estimated_vram_mib)
_jobs_per_worker[h] = max(0, _jobs_per_worker.get(h, 0) - 1)
def estimate_vram_mib(frame_count: int) -> int: def estimate_vram_mib(frame_count: int) -> int: