diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index c38363c..f3b0b4f 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -432,19 +432,67 @@ def _extract_on_remote_host(job_id: int, worker: dict, src_host: str, src_path: new_idx = count_frames(worker, frames_dir) return new_idx, dur + +def _marker_path(frames_dir: str, vid_num: int) -> str: + return f"{frames_dir}/.video_{vid_num}.done" + + +def _read_marker(worker: dict, frames_dir: str, vid_num: int) -> int | None: + """Return frame count stored in the done-marker, or None if not present.""" + mp = _marker_path(frames_dir, vid_num) + rc, out, _ = ssh(worker["ssh_alias"], f"cat {shlex.quote(mp)} 2>/dev/null", timeout=10) + if rc == 0 and out.strip().isdigit(): + return int(out.strip()) + return None + + +def _write_marker(worker: dict, frames_dir: str, vid_num: int, frame_count: int) -> None: + mp = _marker_path(frames_dir, vid_num) + ssh(worker["ssh_alias"], + f"echo {frame_count} > {shlex.quote(mp)}", timeout=10) + + +def _drop_partial_frames(worker: dict, frames_dir: str, from_idx: int) -> None: + """Delete frame_*.jpg with sequence number >= from_idx (leftover from crashed extraction).""" + cmd = ( + f"ls {shlex.quote(frames_dir)}/frame_*.jpg 2>/dev/null " + f"| awk -F'frame_' '{{n=int($2); if (n >= {from_idx}) print}}' " + f"| xargs -r rm -f" + ) + ssh(worker["ssh_alias"], cmd, timeout=60) + def do_extract(job: sqlite3.Row, worker: dict) -> str: videos = json.loads(job["video_paths"]) frames_dir = f"{worker['frames_dir']}/job_{job['id']}" - # Clean any frame_*.jpg from a prior run so count_frames reflects this extraction only - # (retries/restarts otherwise inflate frame_count with duplicates). - ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)} && rm -f {shlex.quote(frames_dir)}/frame_*.jpg") + # Preserve existing frames — resume from last completed video on retry. + ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)}") idx = 0 - total_frames_est = 0 # will be computed after each scp + total_frames_est = 0 total_duration_s = 0.0 n_videos = len(videos) for vid_num, v in enumerate(videos, start=1): vf = f"fps={FPS},scale={IMG_W}:{IMG_H}" pattern = f"{frames_dir}/frame_%06d.jpg" + # Resume: skip videos already fully extracted in a previous run. + done_frames = _read_marker(worker, frames_dir, vid_num) + if done_frames is not None: + print(f" resume: video {vid_num}/{n_videos} already done ({done_frames} frames), skipping") + idx += done_frames + # Still need duration for total_duration_s / total_frames_est + _vpath = v.split(':', 1)[1] if ':' in v and not v.startswith('/') else v + _vhost = v.split(':', 1)[0] if ':' in v and not v.startswith('/') else worker['ssh_alias'] + _dur_alias = _vhost if _vhost in (worker['ssh_alias'], worker['host']) else _vhost + _, _dur_str, _ = ssh(_dur_alias, + f"ffprobe -v error -show_entries format=duration -of csv=p=0 " + f"{shlex.quote(_vpath)} 2>/dev/null || echo 0", timeout=30) + _dur = float(_dur_str.strip()) if _dur_str.strip() else 0.0 + total_duration_s += _dur + total_frames_est += max(1, int(_dur * FPS)) + set_status(job["id"], frame_count=idx, + progress=min(99, idx * 100 // max(1, total_frames_est))) + continue + # Drop any partial frames left by a previous crashed extraction of this video. + _drop_partial_frames(worker, frames_dir, idx) # Detect third-host sources (e.g. z620:) — extract frames remotely, pull only JPEGs _src_is_third_host = False if ":" in v and not v.startswith("/"): @@ -453,15 +501,18 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: _src_is_third_host = True if _src_is_third_host: + _frames_before = idx idx, dur = _extract_on_remote_host( job["id"], worker, _src_host_v, _src_path_v, frames_dir, vid_num, n_videos, idx, vf, ) total_duration_s += dur total_frames_est += max(1, int(dur * FPS)) + _write_marker(worker, frames_dir, vid_num, idx - _frames_before) set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // max(1, total_frames_est))) else: + _frames_before_local = idx worker_src, is_ephemeral = resolve_worker_video_source(worker, v, frames_dir) ensure_worker_video_source(v, worker, worker_src, is_ephemeral, job["id"]) dur = video_duration_s(worker, worker_src) @@ -498,6 +549,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: _, err, _ = ssh(worker["ssh_alias"], f"cat /tmp/cosma-ffmpeg-{job['id']}.log 2>/dev/null | tail -5 || echo ''") raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}") idx = count_frames(worker, frames_dir) + _write_marker(worker, frames_dir, vid_num, idx - _frames_before_local) if is_ephemeral: ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}") set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est))