dashboard + dispatcher — UX props, trim head+tail, cols, link direct

dashboard:
- job_id, AUV GP1/GP2 (serial en tooltip), segment_label, duree reelle,
  nb frames, nb hors-eau trimes
- lien viser plain <a href> (plus de POST ni popup). Affiche uniquement
  si job.done ET viser_url persistee (demo.py kept alive)
- CSS minimal: flex row, separateurs, skipped en italic mute

dispatcher:
- trim head ET tail (AUV hors-eau en debut + fin de session)
- migration DB: trimmed_head, trimmed_tail, video_duration_s
- do_extract persiste total_duration_s + trimmed counts via set_status
- run_one: RuntimeError(skipped_short) preserve le status=skipped
- min_frames underwater pour skip les segments trop courts
- ram_budget 0.45 -> 0.35 (OOM rc=137 avec 8237 frames sur 62GB RAM)
This commit is contained in:
Flag
2026-04-22 21:28:06 +00:00
parent 311824f036
commit 9dd6a82d08
5 changed files with 123 additions and 29 deletions

View File

@@ -71,6 +71,22 @@ def db() -> sqlite3.Connection:
return conn
def _migrate():
"""Idempotent schema upgrades for fields added after initial release."""
with closing(db()) as conn:
cols = {r["name"] for r in conn.execute("PRAGMA table_info(jobs)")}
for col, ddl in (
("trimmed_head", "INTEGER DEFAULT 0"),
("trimmed_tail", "INTEGER DEFAULT 0"),
("video_duration_s", "REAL DEFAULT 0"),
):
if col not in cols:
conn.execute(f"ALTER TABLE jobs ADD COLUMN {col} {ddl}")
_migrate()
def ssh(alias: str, cmd: str, timeout: int = 30) -> tuple[int, str, str]:
p = subprocess.run(
["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", alias, cmd],
@@ -148,37 +164,68 @@ def count_frames(worker: dict, frames_dir: str) -> int:
_AUTO_TRIM_SCRIPT = r"""
import cv2, glob, os, sys
frames_dir = sys.argv[1]
need_streak = 10 # consecutive underwater frames required to lock start
need_streak = 10 # consecutive underwater frames required to lock start/end
paths = sorted(glob.glob(os.path.join(frames_dir, 'frame_*.jpg')))
if not paths:
print('TRIM_RESULT 0 0'); sys.exit(0)
print('TRIM_RESULT 0 0 0'); sys.exit(0)
def is_underwater(path):
img = cv2.imread(path, cv2.IMREAD_REDUCED_COLOR_4)
if img is None:
return None
b, g, r = [float(c) for c in cv2.mean(img)[:3]]
# Red is absorbed by water → R < G and R < B on underwater shots.
return r < g - 5 and r < b - 5
# Scan from the start for the first sustained underwater run.
start = 0
streak = 0
for i, p in enumerate(paths):
img = cv2.imread(p, cv2.IMREAD_REDUCED_COLOR_4)
if img is None:
uw = is_underwater(p)
if uw is None:
continue
mean_b, mean_g, mean_r = [float(c) for c in cv2.mean(img)[:3]]
# Underwater = red is absorbed → R noticeably lower than both G and B
underwater = mean_r < mean_g - 5 and mean_r < mean_b - 5
if underwater:
if uw:
streak += 1
if streak >= need_streak:
start = i - need_streak + 1
break
else:
streak = 0
if start <= 0:
print(f'TRIM_RESULT 0 {len(paths)}'); sys.exit(0)
# Scan from the end for the last sustained underwater run.
end = len(paths)
streak = 0
for j in range(len(paths) - 1, -1, -1):
uw = is_underwater(paths[j])
if uw is None:
continue
if uw:
streak += 1
if streak >= need_streak:
end = j + need_streak # exclusive
break
else:
streak = 0
if end <= start:
# Sanity: never delete everything.
start = 0
end = len(paths)
removed_head = start
removed_tail = len(paths) - end
for p in paths[:start]:
try: os.remove(p)
except OSError: pass
print(f'TRIM_RESULT {start} {len(paths) - start}')
for p in paths[end:]:
try: os.remove(p)
except OSError: pass
print(f'TRIM_RESULT {removed_head} {removed_tail} {end - start}')
"""
def trim_above_water_prefix(worker: dict, frames_dir: str) -> tuple[int, int]:
"""Delete leading out-of-water frames. Returns (removed, remaining)."""
def trim_above_water_prefix(worker: dict, frames_dir: str) -> tuple[int, int, int]:
"""Delete leading and trailing out-of-water frames. Returns (head, tail, remaining)."""
script_remote = f"/tmp/cosma-trim-{os.getpid()}.py"
# Write script on worker and run it inside the lingbot-map venv (has cv2)
rc, _, err = ssh(
@@ -188,20 +235,20 @@ def trim_above_water_prefix(worker: dict, frames_dir: str) -> tuple[int, int]:
)
if rc != 0:
print(f" ↳ trim script upload failed: {err[:150]}")
return (0, 0)
return (0, 0, 0)
rc, out, err = ssh(
worker["ssh_alias"],
f"source {shlex.quote(worker['lingbot_path'])}/.venv/bin/activate && "
f"python3 {shlex.quote(script_remote)} {shlex.quote(frames_dir)}; rm -f {shlex.quote(script_remote)}",
timeout=600,
timeout=1200,
)
for line in out.splitlines():
if line.startswith("TRIM_RESULT"):
parts = line.split()
removed, remaining = int(parts[1]), int(parts[2])
return (removed, remaining)
head, tail, remaining = int(parts[1]), int(parts[2]), int(parts[3])
return (head, tail, remaining)
print(f" ↳ trim unexpected output: {out[:200]} / {err[:200]}")
return (0, 0)
return (0, 0, 0)
def scp_to_worker(local_path: str, worker: dict, remote_path: str):
@@ -255,6 +302,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)} && rm -f {shlex.quote(frames_dir)}/frame_*.jpg")
idx = 0
total_frames_est = 0 # will be computed after each scp
total_duration_s = 0.0
for v in videos:
vf = f"fps={FPS},scale={IMG_W}:{IMG_H}"
pattern = f"{frames_dir}/frame_%06d.jpg"
@@ -264,6 +312,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
print(f" scp {_path_basename(v)}{worker['host']}...")
scp_to_worker(v, worker, worker_src)
dur = video_duration_s(worker, worker_src)
total_duration_s += dur
total_frames_est += max(1, int(dur * FPS))
exit_file = f"/tmp/cosma-ffmpeg-{job['id']}-{idx}.exit"
@@ -297,11 +346,21 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
# are 1-11 GB each. Frames are already extracted so worker_src is no longer needed.
ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}")
set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est))
# Drop the hors-eau prefix before reconstruction — always present at session start.
removed, remaining = trim_above_water_prefix(worker, frames_dir)
if removed:
print(f" ↳ job #{job['id']}: trimmed {removed} out-of-water frames, {remaining} kept")
set_status(job["id"], frame_count=remaining)
# Persist the measured video duration so the dashboard shows real length (segment_label
# from ingest is only the timestamp of the first MP4 and lies when a segment spans multiple).
set_status(job["id"], video_duration_s=total_duration_s)
# Drop the hors-eau prefix AND suffix before reconstruction — AUV is out-of-water at both ends.
head, tail, remaining = trim_above_water_prefix(worker, frames_dir)
if head or tail:
print(f" ↳ job #{job['id']}: trimmed head={head} tail={tail} out-of-water, {remaining} kept")
set_status(job["id"], frame_count=remaining, trimmed_head=head, trimmed_tail=tail)
# Skip jobs with too little underwater content to be worth reconstructing (e.g., brief
# surface checks that the auto-segmentation picked up as a dive).
min_frames = max(60, int(30 * FPS)) # need ~30 s of underwater footage minimum
if remaining < min_frames:
print(f" ↳ job #{job['id']}: only {remaining} underwater frames (<{min_frames}) — marking skipped")
set_status(job["id"], status="skipped", error=f"too short: {remaining} underwater frames")
raise RuntimeError("skipped_short")
# Trim once per job so LVM thin pool on the host actually reclaims the freed blocks.
ssh(worker["ssh_alias"], "sudo fstrim / 2>/dev/null || fstrim / 2>/dev/null", timeout=60)
return frames_dir
@@ -316,7 +375,7 @@ def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str
# .87 has 23 GB RAM, .84 has 62 GB. Keep effective frame count ~4k to stay safe.
frame_count = job["frame_count"] or 0
ram_gb = 23 if worker["host"] == "192.168.0.87" else 62
ram_budget_gb = ram_gb * 0.45 # leave headroom for model + OS + cuda pinned buffers
ram_budget_gb = ram_gb * 0.35 # leave headroom for model + OS + cuda pinned buffers
stride = 1
while frame_count * 3.15 / 1024 / stride > ram_budget_gb:
stride += 1
@@ -521,7 +580,11 @@ def run_one(job: sqlite3.Row) -> bool:
progress=100, log_tail=log, finished_at=_now_iso())
_maybe_create_per_auv_stitch(job_id)
except Exception as e:
set_status(job_id, status="error", error=str(e)[:2000], finished_at=_now_iso())
# do_extract raises "skipped_short" after flagging status='skipped' — don't override.
if "skipped_short" not in str(e):
set_status(job_id, status="error", error=str(e)[:2000], finished_at=_now_iso())
else:
set_status(job_id, finished_at=_now_iso())
finally:
release_worker(worker, estimated)
return True