fix: extraction frames sur host distant (z620) sans SCP du MP4 entier

This commit is contained in:
Flag
2026-04-23 21:02:05 +00:00
parent 9e9eff6cc1
commit 42ba218f09

View File

@@ -366,6 +366,72 @@ def ensure_worker_video_source(video_path: str, worker: dict, worker_src: str, i
scp_to_worker(video_path, worker, worker_src) scp_to_worker(video_path, worker, worker_src)
def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: str,
frames_dir: str, vid_num: int, n_videos: int,
idx: int, vf: str) -> tuple[int, float]:
"""Extract frames on src_host (e.g. z620) with ffmpeg, then pull frames to worker.
Avoids copying the full MP4 to the worker — only JPEGs travel over the network.
"""
remote_tmp = f"/tmp/cosma-frames-job{job_id}-v{vid_num}"
pattern = f"{remote_tmp}/frame_%06d.jpg"
exit_file = f"/tmp/cosma-ffmpeg-remote-{job_id}-{vid_num}.exit"
_, dur_str, _ = ssh(src_host,
f"ffprobe -v error -show_entries format=duration -of csv=p=0 "
f"{shlex.quote(src_path)} 2>/dev/null || echo 0", timeout=30)
try:
dur = float(dur_str.strip())
except Exception:
dur = 0.0
frames_est = max(1, int(dur * FPS))
ssh(src_host, f"mkdir -p {shlex.quote(remote_tmp)}")
bg = (
f"rm -f {shlex.quote(exit_file)}; "
f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(src_path)} "
f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 {shlex.quote(pattern)} "
f"</dev/null >/tmp/cosma-ffmpeg-remote-{job_id}.log 2>&1; "
f"echo $? > {shlex.quote(exit_file)}"
)
ssh(src_host, f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &")
print(f" ffmpeg running on {src_host} for {Path(src_path).name}...")
while True:
rc_done = ssh(src_host, f"test -s {shlex.quote(exit_file)}", timeout=10)[0]
_, count_str, _ = ssh(src_host,
f"ls {shlex.quote(remote_tmp)}/frame_*.jpg 2>/dev/null | wc -l", timeout=10)
current_remote = int(count_str.strip()) if count_str.strip().isdigit() else 0
current_worker = count_frames(worker, frames_dir)
total_current = current_worker + current_remote
pct = min(99, total_current * 100 // (frames_est + idx))
set_status(job_id, progress=pct,
step=f"ffmpeg {vid_num}/{n_videos}: {total_current} frames (sur {src_host})")
if rc_done == 0:
break
time.sleep(5)
_, code_str, _ = ssh(src_host, f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1")
rc = int(code_str.strip()) if code_str.strip().isdigit() else 1
if rc != 0:
_, err, _ = ssh(src_host, f"cat /tmp/cosma-ffmpeg-remote-{job_id}.log 2>/dev/null | tail -5 || echo ''")
raise RuntimeError(f"ffmpeg on {src_host} failed: {err[:200]}")
set_status(job_id, step=f"scp frames {vid_num}/{n_videos} -> {worker['host']}")
print(f" scp frames {remote_tmp}/ -> {worker['host']}:{frames_dir}/")
pull_cmd = (
f"scp -r -o BatchMode=yes "
f"{shlex.quote(src_host)}:{shlex.quote(remote_tmp + '/')} "
f"{shlex.quote(frames_dir)}/"
)
rc2, _, err2 = ssh(worker["ssh_alias"], pull_cmd, timeout=3600)
if rc2 != 0:
raise RuntimeError(f"scp frames {src_host}->{worker['host']} failed: {err2[:200]}")
ssh(src_host, f"rm -rf {shlex.quote(remote_tmp)}", timeout=60)
new_idx = count_frames(worker, frames_dir)
return new_idx, dur
def do_extract(job: sqlite3.Row, worker: dict) -> str: def do_extract(job: sqlite3.Row, worker: dict) -> str:
videos = json.loads(job["video_paths"]) videos = json.loads(job["video_paths"])
frames_dir = f"{worker['frames_dir']}/job_{job['id']}" frames_dir = f"{worker['frames_dir']}/job_{job['id']}"
@@ -379,50 +445,62 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
for vid_num, v in enumerate(videos, start=1): for vid_num, v in enumerate(videos, start=1):
vf = f"fps={FPS},scale={IMG_W}:{IMG_H}" vf = f"fps={FPS},scale={IMG_W}:{IMG_H}"
pattern = f"{frames_dir}/frame_%06d.jpg" pattern = f"{frames_dir}/frame_%06d.jpg"
worker_src, is_ephemeral = resolve_worker_video_source(worker, v, frames_dir) # Detect third-host sources (e.g. z620:) — extract frames remotely, pull only JPEGs
ensure_worker_video_source(v, worker, worker_src, is_ephemeral, job["id"]) _src_is_third_host = False
dur = video_duration_s(worker, worker_src) if ":" in v and not v.startswith("/"):
total_duration_s += dur _src_host_v, _src_path_v = v.split(":", 1)
total_frames_est += max(1, int(dur * FPS)) if _src_host_v not in (worker["ssh_alias"], worker["host"]):
_src_is_third_host = True
exit_file = f"/tmp/cosma-ffmpeg-{job['id']}-{idx}.exit" if _src_is_third_host:
bg = ( idx, dur = _extract_on_remote_host(
f"rm -f {shlex.quote(exit_file)}; " job["id"], worker, _src_host_v, _src_path_v,
f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(worker_src)} " frames_dir, vid_num, n_videos, idx, vf,
f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 {shlex.quote(pattern)} " )
f"</dev/null >/tmp/cosma-ffmpeg-{job['id']}.log 2>&1; " total_duration_s += dur
f"echo $? > {shlex.quote(exit_file)}" total_frames_est += max(1, int(dur * FPS))
) set_status(job["id"], frame_count=idx,
ssh(worker["ssh_alias"], f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &") progress=min(99, idx * 100 // max(1, total_frames_est)))
else:
worker_src, is_ephemeral = resolve_worker_video_source(worker, v, frames_dir)
ensure_worker_video_source(v, worker, worker_src, is_ephemeral, job["id"])
dur = video_duration_s(worker, worker_src)
total_duration_s += dur
total_frames_est += max(1, int(dur * FPS))
thumb_refresh_counter = 0 exit_file = f"/tmp/cosma-ffmpeg-{job['id']}-{idx}.exit"
while True: bg = (
# Use -s (file exists AND size > 0) to avoid race: setsid bash writes the exit code f"rm -f {shlex.quote(exit_file)}; "
# AFTER ffmpeg finishes; a plain -f can match a zero-byte placeholder mid-write. f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(worker_src)} "
rc_done, _, _ = ssh(worker["ssh_alias"], f"test -s {shlex.quote(exit_file)}") f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 {shlex.quote(pattern)} "
current = count_frames(worker, frames_dir) f"</dev/null >/tmp/cosma-ffmpeg-{job['id']}.log 2>&1; "
pct = min(99, current * 100 // total_frames_est) f"echo $? > {shlex.quote(exit_file)}"
set_status(job["id"], frame_count=current, progress=pct, )
step=f"ffmpeg {vid_num}/{n_videos}: {current} frames") ssh(worker["ssh_alias"], f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &")
# Refresh the preview thumbnail every few polls so the dashboard reflects what the
# camera is seeing right now, not the very first (surface) frame.
thumb_refresh_counter += 1
if thumb_refresh_counter % 3 == 1 and current > 0:
_refresh_thumbnail(worker, frames_dir, job["id"])
if rc_done == 0:
break
time.sleep(5)
_, code_str, _ = ssh(worker["ssh_alias"], f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1") thumb_refresh_counter = 0
rc = int(code_str.strip()) if code_str.strip().isdigit() else 1 while True:
if rc != 0: rc_done, _, _ = ssh(worker["ssh_alias"], f"test -s {shlex.quote(exit_file)}")
_, err, _ = ssh(worker["ssh_alias"], f"cat /tmp/cosma-ffmpeg-{job['id']}.log 2>/dev/null | tail -5 || echo ''") current = count_frames(worker, frames_dir)
raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}") pct = min(99, current * 100 // total_frames_est)
idx = count_frames(worker, frames_dir) set_status(job["id"], frame_count=current, progress=pct,
# Free MP4 cache immediately only when we had to stage a local copy. step=f"ffmpeg {vid_num}/{n_videos}: {current} frames")
if is_ephemeral: thumb_refresh_counter += 1
ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}") if thumb_refresh_counter % 3 == 1 and current > 0:
set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est)) _refresh_thumbnail(worker, frames_dir, job["id"])
if rc_done == 0:
break
time.sleep(5)
_, code_str, _ = ssh(worker["ssh_alias"], f"cat {shlex.quote(exit_file)} 2>/dev/null || echo 1")
rc = int(code_str.strip()) if code_str.strip().isdigit() else 1
if rc != 0:
_, err, _ = ssh(worker["ssh_alias"], f"cat /tmp/cosma-ffmpeg-{job['id']}.log 2>/dev/null | tail -5 || echo ''")
raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}")
idx = count_frames(worker, frames_dir)
if is_ephemeral:
ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}")
set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est))
# Persist the measured video duration so the dashboard shows real length (segment_label # Persist the measured video duration so the dashboard shows real length (segment_label
# from ingest is only the timestamp of the first MP4 and lies when a segment spans multiple). # from ingest is only the timestamp of the first MP4 and lies when a segment spans multiple).
set_status(job["id"], video_duration_s=total_duration_s, step="trimming hors-eau") set_status(job["id"], video_duration_s=total_duration_s, step="trimming hors-eau")