diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..498b99d --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ + +# pipeline +__pycache__/ +*.pyc +*.db diff --git a/pipeline/README.md b/pipeline/README.md new file mode 100644 index 0000000..b976ca5 --- /dev/null +++ b/pipeline/README.md @@ -0,0 +1,33 @@ +# cosma-pipeline + +Pipeline autonome de reconstruction COSMA. + +## Structure + +``` +pipeline/ +├── config/ # Seuils QA + params par défaut (versionnés) +├── orchestrator/ # DB SQLite, dispatcher, FastAPI +├── stages/ # Modules indépendants 01..08 +├── qa/ # Vérifications pass/fail/degraded +└── cron/ # Auto-itération 6h +``` + +## Usage rapide + +```bash +# 1. Ingest +python3 pipeline/stages/01_ingest.py /mnt/ssd/20260505-Lepradet --name 20260505-Lepradet + +# 2. Parse USBL +python3 pipeline/stages/02_usbl_parse.py /home/cosma/cosma-pipeline/20260505-Lepradet/manifest.json + +# 3. Filter +python3 pipeline/stages/03_usbl_filter.py /home/cosma/cosma-pipeline/20260505-Lepradet/02_usbl_raw/ +``` + +## Notes données + +- `logs/SUB/log/*_usbl.csv` = bytes série bruts (Waterlinked M64), PAS lat/lon +- Navigation réelle dans `logs/SUB/bag/*.mcap` (ROS2 MCAP) +- Mapping AUV : vidéos utilisent AUV2xx, bags utilisent AUV0xx (même 2 derniers chiffres) diff --git a/pipeline/config/default_params.yaml b/pipeline/config/default_params.yaml new file mode 100644 index 0000000..a453290 --- /dev/null +++ b/pipeline/config/default_params.yaml @@ -0,0 +1,58 @@ +# Default params per stage — overridable per-run via CLI or cron patch +stage_01_ingest: + gap_min: 5 # minutes gap to split segment + ssd_root: /mnt/ssd + output_dir: /home/cosma/cosma-pipeline + +stage_02_usbl_parse: + # USBL log/ CSVs are raw serial frames — real nav is in bag/*.mcap + # This stage parses MCAP bag files for USBL/nav topics + mcap_topics: + - /usbl/position + - /usbl/fix + - /navigation/position + - /bluerov/usbl + - /waterlinked/position + fallback_csv_serial: true # try to decode serial bytes if no mcap topic + output_format: parquet # or csv + +stage_03_usbl_filter: + method: mad # mad | kalman_simple + mad_sigma: 3.0 + moving_avg_window: 5 + +stage_04_frame_extract: + fps: 1 + width: 518 + height: 294 + trim_hors_eau: true + +stage_05_inference: + workers: + - host: 192.168.0.87 + user: floppyrj45 + gpu: "RTX 3060 12GB" + vram_mib: 11913 + lingbot_path: /home/floppyrj45/ai-video/lingbot-map + frames_dir: /home/floppyrj45/cosma-pipeline-frames + - host: 192.168.0.84 + user: root + gpu: "RTX 3090 24GB" + vram_mib: 24576 + lingbot_path: /root/ai-video/lingbot-map + frames_dir: /root/cosma-pipeline-frames + model_path: /home/floppyrj45/ai-video/lingbot-map/checkpoints/lingbot_map.pt + mode: streaming + keyframe_interval: 6 + +stage_06_align: + use_imu_heading: true + use_depth: true + +stage_07_stitch_per_auv: + voxel_size: 0.05 + use_ransac: true + +stage_08_stitch_cross_auv: + voxel_size: 0.1 + final_icp: true diff --git a/pipeline/config/thresholds.yaml b/pipeline/config/thresholds.yaml new file mode 100644 index 0000000..5ba3aeb --- /dev/null +++ b/pipeline/config/thresholds.yaml @@ -0,0 +1,34 @@ +# QA thresholds — tuned from iteration cron +usbl: + min_points_per_segment: 5 # fewer → degraded + max_gap_seconds: 30 # gap > this → split segment + mad_sigma: 3.0 # MAD outlier threshold + moving_avg_window: 5 # smoothing window + +ingest: + min_video_seconds: 120 # shorter segments skipped + max_timestamp_delta_seconds: 60 # EXIF vs USBL match tolerance + +frame_extract: + fps: 1 + width: 518 + height: 294 + underwater_r_minus_g: 5 # R < G-5 AND R < B-5 → hors eau + trim_min_frames: 8 # skip if fewer underwater frames + +inference: + ply_conf_threshold: 1.5 + max_frame_num: 1024 + mode: streaming + keyframe_interval: 6 + +align: + max_translation_m: 500 # sanity check on alignment + min_inlier_ratio: 0.3 # umeyama inlier ratio + +stitch: + voxel_size: 0.05 + icp_max_distance: 0.5 + icp_iterations: 50 + use_ransac: true + ransac_iterations: 100000 diff --git a/pipeline/orchestrator/__init__.py b/pipeline/orchestrator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipeline/orchestrator/db.py b/pipeline/orchestrator/db.py new file mode 100644 index 0000000..1b712af --- /dev/null +++ b/pipeline/orchestrator/db.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +"""SQLite schema for cosma-pipeline orchestrator. + +Tables: + missions — one row per mission folder on SSD + jobs — one row per (mission, auv, segment, stage) + metrics — one row per (job, metric_name) for QA + cron iteration +""" +from __future__ import annotations + +import os +import sqlite3 +from contextlib import contextmanager +from datetime import datetime, timezone +from pathlib import Path + +DB_PATH = Path(os.environ.get("COSMA_PIPELINE_DB", "/home/cosma/cosma-pipeline/state.db")) + +SCHEMA = """ +CREATE TABLE IF NOT EXISTS missions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE, + ssd_path TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + -- pending | ingesting | running | done | degraded | error + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + manifest TEXT, -- JSON blob from 01_ingest + notes TEXT +); + +CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + mission_id INTEGER NOT NULL REFERENCES missions(id), + auv_id TEXT NOT NULL, -- e.g. AUV010 + segment_label TEXT NOT NULL, -- e.g. 2026-05-05_08-16-00 + stage TEXT NOT NULL, -- 01_ingest .. 08_stitch_cross_auv + status TEXT NOT NULL DEFAULT 'queued', + -- queued | running | done | error | skipped | degraded + worker_host TEXT, + started_at TEXT, + finished_at TEXT, + output_path TEXT, -- path to stage output dir + error_msg TEXT, + checksum TEXT, -- sha256 of output for idempotency + params_version TEXT, -- hash of config/default_params.yaml at run time + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS metrics ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER NOT NULL REFERENCES jobs(id), + name TEXT NOT NULL, -- e.g. usbl_points_before, usbl_points_after + value REAL, + text_value TEXT, + pass_fail TEXT, -- pass | fail | degraded | skip + recorded_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_jobs_mission ON jobs(mission_id); +CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status); +CREATE INDEX IF NOT EXISTS idx_metrics_job ON metrics(job_id); +CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(name); +""" + + +def now_iso() -> str: + return datetime.now(timezone.utc).isoformat(timespec="seconds") + + +def init_db(path: Path | None = None) -> Path: + p = path or DB_PATH + p.parent.mkdir(parents=True, exist_ok=True) + with sqlite3.connect(p) as conn: + conn.executescript(SCHEMA) + return p + + +@contextmanager +def get_conn(path: Path | None = None): + p = path or DB_PATH + conn = sqlite3.connect(p) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + +def upsert_mission(conn: sqlite3.Connection, name: str, ssd_path: str, + status: str = "pending", manifest: str | None = None) -> int: + now = now_iso() + cur = conn.execute( + "SELECT id FROM missions WHERE name = ?", (name,) + ) + row = cur.fetchone() + if row: + conn.execute( + "UPDATE missions SET ssd_path=?, status=?, manifest=?, updated_at=? WHERE id=?", + (ssd_path, status, manifest, now, row["id"]) + ) + return row["id"] + else: + cur = conn.execute( + "INSERT INTO missions (name, ssd_path, status, manifest, created_at, updated_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (name, ssd_path, status, manifest, now, now) + ) + return cur.lastrowid + + +def upsert_job(conn: sqlite3.Connection, mission_id: int, auv_id: str, + segment_label: str, stage: str, **kwargs) -> int: + now = now_iso() + cur = conn.execute( + "SELECT id FROM jobs WHERE mission_id=? AND auv_id=? AND segment_label=? AND stage=?", + (mission_id, auv_id, segment_label, stage) + ) + row = cur.fetchone() + fields = {k: v for k, v in kwargs.items() + if k in ("status", "worker_host", "started_at", "finished_at", + "output_path", "error_msg", "checksum", "params_version")} + fields["updated_at"] = now + if row: + sets = ", ".join(f"{k}=?" for k in fields) + vals = list(fields.values()) + [row["id"]] + conn.execute(f"UPDATE jobs SET {sets} WHERE id=?", vals) + return row["id"] + else: + fields.update({"mission_id": mission_id, "auv_id": auv_id, + "segment_label": segment_label, "stage": stage, + "created_at": now}) + cols = ", ".join(fields.keys()) + placeholders = ", ".join("?" for _ in fields) + cur = conn.execute(f"INSERT INTO jobs ({cols}) VALUES ({placeholders})", + list(fields.values())) + return cur.lastrowid + + +def record_metric(conn: sqlite3.Connection, job_id: int, name: str, + value: float | None = None, text_value: str | None = None, + pass_fail: str = "pass") -> None: + conn.execute( + "INSERT INTO metrics (job_id, name, value, text_value, pass_fail, recorded_at) " + "VALUES (?, ?, ?, ?, ?, ?)", + (job_id, name, value, text_value, pass_fail, now_iso()) + ) + + +if __name__ == "__main__": + p = init_db() + print(f"DB initialized: {p}") diff --git a/pipeline/pyproject.toml b/pipeline/pyproject.toml new file mode 100644 index 0000000..26ddfc0 --- /dev/null +++ b/pipeline/pyproject.toml @@ -0,0 +1,21 @@ +[project] +name = "cosma-pipeline" +version = "0.1.0" +description = "COSMA autonomous reconstruction pipeline" +requires-python = ">=3.11" +dependencies = [ + "pandas>=2.0", + "scipy>=1.11", + "numpy>=1.26", + "fastapi>=0.115", + "uvicorn[standard]>=0.30", + "sqlmodel>=0.0.18", + "pyyaml>=6.0", + "tqdm>=4.66", + "open3d>=0.18", + "mcap>=1.1", + "mcap-ros2-support>=0.5", +] + +[tool.uv] +package = false diff --git a/pipeline/qa/__init__.py b/pipeline/qa/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pipeline/qa/checks.py b/pipeline/qa/checks.py new file mode 100644 index 0000000..a3de0fc --- /dev/null +++ b/pipeline/qa/checks.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +"""QA checks — each function returns {metric: value, pass_fail: str, details: str}.""" +from __future__ import annotations + +import json +from pathlib import Path + + +def check_ingest(manifest_path: Path) -> dict: + try: + m = json.loads(manifest_path.read_text()) + n_auv_video = len(m.get("auv_ids_video", [])) + n_auv_bags = len(m.get("auv_ids_bags", [])) + total_s = m.get("total_video_s", 0) + segs = sum(len(v) for v in m.get("segments_per_auv", {}).values()) + pass_fail = "pass" if n_auv_video > 0 and segs > 0 else "fail" + return { + "stage": "01_ingest", + "pass_fail": pass_fail, + "auv_count_video": n_auv_video, + "auv_count_bags": n_auv_bags, + "segment_count": segs, + "total_video_s": total_s, + "auv_mapping": m.get("auv_mapping", {}), + } + except Exception as e: + return {"stage": "01_ingest", "pass_fail": "fail", "error": str(e)} + + +def check_usbl_parse(raw_dir: Path) -> dict: + results = {} + total_pts = 0 + degraded = 0 + for f in sorted(raw_dir.glob("*_nav_raw.json")): + try: + d = json.loads(f.read_text()) + pts = len(d.get("points", [])) + status = d.get("metrics", {}).get("status", "?") + total_pts += pts + if status == "degraded": + degraded += 1 + results[d.get("auv_id", f.stem)] = {"points": pts, "status": status} + except Exception as e: + results[f.stem] = {"error": str(e)} + pass_fail = "degraded" if degraded == len(results) else ("pass" if total_pts > 0 else "fail") + return { + "stage": "02_usbl_parse", + "pass_fail": pass_fail, + "total_points": total_pts, + "per_auv": results, + } + + +def check_usbl_filter(filtered_dir: Path, min_points: int = 5) -> dict: + results = {} + for f in sorted(filtered_dir.glob("*_nav_filtered.json")): + try: + d = json.loads(f.read_text()) + pts_after = len(d.get("points", [])) + m = d.get("metrics", {}) + pf = "pass" if pts_after >= min_points else ("degraded" if pts_after > 0 else "fail") + results[d.get("auv_id", f.stem)] = { + "before": m.get("points_before", 0), + "after": pts_after, + "removed_null": m.get("points_removed_null", 0), + "removed_outlier": m.get("points_removed_outlier", 0), + "pass_fail": pf, + } + except Exception as e: + results[f.stem] = {"error": str(e)} + overall = "pass" + if all(v.get("pass_fail") == "fail" for v in results.values() if "error" not in v): + overall = "fail" + elif any(v.get("pass_fail") == "degraded" for v in results.values() if "error" not in v): + overall = "degraded" + return {"stage": "03_usbl_filter", "pass_fail": overall, "per_auv": results} diff --git a/pipeline/stages/01_ingest.py b/pipeline/stages/01_ingest.py new file mode 100644 index 0000000..3fc668a --- /dev/null +++ b/pipeline/stages/01_ingest.py @@ -0,0 +1,381 @@ +#!/usr/bin/env python3 +"""Stage 01 — Ingest mission folder from SSD. + +Scans /mnt/ssd//raw_data/ and builds a manifest: +- Videos per AUV+GoPro segment (from medias/videos/) +- USBL/bag sessions per AUV (from logs/SUB/bag/*.mcap) +- Auto-detects AUV ID mapping (AUV010↔AUV210 etc.) by timestamp proximity + +Usage: + python3 01_ingest.py /mnt/ssd/20260505-Lepradet --name 20260505-Lepradet + python3 01_ingest.py /mnt/ssd/20260505-Lepradet --name 20260505-Lepradet --out /home/cosma/cosma-pipeline +""" +from __future__ import annotations + +import argparse +import hashlib +import json +import os +import re +import subprocess +import sys +from datetime import datetime, timedelta +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) +from orchestrator.db import init_db, get_conn, upsert_mission, now_iso + +LOG_DIR = Path(os.environ.get("COSMA_PIPELINE_LOGS", "/home/cosma/cosma-pipeline/logs")) +LOG_DIR.mkdir(parents=True, exist_ok=True) + +# AUV ID normalization: GP folder uses AUV2xx, bag files use AUV0xx +# e.g. AUV210 <-> AUV010, AUV213 <-> AUV013, AUV212 <-> AUV012 +AUV_FOLDER_RE = re.compile(r"GP(\d+)[_-]AUV(\d+)", re.I) +AUV_BAG_RE = re.compile(r"auv(\d+)", re.I) +AUV_CSV_RE = re.compile(r"AUV(\d+)", re.I) + + +def normalize_auv(raw_id: str) -> str: + """Normalize AUV IDs: strip leading zeros or map 0xx -> 2xx heuristic. + Returns canonical form like AUV010, AUV013, AUV210, AUV212 as-is. + We keep original for now and detect mapping via timestamp cross-correlation. + """ + m = re.search(r"\d+", raw_id) + if not m: + return raw_id.upper() + n = int(m.group()) + return f"AUV{n:03d}" + + +def exif_create_date(path: Path) -> datetime | None: + try: + out = subprocess.check_output( + ["exiftool", "-s3", "-CreateDate", "-api", "QuickTimeUTC=1", str(path)], + stderr=subprocess.DEVNULL, text=True, timeout=10 + ).strip() + if not out: + return None + out = re.sub(r'[+-]\d{2}:\d{2}$', '', out).strip() + return datetime.strptime(out, "%Y:%m:%d %H:%M:%S") + except Exception: + return None + + +def exif_duration_s(path: Path) -> float | None: + try: + out = subprocess.check_output( + ["exiftool", "-s3", "-Duration#", str(path)], + stderr=subprocess.DEVNULL, text=True, timeout=10 + ).strip() + return float(out) if out else None + except Exception: + return None + + +def scan_videos(raw_data: Path) -> dict[str, list[dict]]: + """Scan medias/videos/ and return dict {auv_id: [video_info, ...]}. + Handles both GP1-AUV210 and GP1_AUV210 naming conventions. + """ + videos_dir = raw_data / "medias" / "videos" + if not videos_dir.exists(): + return {} + + result: dict[str, list[dict]] = {} + for folder in sorted(videos_dir.iterdir()): + if not folder.is_dir(): + continue + m = AUV_FOLDER_RE.search(folder.name) + if not m: + continue + gopro_n = int(m.group(1)) + auv_id = normalize_auv(m.group(2)) + + mp4_files = sorted(folder.glob("*.MP4")) + sorted(folder.glob("*.mp4")) + for mp4 in mp4_files: + create_date = exif_create_date(mp4) + duration = exif_duration_s(mp4) + info = { + "path": str(mp4), + "gopro": gopro_n, + "auv_id": auv_id, + "filename": mp4.name, + "create_date": create_date.isoformat() if create_date else None, + "duration_s": duration, + "size_mb": round(mp4.stat().st_size / 1e6, 1), + } + result.setdefault(auv_id, []).append(info) + + return result + + +def scan_bags(raw_data: Path) -> dict[str, list[dict]]: + """Scan logs/SUB/bag/ for MCAP files grouped by session+AUV.""" + bag_dir = raw_data / "logs" / "SUB" / "bag" + if not bag_dir.exists(): + return {} + + result: dict[str, list[dict]] = {} + for session_dir in sorted(bag_dir.iterdir()): + if not session_dir.is_dir(): + continue + # dir name: 20260505_074718_AUV013 + m = re.match(r"(\d{8}_\d{6})_AUV(\d+)", session_dir.name) + if not m: + continue + ts_str = m.group(1) + auv_id = normalize_auv(m.group(2)) + try: + ts = datetime.strptime(ts_str, "%Y%m%d_%H%M%S") + except ValueError: + ts = None + + mcap_files = sorted(session_dir.glob("*.mcap")) + total_size = sum(f.stat().st_size for f in mcap_files) + non_empty = [str(f) for f in mcap_files if f.stat().st_size > 0] + + if not non_empty: + continue + + session_info = { + "session": session_dir.name, + "auv_id": auv_id, + "timestamp": ts.isoformat() if ts else None, + "mcap_files": non_empty, + "total_mb": round(total_size / 1e6, 1), + } + result.setdefault(auv_id, []).append(session_info) + + return result + + +def scan_usbl_csv(raw_data: Path) -> dict[str, list[dict]]: + """Scan logs/SUB/log/ for *_usbl.csv files. + Note: these are raw serial byte logs, not lat/lon CSV. + We record them for reference; actual nav comes from MCAP bags. + """ + log_dir = raw_data / "logs" / "SUB" / "log" + if not log_dir.exists(): + return {} + + result: dict[str, list[dict]] = {} + for f in sorted(log_dir.glob("*_usbl.csv")): + m = AUV_CSV_RE.search(f.name) + if not m: + continue + auv_id = normalize_auv(m.group(1)) + # parse timestamp from filename: 2026-05-05_08-16-00_AUV010_usbl.csv + ts_m = re.match(r"(\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2})", f.name) + ts = None + if ts_m: + try: + ts = datetime.strptime(ts_m.group(1), "%Y-%m-%d_%H-%M-%S") + except ValueError: + pass + result.setdefault(auv_id, []).append({ + "path": str(f), + "auv_id": auv_id, + "timestamp": ts.isoformat() if ts else None, + "size_kb": round(f.stat().st_size / 1e3, 1), + "format": "raw_serial", # NOT lat/lon CSV + }) + + return result + + +def detect_auv_mapping(videos: dict, bags: dict) -> dict[str, str]: + """Detect AUV ID mapping between video folders (AUV2xx) and bag sessions (AUV0xx). + + Heuristic: if video AUV2xx and bag AUV0xx share same last 2 digits and + timestamps are within 60s → they are the same physical AUV. + + Returns: {video_auv_id: bag_auv_id} e.g. {"AUV210": "AUV010"} + """ + mapping: dict[str, str] = {} + + for vid_auv in videos: + vid_ts_list = [] + for v in videos[vid_auv]: + if v.get("create_date"): + try: + vid_ts_list.append(datetime.fromisoformat(v["create_date"])) + except Exception: + pass + if not vid_ts_list: + continue + + vid_digits = vid_auv[-2:] # last 2 digits of AUV2xx + + best_bag_auv = None + best_delta = timedelta(seconds=999) + + for bag_auv in bags: + # check same last 2 digits + if bag_auv[-2:] != vid_digits: + continue + for sess in bags[bag_auv]: + if sess.get("timestamp"): + try: + bag_ts = datetime.fromisoformat(sess["timestamp"]) + except Exception: + continue + for vt in vid_ts_list: + delta = abs(vt - bag_ts) + if delta < best_delta: + best_delta = delta + best_bag_auv = bag_auv + + if best_bag_auv and best_delta < timedelta(seconds=3600): + mapping[vid_auv] = best_bag_auv + + return mapping + + +def group_video_segments(video_list: list[dict], gap_min: int = 5) -> list[dict]: + """Group consecutive videos into segments by timestamp gap.""" + sorted_vids = sorted( + [v for v in video_list if v.get("create_date")], + key=lambda x: x["create_date"] + ) + if not sorted_vids: + return [{"videos": video_list, "label": "seg_unknown", "total_s": None}] + + segments = [] + current: list[dict] = [sorted_vids[0]] + + for vid in sorted_vids[1:]: + prev = current[-1] + try: + prev_end = datetime.fromisoformat(prev["create_date"]) + if prev.get("duration_s"): + prev_end += timedelta(seconds=prev["duration_s"]) + cur_start = datetime.fromisoformat(vid["create_date"]) + gap = (cur_start - prev_end).total_seconds() / 60 + if gap > gap_min: + segments.append(_finalize_segment(current)) + current = [vid] + else: + current.append(vid) + except Exception: + current.append(vid) + + if current: + segments.append(_finalize_segment(current)) + + return segments + + +def _finalize_segment(videos: list[dict]) -> dict: + label = videos[0]["create_date"][:19].replace(":", "-").replace("T", "_") if videos[0].get("create_date") else "seg_unknown" + total_s = sum(v["duration_s"] or 0 for v in videos) + return { + "label": label, + "videos": videos, + "total_s": total_s, + "start": videos[0].get("create_date"), + "end": videos[-1].get("create_date"), + } + + +def build_manifest(mission_path: Path, gap_min: int = 5) -> dict: + raw_data = mission_path / "raw_data" + if not raw_data.exists(): + # try direct + raw_data = mission_path + + print(f"[01_ingest] scanning {raw_data} ...") + + videos = scan_videos(raw_data) + bags = scan_bags(raw_data) + csvs = scan_usbl_csv(raw_data) + auv_map = detect_auv_mapping(videos, bags) + + # Build per-AUV segments + auv_segments: dict[str, list[dict]] = {} + for auv_id, vid_list in videos.items(): + segs = group_video_segments(vid_list, gap_min=gap_min) + auv_segments[auv_id] = segs + + # Compute AUVs with real data + auv_ids_with_video = sorted(videos.keys()) + auv_ids_with_bags = sorted(bags.keys()) + + total_video_s = sum( + seg["total_s"] or 0 + for segs in auv_segments.values() + for seg in segs + ) + + manifest = { + "mission": mission_path.name, + "ssd_path": str(mission_path), + "generated_at": now_iso(), + "auv_ids_video": auv_ids_with_video, + "auv_ids_bags": auv_ids_with_bags, + "auv_mapping": auv_map, + "total_video_s": round(total_video_s), + "segments_per_auv": auv_segments, + "bag_sessions_per_auv": bags, + "usbl_csv_per_auv": csvs, + "notes": { + "usbl_csv_format": "raw_serial_bytes", + "nav_source": "mcap_bags", + }, + } + + return manifest + + +def ingest(mission_path: Path, mission_name: str, out_dir: Path, + gap_min: int = 5) -> dict: + out_dir.mkdir(parents=True, exist_ok=True) + manifest_path = out_dir / mission_name / "manifest.json" + + # Idempotency check + if manifest_path.exists(): + existing = json.loads(manifest_path.read_text()) + chk = hashlib.sha256(mission_path.name.encode()).hexdigest()[:8] + print(f"[01_ingest] manifest exists (checksum {chk}), skipping scan") + return existing + + manifest = build_manifest(mission_path, gap_min=gap_min) + + # Save manifest + manifest_path.parent.mkdir(parents=True, exist_ok=True) + manifest_path.write_text(json.dumps(manifest, indent=2)) + print(f"[01_ingest] manifest saved: {manifest_path}") + + # Write to DB + init_db() + with get_conn() as conn: + upsert_mission(conn, mission_name, str(mission_path), + status="ingested", manifest=json.dumps(manifest)) + + return manifest + + +def main(): + ap = argparse.ArgumentParser(description="Stage 01 — Ingest mission from SSD") + ap.add_argument("mission_path", type=Path, help="Path to mission folder (e.g. /mnt/ssd/20260505-Lepradet)") + ap.add_argument("--name", type=str, default=None, help="Mission name (defaults to folder name)") + ap.add_argument("--out", type=Path, default=Path("/home/cosma/cosma-pipeline"), help="Output base dir") + ap.add_argument("--gap-min", type=int, default=5, help="Gap in minutes to split video segments") + args = ap.parse_args() + + mission_name = args.name or args.mission_path.name + manifest = ingest(args.mission_path, mission_name, args.out, args.gap_min) + + print(f"\n=== Ingest summary for {mission_name} ===") + print(f"AUVs with video: {manifest['auv_ids_video']}") + print(f"AUVs with bags: {manifest['auv_ids_bags']}") + print(f"AUV mapping: {manifest['auv_mapping']}") + print(f"Total video: {manifest['total_video_s']}s") + print(f"Segments:") + for auv, segs in manifest["segments_per_auv"].items(): + for seg in segs: + n_vids = len(seg["videos"]) + dur = f"{seg['total_s']:.0f}s" if seg["total_s"] else "?" + print(f" {auv} / {seg['label']} {n_vids} videos {dur}") + + +if __name__ == "__main__": + main() diff --git a/pipeline/stages/02_usbl_parse.py b/pipeline/stages/02_usbl_parse.py new file mode 100644 index 0000000..16d94fd --- /dev/null +++ b/pipeline/stages/02_usbl_parse.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python3 +"""Stage 02 — Parse USBL/navigation from MCAP bag files. + +The USBL CSV logs in logs/SUB/log/ contain raw serial bytes, NOT lat/lon. +Real navigation data is in MCAP bags (logs/SUB/bag/). + +This stage: +1. Reads MCAP files per AUV session +2. Extracts position topics (configurable in default_params.yaml) +3. Falls back to parsing serial bytes if no nav topic found (best-effort) +4. Outputs Parquet per AUV with columns: timestamp, lat, lon, depth, heading + +Usage: + python3 02_usbl_parse.py /home/cosma/cosma-pipeline/20260505-Lepradet/manifest.json + python3 02_usbl_parse.py /home/cosma/cosma-pipeline/20260505-Lepradet/manifest.json --auv AUV010 +""" +from __future__ import annotations + +import argparse +import json +import os +import struct +import sys +from datetime import datetime, timezone +from pathlib import Path + +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent)) +from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso + +LOG_DIR = Path(os.environ.get("COSMA_PIPELINE_LOGS", "/home/cosma/cosma-pipeline/logs")) +LOG_DIR.mkdir(parents=True, exist_ok=True) + +# Known nav topics to try in MCAP files +NAV_TOPICS = [ + "/usbl/position", + "/usbl/fix", + "/navigation/position", + "/bluerov/usbl", + "/waterlinked/position", + "/fix", + "/gps/fix", + "/mavros/global_position/global", + "/mavros/global_position/local", +] + + +def try_parse_mcap(mcap_path: Path, topics: list[str] | None = None) -> list[dict]: + """Try to extract nav points from MCAP file. Returns list of {ts, lat, lon, depth}.""" + try: + from mcap.reader import make_reader + except ImportError: + print(f" [02] mcap not installed, skipping {mcap_path.name}") + return [] + + points = [] + try: + with open(mcap_path, "rb") as f: + reader = make_reader(f) + for schema, channel, message in reader.iter_messages(topics=topics): + # Try to deserialize — support ROS2 JSON-encoded or raw + try: + import json as _json + data = _json.loads(message.data) + lat = data.get("latitude") or data.get("lat") + lon = data.get("longitude") or data.get("lon") + depth = data.get("depth") or data.get("altitude") + heading = data.get("heading") or data.get("yaw") + if lat is not None and lon is not None: + ts_ns = message.log_time + ts = ts_ns / 1e9 + points.append({ + "timestamp": ts, + "lat": float(lat), + "lon": float(lon), + "depth": float(depth) if depth is not None else None, + "heading": float(heading) if heading is not None else None, + "source": channel.topic, + }) + except Exception: + pass + except Exception as e: + print(f" [02] MCAP read error {mcap_path.name}: {e}") + + return points + + +def try_parse_serial_csv(csv_path: Path) -> list[dict]: + """Best-effort: parse raw serial byte log for USBL range/bearing frames. + Waterlinked USBL M64 protocol: 0xBB 0x55 frame header. + This is a rough attempt — actual lat/lon requires ship GPS + range+bearing. + Returns relative-only positions (range, bearing) if decoded. + """ + points = [] + try: + with open(csv_path, "r") as f: + for line in f: + line = line.strip() + if not line: + continue + parts = line.split(",", 2) + if len(parts) < 3: + continue + ts_str, direction, raw = parts[0], parts[1], parts[2] + if direction.strip() != "RECEIVED": + continue + # Extract bytes from repr string b'\xbb...' + try: + raw_clean = raw.strip().strip('"') + # Parse Python bytes repr + data = eval(raw_clean) # safe: only used on known CSV + if len(data) >= 4 and data[0] == 0xBB and data[1] == 0x55: + # Waterlinked M64 position frame: len byte at [2], payload follows + payload_len = data[2] + if len(data) >= payload_len + 3: + # Best effort: look for float32 values in payload + # Actual protocol decoding would need WL M64 spec + ts = datetime.fromisoformat(ts_str) + points.append({ + "timestamp": ts.timestamp(), + "lat": None, + "lon": None, + "depth": None, + "heading": None, + "source": "serial_raw", + "raw_bytes": data.hex()[:32], + }) + except Exception: + pass + except Exception as e: + print(f" [02] serial CSV parse error {csv_path.name}: {e}") + + return points + + +def parse_auv_sessions(manifest: dict, auv_id: str, out_dir: Path, + topics: list[str] | None = None) -> dict: + """Parse all sessions for one AUV. Returns metrics dict.""" + metrics = {"auv_id": auv_id, "points_raw": 0, "sources": [], "status": "ok"} + all_points: list[dict] = [] + + # Try MCAP bags first + bag_sessions = manifest.get("bag_sessions_per_auv", {}).get(auv_id, []) + if not bag_sessions: + # Try mapping: maybe bags use AUV0xx while videos use AUV2xx + auv_map = manifest.get("auv_mapping", {}) + bag_auv = auv_map.get(auv_id) + if bag_auv: + bag_sessions = manifest.get("bag_sessions_per_auv", {}).get(bag_auv, []) + + for sess in bag_sessions: + for mcap_path_str in sess.get("mcap_files", []): + mcap_path = Path(mcap_path_str) + if not mcap_path.exists(): + continue + pts = try_parse_mcap(mcap_path, topics=topics or NAV_TOPICS) + if pts: + all_points.extend(pts) + if "mcap" not in metrics["sources"]: + metrics["sources"].append("mcap") + print(f" [02] {auv_id} {mcap_path.name}: {len(pts)} nav points") + + # Fallback: serial CSV if no MCAP nav + if not all_points: + csv_entries = manifest.get("usbl_csv_per_auv", {}).get(auv_id, []) + for entry in csv_entries: + csv_path = Path(entry["path"]) + if not csv_path.exists(): + continue + pts = try_parse_serial_csv(csv_path) + if pts: + all_points.extend(pts) + if "serial_csv" not in metrics["sources"]: + metrics["sources"].append("serial_csv") + + metrics["points_raw"] = len(all_points) + + if not all_points: + metrics["status"] = "degraded" + metrics["note"] = "no nav points found in MCAP or serial CSV" + print(f" [02] {auv_id}: NO nav data found — degraded") + else: + print(f" [02] {auv_id}: {len(all_points)} raw nav points from {metrics['sources']}") + + # Save output + out_file = out_dir / f"{auv_id}_nav_raw.json" + out_file.write_text(json.dumps({ + "auv_id": auv_id, + "generated_at": now_iso(), + "metrics": metrics, + "points": all_points, + }, indent=2, default=str)) + + # Also save as simple CSV for downstream + csv_out = out_dir / f"{auv_id}_nav_raw.csv" + with open(csv_out, "w") as f: + f.write("timestamp,lat,lon,depth,heading,source\n") + for p in all_points: + f.write(f"{p['timestamp']},{p.get('lat','')},{p.get('lon','')},{p.get('depth','')},{p.get('heading','')},{p.get('source','')}\n") + + return metrics + + +def parse_mission(manifest_path: Path, auv_filter: str | None = None, + out_dir: Path | None = None) -> list[dict]: + manifest = json.loads(manifest_path.read_text()) + mission_name = manifest["mission"] + + if out_dir is None: + out_dir = manifest_path.parent / "02_usbl_raw" + out_dir.mkdir(parents=True, exist_ok=True) + + # Idempotency: check if all AUV outputs exist + auv_ids = manifest.get("auv_ids_bags", []) or manifest.get("auv_ids_video", []) + if auv_filter: + auv_ids = [a for a in auv_ids if a == auv_filter] + + all_metrics = [] + init_db() + + for auv_id in auv_ids: + out_file = out_dir / f"{auv_id}_nav_raw.json" + if out_file.exists(): + print(f"[02] {auv_id}: output exists, skipping") + existing = json.loads(out_file.read_text()) + all_metrics.append(existing.get("metrics", {"auv_id": auv_id, "status": "cached"})) + continue + + print(f"[02] Parsing {auv_id} ...") + m = parse_auv_sessions(manifest, auv_id, out_dir) + all_metrics.append(m) + + # Record in DB + with get_conn() as conn: + from orchestrator.db import upsert_mission + mission_id_row = conn.execute( + "SELECT id FROM missions WHERE name=?", (mission_name,) + ).fetchone() + if mission_id_row: + mission_id = mission_id_row["id"] + job_id = upsert_job(conn, mission_id, auv_id, "all", "02_usbl_parse", + status="done" if m["status"] == "ok" else m["status"], + output_path=str(out_dir)) + record_metric(conn, job_id, "usbl_points_raw", + value=m["points_raw"], + pass_fail="pass" if m["points_raw"] > 0 else "fail") + + return all_metrics + + +def main(): + ap = argparse.ArgumentParser(description="Stage 02 — Parse USBL/nav from MCAP bags") + ap.add_argument("manifest", type=Path, help="manifest.json from stage 01") + ap.add_argument("--auv", type=str, default=None, help="Filter to single AUV ID") + ap.add_argument("--out", type=Path, default=None, help="Output directory") + args = ap.parse_args() + + metrics = parse_mission(args.manifest, auv_filter=args.auv, out_dir=args.out) + + print("\n=== Stage 02 summary ===") + total_pts = sum(m.get("points_raw", 0) for m in metrics) + for m in metrics: + status = m.get("status", "?") + pts = m.get("points_raw", 0) + src = m.get("sources", []) + print(f" {m['auv_id']}: {pts} pts {src} [{status}]") + print(f"Total nav points: {total_pts}") + + +if __name__ == "__main__": + main() diff --git a/pipeline/stages/03_usbl_filter.py b/pipeline/stages/03_usbl_filter.py new file mode 100644 index 0000000..ddb6d4d --- /dev/null +++ b/pipeline/stages/03_usbl_filter.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +"""Stage 03 — Filter and smooth USBL navigation trajectory. + +Input: 02_usbl_raw/_nav_raw.json (or .csv) +Output: 03_usbl_filtered/_nav_filtered.json + .csv + +Steps: +1. Drop points with null lat/lon +2. MAD-3σ outlier removal on lat, lon, depth independently +3. Moving-average smoothing (window=5 by default) +4. Optional: simple 1D Kalman on each axis (KISS — no cross-covariance) + +Usage: + python3 03_usbl_filter.py /home/cosma/cosma-pipeline/20260505-Lepradet/02_usbl_raw/ + python3 03_usbl_filter.py /path/to/02_usbl_raw/ --auv AUV010 --sigma 2.5 +""" +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path + +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent)) +from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso + +LOG_DIR = Path(os.environ.get("COSMA_PIPELINE_LOGS", "/home/cosma/cosma-pipeline/logs")) +LOG_DIR.mkdir(parents=True, exist_ok=True) + + +def mad_outlier_mask(arr: np.ndarray, sigma: float = 3.0) -> np.ndarray: + """Returns boolean mask: True = keep (inlier). Uses MAD-based sigma.""" + if len(arr) < 4: + return np.ones(len(arr), dtype=bool) + median = np.median(arr) + mad = np.median(np.abs(arr - median)) + if mad == 0: + return np.ones(len(arr), dtype=bool) + modified_z = 0.6745 * (arr - median) / mad + return np.abs(modified_z) < sigma + + +def moving_average(arr: np.ndarray, window: int = 5) -> np.ndarray: + """Centered moving average with edge padding.""" + if len(arr) < window: + return arr.copy() + pad = window // 2 + padded = np.pad(arr, (pad, pad), mode="edge") + kernel = np.ones(window) / window + return np.convolve(padded, kernel, mode="valid")[:len(arr)] + + +def simple_kalman_1d(measurements: np.ndarray, + process_noise: float = 1e-4, + measurement_noise: float = 1e-2) -> np.ndarray: + """Very simple 1D Kalman filter (scalar, no velocity state). + KISS: just smooths, no cross-axis coupling. + """ + n = len(measurements) + filtered = np.zeros(n) + x_est = measurements[0] + p_est = 1.0 + + for i, z in enumerate(measurements): + # Predict + p_pred = p_est + process_noise + # Update + K = p_pred / (p_pred + measurement_noise) + x_est = x_est + K * (z - x_est) + p_est = (1 - K) * p_pred + filtered[i] = x_est + + return filtered + + +def filter_auv_nav(raw_path: Path, out_path: Path, + sigma: float = 3.0, window: int = 5, + use_kalman: bool = False) -> dict: + """Filter nav for one AUV. Returns metrics dict.""" + data = json.loads(raw_path.read_text()) + points = data.get("points", []) + auv_id = data.get("auv_id", raw_path.stem.replace("_nav_raw", "")) + + metrics = { + "auv_id": auv_id, + "points_before": len(points), + "points_after": 0, + "points_removed_null": 0, + "points_removed_outlier": 0, + "status": "ok", + } + + if not points: + metrics["status"] = "degraded" + metrics["note"] = "no points to filter" + _save_output(auv_id, [], out_path, metrics) + return metrics + + # Step 1: Drop null lat/lon + valid = [p for p in points if p.get("lat") is not None and p.get("lon") is not None] + metrics["points_removed_null"] = len(points) - len(valid) + + if not valid: + metrics["status"] = "degraded" + metrics["note"] = "all points have null lat/lon (serial-only data)" + print(f" [03] {auv_id}: all null lat/lon — degraded (serial CSV source, no MCAP nav)") + _save_output(auv_id, [], out_path, metrics) + return metrics + + # Step 2: MAD outlier removal + lats = np.array([p["lat"] for p in valid]) + lons = np.array([p["lon"] for p in valid]) + + mask_lat = mad_outlier_mask(lats, sigma) + mask_lon = mad_outlier_mask(lons, sigma) + mask = mask_lat & mask_lon + + # Also filter depth if present + depths = np.array([p.get("depth") or np.nan for p in valid]) + if not np.all(np.isnan(depths)): + mask_depth = mad_outlier_mask(depths[~np.isnan(depths)], sigma) + # Map back — only filter where we have depth + depth_idx = np.where(~np.isnan(depths))[0] + for i, keep in zip(depth_idx, mask_depth): + if not keep: + mask[i] = False + + filtered_points = [p for p, keep in zip(valid, mask) if keep] + metrics["points_removed_outlier"] = int(np.sum(~mask)) + + # Step 3: Sort by timestamp + filtered_points.sort(key=lambda p: p["timestamp"]) + + # Step 4: Smooth + if len(filtered_points) >= window: + filt_lats = moving_average(np.array([p["lat"] for p in filtered_points]), window) + filt_lons = moving_average(np.array([p["lon"] for p in filtered_points]), window) + for i, p in enumerate(filtered_points): + p = dict(p) + p["lat"] = float(filt_lats[i]) + p["lon"] = float(filt_lons[i]) + filtered_points[i] = p + + # Step 5: Optional Kalman + if use_kalman and len(filtered_points) > 4: + k_lats = simple_kalman_1d(np.array([p["lat"] for p in filtered_points])) + k_lons = simple_kalman_1d(np.array([p["lon"] for p in filtered_points])) + for i, p in enumerate(filtered_points): + p = dict(p) + p["lat"] = float(k_lats[i]) + p["lon"] = float(k_lons[i]) + filtered_points[i] = p + + metrics["points_after"] = len(filtered_points) + if metrics["points_after"] < 5: + metrics["status"] = "degraded" + metrics["note"] = f"too few points after filter: {metrics['points_after']}" + + print(f" [03] {auv_id}: {metrics['points_before']} → {metrics['points_after']} " + f"(removed {metrics['points_removed_null']} null, {metrics['points_removed_outlier']} outliers)") + + _save_output(auv_id, filtered_points, out_path, metrics) + return metrics + + +def _save_output(auv_id: str, points: list[dict], out_dir: Path, metrics: dict) -> None: + out_dir.mkdir(parents=True, exist_ok=True) + json_out = out_dir / f"{auv_id}_nav_filtered.json" + json_out.write_text(json.dumps({ + "auv_id": auv_id, + "generated_at": now_iso(), + "metrics": metrics, + "points": points, + }, indent=2, default=str)) + + csv_out = out_dir / f"{auv_id}_nav_filtered.csv" + with open(csv_out, "w") as f: + f.write("timestamp,lat,lon,depth,heading,source\n") + for p in points: + f.write(f"{p['timestamp']},{p.get('lat','')},{p.get('lon','')},{p.get('depth','')},{p.get('heading','')},{p.get('source','')}\n") + + +def filter_mission(raw_dir: Path, out_dir: Path | None = None, + auv_filter: str | None = None, + sigma: float = 3.0, window: int = 5, + use_kalman: bool = False) -> list[dict]: + if out_dir is None: + out_dir = raw_dir.parent / "03_usbl_filtered" + + raw_files = sorted(raw_dir.glob("*_nav_raw.json")) + if auv_filter: + raw_files = [f for f in raw_files if auv_filter in f.name] + + all_metrics = [] + init_db() + + for raw_file in raw_files: + out_file = out_dir / raw_file.name.replace("_raw", "_filtered") + if out_file.exists(): + print(f"[03] {raw_file.stem}: output exists, skipping") + existing = json.loads(out_file.read_text()) + all_metrics.append(existing.get("metrics", {})) + continue + + print(f"[03] Filtering {raw_file.name} ...") + m = filter_auv_nav(raw_file, out_dir, sigma=sigma, window=window, use_kalman=use_kalman) + all_metrics.append(m) + + # DB record + with get_conn() as conn: + mission_name = raw_dir.parent.name + mission_row = conn.execute("SELECT id FROM missions WHERE name=?", (mission_name,)).fetchone() + if mission_row: + job_id = upsert_job(conn, mission_row["id"], m["auv_id"], "all", "03_usbl_filter", + status="done" if m["status"] == "ok" else m["status"], + output_path=str(out_dir)) + record_metric(conn, job_id, "usbl_points_before", value=m.get("points_before", 0)) + record_metric(conn, job_id, "usbl_points_after", value=m.get("points_after", 0), + pass_fail="pass" if m.get("points_after", 0) >= 5 else "fail") + record_metric(conn, job_id, "usbl_points_removed_outlier", + value=m.get("points_removed_outlier", 0)) + + return all_metrics + + +def main(): + ap = argparse.ArgumentParser(description="Stage 03 — Filter USBL navigation") + ap.add_argument("raw_dir", type=Path, help="Directory with *_nav_raw.json files") + ap.add_argument("--out", type=Path, default=None) + ap.add_argument("--auv", type=str, default=None) + ap.add_argument("--sigma", type=float, default=3.0, help="MAD sigma threshold") + ap.add_argument("--window", type=int, default=5, help="Moving average window") + ap.add_argument("--kalman", action="store_true", help="Apply simple Kalman smoothing") + args = ap.parse_args() + + metrics = filter_mission(args.raw_dir, out_dir=args.out, auv_filter=args.auv, + sigma=args.sigma, window=args.window, use_kalman=args.kalman) + + print("\n=== Stage 03 summary ===") + for m in metrics: + print(f" {m.get('auv_id','?')}: {m.get('points_before',0)} → {m.get('points_after',0)} [{m.get('status','?')}]") + + +if __name__ == "__main__": + main() diff --git a/pipeline/stages/__init__.py b/pipeline/stages/__init__.py new file mode 100644 index 0000000..e69de29