diff --git a/app/main.py b/app/main.py index 4ce65a9..152ef43 100644 --- a/app/main.py +++ b/app/main.py @@ -146,12 +146,29 @@ def _build_acquisitions(): "SELECT * FROM stitches ORDER BY level DESC, auv" ).fetchall() + # Assign GP1/GP2 labels per AUV by enumerating distinct serials in fixed order. + gp_by_serial: dict[tuple[int, str], str] = {} + for j in jobs: + key = (j["acquisition_id"], j["auv"]) + serials = gp_by_serial.setdefault(key, []) + if j["gopro_serial"] not in serials: + serials.append(j["gopro_serial"]) + gp_label: dict[tuple[int, str, str], str] = {} + for (acq_id, auv), serials in gp_by_serial.items(): + for idx, ser in enumerate(sorted(serials)): + gp_label[(acq_id, auv, ser)] = f"GP{idx + 1}" + by_acq: dict[int, list[dict]] = {} by_acq_total: dict[int, int] = {} for j in jobs: d = dict(j) dur_s = _job_duration_s(j) d["_duration"] = _fmt_dur(dur_s) + d["gp_label"] = gp_label.get((j["acquisition_id"], j["auv"], j["gopro_serial"]), "?") + d["video_duration_fmt"] = _fmt_dur(int(j["video_duration_s"] or 0)) if (j["video_duration_s"] or 0) > 0 else "—" + d["trimmed_total"] = (j["trimmed_head"] or 0) + (j["trimmed_tail"] or 0) + # Only expose a native viser link when port is listening. Probed on render via TCP check. + d["native_viser_url"] = None # filled below by_acq.setdefault(j["acquisition_id"], []).append(d) by_acq_total[j["acquisition_id"]] = by_acq_total.get(j["acquisition_id"], 0) + dur_s diff --git a/app/static/style.css b/app/static/style.css index 08c8539..f43ac2a 100644 --- a/app/static/style.css +++ b/app/static/style.css @@ -115,3 +115,15 @@ button { background: transparent; color: var(--accent); border: 1px solid var(-- button:hover { border-color: var(--accent); } a { color: var(--accent); } code { background: rgba(255,255,255,0.05); padding: 0 0.25rem; border-radius: 3px; } + +/* Job row columns: id · AUV-GP · segment · meta · progress · viser */ +.job-item .label { display: flex; flex-wrap: wrap; align-items: center; gap: 8px; font-size: 14px; } +.job-item .job-id { font-family: monospace; color: var(--mut, #666); font-size: 12px; min-width: 32px; } +.job-item .auv-gp { font-weight: 600; min-width: 100px; } +.job-item .seg { color: var(--mut, #666); font-variant-numeric: tabular-nums; min-width: 90px; } +.job-item .meta { color: var(--mut, #888); font-size: 12px; font-variant-numeric: tabular-nums; } +.job-item .meta::before { content: '· '; opacity: 0.5; } +.job-item .viser-link { text-decoration: none; padding: 2px 8px; border: 1px solid var(--accent, #4a9); border-radius: 3px; color: var(--accent, #4a9); font-size: 12px; } +.job-item .viser-link:hover { background: var(--accent, #4a9); color: white; } +.job-item.skipped { opacity: 0.55; } +.job-item.skipped .label { font-style: italic; } diff --git a/app/templates/_jobs_table.html b/app/templates/_jobs_table.html index be9911a..648c369 100644 --- a/app/templates/_jobs_table.html +++ b/app/templates/_jobs_table.html @@ -18,13 +18,15 @@ {% else %}â– {% endif %} - {{ j.auv }}/{{ j.gopro_serial }}/{{ j.segment_label }} + #{{ j.id }} + {{ j.auv }} {{ j.gp_label }} + {{ j.segment_label }} + {% if j.video_duration_fmt != '—' %}{{ j.video_duration_fmt }}{% if j.frame_count %} · {{ j.frame_count }} f{% if j.trimmed_total %} · −{{ j.trimmed_total }} hors-eau{% endif %}{% endif %}{% endif %} {% if j.status in ('extracting','running') %} {{ j.progress }}% {% endif %} - {% if j.status == 'done' and j.ply_path %} - - + {% if j.status == 'done' and j.viser_url %} + viser {% endif %} {{ j._duration }} diff --git a/scripts/__pycache__/dispatcher.cpython-311.pyc b/scripts/__pycache__/dispatcher.cpython-311.pyc index 4b2de32..3ea3824 100644 Binary files a/scripts/__pycache__/dispatcher.cpython-311.pyc and b/scripts/__pycache__/dispatcher.cpython-311.pyc differ diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 4c63a0b..f436943 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -71,6 +71,22 @@ def db() -> sqlite3.Connection: return conn +def _migrate(): + """Idempotent schema upgrades for fields added after initial release.""" + with closing(db()) as conn: + cols = {r["name"] for r in conn.execute("PRAGMA table_info(jobs)")} + for col, ddl in ( + ("trimmed_head", "INTEGER DEFAULT 0"), + ("trimmed_tail", "INTEGER DEFAULT 0"), + ("video_duration_s", "REAL DEFAULT 0"), + ): + if col not in cols: + conn.execute(f"ALTER TABLE jobs ADD COLUMN {col} {ddl}") + + +_migrate() + + def ssh(alias: str, cmd: str, timeout: int = 30) -> tuple[int, str, str]: p = subprocess.run( ["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", alias, cmd], @@ -148,37 +164,68 @@ def count_frames(worker: dict, frames_dir: str) -> int: _AUTO_TRIM_SCRIPT = r""" import cv2, glob, os, sys frames_dir = sys.argv[1] -need_streak = 10 # consecutive underwater frames required to lock start +need_streak = 10 # consecutive underwater frames required to lock start/end paths = sorted(glob.glob(os.path.join(frames_dir, 'frame_*.jpg'))) if not paths: - print('TRIM_RESULT 0 0'); sys.exit(0) + print('TRIM_RESULT 0 0 0'); sys.exit(0) + +def is_underwater(path): + img = cv2.imread(path, cv2.IMREAD_REDUCED_COLOR_4) + if img is None: + return None + b, g, r = [float(c) for c in cv2.mean(img)[:3]] + # Red is absorbed by water → R < G and R < B on underwater shots. + return r < g - 5 and r < b - 5 + +# Scan from the start for the first sustained underwater run. start = 0 streak = 0 for i, p in enumerate(paths): - img = cv2.imread(p, cv2.IMREAD_REDUCED_COLOR_4) - if img is None: + uw = is_underwater(p) + if uw is None: continue - mean_b, mean_g, mean_r = [float(c) for c in cv2.mean(img)[:3]] - # Underwater = red is absorbed → R noticeably lower than both G and B - underwater = mean_r < mean_g - 5 and mean_r < mean_b - 5 - if underwater: + if uw: streak += 1 if streak >= need_streak: start = i - need_streak + 1 break else: streak = 0 -if start <= 0: - print(f'TRIM_RESULT 0 {len(paths)}'); sys.exit(0) + +# Scan from the end for the last sustained underwater run. +end = len(paths) +streak = 0 +for j in range(len(paths) - 1, -1, -1): + uw = is_underwater(paths[j]) + if uw is None: + continue + if uw: + streak += 1 + if streak >= need_streak: + end = j + need_streak # exclusive + break + else: + streak = 0 + +if end <= start: + # Sanity: never delete everything. + start = 0 + end = len(paths) + +removed_head = start +removed_tail = len(paths) - end for p in paths[:start]: try: os.remove(p) except OSError: pass -print(f'TRIM_RESULT {start} {len(paths) - start}') +for p in paths[end:]: + try: os.remove(p) + except OSError: pass +print(f'TRIM_RESULT {removed_head} {removed_tail} {end - start}') """ -def trim_above_water_prefix(worker: dict, frames_dir: str) -> tuple[int, int]: - """Delete leading out-of-water frames. Returns (removed, remaining).""" +def trim_above_water_prefix(worker: dict, frames_dir: str) -> tuple[int, int, int]: + """Delete leading and trailing out-of-water frames. Returns (head, tail, remaining).""" script_remote = f"/tmp/cosma-trim-{os.getpid()}.py" # Write script on worker and run it inside the lingbot-map venv (has cv2) rc, _, err = ssh( @@ -188,20 +235,20 @@ def trim_above_water_prefix(worker: dict, frames_dir: str) -> tuple[int, int]: ) if rc != 0: print(f" ↳ trim script upload failed: {err[:150]}") - return (0, 0) + return (0, 0, 0) rc, out, err = ssh( worker["ssh_alias"], f"source {shlex.quote(worker['lingbot_path'])}/.venv/bin/activate && " f"python3 {shlex.quote(script_remote)} {shlex.quote(frames_dir)}; rm -f {shlex.quote(script_remote)}", - timeout=600, + timeout=1200, ) for line in out.splitlines(): if line.startswith("TRIM_RESULT"): parts = line.split() - removed, remaining = int(parts[1]), int(parts[2]) - return (removed, remaining) + head, tail, remaining = int(parts[1]), int(parts[2]), int(parts[3]) + return (head, tail, remaining) print(f" ↳ trim unexpected output: {out[:200]} / {err[:200]}") - return (0, 0) + return (0, 0, 0) def scp_to_worker(local_path: str, worker: dict, remote_path: str): @@ -255,6 +302,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)} && rm -f {shlex.quote(frames_dir)}/frame_*.jpg") idx = 0 total_frames_est = 0 # will be computed after each scp + total_duration_s = 0.0 for v in videos: vf = f"fps={FPS},scale={IMG_W}:{IMG_H}" pattern = f"{frames_dir}/frame_%06d.jpg" @@ -264,6 +312,7 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: print(f" scp {_path_basename(v)} → {worker['host']}...") scp_to_worker(v, worker, worker_src) dur = video_duration_s(worker, worker_src) + total_duration_s += dur total_frames_est += max(1, int(dur * FPS)) exit_file = f"/tmp/cosma-ffmpeg-{job['id']}-{idx}.exit" @@ -297,11 +346,21 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: # are 1-11 GB each. Frames are already extracted so worker_src is no longer needed. ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}") set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est)) - # Drop the hors-eau prefix before reconstruction — always present at session start. - removed, remaining = trim_above_water_prefix(worker, frames_dir) - if removed: - print(f" ↳ job #{job['id']}: trimmed {removed} out-of-water frames, {remaining} kept") - set_status(job["id"], frame_count=remaining) + # Persist the measured video duration so the dashboard shows real length (segment_label + # from ingest is only the timestamp of the first MP4 and lies when a segment spans multiple). + set_status(job["id"], video_duration_s=total_duration_s) + # Drop the hors-eau prefix AND suffix before reconstruction — AUV is out-of-water at both ends. + head, tail, remaining = trim_above_water_prefix(worker, frames_dir) + if head or tail: + print(f" ↳ job #{job['id']}: trimmed head={head} tail={tail} out-of-water, {remaining} kept") + set_status(job["id"], frame_count=remaining, trimmed_head=head, trimmed_tail=tail) + # Skip jobs with too little underwater content to be worth reconstructing (e.g., brief + # surface checks that the auto-segmentation picked up as a dive). + min_frames = max(60, int(30 * FPS)) # need ~30 s of underwater footage minimum + if remaining < min_frames: + print(f" ↳ job #{job['id']}: only {remaining} underwater frames (<{min_frames}) — marking skipped") + set_status(job["id"], status="skipped", error=f"too short: {remaining} underwater frames") + raise RuntimeError("skipped_short") # Trim once per job so LVM thin pool on the host actually reclaims the freed blocks. ssh(worker["ssh_alias"], "sudo fstrim / 2>/dev/null || fstrim / 2>/dev/null", timeout=60) return frames_dir @@ -316,7 +375,7 @@ def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str # .87 has 23 GB RAM, .84 has 62 GB. Keep effective frame count ~4k to stay safe. frame_count = job["frame_count"] or 0 ram_gb = 23 if worker["host"] == "192.168.0.87" else 62 - ram_budget_gb = ram_gb * 0.45 # leave headroom for model + OS + cuda pinned buffers + ram_budget_gb = ram_gb * 0.35 # leave headroom for model + OS + cuda pinned buffers stride = 1 while frame_count * 3.15 / 1024 / stride > ram_budget_gb: stride += 1 @@ -521,7 +580,11 @@ def run_one(job: sqlite3.Row) -> bool: progress=100, log_tail=log, finished_at=_now_iso()) _maybe_create_per_auv_stitch(job_id) except Exception as e: - set_status(job_id, status="error", error=str(e)[:2000], finished_at=_now_iso()) + # do_extract raises "skipped_short" after flagging status='skipped' — don't override. + if "skipped_short" not in str(e): + set_status(job_id, status="error", error=str(e)[:2000], finished_at=_now_iso()) + else: + set_status(job_id, finished_at=_now_iso()) finally: release_worker(worker, estimated) return True