dashboard thumb 48x27, step live, spin busy, live thumbnail refresh
- dispatcher: col step ajoutee (migration). set a chaque phase: scp N/M, ffmpeg N/M, trimming hors-eau, reconstruct demo.py - _refresh_thumbnail() scp la DERNIERE frame extraite toutes les ~15s pendant ffmpeg pour que le preview colle a la progression live - template: cache-bust thumbnail via ?t=mtime, step affiche sous progress bar, thumb revenue a 48x27 - CSS: .badge.busy -> animation spin (rotation infinie) au lieu de juste une couleur, .step-text italique mute
This commit is contained in:
Binary file not shown.
@@ -79,6 +79,8 @@ def _migrate():
|
||||
("trimmed_head", "INTEGER DEFAULT 0"),
|
||||
("trimmed_tail", "INTEGER DEFAULT 0"),
|
||||
("video_duration_s", "REAL DEFAULT 0"),
|
||||
# Human-readable phase so the dashboard can show "scp 2/3", "ffmpeg 45%", "reconstruct 12/113 windows"...
|
||||
("step", "TEXT"),
|
||||
):
|
||||
if col not in cols:
|
||||
conn.execute(f"ALTER TABLE jobs ADD COLUMN {col} {ddl}")
|
||||
@@ -236,6 +238,21 @@ print(f'TRIM_RESULT {removed_head} {removed_tail} {end - start}')
|
||||
"""
|
||||
|
||||
|
||||
def _refresh_thumbnail(worker: dict, frames_dir: str, job_id: int) -> None:
|
||||
"""Scp the latest extracted frame back to the dashboard host. Silent on failure."""
|
||||
thumb_dir = DB_PATH.parent / "thumbnails"
|
||||
thumb_dir.mkdir(exist_ok=True)
|
||||
thumb_local = thumb_dir / f"job_{job_id}.jpg"
|
||||
rc, out, _ = ssh(worker["ssh_alias"], f"ls -1 {shlex.quote(frames_dir)}/frame_*.jpg 2>/dev/null | tail -1")
|
||||
latest = out.strip()
|
||||
if not latest:
|
||||
return
|
||||
subprocess.run(
|
||||
["scp", "-o", "BatchMode=yes", f"{worker['ssh_alias']}:{latest}", str(thumb_local)],
|
||||
capture_output=True, timeout=15,
|
||||
)
|
||||
|
||||
|
||||
def trim_above_water_prefix(worker: dict, frames_dir: str) -> tuple[int, int, int]:
|
||||
"""Delete leading and trailing out-of-water frames. Returns (head, tail, remaining)."""
|
||||
script_remote = f"/tmp/cosma-trim-{os.getpid()}.py"
|
||||
@@ -315,6 +332,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
|
||||
idx = 0
|
||||
total_frames_est = 0 # will be computed after each scp
|
||||
total_duration_s = 0.0
|
||||
n_videos = len(videos)
|
||||
for v in videos:
|
||||
vf = f"fps={FPS},scale={IMG_W}:{IMG_H}"
|
||||
pattern = f"{frames_dir}/frame_%06d.jpg"
|
||||
@@ -322,6 +340,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
|
||||
rc_check = ssh(worker["ssh_alias"], f"test -f {shlex.quote(worker_src)}")[0]
|
||||
if rc_check != 0:
|
||||
print(f" scp {_path_basename(v)} → {worker['host']}...")
|
||||
set_status(job["id"], step=f"scp {idx // 1 + 1}/{n_videos}: {_path_basename(v)}")
|
||||
scp_to_worker(v, worker, worker_src)
|
||||
dur = video_duration_s(worker, worker_src)
|
||||
total_duration_s += dur
|
||||
@@ -337,13 +356,23 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
|
||||
)
|
||||
ssh(worker["ssh_alias"], f"setsid bash -c {shlex.quote(bg)} >/dev/null 2>&1 &")
|
||||
|
||||
# 1-based index for humans. We cannot compute it from `idx` directly because idx is
|
||||
# the running frame counter, so count the loop iterations via total_duration_s order.
|
||||
vid_num = videos.index(v) + 1
|
||||
thumb_refresh_counter = 0
|
||||
while True:
|
||||
# Use -s (file exists AND size > 0) to avoid race: setsid bash writes the exit code
|
||||
# AFTER ffmpeg finishes; a plain -f can match a zero-byte placeholder mid-write.
|
||||
rc_done, _, _ = ssh(worker["ssh_alias"], f"test -s {shlex.quote(exit_file)}")
|
||||
current = count_frames(worker, frames_dir)
|
||||
pct = min(99, current * 100 // total_frames_est)
|
||||
set_status(job["id"], frame_count=current, progress=pct)
|
||||
set_status(job["id"], frame_count=current, progress=pct,
|
||||
step=f"ffmpeg {vid_num}/{n_videos}: {current} frames")
|
||||
# Refresh the preview thumbnail every few polls so the dashboard reflects what the
|
||||
# camera is seeing right now, not the very first (surface) frame.
|
||||
thumb_refresh_counter += 1
|
||||
if thumb_refresh_counter % 3 == 1 and current > 0:
|
||||
_refresh_thumbnail(worker, frames_dir, job["id"])
|
||||
if rc_done == 0:
|
||||
break
|
||||
time.sleep(5)
|
||||
@@ -360,7 +389,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
|
||||
set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est))
|
||||
# Persist the measured video duration so the dashboard shows real length (segment_label
|
||||
# from ingest is only the timestamp of the first MP4 and lies when a segment spans multiple).
|
||||
set_status(job["id"], video_duration_s=total_duration_s)
|
||||
set_status(job["id"], video_duration_s=total_duration_s, step="trimming hors-eau")
|
||||
# Skip segments that are too short to contain a meaningful dive.
|
||||
min_video_s = int(os.environ.get("COSMA_QC_MIN_VIDEO_S", "480")) # 8 min default
|
||||
if total_duration_s < min_video_s:
|
||||
@@ -379,25 +408,8 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
|
||||
print(f" ↳ job #{job['id']}: only {remaining} underwater frames (<{min_frames}) — marking skipped")
|
||||
set_status(job["id"], status="skipped", error=f"too short: {remaining} underwater frames")
|
||||
raise RuntimeError("skipped_short")
|
||||
# Copy the first kept frame back to the dashboard host as a thumbnail so the UI can show
|
||||
# what the job actually sees. Mid-point would be better but first is cheap and on disk now.
|
||||
thumb_dir = DB_PATH.parent / "thumbnails"
|
||||
thumb_dir.mkdir(exist_ok=True)
|
||||
thumb_local = thumb_dir / f"job_{job['id']}.jpg"
|
||||
subprocess.run(
|
||||
["scp", "-o", "BatchMode=yes",
|
||||
f"{worker['ssh_alias']}:{frames_dir}/frame_*.jpg", str(thumb_local)],
|
||||
capture_output=True, timeout=30,
|
||||
) # globbing only picks 1 on the remote side via shell expansion
|
||||
# If glob scp is empty, try first explicit by listing
|
||||
if not thumb_local.exists() or thumb_local.stat().st_size == 0:
|
||||
rc, out, _ = ssh(worker["ssh_alias"], f"ls {shlex.quote(frames_dir)}/frame_*.jpg 2>/dev/null | head -1")
|
||||
first = out.strip()
|
||||
if first:
|
||||
subprocess.run(
|
||||
["scp", "-o", "BatchMode=yes", f"{worker['ssh_alias']}:{first}", str(thumb_local)],
|
||||
capture_output=True, timeout=30,
|
||||
)
|
||||
# Snapshot the latest post-trim frame so the dashboard preview matches what the demo.py will see.
|
||||
_refresh_thumbnail(worker, frames_dir, job["id"])
|
||||
# Trim once per job so LVM thin pool on the host actually reclaims the freed blocks.
|
||||
ssh(worker["ssh_alias"], "sudo fstrim / 2>/dev/null || fstrim / 2>/dev/null", timeout=60)
|
||||
return frames_dir
|
||||
@@ -449,6 +461,7 @@ def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str
|
||||
f"done; "
|
||||
f"pkill -KILL -f \"demo.py.*{frames_dir}\" 2>/dev/null; exit 124"
|
||||
)
|
||||
set_status(job["id"], step=f"reconstruct demo.py (windowed w{window_size}, stride {stride})")
|
||||
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3 * 3600)
|
||||
# Accept rc==0 OR PLY file exists with non-zero size (kill -TERM may return non-zero)
|
||||
ply_rc, ply_size, _ = ssh(worker["ssh_alias"], f"stat -c %s {shlex.quote(ply_path)} 2>/dev/null || echo 0")
|
||||
|
||||
Reference in New Issue
Block a user