From 42ba218f09131ec122d9f5672fb06ae2c42dd690 Mon Sep 17 00:00:00 2001 From: Flag Date: Thu, 23 Apr 2026 21:02:05 +0000 Subject: [PATCH] fix: extraction frames sur host distant (z620) sans SCP du MP4 entier --- scripts/dispatcher.py | 160 +++++++++++++++++++++++++++++++----------- 1 file changed, 119 insertions(+), 41 deletions(-) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 669f74c..c38363c 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -366,6 +366,72 @@ def ensure_worker_video_source(video_path: str, worker: dict, worker_src: str, i scp_to_worker(video_path, worker, worker_src) + +def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: str, + frames_dir: str, vid_num: int, n_videos: int, + idx: int, vf: str) -> tuple[int, float]: + """Extract frames on src_host (e.g. z620) with ffmpeg, then pull frames to worker. + Avoids copying the full MP4 to the worker — only JPEGs travel over the network. + """ + remote_tmp = f"/tmp/cosma-frames-job{job_id}-v{vid_num}" + pattern = f"{remote_tmp}/frame_%06d.jpg" + exit_file = f"/tmp/cosma-ffmpeg-remote-{job_id}-{vid_num}.exit" + + _, dur_str, _ = ssh(src_host, + f"ffprobe -v error -show_entries format=duration -of csv=p=0 " + f"{shlex.quote(src_path)} 2>/dev/null || echo 0", timeout=30) + try: + dur = float(dur_str.strip()) + except Exception: + dur = 0.0 + frames_est = max(1, int(dur * FPS)) + + ssh(src_host, f"mkdir -p {shlex.quote(remote_tmp)}") + bg = ( + f"rm -f {shlex.quote(exit_file)}; " + f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(src_path)} " + f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 {shlex.quote(pattern)} " + f"/tmp/cosma-ffmpeg-remote-{job_id}.log 2>&1; " + f"echo $? > {shlex.quote(exit_file)}" + ) + ssh(src_host, f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &") + print(f" ffmpeg running on {src_host} for {Path(src_path).name}...") + + while True: + rc_done = ssh(src_host, f"test -s {shlex.quote(exit_file)}", timeout=10)[0] + _, count_str, _ = ssh(src_host, + f"ls {shlex.quote(remote_tmp)}/frame_*.jpg 2>/dev/null | wc -l", timeout=10) + current_remote = int(count_str.strip()) if count_str.strip().isdigit() else 0 + current_worker = count_frames(worker, frames_dir) + total_current = current_worker + current_remote + pct = min(99, total_current * 100 // (frames_est + idx)) + set_status(job_id, progress=pct, + step=f"ffmpeg {vid_num}/{n_videos}: {total_current} frames (sur {src_host})") + if rc_done == 0: + break + time.sleep(5) + + _, code_str, _ = ssh(src_host, f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1") + rc = int(code_str.strip()) if code_str.strip().isdigit() else 1 + if rc != 0: + _, err, _ = ssh(src_host, f"cat /tmp/cosma-ffmpeg-remote-{job_id}.log 2>/dev/null | tail -5 || echo ''") + raise RuntimeError(f"ffmpeg on {src_host} failed: {err[:200]}") + + set_status(job_id, step=f"scp frames {vid_num}/{n_videos} -> {worker['host']}") + print(f" scp frames {remote_tmp}/ -> {worker['host']}:{frames_dir}/") + pull_cmd = ( + f"scp -r -o BatchMode=yes " + f"{shlex.quote(src_host)}:{shlex.quote(remote_tmp + '/')} " + f"{shlex.quote(frames_dir)}/" + ) + rc2, _, err2 = ssh(worker["ssh_alias"], pull_cmd, timeout=3600) + if rc2 != 0: + raise RuntimeError(f"scp frames {src_host}->{worker['host']} failed: {err2[:200]}") + + ssh(src_host, f"rm -rf {shlex.quote(remote_tmp)}", timeout=60) + new_idx = count_frames(worker, frames_dir) + return new_idx, dur + def do_extract(job: sqlite3.Row, worker: dict) -> str: videos = json.loads(job["video_paths"]) frames_dir = f"{worker['frames_dir']}/job_{job['id']}" @@ -379,50 +445,62 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: for vid_num, v in enumerate(videos, start=1): vf = f"fps={FPS},scale={IMG_W}:{IMG_H}" pattern = f"{frames_dir}/frame_%06d.jpg" - worker_src, is_ephemeral = resolve_worker_video_source(worker, v, frames_dir) - ensure_worker_video_source(v, worker, worker_src, is_ephemeral, job["id"]) - dur = video_duration_s(worker, worker_src) - total_duration_s += dur - total_frames_est += max(1, int(dur * FPS)) + # Detect third-host sources (e.g. z620:) — extract frames remotely, pull only JPEGs + _src_is_third_host = False + if ":" in v and not v.startswith("/"): + _src_host_v, _src_path_v = v.split(":", 1) + if _src_host_v not in (worker["ssh_alias"], worker["host"]): + _src_is_third_host = True - exit_file = f"/tmp/cosma-ffmpeg-{job['id']}-{idx}.exit" - bg = ( - f"rm -f {shlex.quote(exit_file)}; " - f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(worker_src)} " - f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 {shlex.quote(pattern)} " - f"/tmp/cosma-ffmpeg-{job['id']}.log 2>&1; " - f"echo $? > {shlex.quote(exit_file)}" - ) - ssh(worker["ssh_alias"], f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &") + if _src_is_third_host: + idx, dur = _extract_on_remote_host( + job["id"], worker, _src_host_v, _src_path_v, + frames_dir, vid_num, n_videos, idx, vf, + ) + total_duration_s += dur + total_frames_est += max(1, int(dur * FPS)) + set_status(job["id"], frame_count=idx, + progress=min(99, idx * 100 // max(1, total_frames_est))) + else: + worker_src, is_ephemeral = resolve_worker_video_source(worker, v, frames_dir) + ensure_worker_video_source(v, worker, worker_src, is_ephemeral, job["id"]) + dur = video_duration_s(worker, worker_src) + total_duration_s += dur + total_frames_est += max(1, int(dur * FPS)) - thumb_refresh_counter = 0 - while True: - # Use -s (file exists AND size > 0) to avoid race: setsid bash writes the exit code - # AFTER ffmpeg finishes; a plain -f can match a zero-byte placeholder mid-write. - rc_done, _, _ = ssh(worker["ssh_alias"], f"test -s {shlex.quote(exit_file)}") - current = count_frames(worker, frames_dir) - pct = min(99, current * 100 // total_frames_est) - set_status(job["id"], frame_count=current, progress=pct, - step=f"ffmpeg {vid_num}/{n_videos}: {current} frames") - # Refresh the preview thumbnail every few polls so the dashboard reflects what the - # camera is seeing right now, not the very first (surface) frame. - thumb_refresh_counter += 1 - if thumb_refresh_counter % 3 == 1 and current > 0: - _refresh_thumbnail(worker, frames_dir, job["id"]) - if rc_done == 0: - break - time.sleep(5) + exit_file = f"/tmp/cosma-ffmpeg-{job['id']}-{idx}.exit" + bg = ( + f"rm -f {shlex.quote(exit_file)}; " + f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(worker_src)} " + f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 {shlex.quote(pattern)} " + f"/tmp/cosma-ffmpeg-{job['id']}.log 2>&1; " + f"echo $? > {shlex.quote(exit_file)}" + ) + ssh(worker["ssh_alias"], f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &") - _, code_str, _ = ssh(worker["ssh_alias"], f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1") - rc = int(code_str.strip()) if code_str.strip().isdigit() else 1 - if rc != 0: - _, err, _ = ssh(worker["ssh_alias"], f"cat /tmp/cosma-ffmpeg-{job['id']}.log 2>/dev/null | tail -5 || echo ''") - raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}") - idx = count_frames(worker, frames_dir) - # Free MP4 cache immediately only when we had to stage a local copy. - if is_ephemeral: - ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}") - set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est)) + thumb_refresh_counter = 0 + while True: + rc_done, _, _ = ssh(worker["ssh_alias"], f"test -s {shlex.quote(exit_file)}") + current = count_frames(worker, frames_dir) + pct = min(99, current * 100 // total_frames_est) + set_status(job["id"], frame_count=current, progress=pct, + step=f"ffmpeg {vid_num}/{n_videos}: {current} frames") + thumb_refresh_counter += 1 + if thumb_refresh_counter % 3 == 1 and current > 0: + _refresh_thumbnail(worker, frames_dir, job["id"]) + if rc_done == 0: + break + time.sleep(5) + + _, code_str, _ = ssh(worker["ssh_alias"], f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1") + rc = int(code_str.strip()) if code_str.strip().isdigit() else 1 + if rc != 0: + _, err, _ = ssh(worker["ssh_alias"], f"cat /tmp/cosma-ffmpeg-{job['id']}.log 2>/dev/null | tail -5 || echo ''") + raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}") + idx = count_frames(worker, frames_dir) + if is_ephemeral: + ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}") + set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est)) # Persist the measured video duration so the dashboard shows real length (segment_label # from ingest is only the timestamp of the first MP4 and lies when a segment spans multiple). set_status(job["id"], video_duration_s=total_duration_s, step="trimming hors-eau")