From 311824f036dc6ee9b9ab94c9838963a8bcb4d759 Mon Sep 17 00:00:00 2001 From: Flag Date: Wed, 22 Apr 2026 20:31:30 +0000 Subject: [PATCH] =?UTF-8?q?dispatcher=20=C2=97=20trim=20auto=20prefix=20ho?= =?UTF-8?q?rs-eau=20avant=20reconstruct?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Les sessions record commencent systematiquement avec l AUV sur le pont ou en surface. Les frames hors-eau polluent le model et bloquent l alignment ICP du stitch. trim_above_water_prefix detecte underwater par absorption du canal rouge (mean_R < mean_G-5 ET mean_R < mean_B-5) et exige un streak consecutif de 10 frames underwater pour lock le start. Tout ce qui precede est supprime avant demo.py. Opencv charge les frames en REDUCED_COLOR_4 pour acceleration. Execute dans le venv lingbot-map cote worker (cv2 dispo). --- scripts/dispatcher.py | 67 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index c4e1f36..4c63a0b 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -142,6 +142,68 @@ def count_frames(worker: dict, frames_dir: str) -> int: return 0 +# Record sessions always start with the AUV on deck or at the surface — these frames pollute +# reconstruction. Detect the first sustained underwater run (red channel absorbed by water, so +# mean_R < mean_G and mean_R < mean_B) and delete the hors-eau prefix before demo.py runs. +_AUTO_TRIM_SCRIPT = r""" +import cv2, glob, os, sys +frames_dir = sys.argv[1] +need_streak = 10 # consecutive underwater frames required to lock start +paths = sorted(glob.glob(os.path.join(frames_dir, 'frame_*.jpg'))) +if not paths: + print('TRIM_RESULT 0 0'); sys.exit(0) +start = 0 +streak = 0 +for i, p in enumerate(paths): + img = cv2.imread(p, cv2.IMREAD_REDUCED_COLOR_4) + if img is None: + continue + mean_b, mean_g, mean_r = [float(c) for c in cv2.mean(img)[:3]] + # Underwater = red is absorbed → R noticeably lower than both G and B + underwater = mean_r < mean_g - 5 and mean_r < mean_b - 5 + if underwater: + streak += 1 + if streak >= need_streak: + start = i - need_streak + 1 + break + else: + streak = 0 +if start <= 0: + print(f'TRIM_RESULT 0 {len(paths)}'); sys.exit(0) +for p in paths[:start]: + try: os.remove(p) + except OSError: pass +print(f'TRIM_RESULT {start} {len(paths) - start}') +""" + + +def trim_above_water_prefix(worker: dict, frames_dir: str) -> tuple[int, int]: + """Delete leading out-of-water frames. Returns (removed, remaining).""" + script_remote = f"/tmp/cosma-trim-{os.getpid()}.py" + # Write script on worker and run it inside the lingbot-map venv (has cv2) + rc, _, err = ssh( + worker["ssh_alias"], + f"cat > {shlex.quote(script_remote)} << 'PYEOF'\n{_AUTO_TRIM_SCRIPT}\nPYEOF", + timeout=15, + ) + if rc != 0: + print(f" ↳ trim script upload failed: {err[:150]}") + return (0, 0) + rc, out, err = ssh( + worker["ssh_alias"], + f"source {shlex.quote(worker['lingbot_path'])}/.venv/bin/activate && " + f"python3 {shlex.quote(script_remote)} {shlex.quote(frames_dir)}; rm -f {shlex.quote(script_remote)}", + timeout=600, + ) + for line in out.splitlines(): + if line.startswith("TRIM_RESULT"): + parts = line.split() + removed, remaining = int(parts[1]), int(parts[2]) + return (removed, remaining) + print(f" ↳ trim unexpected output: {out[:200]} / {err[:200]}") + return (0, 0) + + def scp_to_worker(local_path: str, worker: dict, remote_path: str): """Copy a file to the worker. @@ -235,6 +297,11 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: # are 1-11 GB each. Frames are already extracted so worker_src is no longer needed. ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}") set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est)) + # Drop the hors-eau prefix before reconstruction — always present at session start. + removed, remaining = trim_above_water_prefix(worker, frames_dir) + if removed: + print(f" ↳ job #{job['id']}: trimmed {removed} out-of-water frames, {remaining} kept") + set_status(job["id"], frame_count=remaining) # Trim once per job so LVM thin pool on the host actually reclaims the freed blocks. ssh(worker["ssh_alias"], "sudo fstrim / 2>/dev/null || fstrim / 2>/dev/null", timeout=60) return frames_dir