feat(pipeline): jalon 1-3 — ingest, USBL parse, filter
Stages 01-03 opérationnels sur 20260505-Lepradet: - 01_ingest: manifest auto, 3 AUVs vidéo, 3 AUVs bags, mapping AUV2xx↔AUV0xx - 02_usbl_parse: MCAP (format incompatible firmware) → fallback serial CSV, 213 pts bruts - 03_usbl_filter: MAD-3σ + moving-avg + Kalman optionnel, dégradé gracieux si null lat/lon - orchestrator/db.py: SQLite schema missions/jobs/metrics idempotent - config/: thresholds.yaml + default_params.yaml versionnés - qa/checks.py: vérifications pass/fail/degraded par étape Note: MCAP bags corrompus ou format non-standard firmware — lat/lon absent. Statut degraded (pas crash). Nécessite investigation format MCAP spécifique.
This commit is contained in:
33
pipeline/README.md
Normal file
33
pipeline/README.md
Normal file
@@ -0,0 +1,33 @@
|
||||
# cosma-pipeline
|
||||
|
||||
Pipeline autonome de reconstruction COSMA.
|
||||
|
||||
## Structure
|
||||
|
||||
```
|
||||
pipeline/
|
||||
├── config/ # Seuils QA + params par défaut (versionnés)
|
||||
├── orchestrator/ # DB SQLite, dispatcher, FastAPI
|
||||
├── stages/ # Modules indépendants 01..08
|
||||
├── qa/ # Vérifications pass/fail/degraded
|
||||
└── cron/ # Auto-itération 6h
|
||||
```
|
||||
|
||||
## Usage rapide
|
||||
|
||||
```bash
|
||||
# 1. Ingest
|
||||
python3 pipeline/stages/01_ingest.py /mnt/ssd/20260505-Lepradet --name 20260505-Lepradet
|
||||
|
||||
# 2. Parse USBL
|
||||
python3 pipeline/stages/02_usbl_parse.py /home/cosma/cosma-pipeline/20260505-Lepradet/manifest.json
|
||||
|
||||
# 3. Filter
|
||||
python3 pipeline/stages/03_usbl_filter.py /home/cosma/cosma-pipeline/20260505-Lepradet/02_usbl_raw/
|
||||
```
|
||||
|
||||
## Notes données
|
||||
|
||||
- `logs/SUB/log/*_usbl.csv` = bytes série bruts (Waterlinked M64), PAS lat/lon
|
||||
- Navigation réelle dans `logs/SUB/bag/*.mcap` (ROS2 MCAP)
|
||||
- Mapping AUV : vidéos utilisent AUV2xx, bags utilisent AUV0xx (même 2 derniers chiffres)
|
||||
58
pipeline/config/default_params.yaml
Normal file
58
pipeline/config/default_params.yaml
Normal file
@@ -0,0 +1,58 @@
|
||||
# Default params per stage — overridable per-run via CLI or cron patch
|
||||
stage_01_ingest:
|
||||
gap_min: 5 # minutes gap to split segment
|
||||
ssd_root: /mnt/ssd
|
||||
output_dir: /home/cosma/cosma-pipeline
|
||||
|
||||
stage_02_usbl_parse:
|
||||
# USBL log/ CSVs are raw serial frames — real nav is in bag/*.mcap
|
||||
# This stage parses MCAP bag files for USBL/nav topics
|
||||
mcap_topics:
|
||||
- /usbl/position
|
||||
- /usbl/fix
|
||||
- /navigation/position
|
||||
- /bluerov/usbl
|
||||
- /waterlinked/position
|
||||
fallback_csv_serial: true # try to decode serial bytes if no mcap topic
|
||||
output_format: parquet # or csv
|
||||
|
||||
stage_03_usbl_filter:
|
||||
method: mad # mad | kalman_simple
|
||||
mad_sigma: 3.0
|
||||
moving_avg_window: 5
|
||||
|
||||
stage_04_frame_extract:
|
||||
fps: 1
|
||||
width: 518
|
||||
height: 294
|
||||
trim_hors_eau: true
|
||||
|
||||
stage_05_inference:
|
||||
workers:
|
||||
- host: 192.168.0.87
|
||||
user: floppyrj45
|
||||
gpu: "RTX 3060 12GB"
|
||||
vram_mib: 11913
|
||||
lingbot_path: /home/floppyrj45/ai-video/lingbot-map
|
||||
frames_dir: /home/floppyrj45/cosma-pipeline-frames
|
||||
- host: 192.168.0.84
|
||||
user: root
|
||||
gpu: "RTX 3090 24GB"
|
||||
vram_mib: 24576
|
||||
lingbot_path: /root/ai-video/lingbot-map
|
||||
frames_dir: /root/cosma-pipeline-frames
|
||||
model_path: /home/floppyrj45/ai-video/lingbot-map/checkpoints/lingbot_map.pt
|
||||
mode: streaming
|
||||
keyframe_interval: 6
|
||||
|
||||
stage_06_align:
|
||||
use_imu_heading: true
|
||||
use_depth: true
|
||||
|
||||
stage_07_stitch_per_auv:
|
||||
voxel_size: 0.05
|
||||
use_ransac: true
|
||||
|
||||
stage_08_stitch_cross_auv:
|
||||
voxel_size: 0.1
|
||||
final_icp: true
|
||||
34
pipeline/config/thresholds.yaml
Normal file
34
pipeline/config/thresholds.yaml
Normal file
@@ -0,0 +1,34 @@
|
||||
# QA thresholds — tuned from iteration cron
|
||||
usbl:
|
||||
min_points_per_segment: 5 # fewer → degraded
|
||||
max_gap_seconds: 30 # gap > this → split segment
|
||||
mad_sigma: 3.0 # MAD outlier threshold
|
||||
moving_avg_window: 5 # smoothing window
|
||||
|
||||
ingest:
|
||||
min_video_seconds: 120 # shorter segments skipped
|
||||
max_timestamp_delta_seconds: 60 # EXIF vs USBL match tolerance
|
||||
|
||||
frame_extract:
|
||||
fps: 1
|
||||
width: 518
|
||||
height: 294
|
||||
underwater_r_minus_g: 5 # R < G-5 AND R < B-5 → hors eau
|
||||
trim_min_frames: 8 # skip if fewer underwater frames
|
||||
|
||||
inference:
|
||||
ply_conf_threshold: 1.5
|
||||
max_frame_num: 1024
|
||||
mode: streaming
|
||||
keyframe_interval: 6
|
||||
|
||||
align:
|
||||
max_translation_m: 500 # sanity check on alignment
|
||||
min_inlier_ratio: 0.3 # umeyama inlier ratio
|
||||
|
||||
stitch:
|
||||
voxel_size: 0.05
|
||||
icp_max_distance: 0.5
|
||||
icp_iterations: 50
|
||||
use_ransac: true
|
||||
ransac_iterations: 100000
|
||||
0
pipeline/orchestrator/__init__.py
Normal file
0
pipeline/orchestrator/__init__.py
Normal file
159
pipeline/orchestrator/db.py
Normal file
159
pipeline/orchestrator/db.py
Normal file
@@ -0,0 +1,159 @@
|
||||
#!/usr/bin/env python3
|
||||
"""SQLite schema for cosma-pipeline orchestrator.
|
||||
|
||||
Tables:
|
||||
missions — one row per mission folder on SSD
|
||||
jobs — one row per (mission, auv, segment, stage)
|
||||
metrics — one row per (job, metric_name) for QA + cron iteration
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sqlite3
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
DB_PATH = Path(os.environ.get("COSMA_PIPELINE_DB", "/home/cosma/cosma-pipeline/state.db"))
|
||||
|
||||
SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS missions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL UNIQUE,
|
||||
ssd_path TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
-- pending | ingesting | running | done | degraded | error
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
manifest TEXT, -- JSON blob from 01_ingest
|
||||
notes TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
mission_id INTEGER NOT NULL REFERENCES missions(id),
|
||||
auv_id TEXT NOT NULL, -- e.g. AUV010
|
||||
segment_label TEXT NOT NULL, -- e.g. 2026-05-05_08-16-00
|
||||
stage TEXT NOT NULL, -- 01_ingest .. 08_stitch_cross_auv
|
||||
status TEXT NOT NULL DEFAULT 'queued',
|
||||
-- queued | running | done | error | skipped | degraded
|
||||
worker_host TEXT,
|
||||
started_at TEXT,
|
||||
finished_at TEXT,
|
||||
output_path TEXT, -- path to stage output dir
|
||||
error_msg TEXT,
|
||||
checksum TEXT, -- sha256 of output for idempotency
|
||||
params_version TEXT, -- hash of config/default_params.yaml at run time
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS metrics (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
job_id INTEGER NOT NULL REFERENCES jobs(id),
|
||||
name TEXT NOT NULL, -- e.g. usbl_points_before, usbl_points_after
|
||||
value REAL,
|
||||
text_value TEXT,
|
||||
pass_fail TEXT, -- pass | fail | degraded | skip
|
||||
recorded_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_mission ON jobs(mission_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_metrics_job ON metrics(job_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(name);
|
||||
"""
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat(timespec="seconds")
|
||||
|
||||
|
||||
def init_db(path: Path | None = None) -> Path:
|
||||
p = path or DB_PATH
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
with sqlite3.connect(p) as conn:
|
||||
conn.executescript(SCHEMA)
|
||||
return p
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_conn(path: Path | None = None):
|
||||
p = path or DB_PATH
|
||||
conn = sqlite3.connect(p)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
try:
|
||||
yield conn
|
||||
conn.commit()
|
||||
except Exception:
|
||||
conn.rollback()
|
||||
raise
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def upsert_mission(conn: sqlite3.Connection, name: str, ssd_path: str,
|
||||
status: str = "pending", manifest: str | None = None) -> int:
|
||||
now = now_iso()
|
||||
cur = conn.execute(
|
||||
"SELECT id FROM missions WHERE name = ?", (name,)
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if row:
|
||||
conn.execute(
|
||||
"UPDATE missions SET ssd_path=?, status=?, manifest=?, updated_at=? WHERE id=?",
|
||||
(ssd_path, status, manifest, now, row["id"])
|
||||
)
|
||||
return row["id"]
|
||||
else:
|
||||
cur = conn.execute(
|
||||
"INSERT INTO missions (name, ssd_path, status, manifest, created_at, updated_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(name, ssd_path, status, manifest, now, now)
|
||||
)
|
||||
return cur.lastrowid
|
||||
|
||||
|
||||
def upsert_job(conn: sqlite3.Connection, mission_id: int, auv_id: str,
|
||||
segment_label: str, stage: str, **kwargs) -> int:
|
||||
now = now_iso()
|
||||
cur = conn.execute(
|
||||
"SELECT id FROM jobs WHERE mission_id=? AND auv_id=? AND segment_label=? AND stage=?",
|
||||
(mission_id, auv_id, segment_label, stage)
|
||||
)
|
||||
row = cur.fetchone()
|
||||
fields = {k: v for k, v in kwargs.items()
|
||||
if k in ("status", "worker_host", "started_at", "finished_at",
|
||||
"output_path", "error_msg", "checksum", "params_version")}
|
||||
fields["updated_at"] = now
|
||||
if row:
|
||||
sets = ", ".join(f"{k}=?" for k in fields)
|
||||
vals = list(fields.values()) + [row["id"]]
|
||||
conn.execute(f"UPDATE jobs SET {sets} WHERE id=?", vals)
|
||||
return row["id"]
|
||||
else:
|
||||
fields.update({"mission_id": mission_id, "auv_id": auv_id,
|
||||
"segment_label": segment_label, "stage": stage,
|
||||
"created_at": now})
|
||||
cols = ", ".join(fields.keys())
|
||||
placeholders = ", ".join("?" for _ in fields)
|
||||
cur = conn.execute(f"INSERT INTO jobs ({cols}) VALUES ({placeholders})",
|
||||
list(fields.values()))
|
||||
return cur.lastrowid
|
||||
|
||||
|
||||
def record_metric(conn: sqlite3.Connection, job_id: int, name: str,
|
||||
value: float | None = None, text_value: str | None = None,
|
||||
pass_fail: str = "pass") -> None:
|
||||
conn.execute(
|
||||
"INSERT INTO metrics (job_id, name, value, text_value, pass_fail, recorded_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(job_id, name, value, text_value, pass_fail, now_iso())
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
p = init_db()
|
||||
print(f"DB initialized: {p}")
|
||||
21
pipeline/pyproject.toml
Normal file
21
pipeline/pyproject.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[project]
|
||||
name = "cosma-pipeline"
|
||||
version = "0.1.0"
|
||||
description = "COSMA autonomous reconstruction pipeline"
|
||||
requires-python = ">=3.11"
|
||||
dependencies = [
|
||||
"pandas>=2.0",
|
||||
"scipy>=1.11",
|
||||
"numpy>=1.26",
|
||||
"fastapi>=0.115",
|
||||
"uvicorn[standard]>=0.30",
|
||||
"sqlmodel>=0.0.18",
|
||||
"pyyaml>=6.0",
|
||||
"tqdm>=4.66",
|
||||
"open3d>=0.18",
|
||||
"mcap>=1.1",
|
||||
"mcap-ros2-support>=0.5",
|
||||
]
|
||||
|
||||
[tool.uv]
|
||||
package = false
|
||||
0
pipeline/qa/__init__.py
Normal file
0
pipeline/qa/__init__.py
Normal file
76
pipeline/qa/checks.py
Normal file
76
pipeline/qa/checks.py
Normal file
@@ -0,0 +1,76 @@
|
||||
#!/usr/bin/env python3
|
||||
"""QA checks — each function returns {metric: value, pass_fail: str, details: str}."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def check_ingest(manifest_path: Path) -> dict:
|
||||
try:
|
||||
m = json.loads(manifest_path.read_text())
|
||||
n_auv_video = len(m.get("auv_ids_video", []))
|
||||
n_auv_bags = len(m.get("auv_ids_bags", []))
|
||||
total_s = m.get("total_video_s", 0)
|
||||
segs = sum(len(v) for v in m.get("segments_per_auv", {}).values())
|
||||
pass_fail = "pass" if n_auv_video > 0 and segs > 0 else "fail"
|
||||
return {
|
||||
"stage": "01_ingest",
|
||||
"pass_fail": pass_fail,
|
||||
"auv_count_video": n_auv_video,
|
||||
"auv_count_bags": n_auv_bags,
|
||||
"segment_count": segs,
|
||||
"total_video_s": total_s,
|
||||
"auv_mapping": m.get("auv_mapping", {}),
|
||||
}
|
||||
except Exception as e:
|
||||
return {"stage": "01_ingest", "pass_fail": "fail", "error": str(e)}
|
||||
|
||||
|
||||
def check_usbl_parse(raw_dir: Path) -> dict:
|
||||
results = {}
|
||||
total_pts = 0
|
||||
degraded = 0
|
||||
for f in sorted(raw_dir.glob("*_nav_raw.json")):
|
||||
try:
|
||||
d = json.loads(f.read_text())
|
||||
pts = len(d.get("points", []))
|
||||
status = d.get("metrics", {}).get("status", "?")
|
||||
total_pts += pts
|
||||
if status == "degraded":
|
||||
degraded += 1
|
||||
results[d.get("auv_id", f.stem)] = {"points": pts, "status": status}
|
||||
except Exception as e:
|
||||
results[f.stem] = {"error": str(e)}
|
||||
pass_fail = "degraded" if degraded == len(results) else ("pass" if total_pts > 0 else "fail")
|
||||
return {
|
||||
"stage": "02_usbl_parse",
|
||||
"pass_fail": pass_fail,
|
||||
"total_points": total_pts,
|
||||
"per_auv": results,
|
||||
}
|
||||
|
||||
|
||||
def check_usbl_filter(filtered_dir: Path, min_points: int = 5) -> dict:
|
||||
results = {}
|
||||
for f in sorted(filtered_dir.glob("*_nav_filtered.json")):
|
||||
try:
|
||||
d = json.loads(f.read_text())
|
||||
pts_after = len(d.get("points", []))
|
||||
m = d.get("metrics", {})
|
||||
pf = "pass" if pts_after >= min_points else ("degraded" if pts_after > 0 else "fail")
|
||||
results[d.get("auv_id", f.stem)] = {
|
||||
"before": m.get("points_before", 0),
|
||||
"after": pts_after,
|
||||
"removed_null": m.get("points_removed_null", 0),
|
||||
"removed_outlier": m.get("points_removed_outlier", 0),
|
||||
"pass_fail": pf,
|
||||
}
|
||||
except Exception as e:
|
||||
results[f.stem] = {"error": str(e)}
|
||||
overall = "pass"
|
||||
if all(v.get("pass_fail") == "fail" for v in results.values() if "error" not in v):
|
||||
overall = "fail"
|
||||
elif any(v.get("pass_fail") == "degraded" for v in results.values() if "error" not in v):
|
||||
overall = "degraded"
|
||||
return {"stage": "03_usbl_filter", "pass_fail": overall, "per_auv": results}
|
||||
381
pipeline/stages/01_ingest.py
Normal file
381
pipeline/stages/01_ingest.py
Normal file
@@ -0,0 +1,381 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Stage 01 — Ingest mission folder from SSD.
|
||||
|
||||
Scans /mnt/ssd/<mission>/raw_data/ and builds a manifest:
|
||||
- Videos per AUV+GoPro segment (from medias/videos/)
|
||||
- USBL/bag sessions per AUV (from logs/SUB/bag/*.mcap)
|
||||
- Auto-detects AUV ID mapping (AUV010↔AUV210 etc.) by timestamp proximity
|
||||
|
||||
Usage:
|
||||
python3 01_ingest.py /mnt/ssd/20260505-Lepradet --name 20260505-Lepradet
|
||||
python3 01_ingest.py /mnt/ssd/20260505-Lepradet --name 20260505-Lepradet --out /home/cosma/cosma-pipeline
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from orchestrator.db import init_db, get_conn, upsert_mission, now_iso
|
||||
|
||||
LOG_DIR = Path(os.environ.get("COSMA_PIPELINE_LOGS", "/home/cosma/cosma-pipeline/logs"))
|
||||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# AUV ID normalization: GP folder uses AUV2xx, bag files use AUV0xx
|
||||
# e.g. AUV210 <-> AUV010, AUV213 <-> AUV013, AUV212 <-> AUV012
|
||||
AUV_FOLDER_RE = re.compile(r"GP(\d+)[_-]AUV(\d+)", re.I)
|
||||
AUV_BAG_RE = re.compile(r"auv(\d+)", re.I)
|
||||
AUV_CSV_RE = re.compile(r"AUV(\d+)", re.I)
|
||||
|
||||
|
||||
def normalize_auv(raw_id: str) -> str:
|
||||
"""Normalize AUV IDs: strip leading zeros or map 0xx -> 2xx heuristic.
|
||||
Returns canonical form like AUV010, AUV013, AUV210, AUV212 as-is.
|
||||
We keep original for now and detect mapping via timestamp cross-correlation.
|
||||
"""
|
||||
m = re.search(r"\d+", raw_id)
|
||||
if not m:
|
||||
return raw_id.upper()
|
||||
n = int(m.group())
|
||||
return f"AUV{n:03d}"
|
||||
|
||||
|
||||
def exif_create_date(path: Path) -> datetime | None:
|
||||
try:
|
||||
out = subprocess.check_output(
|
||||
["exiftool", "-s3", "-CreateDate", "-api", "QuickTimeUTC=1", str(path)],
|
||||
stderr=subprocess.DEVNULL, text=True, timeout=10
|
||||
).strip()
|
||||
if not out:
|
||||
return None
|
||||
out = re.sub(r'[+-]\d{2}:\d{2}$', '', out).strip()
|
||||
return datetime.strptime(out, "%Y:%m:%d %H:%M:%S")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def exif_duration_s(path: Path) -> float | None:
|
||||
try:
|
||||
out = subprocess.check_output(
|
||||
["exiftool", "-s3", "-Duration#", str(path)],
|
||||
stderr=subprocess.DEVNULL, text=True, timeout=10
|
||||
).strip()
|
||||
return float(out) if out else None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def scan_videos(raw_data: Path) -> dict[str, list[dict]]:
|
||||
"""Scan medias/videos/ and return dict {auv_id: [video_info, ...]}.
|
||||
Handles both GP1-AUV210 and GP1_AUV210 naming conventions.
|
||||
"""
|
||||
videos_dir = raw_data / "medias" / "videos"
|
||||
if not videos_dir.exists():
|
||||
return {}
|
||||
|
||||
result: dict[str, list[dict]] = {}
|
||||
for folder in sorted(videos_dir.iterdir()):
|
||||
if not folder.is_dir():
|
||||
continue
|
||||
m = AUV_FOLDER_RE.search(folder.name)
|
||||
if not m:
|
||||
continue
|
||||
gopro_n = int(m.group(1))
|
||||
auv_id = normalize_auv(m.group(2))
|
||||
|
||||
mp4_files = sorted(folder.glob("*.MP4")) + sorted(folder.glob("*.mp4"))
|
||||
for mp4 in mp4_files:
|
||||
create_date = exif_create_date(mp4)
|
||||
duration = exif_duration_s(mp4)
|
||||
info = {
|
||||
"path": str(mp4),
|
||||
"gopro": gopro_n,
|
||||
"auv_id": auv_id,
|
||||
"filename": mp4.name,
|
||||
"create_date": create_date.isoformat() if create_date else None,
|
||||
"duration_s": duration,
|
||||
"size_mb": round(mp4.stat().st_size / 1e6, 1),
|
||||
}
|
||||
result.setdefault(auv_id, []).append(info)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def scan_bags(raw_data: Path) -> dict[str, list[dict]]:
|
||||
"""Scan logs/SUB/bag/ for MCAP files grouped by session+AUV."""
|
||||
bag_dir = raw_data / "logs" / "SUB" / "bag"
|
||||
if not bag_dir.exists():
|
||||
return {}
|
||||
|
||||
result: dict[str, list[dict]] = {}
|
||||
for session_dir in sorted(bag_dir.iterdir()):
|
||||
if not session_dir.is_dir():
|
||||
continue
|
||||
# dir name: 20260505_074718_AUV013
|
||||
m = re.match(r"(\d{8}_\d{6})_AUV(\d+)", session_dir.name)
|
||||
if not m:
|
||||
continue
|
||||
ts_str = m.group(1)
|
||||
auv_id = normalize_auv(m.group(2))
|
||||
try:
|
||||
ts = datetime.strptime(ts_str, "%Y%m%d_%H%M%S")
|
||||
except ValueError:
|
||||
ts = None
|
||||
|
||||
mcap_files = sorted(session_dir.glob("*.mcap"))
|
||||
total_size = sum(f.stat().st_size for f in mcap_files)
|
||||
non_empty = [str(f) for f in mcap_files if f.stat().st_size > 0]
|
||||
|
||||
if not non_empty:
|
||||
continue
|
||||
|
||||
session_info = {
|
||||
"session": session_dir.name,
|
||||
"auv_id": auv_id,
|
||||
"timestamp": ts.isoformat() if ts else None,
|
||||
"mcap_files": non_empty,
|
||||
"total_mb": round(total_size / 1e6, 1),
|
||||
}
|
||||
result.setdefault(auv_id, []).append(session_info)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def scan_usbl_csv(raw_data: Path) -> dict[str, list[dict]]:
|
||||
"""Scan logs/SUB/log/ for *_usbl.csv files.
|
||||
Note: these are raw serial byte logs, not lat/lon CSV.
|
||||
We record them for reference; actual nav comes from MCAP bags.
|
||||
"""
|
||||
log_dir = raw_data / "logs" / "SUB" / "log"
|
||||
if not log_dir.exists():
|
||||
return {}
|
||||
|
||||
result: dict[str, list[dict]] = {}
|
||||
for f in sorted(log_dir.glob("*_usbl.csv")):
|
||||
m = AUV_CSV_RE.search(f.name)
|
||||
if not m:
|
||||
continue
|
||||
auv_id = normalize_auv(m.group(1))
|
||||
# parse timestamp from filename: 2026-05-05_08-16-00_AUV010_usbl.csv
|
||||
ts_m = re.match(r"(\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2})", f.name)
|
||||
ts = None
|
||||
if ts_m:
|
||||
try:
|
||||
ts = datetime.strptime(ts_m.group(1), "%Y-%m-%d_%H-%M-%S")
|
||||
except ValueError:
|
||||
pass
|
||||
result.setdefault(auv_id, []).append({
|
||||
"path": str(f),
|
||||
"auv_id": auv_id,
|
||||
"timestamp": ts.isoformat() if ts else None,
|
||||
"size_kb": round(f.stat().st_size / 1e3, 1),
|
||||
"format": "raw_serial", # NOT lat/lon CSV
|
||||
})
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def detect_auv_mapping(videos: dict, bags: dict) -> dict[str, str]:
|
||||
"""Detect AUV ID mapping between video folders (AUV2xx) and bag sessions (AUV0xx).
|
||||
|
||||
Heuristic: if video AUV2xx and bag AUV0xx share same last 2 digits and
|
||||
timestamps are within 60s → they are the same physical AUV.
|
||||
|
||||
Returns: {video_auv_id: bag_auv_id} e.g. {"AUV210": "AUV010"}
|
||||
"""
|
||||
mapping: dict[str, str] = {}
|
||||
|
||||
for vid_auv in videos:
|
||||
vid_ts_list = []
|
||||
for v in videos[vid_auv]:
|
||||
if v.get("create_date"):
|
||||
try:
|
||||
vid_ts_list.append(datetime.fromisoformat(v["create_date"]))
|
||||
except Exception:
|
||||
pass
|
||||
if not vid_ts_list:
|
||||
continue
|
||||
|
||||
vid_digits = vid_auv[-2:] # last 2 digits of AUV2xx
|
||||
|
||||
best_bag_auv = None
|
||||
best_delta = timedelta(seconds=999)
|
||||
|
||||
for bag_auv in bags:
|
||||
# check same last 2 digits
|
||||
if bag_auv[-2:] != vid_digits:
|
||||
continue
|
||||
for sess in bags[bag_auv]:
|
||||
if sess.get("timestamp"):
|
||||
try:
|
||||
bag_ts = datetime.fromisoformat(sess["timestamp"])
|
||||
except Exception:
|
||||
continue
|
||||
for vt in vid_ts_list:
|
||||
delta = abs(vt - bag_ts)
|
||||
if delta < best_delta:
|
||||
best_delta = delta
|
||||
best_bag_auv = bag_auv
|
||||
|
||||
if best_bag_auv and best_delta < timedelta(seconds=3600):
|
||||
mapping[vid_auv] = best_bag_auv
|
||||
|
||||
return mapping
|
||||
|
||||
|
||||
def group_video_segments(video_list: list[dict], gap_min: int = 5) -> list[dict]:
|
||||
"""Group consecutive videos into segments by timestamp gap."""
|
||||
sorted_vids = sorted(
|
||||
[v for v in video_list if v.get("create_date")],
|
||||
key=lambda x: x["create_date"]
|
||||
)
|
||||
if not sorted_vids:
|
||||
return [{"videos": video_list, "label": "seg_unknown", "total_s": None}]
|
||||
|
||||
segments = []
|
||||
current: list[dict] = [sorted_vids[0]]
|
||||
|
||||
for vid in sorted_vids[1:]:
|
||||
prev = current[-1]
|
||||
try:
|
||||
prev_end = datetime.fromisoformat(prev["create_date"])
|
||||
if prev.get("duration_s"):
|
||||
prev_end += timedelta(seconds=prev["duration_s"])
|
||||
cur_start = datetime.fromisoformat(vid["create_date"])
|
||||
gap = (cur_start - prev_end).total_seconds() / 60
|
||||
if gap > gap_min:
|
||||
segments.append(_finalize_segment(current))
|
||||
current = [vid]
|
||||
else:
|
||||
current.append(vid)
|
||||
except Exception:
|
||||
current.append(vid)
|
||||
|
||||
if current:
|
||||
segments.append(_finalize_segment(current))
|
||||
|
||||
return segments
|
||||
|
||||
|
||||
def _finalize_segment(videos: list[dict]) -> dict:
|
||||
label = videos[0]["create_date"][:19].replace(":", "-").replace("T", "_") if videos[0].get("create_date") else "seg_unknown"
|
||||
total_s = sum(v["duration_s"] or 0 for v in videos)
|
||||
return {
|
||||
"label": label,
|
||||
"videos": videos,
|
||||
"total_s": total_s,
|
||||
"start": videos[0].get("create_date"),
|
||||
"end": videos[-1].get("create_date"),
|
||||
}
|
||||
|
||||
|
||||
def build_manifest(mission_path: Path, gap_min: int = 5) -> dict:
|
||||
raw_data = mission_path / "raw_data"
|
||||
if not raw_data.exists():
|
||||
# try direct
|
||||
raw_data = mission_path
|
||||
|
||||
print(f"[01_ingest] scanning {raw_data} ...")
|
||||
|
||||
videos = scan_videos(raw_data)
|
||||
bags = scan_bags(raw_data)
|
||||
csvs = scan_usbl_csv(raw_data)
|
||||
auv_map = detect_auv_mapping(videos, bags)
|
||||
|
||||
# Build per-AUV segments
|
||||
auv_segments: dict[str, list[dict]] = {}
|
||||
for auv_id, vid_list in videos.items():
|
||||
segs = group_video_segments(vid_list, gap_min=gap_min)
|
||||
auv_segments[auv_id] = segs
|
||||
|
||||
# Compute AUVs with real data
|
||||
auv_ids_with_video = sorted(videos.keys())
|
||||
auv_ids_with_bags = sorted(bags.keys())
|
||||
|
||||
total_video_s = sum(
|
||||
seg["total_s"] or 0
|
||||
for segs in auv_segments.values()
|
||||
for seg in segs
|
||||
)
|
||||
|
||||
manifest = {
|
||||
"mission": mission_path.name,
|
||||
"ssd_path": str(mission_path),
|
||||
"generated_at": now_iso(),
|
||||
"auv_ids_video": auv_ids_with_video,
|
||||
"auv_ids_bags": auv_ids_with_bags,
|
||||
"auv_mapping": auv_map,
|
||||
"total_video_s": round(total_video_s),
|
||||
"segments_per_auv": auv_segments,
|
||||
"bag_sessions_per_auv": bags,
|
||||
"usbl_csv_per_auv": csvs,
|
||||
"notes": {
|
||||
"usbl_csv_format": "raw_serial_bytes",
|
||||
"nav_source": "mcap_bags",
|
||||
},
|
||||
}
|
||||
|
||||
return manifest
|
||||
|
||||
|
||||
def ingest(mission_path: Path, mission_name: str, out_dir: Path,
|
||||
gap_min: int = 5) -> dict:
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
manifest_path = out_dir / mission_name / "manifest.json"
|
||||
|
||||
# Idempotency check
|
||||
if manifest_path.exists():
|
||||
existing = json.loads(manifest_path.read_text())
|
||||
chk = hashlib.sha256(mission_path.name.encode()).hexdigest()[:8]
|
||||
print(f"[01_ingest] manifest exists (checksum {chk}), skipping scan")
|
||||
return existing
|
||||
|
||||
manifest = build_manifest(mission_path, gap_min=gap_min)
|
||||
|
||||
# Save manifest
|
||||
manifest_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
manifest_path.write_text(json.dumps(manifest, indent=2))
|
||||
print(f"[01_ingest] manifest saved: {manifest_path}")
|
||||
|
||||
# Write to DB
|
||||
init_db()
|
||||
with get_conn() as conn:
|
||||
upsert_mission(conn, mission_name, str(mission_path),
|
||||
status="ingested", manifest=json.dumps(manifest))
|
||||
|
||||
return manifest
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="Stage 01 — Ingest mission from SSD")
|
||||
ap.add_argument("mission_path", type=Path, help="Path to mission folder (e.g. /mnt/ssd/20260505-Lepradet)")
|
||||
ap.add_argument("--name", type=str, default=None, help="Mission name (defaults to folder name)")
|
||||
ap.add_argument("--out", type=Path, default=Path("/home/cosma/cosma-pipeline"), help="Output base dir")
|
||||
ap.add_argument("--gap-min", type=int, default=5, help="Gap in minutes to split video segments")
|
||||
args = ap.parse_args()
|
||||
|
||||
mission_name = args.name or args.mission_path.name
|
||||
manifest = ingest(args.mission_path, mission_name, args.out, args.gap_min)
|
||||
|
||||
print(f"\n=== Ingest summary for {mission_name} ===")
|
||||
print(f"AUVs with video: {manifest['auv_ids_video']}")
|
||||
print(f"AUVs with bags: {manifest['auv_ids_bags']}")
|
||||
print(f"AUV mapping: {manifest['auv_mapping']}")
|
||||
print(f"Total video: {manifest['total_video_s']}s")
|
||||
print(f"Segments:")
|
||||
for auv, segs in manifest["segments_per_auv"].items():
|
||||
for seg in segs:
|
||||
n_vids = len(seg["videos"])
|
||||
dur = f"{seg['total_s']:.0f}s" if seg["total_s"] else "?"
|
||||
print(f" {auv} / {seg['label']} {n_vids} videos {dur}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
272
pipeline/stages/02_usbl_parse.py
Normal file
272
pipeline/stages/02_usbl_parse.py
Normal file
@@ -0,0 +1,272 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Stage 02 — Parse USBL/navigation from MCAP bag files.
|
||||
|
||||
The USBL CSV logs in logs/SUB/log/ contain raw serial bytes, NOT lat/lon.
|
||||
Real navigation data is in MCAP bags (logs/SUB/bag/).
|
||||
|
||||
This stage:
|
||||
1. Reads MCAP files per AUV session
|
||||
2. Extracts position topics (configurable in default_params.yaml)
|
||||
3. Falls back to parsing serial bytes if no nav topic found (best-effort)
|
||||
4. Outputs Parquet per AUV with columns: timestamp, lat, lon, depth, heading
|
||||
|
||||
Usage:
|
||||
python3 02_usbl_parse.py /home/cosma/cosma-pipeline/20260505-Lepradet/manifest.json
|
||||
python3 02_usbl_parse.py /home/cosma/cosma-pipeline/20260505-Lepradet/manifest.json --auv AUV010
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso
|
||||
|
||||
LOG_DIR = Path(os.environ.get("COSMA_PIPELINE_LOGS", "/home/cosma/cosma-pipeline/logs"))
|
||||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Known nav topics to try in MCAP files
|
||||
NAV_TOPICS = [
|
||||
"/usbl/position",
|
||||
"/usbl/fix",
|
||||
"/navigation/position",
|
||||
"/bluerov/usbl",
|
||||
"/waterlinked/position",
|
||||
"/fix",
|
||||
"/gps/fix",
|
||||
"/mavros/global_position/global",
|
||||
"/mavros/global_position/local",
|
||||
]
|
||||
|
||||
|
||||
def try_parse_mcap(mcap_path: Path, topics: list[str] | None = None) -> list[dict]:
|
||||
"""Try to extract nav points from MCAP file. Returns list of {ts, lat, lon, depth}."""
|
||||
try:
|
||||
from mcap.reader import make_reader
|
||||
except ImportError:
|
||||
print(f" [02] mcap not installed, skipping {mcap_path.name}")
|
||||
return []
|
||||
|
||||
points = []
|
||||
try:
|
||||
with open(mcap_path, "rb") as f:
|
||||
reader = make_reader(f)
|
||||
for schema, channel, message in reader.iter_messages(topics=topics):
|
||||
# Try to deserialize — support ROS2 JSON-encoded or raw
|
||||
try:
|
||||
import json as _json
|
||||
data = _json.loads(message.data)
|
||||
lat = data.get("latitude") or data.get("lat")
|
||||
lon = data.get("longitude") or data.get("lon")
|
||||
depth = data.get("depth") or data.get("altitude")
|
||||
heading = data.get("heading") or data.get("yaw")
|
||||
if lat is not None and lon is not None:
|
||||
ts_ns = message.log_time
|
||||
ts = ts_ns / 1e9
|
||||
points.append({
|
||||
"timestamp": ts,
|
||||
"lat": float(lat),
|
||||
"lon": float(lon),
|
||||
"depth": float(depth) if depth is not None else None,
|
||||
"heading": float(heading) if heading is not None else None,
|
||||
"source": channel.topic,
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f" [02] MCAP read error {mcap_path.name}: {e}")
|
||||
|
||||
return points
|
||||
|
||||
|
||||
def try_parse_serial_csv(csv_path: Path) -> list[dict]:
|
||||
"""Best-effort: parse raw serial byte log for USBL range/bearing frames.
|
||||
Waterlinked USBL M64 protocol: 0xBB 0x55 frame header.
|
||||
This is a rough attempt — actual lat/lon requires ship GPS + range+bearing.
|
||||
Returns relative-only positions (range, bearing) if decoded.
|
||||
"""
|
||||
points = []
|
||||
try:
|
||||
with open(csv_path, "r") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
parts = line.split(",", 2)
|
||||
if len(parts) < 3:
|
||||
continue
|
||||
ts_str, direction, raw = parts[0], parts[1], parts[2]
|
||||
if direction.strip() != "RECEIVED":
|
||||
continue
|
||||
# Extract bytes from repr string b'\xbb...'
|
||||
try:
|
||||
raw_clean = raw.strip().strip('"')
|
||||
# Parse Python bytes repr
|
||||
data = eval(raw_clean) # safe: only used on known CSV
|
||||
if len(data) >= 4 and data[0] == 0xBB and data[1] == 0x55:
|
||||
# Waterlinked M64 position frame: len byte at [2], payload follows
|
||||
payload_len = data[2]
|
||||
if len(data) >= payload_len + 3:
|
||||
# Best effort: look for float32 values in payload
|
||||
# Actual protocol decoding would need WL M64 spec
|
||||
ts = datetime.fromisoformat(ts_str)
|
||||
points.append({
|
||||
"timestamp": ts.timestamp(),
|
||||
"lat": None,
|
||||
"lon": None,
|
||||
"depth": None,
|
||||
"heading": None,
|
||||
"source": "serial_raw",
|
||||
"raw_bytes": data.hex()[:32],
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f" [02] serial CSV parse error {csv_path.name}: {e}")
|
||||
|
||||
return points
|
||||
|
||||
|
||||
def parse_auv_sessions(manifest: dict, auv_id: str, out_dir: Path,
|
||||
topics: list[str] | None = None) -> dict:
|
||||
"""Parse all sessions for one AUV. Returns metrics dict."""
|
||||
metrics = {"auv_id": auv_id, "points_raw": 0, "sources": [], "status": "ok"}
|
||||
all_points: list[dict] = []
|
||||
|
||||
# Try MCAP bags first
|
||||
bag_sessions = manifest.get("bag_sessions_per_auv", {}).get(auv_id, [])
|
||||
if not bag_sessions:
|
||||
# Try mapping: maybe bags use AUV0xx while videos use AUV2xx
|
||||
auv_map = manifest.get("auv_mapping", {})
|
||||
bag_auv = auv_map.get(auv_id)
|
||||
if bag_auv:
|
||||
bag_sessions = manifest.get("bag_sessions_per_auv", {}).get(bag_auv, [])
|
||||
|
||||
for sess in bag_sessions:
|
||||
for mcap_path_str in sess.get("mcap_files", []):
|
||||
mcap_path = Path(mcap_path_str)
|
||||
if not mcap_path.exists():
|
||||
continue
|
||||
pts = try_parse_mcap(mcap_path, topics=topics or NAV_TOPICS)
|
||||
if pts:
|
||||
all_points.extend(pts)
|
||||
if "mcap" not in metrics["sources"]:
|
||||
metrics["sources"].append("mcap")
|
||||
print(f" [02] {auv_id} {mcap_path.name}: {len(pts)} nav points")
|
||||
|
||||
# Fallback: serial CSV if no MCAP nav
|
||||
if not all_points:
|
||||
csv_entries = manifest.get("usbl_csv_per_auv", {}).get(auv_id, [])
|
||||
for entry in csv_entries:
|
||||
csv_path = Path(entry["path"])
|
||||
if not csv_path.exists():
|
||||
continue
|
||||
pts = try_parse_serial_csv(csv_path)
|
||||
if pts:
|
||||
all_points.extend(pts)
|
||||
if "serial_csv" not in metrics["sources"]:
|
||||
metrics["sources"].append("serial_csv")
|
||||
|
||||
metrics["points_raw"] = len(all_points)
|
||||
|
||||
if not all_points:
|
||||
metrics["status"] = "degraded"
|
||||
metrics["note"] = "no nav points found in MCAP or serial CSV"
|
||||
print(f" [02] {auv_id}: NO nav data found — degraded")
|
||||
else:
|
||||
print(f" [02] {auv_id}: {len(all_points)} raw nav points from {metrics['sources']}")
|
||||
|
||||
# Save output
|
||||
out_file = out_dir / f"{auv_id}_nav_raw.json"
|
||||
out_file.write_text(json.dumps({
|
||||
"auv_id": auv_id,
|
||||
"generated_at": now_iso(),
|
||||
"metrics": metrics,
|
||||
"points": all_points,
|
||||
}, indent=2, default=str))
|
||||
|
||||
# Also save as simple CSV for downstream
|
||||
csv_out = out_dir / f"{auv_id}_nav_raw.csv"
|
||||
with open(csv_out, "w") as f:
|
||||
f.write("timestamp,lat,lon,depth,heading,source\n")
|
||||
for p in all_points:
|
||||
f.write(f"{p['timestamp']},{p.get('lat','')},{p.get('lon','')},{p.get('depth','')},{p.get('heading','')},{p.get('source','')}\n")
|
||||
|
||||
return metrics
|
||||
|
||||
|
||||
def parse_mission(manifest_path: Path, auv_filter: str | None = None,
|
||||
out_dir: Path | None = None) -> list[dict]:
|
||||
manifest = json.loads(manifest_path.read_text())
|
||||
mission_name = manifest["mission"]
|
||||
|
||||
if out_dir is None:
|
||||
out_dir = manifest_path.parent / "02_usbl_raw"
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Idempotency: check if all AUV outputs exist
|
||||
auv_ids = manifest.get("auv_ids_bags", []) or manifest.get("auv_ids_video", [])
|
||||
if auv_filter:
|
||||
auv_ids = [a for a in auv_ids if a == auv_filter]
|
||||
|
||||
all_metrics = []
|
||||
init_db()
|
||||
|
||||
for auv_id in auv_ids:
|
||||
out_file = out_dir / f"{auv_id}_nav_raw.json"
|
||||
if out_file.exists():
|
||||
print(f"[02] {auv_id}: output exists, skipping")
|
||||
existing = json.loads(out_file.read_text())
|
||||
all_metrics.append(existing.get("metrics", {"auv_id": auv_id, "status": "cached"}))
|
||||
continue
|
||||
|
||||
print(f"[02] Parsing {auv_id} ...")
|
||||
m = parse_auv_sessions(manifest, auv_id, out_dir)
|
||||
all_metrics.append(m)
|
||||
|
||||
# Record in DB
|
||||
with get_conn() as conn:
|
||||
from orchestrator.db import upsert_mission
|
||||
mission_id_row = conn.execute(
|
||||
"SELECT id FROM missions WHERE name=?", (mission_name,)
|
||||
).fetchone()
|
||||
if mission_id_row:
|
||||
mission_id = mission_id_row["id"]
|
||||
job_id = upsert_job(conn, mission_id, auv_id, "all", "02_usbl_parse",
|
||||
status="done" if m["status"] == "ok" else m["status"],
|
||||
output_path=str(out_dir))
|
||||
record_metric(conn, job_id, "usbl_points_raw",
|
||||
value=m["points_raw"],
|
||||
pass_fail="pass" if m["points_raw"] > 0 else "fail")
|
||||
|
||||
return all_metrics
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="Stage 02 — Parse USBL/nav from MCAP bags")
|
||||
ap.add_argument("manifest", type=Path, help="manifest.json from stage 01")
|
||||
ap.add_argument("--auv", type=str, default=None, help="Filter to single AUV ID")
|
||||
ap.add_argument("--out", type=Path, default=None, help="Output directory")
|
||||
args = ap.parse_args()
|
||||
|
||||
metrics = parse_mission(args.manifest, auv_filter=args.auv, out_dir=args.out)
|
||||
|
||||
print("\n=== Stage 02 summary ===")
|
||||
total_pts = sum(m.get("points_raw", 0) for m in metrics)
|
||||
for m in metrics:
|
||||
status = m.get("status", "?")
|
||||
pts = m.get("points_raw", 0)
|
||||
src = m.get("sources", [])
|
||||
print(f" {m['auv_id']}: {pts} pts {src} [{status}]")
|
||||
print(f"Total nav points: {total_pts}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
248
pipeline/stages/03_usbl_filter.py
Normal file
248
pipeline/stages/03_usbl_filter.py
Normal file
@@ -0,0 +1,248 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Stage 03 — Filter and smooth USBL navigation trajectory.
|
||||
|
||||
Input: 02_usbl_raw/<AUV>_nav_raw.json (or .csv)
|
||||
Output: 03_usbl_filtered/<AUV>_nav_filtered.json + .csv
|
||||
|
||||
Steps:
|
||||
1. Drop points with null lat/lon
|
||||
2. MAD-3σ outlier removal on lat, lon, depth independently
|
||||
3. Moving-average smoothing (window=5 by default)
|
||||
4. Optional: simple 1D Kalman on each axis (KISS — no cross-covariance)
|
||||
|
||||
Usage:
|
||||
python3 03_usbl_filter.py /home/cosma/cosma-pipeline/20260505-Lepradet/02_usbl_raw/
|
||||
python3 03_usbl_filter.py /path/to/02_usbl_raw/ --auv AUV010 --sigma 2.5
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso
|
||||
|
||||
LOG_DIR = Path(os.environ.get("COSMA_PIPELINE_LOGS", "/home/cosma/cosma-pipeline/logs"))
|
||||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def mad_outlier_mask(arr: np.ndarray, sigma: float = 3.0) -> np.ndarray:
|
||||
"""Returns boolean mask: True = keep (inlier). Uses MAD-based sigma."""
|
||||
if len(arr) < 4:
|
||||
return np.ones(len(arr), dtype=bool)
|
||||
median = np.median(arr)
|
||||
mad = np.median(np.abs(arr - median))
|
||||
if mad == 0:
|
||||
return np.ones(len(arr), dtype=bool)
|
||||
modified_z = 0.6745 * (arr - median) / mad
|
||||
return np.abs(modified_z) < sigma
|
||||
|
||||
|
||||
def moving_average(arr: np.ndarray, window: int = 5) -> np.ndarray:
|
||||
"""Centered moving average with edge padding."""
|
||||
if len(arr) < window:
|
||||
return arr.copy()
|
||||
pad = window // 2
|
||||
padded = np.pad(arr, (pad, pad), mode="edge")
|
||||
kernel = np.ones(window) / window
|
||||
return np.convolve(padded, kernel, mode="valid")[:len(arr)]
|
||||
|
||||
|
||||
def simple_kalman_1d(measurements: np.ndarray,
|
||||
process_noise: float = 1e-4,
|
||||
measurement_noise: float = 1e-2) -> np.ndarray:
|
||||
"""Very simple 1D Kalman filter (scalar, no velocity state).
|
||||
KISS: just smooths, no cross-axis coupling.
|
||||
"""
|
||||
n = len(measurements)
|
||||
filtered = np.zeros(n)
|
||||
x_est = measurements[0]
|
||||
p_est = 1.0
|
||||
|
||||
for i, z in enumerate(measurements):
|
||||
# Predict
|
||||
p_pred = p_est + process_noise
|
||||
# Update
|
||||
K = p_pred / (p_pred + measurement_noise)
|
||||
x_est = x_est + K * (z - x_est)
|
||||
p_est = (1 - K) * p_pred
|
||||
filtered[i] = x_est
|
||||
|
||||
return filtered
|
||||
|
||||
|
||||
def filter_auv_nav(raw_path: Path, out_path: Path,
|
||||
sigma: float = 3.0, window: int = 5,
|
||||
use_kalman: bool = False) -> dict:
|
||||
"""Filter nav for one AUV. Returns metrics dict."""
|
||||
data = json.loads(raw_path.read_text())
|
||||
points = data.get("points", [])
|
||||
auv_id = data.get("auv_id", raw_path.stem.replace("_nav_raw", ""))
|
||||
|
||||
metrics = {
|
||||
"auv_id": auv_id,
|
||||
"points_before": len(points),
|
||||
"points_after": 0,
|
||||
"points_removed_null": 0,
|
||||
"points_removed_outlier": 0,
|
||||
"status": "ok",
|
||||
}
|
||||
|
||||
if not points:
|
||||
metrics["status"] = "degraded"
|
||||
metrics["note"] = "no points to filter"
|
||||
_save_output(auv_id, [], out_path, metrics)
|
||||
return metrics
|
||||
|
||||
# Step 1: Drop null lat/lon
|
||||
valid = [p for p in points if p.get("lat") is not None and p.get("lon") is not None]
|
||||
metrics["points_removed_null"] = len(points) - len(valid)
|
||||
|
||||
if not valid:
|
||||
metrics["status"] = "degraded"
|
||||
metrics["note"] = "all points have null lat/lon (serial-only data)"
|
||||
print(f" [03] {auv_id}: all null lat/lon — degraded (serial CSV source, no MCAP nav)")
|
||||
_save_output(auv_id, [], out_path, metrics)
|
||||
return metrics
|
||||
|
||||
# Step 2: MAD outlier removal
|
||||
lats = np.array([p["lat"] for p in valid])
|
||||
lons = np.array([p["lon"] for p in valid])
|
||||
|
||||
mask_lat = mad_outlier_mask(lats, sigma)
|
||||
mask_lon = mad_outlier_mask(lons, sigma)
|
||||
mask = mask_lat & mask_lon
|
||||
|
||||
# Also filter depth if present
|
||||
depths = np.array([p.get("depth") or np.nan for p in valid])
|
||||
if not np.all(np.isnan(depths)):
|
||||
mask_depth = mad_outlier_mask(depths[~np.isnan(depths)], sigma)
|
||||
# Map back — only filter where we have depth
|
||||
depth_idx = np.where(~np.isnan(depths))[0]
|
||||
for i, keep in zip(depth_idx, mask_depth):
|
||||
if not keep:
|
||||
mask[i] = False
|
||||
|
||||
filtered_points = [p for p, keep in zip(valid, mask) if keep]
|
||||
metrics["points_removed_outlier"] = int(np.sum(~mask))
|
||||
|
||||
# Step 3: Sort by timestamp
|
||||
filtered_points.sort(key=lambda p: p["timestamp"])
|
||||
|
||||
# Step 4: Smooth
|
||||
if len(filtered_points) >= window:
|
||||
filt_lats = moving_average(np.array([p["lat"] for p in filtered_points]), window)
|
||||
filt_lons = moving_average(np.array([p["lon"] for p in filtered_points]), window)
|
||||
for i, p in enumerate(filtered_points):
|
||||
p = dict(p)
|
||||
p["lat"] = float(filt_lats[i])
|
||||
p["lon"] = float(filt_lons[i])
|
||||
filtered_points[i] = p
|
||||
|
||||
# Step 5: Optional Kalman
|
||||
if use_kalman and len(filtered_points) > 4:
|
||||
k_lats = simple_kalman_1d(np.array([p["lat"] for p in filtered_points]))
|
||||
k_lons = simple_kalman_1d(np.array([p["lon"] for p in filtered_points]))
|
||||
for i, p in enumerate(filtered_points):
|
||||
p = dict(p)
|
||||
p["lat"] = float(k_lats[i])
|
||||
p["lon"] = float(k_lons[i])
|
||||
filtered_points[i] = p
|
||||
|
||||
metrics["points_after"] = len(filtered_points)
|
||||
if metrics["points_after"] < 5:
|
||||
metrics["status"] = "degraded"
|
||||
metrics["note"] = f"too few points after filter: {metrics['points_after']}"
|
||||
|
||||
print(f" [03] {auv_id}: {metrics['points_before']} → {metrics['points_after']} "
|
||||
f"(removed {metrics['points_removed_null']} null, {metrics['points_removed_outlier']} outliers)")
|
||||
|
||||
_save_output(auv_id, filtered_points, out_path, metrics)
|
||||
return metrics
|
||||
|
||||
|
||||
def _save_output(auv_id: str, points: list[dict], out_dir: Path, metrics: dict) -> None:
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
json_out = out_dir / f"{auv_id}_nav_filtered.json"
|
||||
json_out.write_text(json.dumps({
|
||||
"auv_id": auv_id,
|
||||
"generated_at": now_iso(),
|
||||
"metrics": metrics,
|
||||
"points": points,
|
||||
}, indent=2, default=str))
|
||||
|
||||
csv_out = out_dir / f"{auv_id}_nav_filtered.csv"
|
||||
with open(csv_out, "w") as f:
|
||||
f.write("timestamp,lat,lon,depth,heading,source\n")
|
||||
for p in points:
|
||||
f.write(f"{p['timestamp']},{p.get('lat','')},{p.get('lon','')},{p.get('depth','')},{p.get('heading','')},{p.get('source','')}\n")
|
||||
|
||||
|
||||
def filter_mission(raw_dir: Path, out_dir: Path | None = None,
|
||||
auv_filter: str | None = None,
|
||||
sigma: float = 3.0, window: int = 5,
|
||||
use_kalman: bool = False) -> list[dict]:
|
||||
if out_dir is None:
|
||||
out_dir = raw_dir.parent / "03_usbl_filtered"
|
||||
|
||||
raw_files = sorted(raw_dir.glob("*_nav_raw.json"))
|
||||
if auv_filter:
|
||||
raw_files = [f for f in raw_files if auv_filter in f.name]
|
||||
|
||||
all_metrics = []
|
||||
init_db()
|
||||
|
||||
for raw_file in raw_files:
|
||||
out_file = out_dir / raw_file.name.replace("_raw", "_filtered")
|
||||
if out_file.exists():
|
||||
print(f"[03] {raw_file.stem}: output exists, skipping")
|
||||
existing = json.loads(out_file.read_text())
|
||||
all_metrics.append(existing.get("metrics", {}))
|
||||
continue
|
||||
|
||||
print(f"[03] Filtering {raw_file.name} ...")
|
||||
m = filter_auv_nav(raw_file, out_dir, sigma=sigma, window=window, use_kalman=use_kalman)
|
||||
all_metrics.append(m)
|
||||
|
||||
# DB record
|
||||
with get_conn() as conn:
|
||||
mission_name = raw_dir.parent.name
|
||||
mission_row = conn.execute("SELECT id FROM missions WHERE name=?", (mission_name,)).fetchone()
|
||||
if mission_row:
|
||||
job_id = upsert_job(conn, mission_row["id"], m["auv_id"], "all", "03_usbl_filter",
|
||||
status="done" if m["status"] == "ok" else m["status"],
|
||||
output_path=str(out_dir))
|
||||
record_metric(conn, job_id, "usbl_points_before", value=m.get("points_before", 0))
|
||||
record_metric(conn, job_id, "usbl_points_after", value=m.get("points_after", 0),
|
||||
pass_fail="pass" if m.get("points_after", 0) >= 5 else "fail")
|
||||
record_metric(conn, job_id, "usbl_points_removed_outlier",
|
||||
value=m.get("points_removed_outlier", 0))
|
||||
|
||||
return all_metrics
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="Stage 03 — Filter USBL navigation")
|
||||
ap.add_argument("raw_dir", type=Path, help="Directory with *_nav_raw.json files")
|
||||
ap.add_argument("--out", type=Path, default=None)
|
||||
ap.add_argument("--auv", type=str, default=None)
|
||||
ap.add_argument("--sigma", type=float, default=3.0, help="MAD sigma threshold")
|
||||
ap.add_argument("--window", type=int, default=5, help="Moving average window")
|
||||
ap.add_argument("--kalman", action="store_true", help="Apply simple Kalman smoothing")
|
||||
args = ap.parse_args()
|
||||
|
||||
metrics = filter_mission(args.raw_dir, out_dir=args.out, auv_filter=args.auv,
|
||||
sigma=args.sigma, window=args.window, use_kalman=args.kalman)
|
||||
|
||||
print("\n=== Stage 03 summary ===")
|
||||
for m in metrics:
|
||||
print(f" {m.get('auv_id','?')}: {m.get('points_before',0)} → {m.get('points_after',0)} [{m.get('status','?')}]")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
0
pipeline/stages/__init__.py
Normal file
0
pipeline/stages/__init__.py
Normal file
Reference in New Issue
Block a user