dispatcher — trim auto prefix hors-eau avant reconstruct

Les sessions record commencent systematiquement avec l AUV sur le pont
ou en surface. Les frames hors-eau polluent le model et bloquent l alignment
ICP du stitch.

trim_above_water_prefix detecte underwater par absorption du canal rouge
(mean_R < mean_G-5 ET mean_R < mean_B-5) et exige un streak consecutif
de 10 frames underwater pour lock le start. Tout ce qui precede est
supprime avant demo.py.

Opencv charge les frames en REDUCED_COLOR_4 pour acceleration.
Execute dans le venv lingbot-map cote worker (cv2 dispo).
This commit is contained in:
Flag
2026-04-22 20:31:30 +00:00
parent e8955aa406
commit 311824f036

View File

@@ -142,6 +142,68 @@ def count_frames(worker: dict, frames_dir: str) -> int:
return 0
# Record sessions always start with the AUV on deck or at the surface — these frames pollute
# reconstruction. Detect the first sustained underwater run (red channel absorbed by water, so
# mean_R < mean_G and mean_R < mean_B) and delete the hors-eau prefix before demo.py runs.
_AUTO_TRIM_SCRIPT = r"""
import cv2, glob, os, sys
frames_dir = sys.argv[1]
need_streak = 10 # consecutive underwater frames required to lock start
paths = sorted(glob.glob(os.path.join(frames_dir, 'frame_*.jpg')))
if not paths:
print('TRIM_RESULT 0 0'); sys.exit(0)
start = 0
streak = 0
for i, p in enumerate(paths):
img = cv2.imread(p, cv2.IMREAD_REDUCED_COLOR_4)
if img is None:
continue
mean_b, mean_g, mean_r = [float(c) for c in cv2.mean(img)[:3]]
# Underwater = red is absorbed → R noticeably lower than both G and B
underwater = mean_r < mean_g - 5 and mean_r < mean_b - 5
if underwater:
streak += 1
if streak >= need_streak:
start = i - need_streak + 1
break
else:
streak = 0
if start <= 0:
print(f'TRIM_RESULT 0 {len(paths)}'); sys.exit(0)
for p in paths[:start]:
try: os.remove(p)
except OSError: pass
print(f'TRIM_RESULT {start} {len(paths) - start}')
"""
def trim_above_water_prefix(worker: dict, frames_dir: str) -> tuple[int, int]:
"""Delete leading out-of-water frames. Returns (removed, remaining)."""
script_remote = f"/tmp/cosma-trim-{os.getpid()}.py"
# Write script on worker and run it inside the lingbot-map venv (has cv2)
rc, _, err = ssh(
worker["ssh_alias"],
f"cat > {shlex.quote(script_remote)} << 'PYEOF'\n{_AUTO_TRIM_SCRIPT}\nPYEOF",
timeout=15,
)
if rc != 0:
print(f" ↳ trim script upload failed: {err[:150]}")
return (0, 0)
rc, out, err = ssh(
worker["ssh_alias"],
f"source {shlex.quote(worker['lingbot_path'])}/.venv/bin/activate && "
f"python3 {shlex.quote(script_remote)} {shlex.quote(frames_dir)}; rm -f {shlex.quote(script_remote)}",
timeout=600,
)
for line in out.splitlines():
if line.startswith("TRIM_RESULT"):
parts = line.split()
removed, remaining = int(parts[1]), int(parts[2])
return (removed, remaining)
print(f" ↳ trim unexpected output: {out[:200]} / {err[:200]}")
return (0, 0)
def scp_to_worker(local_path: str, worker: dict, remote_path: str):
"""Copy a file to the worker.
@@ -235,6 +297,11 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
# are 1-11 GB each. Frames are already extracted so worker_src is no longer needed.
ssh(worker["ssh_alias"], f"rm -f {shlex.quote(worker_src)}")
set_status(job["id"], frame_count=idx, progress=min(99, idx * 100 // total_frames_est))
# Drop the hors-eau prefix before reconstruction — always present at session start.
removed, remaining = trim_above_water_prefix(worker, frames_dir)
if removed:
print(f" ↳ job #{job['id']}: trimmed {removed} out-of-water frames, {remaining} kept")
set_status(job["id"], frame_count=remaining)
# Trim once per job so LVM thin pool on the host actually reclaims the freed blocks.
ssh(worker["ssh_alias"], "sudo fstrim / 2>/dev/null || fstrim / 2>/dev/null", timeout=60)
return frames_dir