fix: SQLite busy_timeout + z620 SSH via worker relay
- db(): timeout=30 + PRAGMA busy_timeout=5000 pour éviter locked sous charge parallèle - _ssh_via(): helper routant SSH z620 via worker (.82→worker→z620) - _extract_on_remote_host(): toutes les commandes z620 passent via worker - resume probe: ffprobe z620 également via worker - reset jobs 11/17/18/20 en queued pour relancer
This commit is contained in:
@@ -65,8 +65,9 @@ _reserved_vram = {w["host"]: 0 for w in WORKERS}
|
|||||||
|
|
||||||
|
|
||||||
def db() -> sqlite3.Connection:
|
def db() -> sqlite3.Connection:
|
||||||
conn = sqlite3.connect(DB_PATH, isolation_level=None)
|
conn = sqlite3.connect(DB_PATH, isolation_level=None, timeout=30)
|
||||||
conn.execute("PRAGMA journal_mode=WAL")
|
conn.execute("PRAGMA journal_mode=WAL")
|
||||||
|
conn.execute("PRAGMA busy_timeout = 5000")
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
@@ -367,6 +368,13 @@ def ensure_worker_video_source(video_path: str, worker: dict, worker_src: str, i
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def _ssh_via(worker: dict, host: str, cmd: str, timeout: int = 30) -> tuple[int, str, str]:
|
||||||
|
"""Route SSH to host through worker as relay (z620 not directly reachable from .82)."""
|
||||||
|
relay = "ssh -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=accept-new " + host + " " + shlex.quote(cmd)
|
||||||
|
return ssh(worker["ssh_alias"], relay, timeout=timeout)
|
||||||
|
|
||||||
|
|
||||||
def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: str,
|
def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: str,
|
||||||
frames_dir: str, vid_num: int, n_videos: int,
|
frames_dir: str, vid_num: int, n_videos: int,
|
||||||
idx: int, vf: str) -> tuple[int, float]:
|
idx: int, vf: str) -> tuple[int, float]:
|
||||||
@@ -377,7 +385,7 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path:
|
|||||||
pattern = f"{remote_tmp}/frame_%06d.jpg"
|
pattern = f"{remote_tmp}/frame_%06d.jpg"
|
||||||
exit_file = f"/tmp/cosma-ffmpeg-remote-{job_id}-{vid_num}.exit"
|
exit_file = f"/tmp/cosma-ffmpeg-remote-{job_id}-{vid_num}.exit"
|
||||||
|
|
||||||
_, dur_str, _ = ssh(src_host,
|
_, dur_str, _ = _ssh_via(worker, src_host,
|
||||||
f"ffprobe -v error -show_entries format=duration -of csv=p=0 "
|
f"ffprobe -v error -show_entries format=duration -of csv=p=0 "
|
||||||
f"{shlex.quote(src_path)} 2>/dev/null || echo 0", timeout=30)
|
f"{shlex.quote(src_path)} 2>/dev/null || echo 0", timeout=30)
|
||||||
try:
|
try:
|
||||||
@@ -386,7 +394,7 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path:
|
|||||||
dur = 0.0
|
dur = 0.0
|
||||||
frames_est = max(1, int(dur * FPS))
|
frames_est = max(1, int(dur * FPS))
|
||||||
|
|
||||||
ssh(src_host, f"mkdir -p {shlex.quote(remote_tmp)}")
|
_ssh_via(worker, src_host, f"mkdir -p {shlex.quote(remote_tmp)}")
|
||||||
bg = (
|
bg = (
|
||||||
f"rm -f {shlex.quote(exit_file)}; "
|
f"rm -f {shlex.quote(exit_file)}; "
|
||||||
f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(src_path)} "
|
f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(src_path)} "
|
||||||
@@ -394,12 +402,12 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path:
|
|||||||
f"</dev/null >/tmp/cosma-ffmpeg-remote-{job_id}.log 2>&1; "
|
f"</dev/null >/tmp/cosma-ffmpeg-remote-{job_id}.log 2>&1; "
|
||||||
f"echo $? > {shlex.quote(exit_file)}"
|
f"echo $? > {shlex.quote(exit_file)}"
|
||||||
)
|
)
|
||||||
ssh(src_host, f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &")
|
_ssh_via(worker, src_host, f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &")
|
||||||
print(f" ffmpeg running on {src_host} for {Path(src_path).name}...")
|
print(f" ffmpeg running on {src_host} for {Path(src_path).name}...")
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
rc_done = ssh(src_host, f"test -s {shlex.quote(exit_file)}", timeout=10)[0]
|
rc_done = _ssh_via(worker, src_host, f"test -s {shlex.quote(exit_file)}", timeout=10)[0]
|
||||||
_, count_str, _ = ssh(src_host,
|
_, count_str, _ = _ssh_via(worker, src_host,
|
||||||
f"ls {shlex.quote(remote_tmp)}/frame_*.jpg 2>/dev/null | wc -l", timeout=10)
|
f"ls {shlex.quote(remote_tmp)}/frame_*.jpg 2>/dev/null | wc -l", timeout=10)
|
||||||
current_remote = int(count_str.strip()) if count_str.strip().isdigit() else 0
|
current_remote = int(count_str.strip()) if count_str.strip().isdigit() else 0
|
||||||
current_worker = count_frames(worker, frames_dir)
|
current_worker = count_frames(worker, frames_dir)
|
||||||
@@ -411,10 +419,10 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path:
|
|||||||
break
|
break
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
_, code_str, _ = ssh(src_host, f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1")
|
_, code_str, _ = _ssh_via(worker, src_host, f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1")
|
||||||
rc = int(code_str.strip()) if code_str.strip().isdigit() else 1
|
rc = int(code_str.strip()) if code_str.strip().isdigit() else 1
|
||||||
if rc != 0:
|
if rc != 0:
|
||||||
_, err, _ = ssh(src_host, f"cat /tmp/cosma-ffmpeg-remote-{job_id}.log 2>/dev/null | tail -5 || echo ''")
|
_, err, _ = _ssh_via(worker, src_host, f"cat /tmp/cosma-ffmpeg-remote-{job_id}.log 2>/dev/null | tail -5 || echo ''")
|
||||||
raise RuntimeError(f"ffmpeg on {src_host} failed: {err[:200]}")
|
raise RuntimeError(f"ffmpeg on {src_host} failed: {err[:200]}")
|
||||||
|
|
||||||
set_status(job_id, step=f"scp frames {vid_num}/{n_videos} -> {worker['host']}")
|
set_status(job_id, step=f"scp frames {vid_num}/{n_videos} -> {worker['host']}")
|
||||||
@@ -428,7 +436,7 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path:
|
|||||||
if rc2 != 0:
|
if rc2 != 0:
|
||||||
raise RuntimeError(f"scp frames {src_host}->{worker['host']} failed: {err2[:200]}")
|
raise RuntimeError(f"scp frames {src_host}->{worker['host']} failed: {err2[:200]}")
|
||||||
|
|
||||||
ssh(src_host, f"rm -rf {shlex.quote(remote_tmp)}", timeout=60)
|
_ssh_via(worker, src_host, f"rm -rf {shlex.quote(remote_tmp)}", timeout=60)
|
||||||
new_idx = count_frames(worker, frames_dir)
|
new_idx = count_frames(worker, frames_dir)
|
||||||
return new_idx, dur
|
return new_idx, dur
|
||||||
|
|
||||||
@@ -482,7 +490,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
|
|||||||
_vpath = v.split(':', 1)[1] if ':' in v and not v.startswith('/') else v
|
_vpath = v.split(':', 1)[1] if ':' in v and not v.startswith('/') else v
|
||||||
_vhost = v.split(':', 1)[0] if ':' in v and not v.startswith('/') else worker['ssh_alias']
|
_vhost = v.split(':', 1)[0] if ':' in v and not v.startswith('/') else worker['ssh_alias']
|
||||||
_dur_alias = _vhost if _vhost in (worker['ssh_alias'], worker['host']) else _vhost
|
_dur_alias = _vhost if _vhost in (worker['ssh_alias'], worker['host']) else _vhost
|
||||||
_, _dur_str, _ = ssh(_dur_alias,
|
_, _dur_str, _ = _ssh_via(worker, _dur_alias,
|
||||||
f"ffprobe -v error -show_entries format=duration -of csv=p=0 "
|
f"ffprobe -v error -show_entries format=duration -of csv=p=0 "
|
||||||
f"{shlex.quote(_vpath)} 2>/dev/null || echo 0", timeout=30)
|
f"{shlex.quote(_vpath)} 2>/dev/null || echo 0", timeout=30)
|
||||||
_dur = float(_dur_str.strip()) if _dur_str.strip() else 0.0
|
_dur = float(_dur_str.strip()) if _dur_str.strip() else 0.0
|
||||||
|
|||||||
Reference in New Issue
Block a user