From 325e5feb5fff08af6addb820ca937467edabf41b Mon Sep 17 00:00:00 2001 From: Flag Date: Thu, 23 Apr 2026 22:03:37 +0000 Subject: [PATCH] fix: SQLite busy_timeout + z620 SSH via worker relay MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - db(): timeout=30 + PRAGMA busy_timeout=5000 pour éviter locked sous charge parallèle - _ssh_via(): helper routant SSH z620 via worker (.82→worker→z620) - _extract_on_remote_host(): toutes les commandes z620 passent via worker - resume probe: ffprobe z620 également via worker - reset jobs 11/17/18/20 en queued pour relancer --- scripts/dispatcher.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 6f7c49e..fcff56e 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -65,8 +65,9 @@ _reserved_vram = {w["host"]: 0 for w in WORKERS} def db() -> sqlite3.Connection: - conn = sqlite3.connect(DB_PATH, isolation_level=None) + conn = sqlite3.connect(DB_PATH, isolation_level=None, timeout=30) conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout = 5000") conn.row_factory = sqlite3.Row return conn @@ -367,6 +368,13 @@ def ensure_worker_video_source(video_path: str, worker: dict, worker_src: str, i + +def _ssh_via(worker: dict, host: str, cmd: str, timeout: int = 30) -> tuple[int, str, str]: + """Route SSH to host through worker as relay (z620 not directly reachable from .82).""" + relay = "ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=accept-new " + host + " " + shlex.quote(cmd) + return ssh(worker["ssh_alias"], relay, timeout=timeout) + + def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: str, frames_dir: str, vid_num: int, n_videos: int, idx: int, vf: str) -> tuple[int, float]: @@ -377,7 +385,7 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: pattern = f"{remote_tmp}/frame_%06d.jpg" exit_file = f"/tmp/cosma-ffmpeg-remote-{job_id}-{vid_num}.exit" - _, dur_str, _ = ssh(src_host, + _, dur_str, _ = _ssh_via(worker, src_host, f"ffprobe -v error -show_entries format=duration -of csv=p=0 " f"{shlex.quote(src_path)} 2>/dev/null || echo 0", timeout=30) try: @@ -386,7 +394,7 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: dur = 0.0 frames_est = max(1, int(dur * FPS)) - ssh(src_host, f"mkdir -p {shlex.quote(remote_tmp)}") + _ssh_via(worker, src_host, f"mkdir -p {shlex.quote(remote_tmp)}") bg = ( f"rm -f {shlex.quote(exit_file)}; " f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(src_path)} " @@ -394,12 +402,12 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: f"/tmp/cosma-ffmpeg-remote-{job_id}.log 2>&1; " f"echo $? > {shlex.quote(exit_file)}" ) - ssh(src_host, f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &") + _ssh_via(worker, src_host, f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &") print(f" ffmpeg running on {src_host} for {Path(src_path).name}...") while True: - rc_done = ssh(src_host, f"test -s {shlex.quote(exit_file)}", timeout=10)[0] - _, count_str, _ = ssh(src_host, + rc_done = _ssh_via(worker, src_host, f"test -s {shlex.quote(exit_file)}", timeout=10)[0] + _, count_str, _ = _ssh_via(worker, src_host, f"ls {shlex.quote(remote_tmp)}/frame_*.jpg 2>/dev/null | wc -l", timeout=10) current_remote = int(count_str.strip()) if count_str.strip().isdigit() else 0 current_worker = count_frames(worker, frames_dir) @@ -411,10 +419,10 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: break time.sleep(5) - _, code_str, _ = ssh(src_host, f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1") + _, code_str, _ = _ssh_via(worker, src_host, f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1") rc = int(code_str.strip()) if code_str.strip().isdigit() else 1 if rc != 0: - _, err, _ = ssh(src_host, f"cat /tmp/cosma-ffmpeg-remote-{job_id}.log 2>/dev/null | tail -5 || echo ''") + _, err, _ = _ssh_via(worker, src_host, f"cat /tmp/cosma-ffmpeg-remote-{job_id}.log 2>/dev/null | tail -5 || echo ''") raise RuntimeError(f"ffmpeg on {src_host} failed: {err[:200]}") set_status(job_id, step=f"scp frames {vid_num}/{n_videos} -> {worker['host']}") @@ -428,7 +436,7 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: if rc2 != 0: raise RuntimeError(f"scp frames {src_host}->{worker['host']} failed: {err2[:200]}") - ssh(src_host, f"rm -rf {shlex.quote(remote_tmp)}", timeout=60) + _ssh_via(worker, src_host, f"rm -rf {shlex.quote(remote_tmp)}", timeout=60) new_idx = count_frames(worker, frames_dir) return new_idx, dur @@ -482,7 +490,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: _vpath = v.split(':', 1)[1] if ':' in v and not v.startswith('/') else v _vhost = v.split(':', 1)[0] if ':' in v and not v.startswith('/') else worker['ssh_alias'] _dur_alias = _vhost if _vhost in (worker['ssh_alias'], worker['host']) else _vhost - _, _dur_str, _ = ssh(_dur_alias, + _, _dur_str, _ = _ssh_via(worker, _dur_alias, f"ffprobe -v error -show_entries format=duration -of csv=p=0 " f"{shlex.quote(_vpath)} 2>/dev/null || echo 0", timeout=30) _dur = float(_dur_str.strip()) if _dur_str.strip() else 0.0