diff --git a/pipeline/stages/05_inference.py b/pipeline/stages/05_inference.py index 05b6369..06ce623 100644 --- a/pipeline/stages/05_inference.py +++ b/pipeline/stages/05_inference.py @@ -13,11 +13,12 @@ Workers: Auto: pick by lowest GPU memory usage (nvidia-smi via SSH). Flow: -1. rsync frames .83 → worker /root/cosma-frames-tmp/ (or /home/floppyrj45/) -2. SSH launch demo.py with windowed mode (window=64, overlap=16) -3. Retrieve PLY + NPZ → .83 ~/cosma-pipeline/data//ply//.{ply,npz} -4. Cleanup worker temp dir -5. Log to SQLite: duration, GPU peak mem, nb points in PLY +1. Kill any stale demo.py on worker before starting +2. rsync frames .83 → worker /root/cosma-frames-tmp/ +3. SSH launch demo.py in background; poll for PLY file; kill viser server once PLY done +4. Retrieve PLY + NPZ → .83 ~/cosma-pipeline/data//ply//.{ply,npz} +5. Cleanup worker temp dir +6. Log to SQLite: duration, GPU peak mem, nb points in PLY Usage: python3 05_inference.py --frames-dir ~/cosma-pipeline/data/20260505-Lepradet/frames/AUV210/GX019837 --worker auto --mission 20260505-Lepradet @@ -83,6 +84,21 @@ def get_gpu_mem_used(worker_key: str) -> int: return 99999 +def kill_stale_demo_py(worker_key: str) -> None: + """Kill any lingering demo.py processes on worker before starting new inference.""" + w = WORKERS[worker_key] + ssh_target = f"{w['user']}@{w['host']}" + try: + subprocess.run( + ["ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", + ssh_target, "pkill -9 -f demo.py 2>/dev/null; sleep 1; echo stale_killed"], + capture_output=True, text=True, timeout=15, + ) + print(f" [05] Stale demo.py killed on {worker_key}") + except Exception as e: + print(f" [05] Warning: kill_stale failed on {worker_key}: {e}") + + def pick_worker() -> str: """Auto-select worker with lowest GPU memory usage.""" best = None @@ -140,6 +156,9 @@ def run_inference(frames_dir: Path, worker_key: str, mission_name: str, "status": "ok", } + # Step 0: kill any stale demo.py on worker + kill_stale_demo_py(worker_key) + # Step 1: create remote temp dir + rsync frames print(f" [05] rsync {frames_dir} → {ssh_target}:{worker_frames}...") subprocess.run( @@ -165,6 +184,9 @@ def run_inference(frames_dir: Path, worker_key: str, mission_name: str, conf_thr = _INF_CFG.get("ply_conf_threshold", 1.5) kf_interval = _INF_CFG.get("keyframe_interval", 1) max_frames = _INF_CFG.get("max_frame_num", 1024) + use_offload = _INF_CFG.get("offload_to_cpu", False) + offload_flag = "--offload_to_cpu" if use_offload else "--no-offload_to_cpu" + if inf_mode == "windowed": window_size = _INF_CFG.get("window_size", 64) overlap_size = _INF_CFG.get("overlap_size", 16) @@ -179,39 +201,67 @@ def run_inference(frames_dir: Path, worker_key: str, mission_name: str, f"--keyframe_interval {kf_interval} " f"--max_frame_num {max_frames} " ) - demo_cmd = ( - f"cd {w['ai_dir']} && " - f"{w['venv']} demo.py " - f"--model_path {checkpoint} " - f"--image_folder {worker_frames} " - f"{mode_flags}" - f"--ply_conf_threshold {conf_thr} " - f"--save_ply {ply_remote} " - f"--save_poses {npz_remote} " - f"--use_sdpa " - f"--offload_to_cpu " - f"2>&1" - ) - print(f" [05] Launching inference on {host}...") + inf_timeout = int(_INF_CFG.get("inference_timeout_s", 10800)) + + # Remote script: launch demo.py in background, poll for PLY, kill viser when done + # This avoids the SSH blocking on the viser server that starts after inference + remote_script = f"""#!/bin/bash +set -e +PLY={ply_remote} +LOG=/tmp/cosma_demo_{segment}.log +# Launch demo.py in background +nohup {w['venv']} {w['ai_dir']}/demo.py \\ + --model_path {checkpoint} \\ + --image_folder {worker_frames} \\ + {mode_flags}--ply_conf_threshold {conf_thr} \\ + --save_ply \\ + --save_poses {npz_remote} \\ + --use_sdpa {offload_flag} \\ + > 2>&1 & +DEMO_PID= +echo "demo.py PID=" >&2 +# Poll for PLY file (check every 30s) +WAITED=0 +while [ -lt {inf_timeout} ]; do + if [ -f "" ] && [ $(wc -c < "") -gt 100 ]; then + sleep 10 # let write finish + echo "PLY_DONE size=$(wc -c < )" >&2 + kill 2>/dev/null || true + exit 0 + fi + # Check if process died with error + if ! kill -0 2>/dev/null; then + echo "Process died early" >&2 + exit 1 + fi + sleep 30 + WAITED=30 +done +echo "TIMEOUT after {inf_timeout}s" >&2 +kill -9 2>/dev/null || true +exit 2 +""" + + print(f" [05] Launching inference on {host} (background+poll, timeout={inf_timeout}s)...") t0 = time.time() r = subprocess.run( - ["ssh", "-o", "StrictHostKeyChecking=no", ssh_target, demo_cmd], - capture_output=True, text=True, timeout=7200, # 2h max + ["ssh", "-o", "StrictHostKeyChecking=no", ssh_target, + "bash -s"], + input=remote_script, + capture_output=True, text=True, timeout=inf_timeout + 60, ) elapsed = time.time() - t0 metrics["inference_s"] = round(elapsed, 1) if r.returncode != 0: metrics["status"] = "error" - metrics["error"] = r.stdout[-500:] + r.stderr[-200:] + metrics["error"] = (r.stdout + r.stderr)[-500:] print(f" [05] inference error: {metrics['error'][-200:]}") return metrics - print(f" [05] Inference done in {elapsed:.1f}s") + print(f" [05] Inference done in {elapsed:.1f}s (returncode={r.returncode})") - # Step 3: GPU peak mem from nvidia-smi log (best-effort parse) - gpu_mem_line = [l for l in r.stdout.split("\n") if "MiB" in l] metrics["gpu_peak_mb"] = get_gpu_mem_used(worker_key) # Step 4: rsync PLY + NPZ back @@ -242,17 +292,14 @@ def run_inference(frames_dir: Path, worker_key: str, mission_name: str, def process_frames_dir(frames_dir: Path, worker_key: str, mission_name: str) -> list[dict]: """Process a directory of frames (single segment or AUV tree).""" - # Detect if frames_dir contains frame_*.jpg directly or subdirs direct_frames = list(frames_dir.glob("frame_*.jpg")) if direct_frames: - # Single segment parts = frames_dir.parts auv_id = frames_dir.parent.name if len(parts) >= 2 else "UNKNOWN" segment = frames_dir.name return [run_inference(frames_dir, worker_key, mission_name, auv_id, segment)] - # Tree: frames_dir///frame_*.jpg all_metrics = [] for auv_dir in sorted(frames_dir.iterdir()): if not auv_dir.is_dir(): @@ -265,6 +312,19 @@ def process_frames_dir(frames_dir: Path, worker_key: str, mission_name: str) -> if not frames: continue print(f"\n[05] === {auv_id}/{seg_dir.name}: {len(frames)} frames ===") + # Guard: min frames required for model (RoPE/attention) + min_frames = int(_INF_CFG.get("min_frames_for_inference", 32)) + if len(frames) < min_frames: + print(f" [05] SKIP {auv_id}/{seg_dir.name}: {len(frames)} frames < {min_frames} min") + init_db() + with get_conn() as conn_mf: + mr = conn_mf.execute("SELECT id FROM missions WHERE name=?", (mission_name,)).fetchone() + if mr: + upsert_job(conn_mf, mr["id"], auv_id, seg_dir.name, "05_inference", + status="skipped", + error_msg=f"frames_too_few={len(frames)}<{min_frames}") + continue + m = run_inference(seg_dir, worker_key, mission_name, auv_id, seg_dir.name) all_metrics.append(m) @@ -291,12 +351,9 @@ def process_frames_dir(frames_dir: Path, worker_key: str, mission_name: str) -> def main(): ap = argparse.ArgumentParser(description="Stage 05 — lingbot-map inference") - ap.add_argument("--frames-dir", type=Path, required=True, - help="Frames dir (single segment or AUV tree)") - ap.add_argument("--worker", type=str, default="auto", - choices=["auto", ".84", ".87"]) - ap.add_argument("--mission", type=str, required=True, - help="Mission name (e.g. 20260505-Lepradet)") + ap.add_argument("--frames-dir", type=Path, required=True) + ap.add_argument("--worker", type=str, default="auto", choices=["auto", ".84", ".87"]) + ap.add_argument("--mission", type=str, required=True) args = ap.parse_args() worker = args.worker