feat(pipeline): stage 04b port trim_above_water from dispatcher

This commit is contained in:
Poulpe
2026-05-11 14:08:30 +00:00
parent 82f71fcc96
commit e09ef7886b
5 changed files with 1226 additions and 0 deletions

53
pipeline/run_pipeline.sh Executable file
View File

@@ -0,0 +1,53 @@
#!/usr/bin/env bash
# Run full pipeline for a mission: stages 02→03→04→05
# Usage: ./run_pipeline.sh <mission> [worker]
# Example: ./run_pipeline.sh 20260505-Lepradet auto
set -euo pipefail
MISSION=${1:-20260505-Lepradet}
WORKER=${2:-auto}
MANIFEST="/home/cosma/cosma-pipeline/${MISSION}/manifest.json"
PIPELINE_DIR="$(cd "$(dirname "$0")" && pwd)/stages"
PIPELINE_BASE="/home/cosma/cosma-pipeline"
NAV_DIR="${PIPELINE_BASE}/data/${MISSION}/nav"
NAV_FILT_DIR="${PIPELINE_BASE}/data/${MISSION}/nav_filtered"
FRAMES_DIR="${PIPELINE_BASE}/data/${MISSION}/frames"
RUN_ID="$(date +%Y%m%d_%H%M%S)"
RUN_LOG_DIR="${PIPELINE_BASE}/runs/${RUN_ID}"
mkdir -p "${RUN_LOG_DIR}"
echo "=== Pipeline run ${RUN_ID} mission=${MISSION} worker=${WORKER} ===" | tee "${RUN_LOG_DIR}/run.log"
echo "Start: $(date -u +%Y-%m-%dT%H:%M:%SZ)" | tee -a "${RUN_LOG_DIR}/run.log"
# Stage 02: nav parse
echo "" | tee -a "${RUN_LOG_DIR}/run.log"
echo "--- Stage 02: nav parse ---" | tee -a "${RUN_LOG_DIR}/run.log"
python3 "${PIPELINE_DIR}/02_nav_parse.py" "${MANIFEST}" \
2>&1 | tee -a "${RUN_LOG_DIR}/stage02.log" "${RUN_LOG_DIR}/run.log"
# Stage 03: nav filter
echo "" | tee -a "${RUN_LOG_DIR}/run.log"
echo "--- Stage 03: nav filter ---" | tee -a "${RUN_LOG_DIR}/run.log"
python3 "${PIPELINE_DIR}/03_nav_filter.py" "${NAV_DIR}" \
2>&1 | tee -a "${RUN_LOG_DIR}/stage03.log" "${RUN_LOG_DIR}/run.log"
# Stage 04: frame extract
echo "" | tee -a "${RUN_LOG_DIR}/run.log"
echo "--- Stage 04: frame extract ---" | tee -a "${RUN_LOG_DIR}/run.log"
python3 "${PIPELINE_DIR}/04_frame_extract.py" --mission "${MISSION}" \
2>&1 | tee -a "${RUN_LOG_DIR}/stage04.log" "${RUN_LOG_DIR}/run.log"
# Stage 05: inference (sequential, one segment at a time)
echo "" | tee -a "${RUN_LOG_DIR}/run.log"
echo "--- Stage 05: inference ---" | tee -a "${RUN_LOG_DIR}/run.log"
python3 "${PIPELINE_DIR}/05_inference.py" \
--frames-dir "${FRAMES_DIR}" \
--worker "${WORKER}" \
--mission "${MISSION}" \
2>&1 | tee -a "${RUN_LOG_DIR}/stage05.log" "${RUN_LOG_DIR}/run.log"
echo "" | tee -a "${RUN_LOG_DIR}/run.log"
echo "=== Pipeline DONE $(date -u +%Y-%m-%dT%H:%M:%SZ) ===" | tee -a "${RUN_LOG_DIR}/run.log"
echo "Logs: ${RUN_LOG_DIR}/"

View File

