feat: resume extraction — markers par vidéo, pas de rm au départ, reprise au crash

This commit is contained in:
Flag
2026-04-23 21:07:09 +00:00
parent 42ba218f09
commit a505ec1bcd

View File

@@ -432,19 +432,67 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path:
new_idx = count_frames(worker, frames_dir) new_idx = count_frames(worker, frames_dir)
return new_idx, dur return new_idx, dur
def _marker_path(frames_dir: str, vid_num: int) -> str:
return f"{frames_dir}/.video_{vid_num}.done"
def _read_marker(worker: dict, frames_dir: str, vid_num: int) -> int | None:
"""Return frame count stored in the done-marker, or None if not present."""
mp = _marker_path(frames_dir, vid_num)
rc, out, _ = ssh(worker["ssh_alias"], f"cat {shlex.quote(mp)} 2>/dev/null", timeout=10)
if rc == 0 and out.strip().isdigit():
return int(out.strip())
return None
def _write_marker(worker: dict, frames_dir: str, vid_num: int, frame_count: int) -> None:
mp = _marker_path(frames_dir, vid_num)
ssh(worker["ssh_alias"],
f"echo {frame_count} > {shlex.quote(mp)}", timeout=10)
def _drop_partial_frames(worker: dict, frames_dir: str, from_idx: int) -> None:
"""Delete frame_*.jpg with sequence number >= from_idx (leftover from crashed extraction)."""
cmd = (
f"ls {shlex.quote(frames_dir)}/frame_*.jpg 2>/dev/null "
f"| awk -F'frame_' '{{n=int($2); if (n >= {from_idx}) print}}' "
f"| xargs -r rm -f"
)
ssh(worker["ssh_alias"], cmd, timeout=60)
def do_extract(job: sqlite3.Row, worker: dict) -> str: def do_extract(job: sqlite3.Row, worker: dict) -> str:
videos = json.loads(job["video_paths"]) videos = json.loads(job["video_paths"])
frames_dir = f"{worker['frames_dir']}/job_{job['id']}" frames_dir = f"{worker['frames_dir']}/job_{job['id']}"
# Clean any frame_*.jpg from a prior run so count_frames reflects this extraction only # Preserve existing frames — resume from last completed video on retry.
# (retries/restarts otherwise inflate frame_count with duplicates). ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)}")
ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)} && rm -f {shlex.quote(frames_dir)}/frame_*.jpg")
idx = 0 idx = 0
total_frames_est = 0 # will be computed after each scp total_frames_est = 0
total_duration_s = 0.0 total_duration_s = 0.0
n_videos = len(videos) n_videos = len(videos)
for vid_num, v in enumerate(videos, start=1): for vid_num, v in enumerate(videos, start=1):
vf = f"fps={FPS},scale={IMG_W}:{IMG_H}" vf = f"fps={FPS},scale={IMG_W}:{IMG_H}"
pattern = f"{frames_dir}/frame_%06d.jpg" pattern = f"{frames_dir}/frame_%06d.jpg"
# Resume: skip videos already fully extracted in a previous run.
done_frames = _read_marker(worker, frames_dir, vid_num)
if done_frames is not None:
print(f" resume: video {vid_num}/{n_videos} already done ({done_frames} frames), skipping")
idx += done_frames
# Still need duration for total_duration_s / total_frames_est
_vpath = v.split(':', 1)[1] if ':' in v and not v.startswith('/') else v
_vhost = v.split(':', 1)[0] if ':' in v and not v.startswith('/') else worker['ssh_alias']
_dur_alias = _vhost if _vhost in (worker['ssh_alias'], worker['host']) else _vhost
_, _dur_str, _ = ssh(_dur_alias,
f"ffprobe -v error -show_entries format=duration -of csv=p=0 "
f"{shlex.quote(_vpath)} 2>/dev/null || echo 0", timeout=30)
_dur = float(_dur_str.strip()) if _dur_str.strip() else 0.0
total_duration_s += _dur
total_frames_est += max(1, int(_dur * FPS))
set_status(job["id"], frame_count=idx,
progress=min(99, idx * 100 // max(1, total_frames_est)))
continue
# Drop any partial frames left by a previous crashed extraction of this video.
_drop_partial_frames(worker, frames_dir, idx)
# Detect third-host sources (e.g. z620:) — extract frames remotely, pull only JPEGs # Detect third-host sources (e.g. z620:) — extract frames remotely, pull only JPEGs
_src_is_third_host = False _src_is_third_host = False
if ":" in v and not v.startswith("/"): if ":" in v and not v.startswith("/"):
@@ -453,15 +501,18 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
_src_is_third_host = True _src_is_third_host = True
if _src_is_third_host: if _src_is_third_host:
_frames_before = idx
idx, dur = _extract_on_remote_host( idx, dur = _extract_on_remote_host(
job["id"], worker, _src_host_v, _src_path_v, job["id"], worker, _src_host_v, _src_path_v,
frames_dir, vid_num, n_videos, idx, vf, frames_dir, vid_num, n_videos, idx, vf,
) )
total_duration_s += dur total_duration_s += dur
total_frames_est += max(1, int(dur * FPS)) total_frames_est += max(1, int(dur * FPS))
_write_marker(worker, frames_dir, vid_num, idx - _frames_before)
set_status(job["id"], frame_count=idx, set_status(job["id"], frame_count=idx,
progress=min(99, idx * 100 // max(1, total_frames_est))) progress=min(99, idx * 100 // max(1, total_frames_est)))
else: else:
_frames_before_local = idx
worker_src, is_ephemeral = resolve_worker_video_source(worker, v, frames_dir) worker_src, is_ephemeral = resolve_worker_video_source(worker, v, frames_dir)
ensure_worker_video_source(v, worker, worker_src, is_ephemeral, job["id"]) ensure_worker_video_source(v, worker, worker_src, is_ephemeral, job["id"])
dur = video_duration_s(worker, worker_src) dur = video_duration_s(worker, worker_src)
@@ -498,6 +549,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
_, err, _ = ssh(worker["ssh_alias"], f"cat /tmp/cosma-ffmpeg-{job['id']}.log 2>/dev/null | tail -5 || echo ''") _, err, _ = ssh(worker["ssh_alias"], f"cat /tmp/cosma-ffmpeg-{job['id']}.log 2>/dev/null | tail -5 || echo ''")
raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}") raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}")
idx = count_frames(worker, frames_dir) idx = count_frames(worker, frames_dir)
_write_marker(worker, frames_dir, vid_num, idx - _frames_before_local)
if is_ephemeral: if is_ephemeral:
ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}") ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}")
set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est)) set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est))