diff --git a/pipeline/run_pipeline.sh b/pipeline/run_pipeline.sh new file mode 100755 index 0000000..ccf9ed6 --- /dev/null +++ b/pipeline/run_pipeline.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +# Run full pipeline for a mission: stages 02→03→04→05 +# Usage: ./run_pipeline.sh [worker] +# Example: ./run_pipeline.sh 20260505-Lepradet auto + +set -euo pipefail + +MISSION=${1:-20260505-Lepradet} +WORKER=${2:-auto} +MANIFEST="/home/cosma/cosma-pipeline/${MISSION}/manifest.json" +PIPELINE_DIR="$(cd "$(dirname "$0")" && pwd)/stages" +PIPELINE_BASE="/home/cosma/cosma-pipeline" +NAV_DIR="${PIPELINE_BASE}/data/${MISSION}/nav" +NAV_FILT_DIR="${PIPELINE_BASE}/data/${MISSION}/nav_filtered" +FRAMES_DIR="${PIPELINE_BASE}/data/${MISSION}/frames" + +RUN_ID="$(date +%Y%m%d_%H%M%S)" +RUN_LOG_DIR="${PIPELINE_BASE}/runs/${RUN_ID}" +mkdir -p "${RUN_LOG_DIR}" + +echo "=== Pipeline run ${RUN_ID} mission=${MISSION} worker=${WORKER} ===" | tee "${RUN_LOG_DIR}/run.log" +echo "Start: $(date -u +%Y-%m-%dT%H:%M:%SZ)" | tee -a "${RUN_LOG_DIR}/run.log" + +# Stage 02: nav parse +echo "" | tee -a "${RUN_LOG_DIR}/run.log" +echo "--- Stage 02: nav parse ---" | tee -a "${RUN_LOG_DIR}/run.log" +python3 "${PIPELINE_DIR}/02_nav_parse.py" "${MANIFEST}" \ + 2>&1 | tee -a "${RUN_LOG_DIR}/stage02.log" "${RUN_LOG_DIR}/run.log" + +# Stage 03: nav filter +echo "" | tee -a "${RUN_LOG_DIR}/run.log" +echo "--- Stage 03: nav filter ---" | tee -a "${RUN_LOG_DIR}/run.log" +python3 "${PIPELINE_DIR}/03_nav_filter.py" "${NAV_DIR}" \ + 2>&1 | tee -a "${RUN_LOG_DIR}/stage03.log" "${RUN_LOG_DIR}/run.log" + +# Stage 04: frame extract +echo "" | tee -a "${RUN_LOG_DIR}/run.log" +echo "--- Stage 04: frame extract ---" | tee -a "${RUN_LOG_DIR}/run.log" +python3 "${PIPELINE_DIR}/04_frame_extract.py" --mission "${MISSION}" \ + 2>&1 | tee -a "${RUN_LOG_DIR}/stage04.log" "${RUN_LOG_DIR}/run.log" + +# Stage 05: inference (sequential, one segment at a time) +echo "" | tee -a "${RUN_LOG_DIR}/run.log" +echo "--- Stage 05: inference ---" | tee -a "${RUN_LOG_DIR}/run.log" +python3 "${PIPELINE_DIR}/05_inference.py" \ + --frames-dir "${FRAMES_DIR}" \ + --worker "${WORKER}" \ + --mission "${MISSION}" \ + 2>&1 | tee -a "${RUN_LOG_DIR}/stage05.log" "${RUN_LOG_DIR}/run.log" + +echo "" | tee -a "${RUN_LOG_DIR}/run.log" +echo "=== Pipeline DONE $(date -u +%Y-%m-%dT%H:%M:%SZ) ===" | tee -a "${RUN_LOG_DIR}/run.log" +echo "Logs: ${RUN_LOG_DIR}/" diff --git a/pipeline/stages/02_nav_parse.py b/pipeline/stages/02_nav_parse.py new file mode 100644 index 0000000..e79dc0d --- /dev/null +++ b/pipeline/stages/02_nav_parse.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python3 +"""Stage 02 — Parse navigation from ROS2 MCAP bag files. + +Extracts per-AUV trajectories from MCAP bags using mcap_ros2: +- /mavros/global_position/global → NavSatFix (lat, lon, alt) +- /mavros/imu/data → Imu (qx, qy, qz, qw) +- /mavros/imu/static_pressure → FluidPressure (pressure_pa) + +Joins on nearest timestamp (tolerance 100ms). +Saves parquet: ~/cosma-pipeline/data//nav/_.parquet + +Fallback: if no MCAP GPS data, marks as degraded=True (GPS=0 under water is normal). + +Usage: + python3 02_nav_parse.py /home/cosma/cosma-pipeline/20260505-Lepradet/manifest.json + python3 02_nav_parse.py /path/manifest.json --auv AUV013 +""" +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path + +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent)) +from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso + +PIPELINE_BASE = Path(os.environ.get("COSMA_PIPELINE_BASE", "/home/cosma/cosma-pipeline")) +NAV_TOPICS = [ + "/mavros/global_position/global", + "/mavros/imu/data", + "/mavros/imu/static_pressure", +] + + +def parse_mcap_segment(mcap_files: list[Path]) -> dict[str, list]: + """Extract raw topic data from a list of MCAP files (one session/segment). + Returns dict keyed by topic -> list of (ts_ns, data_dict). + """ + from mcap_ros2.reader import read_ros2_messages + + topic_data: dict[str, list] = {t: [] for t in NAV_TOPICS} + + for mcap_path in mcap_files: + if not mcap_path.exists(): + continue + try: + for msg in read_ros2_messages(str(mcap_path), topics=NAV_TOPICS): + topic = msg.channel.topic + m = msg.ros_msg + ts_ns = int(msg.log_time.timestamp() * 1e9) + + if topic == "/mavros/global_position/global": + topic_data[topic].append((ts_ns, { + "lat": float(m.latitude), + "lon": float(m.longitude), + "alt": float(m.altitude), + })) + elif topic == "/mavros/imu/data": + topic_data[topic].append((ts_ns, { + "qx": float(m.orientation.x), + "qy": float(m.orientation.y), + "qz": float(m.orientation.z), + "qw": float(m.orientation.w), + })) + elif topic == "/mavros/imu/static_pressure": + topic_data[topic].append((ts_ns, { + "pressure_pa": float(m.fluid_pressure), + })) + except Exception as e: + print(f" [02] Error reading {mcap_path.name}: {e}") + + return topic_data + + +def join_topics(topic_data: dict[str, list], tol_ns: int = 100_000_000) -> list[dict]: + """Join NavSatFix + Imu + FluidPressure on nearest timestamp (100ms tol). + Base timeline = NavSatFix if available, else Imu. + """ + import pandas as pd + + nav_pts = topic_data.get("/mavros/global_position/global", []) + imu_pts = topic_data.get("/mavros/imu/data", []) + pres_pts = topic_data.get("/mavros/imu/static_pressure", []) + + if not nav_pts and not imu_pts: + return [] + + # Build DataFrames + if nav_pts: + df_nav = pd.DataFrame([{"ts_ns": ts, **d} for ts, d in nav_pts]) + else: + df_nav = pd.DataFrame(columns=["ts_ns", "lat", "lon", "alt"]) + + if imu_pts: + df_imu = pd.DataFrame([{"ts_ns": ts, **d} for ts, d in imu_pts]) + else: + df_imu = pd.DataFrame(columns=["ts_ns", "qx", "qy", "qz", "qw"]) + + if pres_pts: + df_pres = pd.DataFrame([{"ts_ns": ts, **d} for ts, d in pres_pts]) + else: + df_pres = pd.DataFrame(columns=["ts_ns", "pressure_pa"]) + + # Use nav as base if it has data, else imu + base_df = df_nav if len(df_nav) > 0 else df_imu + base_df = base_df.sort_values("ts_ns").reset_index(drop=True) + + # Merge-as-of for IMU + result = base_df.copy() + if len(df_imu) > 0: + df_imu_s = df_imu.sort_values("ts_ns").reset_index(drop=True) + # Simple nearest-neighbor join + imu_ts = df_imu_s["ts_ns"].values + for col in ["qx", "qy", "qz", "qw"]: + result[col] = np.nan + for i, row_ts in enumerate(result["ts_ns"].values): + idx = np.argmin(np.abs(imu_ts - row_ts)) + if abs(imu_ts[idx] - row_ts) <= tol_ns: + for col in ["qx", "qy", "qz", "qw"]: + result.at[i, col] = float(df_imu_s.at[idx, col]) + + # Merge pressure + if len(df_pres) > 0: + df_pres_s = df_pres.sort_values("ts_ns").reset_index(drop=True) + pres_ts = df_pres_s["ts_ns"].values + result["pressure_pa"] = np.nan + for i, row_ts in enumerate(result["ts_ns"].values): + idx = np.argmin(np.abs(pres_ts - row_ts)) + if abs(pres_ts[idx] - row_ts) <= tol_ns: + result.at[i, "pressure_pa"] = float(df_pres_s.at[idx, "pressure_pa"]) + + # Ensure all columns exist + for col in ["lat", "lon", "alt", "qx", "qy", "qz", "qw", "pressure_pa"]: + if col not in result.columns: + result[col] = np.nan + + return result.to_dict("records") + + +def parse_auv(manifest: dict, auv_id: str, out_dir: Path) -> dict: + """Parse all MCAP sessions for one AUV. Returns metrics.""" + from pathlib import Path as P + + metrics = { + "auv_id": auv_id, + "segments": [], + "total_points": 0, + "degraded": False, + "status": "ok", + } + + bag_sessions = manifest.get("bag_sessions_per_auv", {}).get(auv_id, []) + if not bag_sessions: + auv_map = manifest.get("auv_mapping", {}) + bag_auv = auv_map.get(auv_id) + if bag_auv: + bag_sessions = manifest.get("bag_sessions_per_auv", {}).get(bag_auv, []) + + if not bag_sessions: + # Build from raw SSD structure + ssd_path = P(manifest.get("ssd_path", "/mnt/ssd") + "/" + manifest["mission"].split("-")[0] + "-" + manifest["mission"].split("-")[1] if "-" in manifest["mission"] else manifest.get("ssd_path", "/mnt/ssd")) + auv_num = auv_id.replace("AUV", "0") # AUV013 -> 0013? No: AUV013 -> AUV013 + bag_root = P(manifest.get("ssd_path", "/mnt/ssd")) / "raw_data/logs/SUB/bag" + sessions = sorted(bag_root.glob(f"*_{auv_id}")) + bag_sessions = [{"label": s.name, "mcap_files": [str(f) for f in sorted(s.glob("*.mcap"))]} for s in sessions] + + import pandas as pd + + all_points_total = 0 + for sess in bag_sessions: + label = sess.get("session", sess.get("label", "unknown")) + mcap_files = [P(f) for f in sess.get("mcap_files", [])] + if not mcap_files: + continue + + out_parquet = out_dir / f"{auv_id}_{label}.parquet" + if out_parquet.exists(): + df_ex = pd.read_parquet(out_parquet) + n = len(df_ex) + print(f" [02] {auv_id}/{label}: cached ({n} pts)") + all_points_total += n + metrics["segments"].append({"label": label, "points": n, "cached": True}) + continue + + print(f" [02] {auv_id}/{label}: parsing {len(mcap_files)} MCAP files...") + topic_data = parse_mcap_segment(mcap_files) + points = join_topics(topic_data) + + if not points: + print(f" [02] {auv_id}/{label}: no data") + metrics["segments"].append({"label": label, "points": 0, "degraded": True}) + metrics["degraded"] = True + continue + + df = pd.DataFrame(points) + n = len(df) + # Check GPS quality + has_gps = df["lat"].notna().any() and (df["lat"] != 0).any() + if not has_gps: + print(f" [02] {auv_id}/{label}: {n} pts, GPS=0 (degraded — AUV underwater)") + metrics["degraded"] = True + else: + print(f" [02] {auv_id}/{label}: {n} pts, GPS OK") + + df.to_parquet(out_parquet, index=False) + all_points_total += n + metrics["segments"].append({"label": label, "points": n, "degraded": not has_gps}) + + metrics["total_points"] = all_points_total + if all_points_total == 0: + metrics["status"] = "degraded" + return metrics + + +def parse_mission(manifest_path: Path, auv_filter: str | None = None) -> list[dict]: + manifest = json.loads(manifest_path.read_text()) + mission_name = manifest["mission"] + + out_dir = PIPELINE_BASE / "data" / mission_name / "nav" + out_dir.mkdir(parents=True, exist_ok=True) + + auv_ids = list(set( + manifest.get("auv_ids_bags", []) + + list(manifest.get("auv_mapping", {}).keys()) + )) + if not auv_ids: + auv_ids = manifest.get("auv_ids_video", []) + if auv_filter: + auv_ids = [a for a in auv_ids if a == auv_filter] + + all_metrics = [] + init_db() + + for auv_id in sorted(auv_ids): + print(f"[02] === {auv_id} ===") + m = parse_auv(manifest, auv_id, out_dir) + all_metrics.append(m) + + with get_conn() as conn: + mission_row = conn.execute("SELECT id FROM missions WHERE name=?", (mission_name,)).fetchone() + if mission_row: + job_id = upsert_job(conn, mission_row["id"], auv_id, "all", "02_nav_parse", + status="done" if m["status"] == "ok" else m["status"], + output_path=str(out_dir)) + record_metric(conn, job_id, "nav_points_total", value=m["total_points"], + pass_fail="pass" if m["total_points"] > 0 else "warn") + + return all_metrics + + +def main(): + ap = argparse.ArgumentParser(description="Stage 02 — Parse nav from MCAP bags") + ap.add_argument("manifest", type=Path) + ap.add_argument("--auv", type=str, default=None) + args = ap.parse_args() + + metrics = parse_mission(args.manifest, auv_filter=args.auv) + + print("\n=== Stage 02 summary ===") + for m in metrics: + segs = m.get("segments", []) + total = m.get("total_points", 0) + deg = "DEGRADED" if m.get("degraded") else "OK" + print(f" {m['auv_id']}: {total} pts across {len(segs)} segments [{deg}]") + + +if __name__ == "__main__": + main() diff --git a/pipeline/stages/03_nav_filter.py b/pipeline/stages/03_nav_filter.py new file mode 100644 index 0000000..04b0906 --- /dev/null +++ b/pipeline/stages/03_nav_filter.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +"""Stage 03 — Filter and smooth navigation trajectories. + +Input: ~/cosma-pipeline/data//nav/_.parquet +Output: ~/cosma-pipeline/data//nav_filtered/_.parquet + +Steps: +1. Drop rows with null lat/lon OR lat==0 AND lon==0 (no GPS lock) +2. MAD-3σ outlier removal on lat, lon +3. Moving average smoothing (window 5s, KISS) +4. Depth from pressure: depth_m = (pressure_pa - 101325) / (1025 * 9.81) +5. Output: same columns + lat_smooth, lon_smooth, depth_m + +Usage: + python3 03_nav_filter.py /home/cosma/cosma-pipeline/data/20260505-Lepradet/nav/ + python3 03_nav_filter.py /path/nav/ --auv AUV013 --sigma 2.5 +""" +from __future__ import annotations + +import argparse +import os +import sys +from pathlib import Path + +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent)) +from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso + +PIPELINE_BASE = Path(os.environ.get("COSMA_PIPELINE_BASE", "/home/cosma/cosma-pipeline")) +RHO_SEA = 1025.0 # kg/m3 +G = 9.81 # m/s2 +P_ATM = 101325.0 # Pa + + +def mad_mask(arr: np.ndarray, sigma: float = 3.0) -> np.ndarray: + """True = keep.""" + if len(arr) < 4: + return np.ones(len(arr), dtype=bool) + med = np.median(arr) + mad = np.median(np.abs(arr - med)) + if mad == 0: + return np.ones(len(arr), dtype=bool) + return np.abs(0.6745 * (arr - med) / mad) < sigma + + +def moving_average(arr: np.ndarray, window: int = 5) -> np.ndarray: + if len(arr) < window: + return arr.copy() + pad = window // 2 + padded = np.pad(arr, (pad, pad), mode="edge") + return np.convolve(padded, np.ones(window) / window, mode="valid")[:len(arr)] + + +def filter_parquet(src: Path, dst_dir: Path, sigma: float = 3.0, window: int = 5) -> dict: + import pandas as pd + + df = pd.read_parquet(src) + auv_seg = src.stem + metrics = { + "file": src.name, + "points_in": len(df), + "points_out": 0, + "status": "ok", + } + + # Step 1: drop null/zero GPS + has_lat = "lat" in df.columns and df["lat"].notna().any() + if has_lat: + mask_valid = df["lat"].notna() & df["lon"].notna() & (df["lat"] != 0) & (df["lon"] != 0) + df_valid = df[mask_valid].copy() + else: + # No GPS — keep all rows for IMU/pressure + df_valid = df.copy() + metrics["degraded"] = True + + if len(df_valid) == 0: + metrics["status"] = "degraded" + metrics["note"] = "no valid GPS points" + print(f" [03] {auv_seg}: no valid GPS — saving as-is with depth calc only") + df_out = df.copy() + else: + # Step 2: MAD outlier removal on lat/lon + if has_lat and len(df_valid) >= 4: + lats = df_valid["lat"].values + lons = df_valid["lon"].values + mask = mad_mask(lats, sigma) & mad_mask(lons, sigma) + n_removed = int((~mask).sum()) + df_valid = df_valid[mask].copy() + metrics["points_removed_outlier"] = n_removed + else: + metrics["points_removed_outlier"] = 0 + + # Step 3: sort by timestamp + if "ts_ns" in df_valid.columns: + df_valid = df_valid.sort_values("ts_ns").reset_index(drop=True) + + # Step 4: smooth lat/lon + if has_lat and len(df_valid) >= window: + df_valid["lat_smooth"] = moving_average(df_valid["lat"].values, window) + df_valid["lon_smooth"] = moving_average(df_valid["lon"].values, window) + elif has_lat and len(df_valid) > 0: + df_valid["lat_smooth"] = df_valid["lat"] + df_valid["lon_smooth"] = df_valid["lon"] + else: + df_valid["lat_smooth"] = np.nan + df_valid["lon_smooth"] = np.nan + + df_out = df_valid + + # Step 5: depth from pressure + if "pressure_pa" in df_out.columns and df_out["pressure_pa"].notna().any(): + df_out["depth_m"] = (df_out["pressure_pa"] - P_ATM) / (RHO_SEA * G) + df_out["depth_m"] = df_out["depth_m"].abs() # negative when underwater (P < Patm) # surface = 0 + else: + df_out["depth_m"] = np.nan + + dst_dir.mkdir(parents=True, exist_ok=True) + out_path = dst_dir / src.name + df_out.to_parquet(out_path, index=False) + + metrics["points_out"] = len(df_out) + removed_null = metrics["points_in"] - len(df_out) - metrics.get("points_removed_outlier", 0) + metrics["points_removed_null"] = max(0, removed_null) + print(f" [03] {auv_seg}: {metrics['points_in']} → {metrics['points_out']} pts, " + f"depth_m range=[{df_out['depth_m'].min():.1f}, {df_out['depth_m'].max():.1f}]" + if df_out["depth_m"].notna().any() else + f" [03] {auv_seg}: {metrics['points_in']} → {metrics['points_out']} pts, no pressure") + return metrics + + +def filter_mission(nav_dir: Path, auv_filter: str | None = None, + sigma: float = 3.0, window: int = 5) -> list[dict]: + out_dir = nav_dir.parent / "nav_filtered" + + parquet_files = sorted(nav_dir.glob("*.parquet")) + if auv_filter: + parquet_files = [f for f in parquet_files if auv_filter in f.name] + + all_metrics = [] + init_db() + + for pf in parquet_files: + out_file = out_dir / pf.name + if out_file.exists(): + print(f"[03] {pf.stem}: cached") + continue + + print(f"[03] Filtering {pf.name}...") + m = filter_parquet(pf, out_dir, sigma=sigma, window=window) + all_metrics.append(m) + + with get_conn() as conn: + mission_name = nav_dir.parent.name + mission_row = conn.execute("SELECT id FROM missions WHERE name=?", (mission_name,)).fetchone() + if mission_row: + auv_id = pf.stem.split("_")[0] + job_id = upsert_job(conn, mission_row["id"], auv_id, "all", "03_nav_filter", + status="done" if m.get("status") == "ok" else m.get("status", "done"), + output_path=str(out_dir)) + record_metric(conn, job_id, "nav_points_filtered", value=m.get("points_out", 0), + pass_fail="pass" if m.get("points_out", 0) > 0 else "warn") + + return all_metrics + + +def main(): + ap = argparse.ArgumentParser(description="Stage 03 — Filter nav trajectories") + ap.add_argument("nav_dir", type=Path, help="Directory with *.parquet from stage 02") + ap.add_argument("--auv", type=str, default=None) + ap.add_argument("--sigma", type=float, default=3.0) + ap.add_argument("--window", type=int, default=5) + args = ap.parse_args() + + metrics = filter_mission(args.nav_dir, auv_filter=args.auv, + sigma=args.sigma, window=args.window) + + print("\n=== Stage 03 summary ===") + for m in metrics: + print(f" {m.get('file','?')}: {m.get('points_in',0)} → {m.get('points_out',0)} " + f"[{m.get('status','?')}]") + + +if __name__ == "__main__": + main() diff --git a/pipeline/stages/04b_trim_water.py b/pipeline/stages/04b_trim_water.py new file mode 100644 index 0000000..ebdb6d3 --- /dev/null +++ b/pipeline/stages/04b_trim_water.py @@ -0,0 +1,428 @@ +#!/usr/bin/env python3 +"""Stage 04b — Trim out-of-water (hors-eau) head/tail frames from already-extracted segments. + +Ports the sustained-run trim logic from cosma-qc/scripts/dispatcher.py (_AUTO_TRIM_SCRIPT, +trim_above_water_prefix) into the new cosma-pipeline pipeline. Re-runs frame QC scoring +on the trimmed set and updates state.db (jobs.status + metrics). + +Usage: + python3 04b_trim_water.py --mission 20260505-Lepradet + python3 04b_trim_water.py --mission 20260505-Lepradet --auv AUV210 --segment GX019837 + python3 04b_trim_water.py --mission 20260505-Lepradet --dry-run + +Safety: + - Skips segments where ffmpeg is still running on the frames dir (extraction in progress). + - Skips segments with a queued/running 05_inference job in state.db. + - Skips segments whose frame count is not stable over a 5s window. + - Never deletes all frames (sanity floor: keep everything if trim would empty the dir). +""" +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +import time +from pathlib import Path + +import cv2 + +sys.path.insert(0, str(Path(__file__).parent.parent)) +sys.path.insert(0, str(Path(__file__).parent)) +from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso +from lib_frame_qc import score_image_file, aggregate as qc_aggregate + +PIPELINE_BASE = Path(os.environ.get("COSMA_PIPELINE_BASE", "/home/cosma/cosma-pipeline")) +QC_SAMPLE_RATE = int(os.environ.get("COSMA_QC_SAMPLE_RATE", "5")) +QC_BOTTOM_OK_PCT = float(os.environ.get("COSMA_QC_BOTTOM_OK_PCT", "50")) +NEED_STREAK = 10 # consecutive underwater frames required to lock start/end + + +# ----------------------------------------------------------------------------- +# Trim logic (ported verbatim from dispatcher._AUTO_TRIM_SCRIPT) +# ----------------------------------------------------------------------------- + +def is_underwater(path: Path) -> bool | None: + img = cv2.imread(str(path), cv2.IMREAD_REDUCED_COLOR_4) + if img is None: + return None + b, g, r = [float(c) for c in cv2.mean(img)[:3]] + # Red is absorbed by water → R < G AND R < B on underwater shots. + return r < g - 5 and r < b - 5 + + +def trim_segment(frames_dir: Path, dry_run: bool = False) -> tuple[int, int, int]: + """Delete leading and trailing out-of-water frames. + Returns (head_removed, tail_removed, remaining). + """ + paths = sorted(frames_dir.glob("frame_*.jpg")) + if not paths: + return (0, 0, 0) + + # Scan from start + start = 0 + streak = 0 + for i, p in enumerate(paths): + uw = is_underwater(p) + if uw is None: + continue + if uw: + streak += 1 + if streak >= NEED_STREAK: + start = i - NEED_STREAK + 1 + break + else: + streak = 0 + + # Scan from end + end = len(paths) + streak = 0 + for j in range(len(paths) - 1, -1, -1): + uw = is_underwater(paths[j]) + if uw is None: + continue + if uw: + streak += 1 + if streak >= NEED_STREAK: + end = j + NEED_STREAK # exclusive + break + else: + streak = 0 + + if end <= start: + # Sanity: never delete everything. + start = 0 + end = len(paths) + + removed_head = start + removed_tail = len(paths) - end + + if not dry_run: + for p in paths[:start]: + try: + p.unlink() + except OSError: + pass + for p in paths[end:]: + try: + p.unlink() + except OSError: + pass + + return (removed_head, removed_tail, end - start) + + +# ----------------------------------------------------------------------------- +# Safety: is this segment currently being touched? +# ----------------------------------------------------------------------------- + +def has_ffmpeg_running_on(frames_dir: Path) -> bool: + """Check if any ffmpeg process is writing into frames_dir.""" + try: + r = subprocess.run( + ["pgrep", "-af", "ffmpeg"], capture_output=True, text=True, timeout=5 + ) + for line in r.stdout.splitlines(): + if str(frames_dir) in line: + return True + except Exception: + pass + return False + + +def has_inference_running_on(frames_dir: Path) -> bool: + """Check if any 05_inference.py process is running on frames_dir.""" + try: + r = subprocess.run( + ["pgrep", "-af", "05_inference"], capture_output=True, text=True, timeout=5 + ) + for line in r.stdout.splitlines(): + if str(frames_dir) in line: + return True + except Exception: + pass + return False + + +def has_pending_inference_job(conn, mission_id: int, auv_id: str, segment: str) -> bool: + """Check state.db for queued/running 05_inference job on this segment.""" + row = conn.execute( + "SELECT status FROM jobs WHERE mission_id=? AND auv_id=? " + "AND segment_label=? AND stage='05_inference'", + (mission_id, auv_id, segment), + ).fetchone() + if row is None: + return False + return row["status"] in ("queued", "running") + + +def frame_count_is_stable(frames_dir: Path, wait_s: float = 5.0) -> bool: + """Return True if the frame count doesn't change over wait_s.""" + n1 = sum(1 for _ in frames_dir.glob("frame_*.jpg")) + time.sleep(wait_s) + n2 = sum(1 for _ in frames_dir.glob("frame_*.jpg")) + return n1 == n2 + + +# ----------------------------------------------------------------------------- +# QC re-scoring (mirrors stage 04 qc_segment) +# ----------------------------------------------------------------------------- + +def qc_segment(frames_dir: Path, sample_rate: int = QC_SAMPLE_RATE) -> dict | None: + frames = sorted(frames_dir.glob("frame_*.jpg")) + if not frames: + return None + sampled = frames[::max(1, sample_rate)] + per_frame = [] + for f in sampled: + s = score_image_file(f) + if s is not None: + per_frame.append(s) + if not per_frame: + return None + agg = qc_aggregate(per_frame) + qc_payload = { + "frames_in_dir": len(frames), + "frames_sampled": len(per_frame), + "sample_rate": sample_rate, + **agg, + "per_frame": per_frame, + } + try: + (frames_dir / "qc.json").write_text(json.dumps(qc_payload, indent=2)) + except Exception as e: + print(f" [04b] qc.json write failed: {e}") + return agg + + +# ----------------------------------------------------------------------------- +# Main per-segment driver +# ----------------------------------------------------------------------------- + +def process_segment(mission_name: str, auv_id: str, segment: str, + frames_dir: Path, dry_run: bool, conn) -> dict: + result = { + "auv_id": auv_id, + "segment": segment, + "frames_dir": str(frames_dir), + "skipped": False, + "head_removed": 0, + "tail_removed": 0, + "remaining": 0, + "before_total": 0, + "before_bottom_pct": None, + "after_bottom_pct": None, + "status_before": None, + "status_after": None, + } + + if not frames_dir.is_dir(): + result["skipped"] = True + result["reason"] = "no_frames_dir" + return result + + # Safety checks + if has_ffmpeg_running_on(frames_dir): + result["skipped"] = True + result["reason"] = "ffmpeg_running" + print(f" [04b] SKIP {auv_id}/{segment}: ffmpeg still extracting") + return result + + if has_inference_running_on(frames_dir): + result["skipped"] = True + result["reason"] = "inference_running_proc" + print(f" [04b] SKIP {auv_id}/{segment}: 05_inference process running") + return result + + # Look up mission_id + current 04 job + mission_row = conn.execute( + "SELECT id FROM missions WHERE name=?", (mission_name,) + ).fetchone() + if not mission_row: + result["skipped"] = True + result["reason"] = "mission_not_in_db" + return result + mission_id = mission_row["id"] + + if has_pending_inference_job(conn, mission_id, auv_id, segment): + result["skipped"] = True + result["reason"] = "inference_job_pending" + print(f" [04b] SKIP {auv_id}/{segment}: 05_inference queued/running in DB") + return result + + if not frame_count_is_stable(frames_dir, wait_s=5.0): + result["skipped"] = True + result["reason"] = "frame_count_unstable" + print(f" [04b] SKIP {auv_id}/{segment}: frame count not stable") + return result + + # Snapshot before + before_paths = sorted(frames_dir.glob("frame_*.jpg")) + result["before_total"] = len(before_paths) + job04_row = conn.execute( + "SELECT id, status FROM jobs WHERE mission_id=? AND auv_id=? " + "AND segment_label=? AND stage='04_frame_extract'", + (mission_id, auv_id, segment), + ).fetchone() + if job04_row is None: + result["skipped"] = True + result["reason"] = "no_04_job_in_db" + print(f" [04b] SKIP {auv_id}/{segment}: no 04 job row") + return result + result["status_before"] = job04_row["status"] + + # Read current QC if available + qc_path = frames_dir / "qc.json" + if qc_path.exists(): + try: + result["before_bottom_pct"] = json.loads(qc_path.read_text()).get("bottom_visible_pct") + except Exception: + pass + + # Trim + head, tail, remaining = trim_segment(frames_dir, dry_run=dry_run) + result["head_removed"] = head + result["tail_removed"] = tail + result["remaining"] = remaining + + # Re-QC if not dry-run and something was trimmed (or always to keep metrics fresh) + after_agg = None + if not dry_run and (head > 0 or tail > 0): + after_agg = qc_segment(frames_dir) + elif dry_run: + # In dry-run, don't touch qc.json; compute aggregate from remaining slice in-memory + remaining_paths = sorted(frames_dir.glob("frame_*.jpg"))[head: len(before_paths) - tail] + sampled = remaining_paths[::max(1, QC_SAMPLE_RATE)] + per_frame = [s for s in (score_image_file(f) for f in sampled) if s is not None] + if per_frame: + after_agg = qc_aggregate(per_frame) + + if after_agg is not None: + result["after_bottom_pct"] = after_agg.get("bottom_visible_pct") + + if dry_run: + print( + f" [04b] DRY {auv_id}/{segment}: head={head} tail={tail} " + f"remaining={remaining} (before={len(before_paths)}, " + f"bottom_pct {result['before_bottom_pct']}→{result['after_bottom_pct']})" + ) + return result + + # Update DB: job row + metrics + job_id = job04_row["id"] + bottom_pct = after_agg.get("bottom_visible_pct") if after_agg else None + + if bottom_pct is not None and bottom_pct >= QC_BOTTOM_OK_PCT: + new_status = "done" + err_msg = None + elif bottom_pct is not None: + new_status = "degraded" + err_msg = f"bottom_visible_pct={bottom_pct}% <{QC_BOTTOM_OK_PCT}% (after trim)" + else: + new_status = job04_row["status"] + err_msg = None + + upsert_job( + conn, mission_id, auv_id, segment, "04_frame_extract", + status=new_status, + output_path=str(frames_dir), + error_msg=err_msg, + ) + record_metric(conn, job_id, "trimmed_head", value=float(head)) + record_metric(conn, job_id, "trimmed_tail", value=float(tail)) + record_metric(conn, job_id, "frames_after_trim", value=float(remaining)) + if after_agg: + for k in ( + "frames_total", "frames_bottom_visible", "frames_out_of_water", + "frames_turbid", "frames_water_no_bottom", + ): + if k in after_agg: + record_metric(conn, job_id, k, value=float(after_agg[k])) + if bottom_pct is not None: + record_metric( + conn, job_id, "bottom_visible_pct", + value=float(bottom_pct), + pass_fail="pass" if bottom_pct >= QC_BOTTOM_OK_PCT else "degraded", + ) + + result["status_after"] = new_status + print( + f" [04b] {auv_id}/{segment}: trimmed head={head} tail={tail} " + f"remaining={remaining}, bottom_pct={bottom_pct}% ({result['status_before']}→{new_status})" + ) + return result + + +# ----------------------------------------------------------------------------- +# Discovery + CLI +# ----------------------------------------------------------------------------- + +def find_segments(mission_name: str, auv_filter: str | None, + segment_filter: str | None) -> list[tuple[str, str, Path]]: + base = PIPELINE_BASE / "data" / mission_name / "frames" + out: list[tuple[str, str, Path]] = [] + if not base.is_dir(): + return out + for auv_dir in sorted(base.iterdir()): + if not auv_dir.is_dir(): + continue + if auv_filter and auv_dir.name != auv_filter: + continue + for seg_dir in sorted(auv_dir.iterdir()): + if not seg_dir.is_dir(): + continue + if segment_filter and seg_dir.name != segment_filter: + continue + out.append((auv_dir.name, seg_dir.name, seg_dir)) + return out + + +def main(): + ap = argparse.ArgumentParser(description="Stage 04b — Trim hors-eau head/tail frames") + ap.add_argument("--mission", default="20260505-Lepradet") + ap.add_argument("--auv") + ap.add_argument("--segment") + ap.add_argument("--dry-run", action="store_true") + args = ap.parse_args() + + init_db() + + segments = find_segments(args.mission, args.auv, args.segment) + if not segments: + print(f"[04b] No segments found under {args.mission}") + sys.exit(1) + + print(f"[04b] Mission={args.mission} segments={len(segments)} dry_run={args.dry_run}") + results: list[dict] = [] + with get_conn() as conn: + for auv_id, segment, frames_dir in segments: + try: + r = process_segment(args.mission, auv_id, segment, frames_dir, + args.dry_run, conn) + except Exception as e: + r = {"auv_id": auv_id, "segment": segment, "error": str(e), + "skipped": True} + print(f" [04b] ERR {auv_id}/{segment}: {e}") + results.append(r) + + # Summary + print("\n=== Stage 04b summary ===") + upgraded = [r for r in results + if r.get("status_before") == "degraded" and r.get("status_after") == "done"] + still_degraded = [r for r in results + if r.get("status_after") == "degraded"] + skipped = [r for r in results if r.get("skipped")] + print(f"Upgraded degraded→done : {len(upgraded)}") + for r in upgraded: + print(f" + {r['auv_id']}/{r['segment']} " + f"({r['before_bottom_pct']}%→{r['after_bottom_pct']}%, " + f"trim head={r['head_removed']} tail={r['tail_removed']})") + print(f"Still degraded : {len(still_degraded)}") + print(f"Skipped : {len(skipped)}") + for r in skipped: + print(f" - {r['auv_id']}/{r['segment']}: {r.get('reason', 'unknown')}") + + +if __name__ == "__main__": + main() diff --git a/pipeline/stages/05_inference.py b/pipeline/stages/05_inference.py new file mode 100644 index 0000000..274a1ab --- /dev/null +++ b/pipeline/stages/05_inference.py @@ -0,0 +1,288 @@ +#!/usr/bin/env python3 +"""Stage 05 — Run lingbot-map inference on extracted frames. + +Args: + --frames-dir Directory with frame_*.jpg (or parent with AUV subdirs) + --worker GPU worker selection + --mission Mission name for output paths + +Workers: + .84: /root/ai-video/lingbot-map/.venv/bin/python demo.py ... + .87: /home/floppyrj45/ai-video/lingbot-map/.venv/bin/python demo.py ... + +Auto: pick by lowest GPU memory usage (nvidia-smi via SSH). + +Flow: +1. rsync frames .83 → worker /root/cosma-frames-tmp/ (or /home/floppyrj45/) +2. SSH launch demo.py with windowed mode (window=64, overlap=16) +3. Retrieve PLY + NPZ → .83 ~/cosma-pipeline/data//ply//.{ply,npz} +4. Cleanup worker temp dir +5. Log to SQLite: duration, GPU peak mem, nb points in PLY + +Usage: + python3 05_inference.py --frames-dir ~/cosma-pipeline/data/20260505-Lepradet/frames/AUV210/GX019837 --worker auto --mission 20260505-Lepradet +""" +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) +from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso + +PIPELINE_BASE = Path(os.environ.get("COSMA_PIPELINE_BASE", "/home/cosma/cosma-pipeline")) + +WORKERS = { + ".84": { + "host": "192.168.0.84", + "user": "root", + "ai_dir": "/root/ai-video/lingbot-map", + "venv": "/root/ai-video/lingbot-map/.venv/bin/python", + "tmp_dir": "/root/cosma-frames-tmp", + }, + ".87": { + "host": "192.168.0.87", + "user": "floppyrj45", + "ai_dir": "/home/floppyrj45/ai-video/lingbot-map", + "venv": "/home/floppyrj45/ai-video/lingbot-map/.venv/bin/python", + "tmp_dir": "/home/floppyrj45/cosma-frames-tmp", + }, +} + + +def get_gpu_mem_used(worker_key: str) -> int: + """Return GPU memory used in MB via SSH nvidia-smi. Returns 99999 on error.""" + w = WORKERS[worker_key] + cmd = [ + "ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5", + f"{w['user']}@{w['host']}", + "nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits 2>/dev/null | head -1" + ] + try: + r = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + return int(r.stdout.strip()) + except Exception: + return 99999 + + +def pick_worker() -> str: + """Auto-select worker with lowest GPU memory usage.""" + best = None + best_mem = 99999 + for key in WORKERS: + mem = get_gpu_mem_used(key) + print(f" [05] Worker {key}: GPU mem={mem}MB") + if mem < best_mem: + best_mem = mem + best = key + if best is None: + raise RuntimeError("No GPU worker available") + print(f" [05] Selected worker {best}") + return best + + +def count_ply_points(ply_path: Path) -> int: + """Count vertex count in PLY file header.""" + try: + with open(ply_path, "rb") as f: + for _ in range(30): + line = f.readline().decode("ascii", errors="ignore").strip() + if line.startswith("element vertex"): + return int(line.split()[-1]) + except Exception: + pass + return 0 + + +def run_inference(frames_dir: Path, worker_key: str, mission_name: str, + auv_id: str, segment: str) -> dict: + """Run lingbot-map on one segment. Returns metrics.""" + w = WORKERS[worker_key] + host = w["host"] + user = w["user"] + ssh_target = f"{user}@{host}" + worker_frames = f"{w['tmp_dir']}/{mission_name}/{auv_id}/{segment}" + ply_remote = f"{w['tmp_dir']}/{mission_name}/{auv_id}/{segment}.ply" + npz_remote = f"{w['tmp_dir']}/{mission_name}/{auv_id}/{segment}.npz" + + out_dir = PIPELINE_BASE / "data" / mission_name / "ply" / auv_id + out_dir.mkdir(parents=True, exist_ok=True) + out_ply = out_dir / f"{segment}.ply" + out_npz = out_dir / f"{segment}.npz" + + if out_ply.exists() and out_ply.stat().st_size > 1000: + n_pts = count_ply_points(out_ply) + print(f" [05] {auv_id}/{segment}: cached PLY ({n_pts} pts)") + return {"cached": True, "ply": str(out_ply), "n_points": n_pts} + + metrics = { + "auv_id": auv_id, + "segment": segment, + "worker": worker_key, + "status": "ok", + } + + # Step 1: create remote temp dir + rsync frames + print(f" [05] rsync {frames_dir} → {ssh_target}:{worker_frames}...") + subprocess.run( + ["ssh", "-o", "StrictHostKeyChecking=no", ssh_target, + f"mkdir -p {worker_frames}"], + check=True, timeout=15, + ) + r = subprocess.run( + ["rsync", "-az", "--delete", + str(frames_dir) + "/", + f"{ssh_target}:{worker_frames}/"], + capture_output=True, text=True, timeout=600, + ) + if r.returncode != 0: + metrics["status"] = "error" + metrics["error"] = f"rsync failed: {r.stderr[-200:]}" + return metrics + print(f" [05] rsync done") + + # Step 2: build demo.py command + checkpoint = f"{w['ai_dir']}/checkpoints/lingbot-map/lingbot-map.pt" + demo_cmd = ( + f"cd {w['ai_dir']} && " + f"{w['venv']} demo.py " + f"--model_path {checkpoint} " + f"--image_folder {worker_frames} " + f"--mode windowed " + f"--window_size 64 " + f"--overlap_size 16 " + f"--save_ply {ply_remote} " + f"--save_poses {npz_remote} " + f"--use_sdpa " + f"2>&1" + ) + + print(f" [05] Launching inference on {host}...") + t0 = time.time() + r = subprocess.run( + ["ssh", "-o", "StrictHostKeyChecking=no", ssh_target, demo_cmd], + capture_output=True, text=True, timeout=7200, # 2h max + ) + elapsed = time.time() - t0 + metrics["inference_s"] = round(elapsed, 1) + + if r.returncode != 0: + metrics["status"] = "error" + metrics["error"] = r.stdout[-500:] + r.stderr[-200:] + print(f" [05] inference error: {metrics['error'][-200:]}") + return metrics + + print(f" [05] Inference done in {elapsed:.1f}s") + + # Step 3: GPU peak mem from nvidia-smi log (best-effort parse) + gpu_mem_line = [l for l in r.stdout.split("\n") if "MiB" in l] + metrics["gpu_peak_mb"] = get_gpu_mem_used(worker_key) + + # Step 4: rsync PLY + NPZ back + print(f" [05] Retrieving PLY + NPZ...") + for remote, local in [(ply_remote, out_ply), (npz_remote, out_npz)]: + r2 = subprocess.run( + ["rsync", "-az", f"{ssh_target}:{remote}", str(local)], + capture_output=True, text=True, timeout=120, + ) + if r2.returncode != 0: + print(f" [05] Warning: rsync back failed for {remote}: {r2.stderr[-100:]}") + + # Step 5: cleanup worker + subprocess.run( + ["ssh", "-o", "StrictHostKeyChecking=no", ssh_target, + f"rm -rf {worker_frames} {ply_remote} {npz_remote}"], + timeout=30, + ) + + # Count PLY points + n_pts = count_ply_points(out_ply) if out_ply.exists() else 0 + metrics["n_points"] = n_pts + metrics["ply"] = str(out_ply) + print(f" [05] PLY: {n_pts} points → {out_ply}") + + return metrics + + +def process_frames_dir(frames_dir: Path, worker_key: str, mission_name: str) -> list[dict]: + """Process a directory of frames (single segment or AUV tree).""" + # Detect if frames_dir contains frame_*.jpg directly or subdirs + direct_frames = list(frames_dir.glob("frame_*.jpg")) + + if direct_frames: + # Single segment + parts = frames_dir.parts + auv_id = frames_dir.parent.name if len(parts) >= 2 else "UNKNOWN" + segment = frames_dir.name + return [run_inference(frames_dir, worker_key, mission_name, auv_id, segment)] + + # Tree: frames_dir///frame_*.jpg + all_metrics = [] + for auv_dir in sorted(frames_dir.iterdir()): + if not auv_dir.is_dir(): + continue + auv_id = auv_dir.name + for seg_dir in sorted(auv_dir.iterdir()): + if not seg_dir.is_dir(): + continue + frames = list(seg_dir.glob("frame_*.jpg")) + if not frames: + continue + print(f"\n[05] === {auv_id}/{seg_dir.name}: {len(frames)} frames ===") + m = run_inference(seg_dir, worker_key, mission_name, auv_id, seg_dir.name) + all_metrics.append(m) + + init_db() + with get_conn() as conn: + mission_row = conn.execute( + "SELECT id FROM missions WHERE name=?", (mission_name,) + ).fetchone() + if mission_row and not m.get("cached"): + job_id = upsert_job( + conn, mission_row["id"], auv_id, seg_dir.name, "05_inference", + status="done" if m.get("status") == "ok" else m.get("status", "error"), + output_path=m.get("ply", ""), + ) + record_metric(conn, job_id, "ply_points", value=m.get("n_points", 0), + pass_fail="pass" if m.get("n_points", 0) > 100 else "fail") + if "inference_s" in m: + record_metric(conn, job_id, "inference_s", value=m["inference_s"]) + if "gpu_peak_mb" in m: + record_metric(conn, job_id, "gpu_peak_mb", value=m["gpu_peak_mb"]) + + return all_metrics + + +def main(): + ap = argparse.ArgumentParser(description="Stage 05 — lingbot-map inference") + ap.add_argument("--frames-dir", type=Path, required=True, + help="Frames dir (single segment or AUV tree)") + ap.add_argument("--worker", type=str, default="auto", + choices=["auto", ".84", ".87"]) + ap.add_argument("--mission", type=str, required=True, + help="Mission name (e.g. 20260505-Lepradet)") + args = ap.parse_args() + + worker = args.worker + if worker == "auto": + worker = pick_worker() + + metrics = process_frames_dir(args.frames_dir, worker, args.mission) + + print("\n=== Stage 05 summary ===") + total_pts = sum(m.get("n_points", 0) for m in metrics) + ok = sum(1 for m in metrics if m.get("status") == "ok" or m.get("cached")) + print(f" Segments OK: {ok}/{len(metrics)}, total PLY points: {total_pts}") + for m in metrics: + print(f" {m.get('auv_id','?')}/{m.get('segment','?')}: " + f"{m.get('n_points',0)} pts " + f"[{m.get('status','cached' if m.get('cached') else '?')}]") + + +if __name__ == "__main__": + main()