@@ -0,0 +1,272 @@
#!/usr/bin/env python3
"""Stage 02 — Parse navigation from ROS2 MCAP bag files.
Extracts per-AUV trajectories from MCAP bags using mcap_ros2:
- /mavros/global_position/global → NavSatFix (lat, lon, alt)
- /mavros/imu/data → Imu (qx, qy, qz, qw)
- /mavros/imu/static_pressure → FluidPressure (pressure_pa)
Joins on nearest timestamp (tolerance 100ms).
Saves parquet: ~/cosma-pipeline/data/<mission>/nav/<AUV>_<segment>.parquet
Fallback: if no MCAP GPS data, marks as degraded=True (GPS=0 under water is normal).
Usage:
python3 02_nav_parse.py /home/cosma/cosma-pipeline/20260505-Lepradet/manifest.json
python3 02_nav_parse.py /path/manifest.json --auv AUV013
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
import numpy as np
sys.path.insert(0, str(Path(__file__).parent.parent))
from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso
PIPELINE_BASE = Path(os.environ.get("COSMA_PIPELINE_BASE", "/home/cosma/cosma-pipeline"))
NAV_TOPICS = [
"/mavros/global_position/global",
"/mavros/imu/data",
"/mavros/imu/static_pressure",
]
def parse_mcap_segment(mcap_files: list[Path]) -> dict[str, list]:
"""Extract raw topic data from a list of MCAP files (one session/segment).
Returns dict keyed by topic -> list of (ts_ns, data_dict).
"""
from mcap_ros2.reader import read_ros2_messages
topic_data: dict[str, list] = {t: [] for t in NAV_TOPICS}
for mcap_path in mcap_files:
if not mcap_path.exists():
continue
try:
for msg in read_ros2_messages(str(mcap_path), topics=NAV_TOPICS):
topic = msg.channel.topic
m = msg.ros_msg
ts_ns = int(msg.log_time.timestamp() * 1e9)
if topic == "/mavros/global_position/global":
topic_data[topic].append((ts_ns, {
"lat": float(m.latitude),
"lon": float(m.longitude),
"alt": float(m.altitude),
}))
elif topic == "/mavros/imu/data":
topic_data[topic].append((ts_ns, {
"qx": float(m.orientation.x),
"qy": float(m.orientation.y),
"qz": float(m.orientation.z),
"qw": float(m.orientation.w),
}))
elif topic == "/mavros/imu/static_pressure":
topic_data[topic].append((ts_ns, {
"pressure_pa": float(m.fluid_pressure),
}))
except Exception as e:
print(f" [02] Error reading {mcap_path.name}: {e}")
return topic_data
def join_topics(topic_data: dict[str, list], tol_ns: int = 100_000_000) -> list[dict]:
"""Join NavSatFix + Imu + FluidPressure on nearest timestamp (100ms tol).
Base timeline = NavSatFix if available, else Imu.
"""
import pandas as pd
nav_pts = topic_data.get("/mavros/global_position/global", [])
imu_pts = topic_data.get("/mavros/imu/data", [])
pres_pts = topic_data.get("/mavros/imu/static_pressure", [])
if not nav_pts and not imu_pts:
return []
# Build DataFrames
if nav_pts:
df_nav = pd.DataFrame([{"ts_ns": ts, **d} for ts, d in nav_pts])
else:
df_nav = pd.DataFrame(columns=["ts_ns", "lat", "lon", "alt"])
if imu_pts:
df_imu = pd.DataFrame([{"ts_ns": ts, **d} for ts, d in imu_pts])
else:
df_imu = pd.DataFrame(columns=["ts_ns", "qx", "qy", "qz", "qw"])
if pres_pts:
df_pres = pd.DataFrame([{"ts_ns": ts, **d} for ts, d in pres_pts])
else:
df_pres = pd.DataFrame(columns=["ts_ns", "pressure_pa"])
# Use nav as base if it has data, else imu
base_df = df_nav if len(df_nav) > 0 else df_imu
base_df = base_df.sort_values("ts_ns").reset_index(drop=True)
# Merge-as-of for IMU
result = base_df.copy()
if len(df_imu) > 0:
df_imu_s = df_imu.sort_values("ts_ns").reset_index(drop=True)
# Simple nearest-neighbor join
imu_ts = df_imu_s["ts_ns"].values
for col in ["qx", "qy", "qz", "qw"]:
result[col] = np.nan
for i, row_ts in enumerate(result["ts_ns"].values):
idx = np.argmin(np.abs(imu_ts - row_ts))
if abs(imu_ts[idx] - row_ts) <= tol_ns:
for col in ["qx", "qy", "qz", "qw"]:
result.at[i, col] = float(df_imu_s.at[idx, col])
# Merge pressure
if len(df_pres) > 0:
df_pres_s = df_pres.sort_values("ts_ns").reset_index(drop=True)
pres_ts = df_pres_s["ts_ns"].values
result["pressure_pa"] = np.nan
for i, row_ts in enumerate(result["ts_ns"].values):
idx = np.argmin(np.abs(pres_ts - row_ts))
if abs(pres_ts[idx] - row_ts) <= tol_ns:
result.at[i, "pressure_pa"] = float(df_pres_s.at[idx, "pressure_pa"])
# Ensure all columns exist
for col in ["lat", "lon", "alt", "qx", "qy", "qz", "qw", "pressure_pa"]:
if col not in result.columns:
result[col] = np.nan
return result.to_dict("records")
def parse_auv(manifest: dict, auv_id: str, out_dir: Path) -> dict:
"""Parse all MCAP sessions for one AUV. Returns metrics."""
from pathlib import Path as P
metrics = {
"auv_id": auv_id,
"segments": [],
"total_points": 0,
"degraded": False,
"status": "ok",
}
bag_sessions = manifest.get("bag_sessions_per_auv", {}).get(auv_id, [])
if not bag_sessions:
auv_map = manifest.get("auv_mapping", {})
bag_auv = auv_map.get(auv_id)
if bag_auv:
bag_sessions = manifest.get("bag_sessions_per_auv", {}).get(bag_auv, [])
if not bag_sessions:
# Build from raw SSD structure
ssd_path = P(manifest.get("ssd_path", "/mnt/ssd") + "/" + manifest["mission"].split("-")[0] + "-" + manifest["mission"].split("-")[1] if "-" in manifest["mission"] else manifest.get("ssd_path", "/mnt/ssd"))
auv_num = auv_id.replace("AUV", "0") # AUV013 -> 0013? No: AUV013 -> AUV013
bag_root = P(manifest.get("ssd_path", "/mnt/ssd")) / "raw_data/logs/SUB/bag"
sessions = sorted(bag_root.glob(f"*_{auv_id}"))
bag_sessions = [{"label": s.name, "mcap_files": [str(f) for f in sorted(s.glob("*.mcap"))]} for s in sessions]
import pandas as pd
all_points_total = 0
for sess in bag_sessions:
label = sess.get("session", sess.get("label", "unknown"))
mcap_files = [P(f) for f in sess.get("mcap_files", [])]
if not mcap_files:
continue
out_parquet = out_dir / f"{auv_id}_{label}.parquet"
if out_parquet.exists():
df_ex = pd.read_parquet(out_parquet)
n = len(df_ex)
print(f" [02] {auv_id}/{label}: cached ({n} pts)")
all_points_total += n
metrics["segments"].append({"label": label, "points": n, "cached": True})
continue
print(f" [02] {auv_id}/{label}: parsing {len(mcap_files)} MCAP files...")
topic_data = parse_mcap_segment(mcap_files)
points = join_topics(topic_data)
if not points:
print(f" [02] {auv_id}/{label}: no data")
metrics["segments"].append({"label": label, "points": 0, "degraded": True})
metrics["degraded"] = True
continue
df = pd.DataFrame(points)
n = len(df)
# Check GPS quality
has_gps = df["lat"].notna().any() and (df["lat"] != 0).any()
if not has_gps:
print(f" [02] {auv_id}/{label}: {n} pts, GPS=0 (degraded — AUV underwater)")
metrics["degraded"] = True
else:
print(f" [02] {auv_id}/{label}: {n} pts, GPS OK")
df.to_parquet(out_parquet, index=False)
all_points_total += n
metrics["segments"].append({"label": label, "points": n, "degraded": not has_gps})
metrics["total_points"] = all_points_total
if all_points_total == 0:
metrics["status"] = "degraded"
return metrics
def parse_mission(manifest_path: Path, auv_filter: str | None = None) -> list[dict]:
manifest = json.loads(manifest_path.read_text())
mission_name = manifest["mission"]
out_dir = PIPELINE_BASE / "data" / mission_name / "nav"
out_dir.mkdir(parents=True, exist_ok=True)
auv_ids = list(set(
manifest.get("auv_ids_bags", []) +
list(manifest.get("auv_mapping", {}).keys())
))
if not auv_ids:
auv_ids = manifest.get("auv_ids_video", [])
if auv_filter:
auv_ids = [a for a in auv_ids if a == auv_filter]
all_metrics = []
init_db()
for auv_id in sorted(auv_ids):
print(f"[02] === {auv_id} ===")
m = parse_auv(manifest, auv_id, out_dir)
all_metrics.append(m)
with get_conn() as conn:
mission_row = conn.execute("SELECT id FROM missions WHERE name=?", (mission_name,)).fetchone()
if mission_row:
job_id = upsert_job(conn, mission_row["id"], auv_id, "all", "02_nav_parse",
status="done" if m["status"] == "ok" else m["status"],
output_path=str(out_dir))
record_metric(conn, job_id, "nav_points_total", value=m["total_points"],
pass_fail="pass" if m["total_points"] > 0 else "warn")
return all_metrics
def main():
ap = argparse.ArgumentParser(description="Stage 02 — Parse nav from MCAP bags")
ap.add_argument("manifest", type=Path)
ap.add_argument("--auv", type=str, default=None)
args = ap.parse_args()
metrics = parse_mission(args.manifest, auv_filter=args.auv)
print("\n=== Stage 02 summary ===")
for m in metrics:
segs = m.get("segments", [])
total = m.get("total_points", 0)
deg = "DEGRADED" if m.get("degraded") else "OK"
print(f" {m['auv_id']}: {total} pts across {len(segs)} segments [{deg}]")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,185 @@
#!/usr/bin/env python3
"""Stage 03 — Filter and smooth navigation trajectories.
Input: ~/cosma-pipeline/data/<mission>/nav/<AUV>_<segment>.parquet
Output: ~/cosma-pipeline/data/<mission>/nav_filtered/<AUV>_<segment>.parquet
Steps:
1. Drop rows with null lat/lon OR lat==0 AND lon==0 (no GPS lock)
2. MAD-3σ outlier removal on lat, lon
3. Moving average smoothing (window 5s, KISS)
4. Depth from pressure: depth_m = (pressure_pa - 101325) / (1025 * 9.81)
5. Output: same columns + lat_smooth, lon_smooth, depth_m
Usage:
python3 03_nav_filter.py /home/cosma/cosma-pipeline/data/20260505-Lepradet/nav/
python3 03_nav_filter.py /path/nav/ --auv AUV013 --sigma 2.5
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
import numpy as np
sys.path.insert(0, str(Path(__file__).parent.parent))
from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso
PIPELINE_BASE = Path(os.environ.get("COSMA_PIPELINE_BASE", "/home/cosma/cosma-pipeline"))
RHO_SEA = 1025.0 # kg/m3
G = 9.81 # m/s2
P_ATM = 101325.0 # Pa
def mad_mask(arr: np.ndarray, sigma: float = 3.0) -> np.ndarray:
"""True = keep."""
if len(arr) < 4:
return np.ones(len(arr), dtype=bool)
med = np.median(arr)
mad = np.median(np.abs(arr - med))
if mad == 0:
return np.ones(len(arr), dtype=bool)
return np.abs(0.6745 * (arr - med) / mad) < sigma
def moving_average(arr: np.ndarray, window: int = 5) -> np.ndarray:
if len(arr) < window:
return arr.copy()
pad = window // 2
padded = np.pad(arr, (pad, pad), mode="edge")
return np.convolve(padded, np.ones(window) / window, mode="valid")[:len(arr)]
def filter_parquet(src: Path, dst_dir: Path, sigma: float = 3.0, window: int = 5) -> dict:
import pandas as pd
df = pd.read_parquet(src)
auv_seg = src.stem
metrics = {
"file": src.name,
"points_in": len(df),
"points_out": 0,
"status": "ok",
}
# Step 1: drop null/zero GPS
has_lat = "lat" in df.columns and df["lat"].notna().any()
if has_lat:
mask_valid = df["lat"].notna() & df["lon"].notna() & (df["lat"] != 0) & (df["lon"] != 0)
df_valid = df[mask_valid].copy()
else:
# No GPS — keep all rows for IMU/pressure
df_valid = df.copy()
metrics["degraded"] = True
if len(df_valid) == 0:
metrics["status"] = "degraded"
metrics["note"] = "no valid GPS points"
print(f" [03] {auv_seg}: no valid GPS — saving as-is with depth calc only")
df_out = df.copy()
else:
# Step 2: MAD outlier removal on lat/lon
if has_lat and len(df_valid) >= 4:
lats = df_valid["lat"].values
lons = df_valid["lon"].values
mask = mad_mask(lats, sigma) & mad_mask(lons, sigma)
n_removed = int((~mask).sum())
df_valid = df_valid[mask].copy()
metrics["points_removed_outlier"] = n_removed
else:
metrics["points_removed_outlier"] = 0
# Step 3: sort by timestamp
if "ts_ns" in df_valid.columns:
df_valid = df_valid.sort_values("ts_ns").reset_index(drop=True)
# Step 4: smooth lat/lon
if has_lat and len(df_valid) >= window:
df_valid["lat_smooth"] = moving_average(df_valid["lat"].values, window)
df_valid["lon_smooth"] = moving_average(df_valid["lon"].values, window)
elif has_lat and len(df_valid) > 0:
df_valid["lat_smooth"] = df_valid["lat"]
df_valid["lon_smooth"] = df_valid["lon"]
else:
df_valid["lat_smooth"] = np.nan
df_valid["lon_smooth"] = np.nan
df_out = df_valid
# Step 5: depth from pressure
if "pressure_pa" in df_out.columns and df_out["pressure_pa"].notna().any():
df_out["depth_m"] = (df_out["pressure_pa"] - P_ATM) / (RHO_SEA * G)
df_out["depth_m"] = df_out["depth_m"].abs() # negative when underwater (P < Patm) # surface = 0
else:
df_out["depth_m"] = np.nan
dst_dir.mkdir(parents=True, exist_ok=True)
out_path = dst_dir / src.name
df_out.to_parquet(out_path, index=False)
metrics["points_out"] = len(df_out)
removed_null = metrics["points_in"] - len(df_out) - metrics.get("points_removed_outlier", 0)
metrics["points_removed_null"] = max(0, removed_null)
print(f" [03] {auv_seg}: {metrics['points_in']}{metrics['points_out']} pts, "
f"depth_m range=[{df_out['depth_m'].min():.1f}, {df_out['depth_m'].max():.1f}]"
if df_out["depth_m"].notna().any() else
f" [03] {auv_seg}: {metrics['points_in']}{metrics['points_out']} pts, no pressure")
return metrics
def filter_mission(nav_dir: Path, auv_filter: str | None = None,
sigma: float = 3.0, window: int = 5) -> list[dict]:
out_dir = nav_dir.parent / "nav_filtered"
parquet_files = sorted(nav_dir.glob("*.parquet"))
if auv_filter:
parquet_files = [f for f in parquet_files if auv_filter in f.name]
all_metrics = []
init_db()
for pf in parquet_files:
out_file = out_dir / pf.name
if out_file.exists():
print(f"[03] {pf.stem}: cached")
continue
print(f"[03] Filtering {pf.name}...")
m = filter_parquet(pf, out_dir, sigma=sigma, window=window)
all_metrics.append(m)
with get_conn() as conn:
mission_name = nav_dir.parent.name
mission_row = conn.execute("SELECT id FROM missions WHERE name=?", (mission_name,)).fetchone()
if mission_row:
auv_id = pf.stem.split("_")[0]
job_id = upsert_job(conn, mission_row["id"], auv_id, "all", "03_nav_filter",
status="done" if m.get("status") == "ok" else m.get("status", "done"),
output_path=str(out_dir))
record_metric(conn, job_id, "nav_points_filtered", value=m.get("points_out", 0),
pass_fail="pass" if m.get("points_out", 0) > 0 else "warn")
return all_metrics
def main():
ap = argparse.ArgumentParser(description="Stage 03 — Filter nav trajectories")
ap.add_argument("nav_dir", type=Path, help="Directory with *.parquet from stage 02")
ap.add_argument("--auv", type=str, default=None)
ap.add_argument("--sigma", type=float, default=3.0)
ap.add_argument("--window", type=int, default=5)
args = ap.parse_args()
metrics = filter_mission(args.nav_dir, auv_filter=args.auv,
sigma=args.sigma, window=args.window)
print("\n=== Stage 03 summary ===")
for m in metrics:
print(f" {m.get('file','?')}: {m.get('points_in',0)}{m.get('points_out',0)} "
f"[{m.get('status','?')}]")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,428 @@
#!/usr/bin/env python3
"""Stage 04b — Trim out-of-water (hors-eau) head/tail frames from already-extracted segments.
Ports the sustained-run trim logic from cosma-qc/scripts/dispatcher.py (_AUTO_TRIM_SCRIPT,
trim_above_water_prefix) into the new cosma-pipeline pipeline. Re-runs frame QC scoring
on the trimmed set and updates state.db (jobs.status + metrics).
Usage:
python3 04b_trim_water.py --mission 20260505-Lepradet
python3 04b_trim_water.py --mission 20260505-Lepradet --auv AUV210 --segment GX019837
python3 04b_trim_water.py --mission 20260505-Lepradet --dry-run
Safety:
- Skips segments where ffmpeg is still running on the frames dir (extraction in progress).
- Skips segments with a queued/running 05_inference job in state.db.
- Skips segments whose frame count is not stable over a 5s window.
- Never deletes all frames (sanity floor: keep everything if trim would empty the dir).
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
import time
from pathlib import Path
import cv2
sys.path.insert(0, str(Path(__file__).parent.parent))
sys.path.insert(0, str(Path(__file__).parent))
from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso
from lib_frame_qc import score_image_file, aggregate as qc_aggregate
PIPELINE_BASE = Path(os.environ.get("COSMA_PIPELINE_BASE", "/home/cosma/cosma-pipeline"))
QC_SAMPLE_RATE = int(os.environ.get("COSMA_QC_SAMPLE_RATE", "5"))
QC_BOTTOM_OK_PCT = float(os.environ.get("COSMA_QC_BOTTOM_OK_PCT", "50"))
NEED_STREAK = 10 # consecutive underwater frames required to lock start/end
# -----------------------------------------------------------------------------
# Trim logic (ported verbatim from dispatcher._AUTO_TRIM_SCRIPT)
# -----------------------------------------------------------------------------
def is_underwater(path: Path) -> bool | None:
img = cv2.imread(str(path), cv2.IMREAD_REDUCED_COLOR_4)
if img is None:
return None
b, g, r = [float(c) for c in cv2.mean(img)[:3]]
# Red is absorbed by water → R < G AND R < B on underwater shots.
return r < g - 5 and r < b - 5
def trim_segment(frames_dir: Path, dry_run: bool = False) -> tuple[int, int, int]:
"""Delete leading and trailing out-of-water frames.
Returns (head_removed, tail_removed, remaining).
"""
paths = sorted(frames_dir.glob("frame_*.jpg"))
if not paths:
return (0, 0, 0)
# Scan from start
start = 0
streak = 0
for i, p in enumerate(paths):
uw = is_underwater(p)
if uw is None:
continue
if uw:
streak += 1
if streak >= NEED_STREAK:
start = i - NEED_STREAK + 1
break
else:
streak = 0
# Scan from end
end = len(paths)
streak = 0
for j in range(len(paths) - 1, -1, -1):
uw = is_underwater(paths[j])
if uw is None:
continue
if uw:
streak += 1
if streak >= NEED_STREAK:
end = j + NEED_STREAK # exclusive
break
else:
streak = 0
if end <= start:
# Sanity: never delete everything.
start = 0
end = len(paths)
removed_head = start
removed_tail = len(paths) - end
if not dry_run:
for p in paths[:start]:
try:
p.unlink()
except OSError:
pass
for p in paths[end:]:
try:
p.unlink()
except OSError:
pass
return (removed_head, removed_tail, end - start)
# -----------------------------------------------------------------------------
# Safety: is this segment currently being touched?
# -----------------------------------------------------------------------------
def has_ffmpeg_running_on(frames_dir: Path) -> bool:
"""Check if any ffmpeg process is writing into frames_dir."""
try:
r = subprocess.run(
["pgrep", "-af", "ffmpeg"], capture_output=True, text=True, timeout=5
)
for line in r.stdout.splitlines():
if str(frames_dir) in line:
return True
except Exception:
pass
return False
def has_inference_running_on(frames_dir: Path) -> bool:
"""Check if any 05_inference.py process is running on frames_dir."""
try:
r = subprocess.run(
["pgrep", "-af", "05_inference"], capture_output=True, text=True, timeout=5
)
for line in r.stdout.splitlines():
if str(frames_dir) in line:
return True
except Exception:
pass
return False
def has_pending_inference_job(conn, mission_id: int, auv_id: str, segment: str) -> bool:
"""Check state.db for queued/running 05_inference job on this segment."""
row = conn.execute(
"SELECT status FROM jobs WHERE mission_id=? AND auv_id=? "
"AND segment_label=? AND stage='05_inference'",
(mission_id, auv_id, segment),
).fetchone()
if row is None:
return False
return row["status"] in ("queued", "running")
def frame_count_is_stable(frames_dir: Path, wait_s: float = 5.0) -> bool:
"""Return True if the frame count doesn't change over wait_s."""
n1 = sum(1 for _ in frames_dir.glob("frame_*.jpg"))
time.sleep(wait_s)
n2 = sum(1 for _ in frames_dir.glob("frame_*.jpg"))
return n1 == n2
# -----------------------------------------------------------------------------
# QC re-scoring (mirrors stage 04 qc_segment)
# -----------------------------------------------------------------------------
def qc_segment(frames_dir: Path, sample_rate: int = QC_SAMPLE_RATE) -> dict | None:
frames = sorted(frames_dir.glob("frame_*.jpg"))
if not frames:
return None
sampled = frames[::max(1, sample_rate)]
per_frame = []
for f in sampled:
s = score_image_file(f)
if s is not None:
per_frame.append(s)
if not per_frame:
return None
agg = qc_aggregate(per_frame)
qc_payload = {
"frames_in_dir": len(frames),
"frames_sampled": len(per_frame),
"sample_rate": sample_rate,
**agg,
"per_frame": per_frame,
}
try:
(frames_dir / "qc.json").write_text(json.dumps(qc_payload, indent=2))
except Exception as e:
print(f" [04b] qc.json write failed: {e}")
return agg
# -----------------------------------------------------------------------------
# Main per-segment driver
# -----------------------------------------------------------------------------
def process_segment(mission_name: str, auv_id: str, segment: str,
frames_dir: Path, dry_run: bool, conn) -> dict:
result = {
"auv_id": auv_id,
"segment": segment,
"frames_dir": str(frames_dir),
"skipped": False,
"head_removed": 0,
"tail_removed": 0,
"remaining": 0,
"before_total": 0,
"before_bottom_pct": None,
"after_bottom_pct": None,
"status_before": None,
"status_after": None,
}
if not frames_dir.is_dir():
result["skipped"] = True
result["reason"] = "no_frames_dir"
return result
# Safety checks
if has_ffmpeg_running_on(frames_dir):
result["skipped"] = True
result["reason"] = "ffmpeg_running"
print(f" [04b] SKIP {auv_id}/{segment}: ffmpeg still extracting")
return result
if has_inference_running_on(frames_dir):
result["skipped"] = True
result["reason"] = "inference_running_proc"
print(f" [04b] SKIP {auv_id}/{segment}: 05_inference process running")
return result
# Look up mission_id + current 04 job
mission_row = conn.execute(
"SELECT id FROM missions WHERE name=?", (mission_name,)
).fetchone()
if not mission_row:
result["skipped"] = True
result["reason"] = "mission_not_in_db"
return result
mission_id = mission_row["id"]
if has_pending_inference_job(conn, mission_id, auv_id, segment):
result["skipped"] = True
result["reason"] = "inference_job_pending"
print(f" [04b] SKIP {auv_id}/{segment}: 05_inference queued/running in DB")
return result
if not frame_count_is_stable(frames_dir, wait_s=5.0):
result["skipped"] = True
result["reason"] = "frame_count_unstable"
print(f" [04b] SKIP {auv_id}/{segment}: frame count not stable")
return result
# Snapshot before
before_paths = sorted(frames_dir.glob("frame_*.jpg"))
result["before_total"] = len(before_paths)
job04_row = conn.execute(
"SELECT id, status FROM jobs WHERE mission_id=? AND auv_id=? "
"AND segment_label=? AND stage='04_frame_extract'",
(mission_id, auv_id, segment),
).fetchone()
if job04_row is None:
result["skipped"] = True
result["reason"] = "no_04_job_in_db"
print(f" [04b] SKIP {auv_id}/{segment}: no 04 job row")
return result
result["status_before"] = job04_row["status"]
# Read current QC if available
qc_path = frames_dir / "qc.json"
if qc_path.exists():
try:
result["before_bottom_pct"] = json.loads(qc_path.read_text()).get("bottom_visible_pct")
except Exception:
pass
# Trim
head, tail, remaining = trim_segment(frames_dir, dry_run=dry_run)
result["head_removed"] = head
result["tail_removed"] = tail
result["remaining"] = remaining
# Re-QC if not dry-run and something was trimmed (or always to keep metrics fresh)
after_agg = None
if not dry_run and (head > 0 or tail > 0):
after_agg = qc_segment(frames_dir)
elif dry_run:
# In dry-run, don't touch qc.json; compute aggregate from remaining slice in-memory
remaining_paths = sorted(frames_dir.glob("frame_*.jpg"))[head: len(before_paths) - tail]
sampled = remaining_paths[::max(1, QC_SAMPLE_RATE)]
per_frame = [s for s in (score_image_file(f) for f in sampled) if s is not None]
if per_frame:
after_agg = qc_aggregate(per_frame)
if after_agg is not None:
result["after_bottom_pct"] = after_agg.get("bottom_visible_pct")
if dry_run:
print(
f" [04b] DRY {auv_id}/{segment}: head={head} tail={tail} "
f"remaining={remaining} (before={len(before_paths)}, "
f"bottom_pct {result['before_bottom_pct']}{result['after_bottom_pct']})"
)
return result
# Update DB: job row + metrics
job_id = job04_row["id"]
bottom_pct = after_agg.get("bottom_visible_pct") if after_agg else None
if bottom_pct is not None and bottom_pct >= QC_BOTTOM_OK_PCT:
new_status = "done"
err_msg = None
elif bottom_pct is not None:
new_status = "degraded"
err_msg = f"bottom_visible_pct={bottom_pct}% <{QC_BOTTOM_OK_PCT}% (after trim)"
else:
new_status = job04_row["status"]
err_msg = None
upsert_job(
conn, mission_id, auv_id, segment, "04_frame_extract",
status=new_status,
output_path=str(frames_dir),
error_msg=err_msg,
)
record_metric(conn, job_id, "trimmed_head", value=float(head))
record_metric(conn, job_id, "trimmed_tail", value=float(tail))
record_metric(conn, job_id, "frames_after_trim", value=float(remaining))
if after_agg:
for k in (
"frames_total", "frames_bottom_visible", "frames_out_of_water",
"frames_turbid", "frames_water_no_bottom",
):
if k in after_agg:
record_metric(conn, job_id, k, value=float(after_agg[k]))
if bottom_pct is not None:
record_metric(
conn, job_id, "bottom_visible_pct",
value=float(bottom_pct),
pass_fail="pass" if bottom_pct >= QC_BOTTOM_OK_PCT else "degraded",
)
result["status_after"] = new_status
print(
f" [04b] {auv_id}/{segment}: trimmed head={head} tail={tail} "
f"remaining={remaining}, bottom_pct={bottom_pct}% ({result['status_before']}{new_status})"
)
return result
# -----------------------------------------------------------------------------
# Discovery + CLI
# -----------------------------------------------------------------------------
def find_segments(mission_name: str, auv_filter: str | None,
segment_filter: str | None) -> list[tuple[str, str, Path]]:
base = PIPELINE_BASE / "data" / mission_name / "frames"
out: list[tuple[str, str, Path]] = []
if not base.is_dir():
return out
for auv_dir in sorted(base.iterdir()):
if not auv_dir.is_dir():
continue
if auv_filter and auv_dir.name != auv_filter:
continue
for seg_dir in sorted(auv_dir.iterdir()):
if not seg_dir.is_dir():
continue
if segment_filter and seg_dir.name != segment_filter:
continue
out.append((auv_dir.name, seg_dir.name, seg_dir))
return out
def main():
ap = argparse.ArgumentParser(description="Stage 04b — Trim hors-eau head/tail frames")
ap.add_argument("--mission", default="20260505-Lepradet")
ap.add_argument("--auv")
ap.add_argument("--segment")
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
init_db()
segments = find_segments(args.mission, args.auv, args.segment)
if not segments:
print(f"[04b] No segments found under {args.mission}")
sys.exit(1)
print(f"[04b] Mission={args.mission} segments={len(segments)} dry_run={args.dry_run}")
results: list[dict] = []
with get_conn() as conn:
for auv_id, segment, frames_dir in segments:
try:
r = process_segment(args.mission, auv_id, segment, frames_dir,
args.dry_run, conn)
except Exception as e:
r = {"auv_id": auv_id, "segment": segment, "error": str(e),
"skipped": True}
print(f" [04b] ERR {auv_id}/{segment}: {e}")
results.append(r)
# Summary
print("\n=== Stage 04b summary ===")
upgraded = [r for r in results
if r.get("status_before") == "degraded" and r.get("status_after") == "done"]
still_degraded = [r for r in results
if r.get("status_after") == "degraded"]
skipped = [r for r in results if r.get("skipped")]
print(f"Upgraded degraded→done : {len(upgraded)}")
for r in upgraded:
print(f" + {r['auv_id']}/{r['segment']} "
f"({r['before_bottom_pct']}%→{r['after_bottom_pct']}%, "
f"trim head={r['head_removed']} tail={r['tail_removed']})")
print(f"Still degraded : {len(still_degraded)}")
print(f"Skipped : {len(skipped)}")
for r in skipped:
print(f" - {r['auv_id']}/{r['segment']}: {r.get('reason', 'unknown')}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,288 @@
#!/usr/bin/env python3
"""Stage 05 — Run lingbot-map inference on extracted frames.
Args:
--frames-dir <path> Directory with frame_*.jpg (or parent with AUV subdirs)
--worker <auto|.84|.87> GPU worker selection
--mission <name> Mission name for output paths
Workers:
.84: /root/ai-video/lingbot-map/.venv/bin/python demo.py ...
.87: /home/floppyrj45/ai-video/lingbot-map/.venv/bin/python demo.py ...
Auto: pick by lowest GPU memory usage (nvidia-smi via SSH).
Flow:
1. rsync frames .83 → worker /root/cosma-frames-tmp/ (or /home/floppyrj45/)
2. SSH launch demo.py with windowed mode (window=64, overlap=16)
3. Retrieve PLY + NPZ → .83 ~/cosma-pipeline/data/<mission>/ply/<AUV>/<segment>.{ply,npz}
4. Cleanup worker temp dir
5. Log to SQLite: duration, GPU peak mem, nb points in PLY
Usage:
python3 05_inference.py --frames-dir ~/cosma-pipeline/data/20260505-Lepradet/frames/AUV210/GX019837 --worker auto --mission 20260505-Lepradet
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso
PIPELINE_BASE = Path(os.environ.get("COSMA_PIPELINE_BASE", "/home/cosma/cosma-pipeline"))
WORKERS = {
".84": {
"host": "192.168.0.84",
"user": "root",
"ai_dir": "/root/ai-video/lingbot-map",
"venv": "/root/ai-video/lingbot-map/.venv/bin/python",
"tmp_dir": "/root/cosma-frames-tmp",
},
".87": {
"host": "192.168.0.87",
"user": "floppyrj45",
"ai_dir": "/home/floppyrj45/ai-video/lingbot-map",
"venv": "/home/floppyrj45/ai-video/lingbot-map/.venv/bin/python",
"tmp_dir": "/home/floppyrj45/cosma-frames-tmp",
},
}
def get_gpu_mem_used(worker_key: str) -> int:
"""Return GPU memory used in MB via SSH nvidia-smi. Returns 99999 on error."""
w = WORKERS[worker_key]
cmd = [
"ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5",
f"{w['user']}@{w['host']}",
"nvidia-smi --query-gpu=memory.used --format=csv,noheader,nounits 2>/dev/null | head -1"
]
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
return int(r.stdout.strip())
except Exception:
return 99999
def pick_worker() -> str:
"""Auto-select worker with lowest GPU memory usage."""
best = None
best_mem = 99999
for key in WORKERS:
mem = get_gpu_mem_used(key)
print(f" [05] Worker {key}: GPU mem={mem}MB")
if mem < best_mem:
best_mem = mem
best = key
if best is None:
raise RuntimeError("No GPU worker available")
print(f" [05] Selected worker {best}")
return best
def count_ply_points(ply_path: Path) -> int:
"""Count vertex count in PLY file header."""
try:
with open(ply_path, "rb") as f:
for _ in range(30):
line = f.readline().decode("ascii", errors="ignore").strip()
if line.startswith("element vertex"):
return int(line.split()[-1])
except Exception:
pass
return 0
def run_inference(frames_dir: Path, worker_key: str, mission_name: str,
auv_id: str, segment: str) -> dict:
"""Run lingbot-map on one segment. Returns metrics."""
w = WORKERS[worker_key]
host = w["host"]
user = w["user"]
ssh_target = f"{user}@{host}"
worker_frames = f"{w['tmp_dir']}/{mission_name}/{auv_id}/{segment}"
ply_remote = f"{w['tmp_dir']}/{mission_name}/{auv_id}/{segment}.ply"
npz_remote = f"{w['tmp_dir']}/{mission_name}/{auv_id}/{segment}.npz"
out_dir = PIPELINE_BASE / "data" / mission_name / "ply" / auv_id
out_dir.mkdir(parents=True, exist_ok=True)
out_ply = out_dir / f"{segment}.ply"
out_npz = out_dir / f"{segment}.npz"
if out_ply.exists() and out_ply.stat().st_size > 1000:
n_pts = count_ply_points(out_ply)
print(f" [05] {auv_id}/{segment}: cached PLY ({n_pts} pts)")
return {"cached": True, "ply": str(out_ply), "n_points": n_pts}
metrics = {
"auv_id": auv_id,
"segment": segment,
"worker": worker_key,
"status": "ok",
}
# Step 1: create remote temp dir + rsync frames
print(f" [05] rsync {frames_dir}{ssh_target}:{worker_frames}...")
subprocess.run(
["ssh", "-o", "StrictHostKeyChecking=no", ssh_target,
f"mkdir -p {worker_frames}"],
check=True, timeout=15,
)
r = subprocess.run(
["rsync", "-az", "--delete",
str(frames_dir) + "/",
f"{ssh_target}:{worker_frames}/"],
capture_output=True, text=True, timeout=600,
)
if r.returncode != 0:
metrics["status"] = "error"
metrics["error"] = f"rsync failed: {r.stderr[-200:]}"
return metrics
print(f" [05] rsync done")
# Step 2: build demo.py command
checkpoint = f"{w['ai_dir']}/checkpoints/lingbot-map/lingbot-map.pt"
demo_cmd = (
f"cd {w['ai_dir']} && "
f"{w['venv']} demo.py "
f"--model_path {checkpoint} "
f"--image_folder {worker_frames} "
f"--mode windowed "
f"--window_size 64 "
f"--overlap_size 16 "
f"--save_ply {ply_remote} "
f"--save_poses {npz_remote} "
f"--use_sdpa "
f"2>&1"
)
print(f" [05] Launching inference on {host}...")
t0 = time.time()
r = subprocess.run(
["ssh", "-o", "StrictHostKeyChecking=no", ssh_target, demo_cmd],
capture_output=True, text=True, timeout=7200, # 2h max
)
elapsed = time.time() - t0
metrics["inference_s"] = round(elapsed, 1)
if r.returncode != 0:
metrics["status"] = "error"
metrics["error"] = r.stdout[-500:] + r.stderr[-200:]
print(f" [05] inference error: {metrics['error'][-200:]}")
return metrics
print(f" [05] Inference done in {elapsed:.1f}s")
# Step 3: GPU peak mem from nvidia-smi log (best-effort parse)
gpu_mem_line = [l for l in r.stdout.split("\n") if "MiB" in l]
metrics["gpu_peak_mb"] = get_gpu_mem_used(worker_key)
# Step 4: rsync PLY + NPZ back
print(f" [05] Retrieving PLY + NPZ...")
for remote, local in [(ply_remote, out_ply), (npz_remote, out_npz)]:
r2 = subprocess.run(
["rsync", "-az", f"{ssh_target}:{remote}", str(local)],
capture_output=True, text=True, timeout=120,
)
if r2.returncode != 0:
print(f" [05] Warning: rsync back failed for {remote}: {r2.stderr[-100:]}")
# Step 5: cleanup worker
subprocess.run(
["ssh", "-o", "StrictHostKeyChecking=no", ssh_target,
f"rm -rf {worker_frames} {ply_remote} {npz_remote}"],
timeout=30,
)
# Count PLY points
n_pts = count_ply_points(out_ply) if out_ply.exists() else 0
metrics["n_points"] = n_pts
metrics["ply"] = str(out_ply)
print(f" [05] PLY: {n_pts} points → {out_ply}")
return metrics
def process_frames_dir(frames_dir: Path, worker_key: str, mission_name: str) -> list[dict]:
"""Process a directory of frames (single segment or AUV tree)."""
# Detect if frames_dir contains frame_*.jpg directly or subdirs
direct_frames = list(frames_dir.glob("frame_*.jpg"))
if direct_frames:
# Single segment
parts = frames_dir.parts
auv_id = frames_dir.parent.name if len(parts) >= 2 else "UNKNOWN"
segment = frames_dir.name
return [run_inference(frames_dir, worker_key, mission_name, auv_id, segment)]
# Tree: frames_dir/<AUV>/<segment>/frame_*.jpg
all_metrics = []
for auv_dir in sorted(frames_dir.iterdir()):
if not auv_dir.is_dir():
continue
auv_id = auv_dir.name
for seg_dir in sorted(auv_dir.iterdir()):
if not seg_dir.is_dir():
continue
frames = list(seg_dir.glob("frame_*.jpg"))
if not frames:
continue
print(f"\n[05] === {auv_id}/{seg_dir.name}: {len(frames)} frames ===")
m = run_inference(seg_dir, worker_key, mission_name, auv_id, seg_dir.name)
all_metrics.append(m)
init_db()
with get_conn() as conn:
mission_row = conn.execute(
"SELECT id FROM missions WHERE name=?", (mission_name,)
).fetchone()
if mission_row and not m.get("cached"):
job_id = upsert_job(
conn, mission_row["id"], auv_id, seg_dir.name, "05_inference",
status="done" if m.get("status") == "ok" else m.get("status", "error"),
output_path=m.get("ply", ""),
)
record_metric(conn, job_id, "ply_points", value=m.get("n_points", 0),
pass_fail="pass" if m.get("n_points", 0) > 100 else "fail")
if "inference_s" in m:
record_metric(conn, job_id, "inference_s", value=m["inference_s"])
if "gpu_peak_mb" in m:
record_metric(conn, job_id, "gpu_peak_mb", value=m["gpu_peak_mb"])
return all_metrics
def main():
ap = argparse.ArgumentParser(description="Stage 05 — lingbot-map inference")
ap.add_argument("--frames-dir", type=Path, required=True,
help="Frames dir (single segment or AUV tree)")
ap.add_argument("--worker", type=str, default="auto",
choices=["auto", ".84", ".87"])
ap.add_argument("--mission", type=str, required=True,
help="Mission name (e.g. 20260505-Lepradet)")
args = ap.parse_args()
worker = args.worker
if worker == "auto":
worker = pick_worker()
metrics = process_frames_dir(args.frames_dir, worker, args.mission)
print("\n=== Stage 05 summary ===")
total_pts = sum(m.get("n_points", 0) for m in metrics)
ok = sum(1 for m in metrics if m.get("status") == "ok" or m.get("cached"))
print(f" Segments OK: {ok}/{len(metrics)}, total PLY points: {total_pts}")
for m in metrics:
print(f" {m.get('auv_id','?')}/{m.get('segment','?')}: "
f"{m.get('n_points',0)} pts "
f"[{m.get('status','cached' if m.get('cached') else '?')}]")
if __name__ == "__main__":
main()