feat: add MCAP/CSV ingest, NATS publisher with stdout fallback, and CLI

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
floppyrj45
2026-04-19 15:20:13 +00:00
parent f04d7c90c9
commit 67b2121add
4 changed files with 408 additions and 0 deletions

View File

@@ -0,0 +1,77 @@
"""Event bus publisher: NATS when configured, stdout otherwise."""
from __future__ import annotations
import asyncio
import logging
import os
import sys
from typing import Protocol
from .models import Anomaly
logger = logging.getLogger(__name__)
class Publisher(Protocol):
def publish(self, anomaly: Anomaly) -> None: ...
def close(self) -> None: ...
class StdoutPublisher:
"""Fallback: writes JSON Lines to stdout. Thread/process-safe via writes."""
def __init__(self, stream=None) -> None:
self._stream = stream if stream is not None else sys.stdout
def publish(self, anomaly: Anomaly) -> None:
line = anomaly.to_json()
self._stream.write(line + "\n")
self._stream.flush()
def close(self) -> None:
try:
self._stream.flush()
except Exception:
pass
class NatsPublisher:
"""Sync wrapper around nats-py async client. Keeps a dedicated loop."""
def __init__(self, url: str) -> None:
self._url = url
self._loop = asyncio.new_event_loop()
self._nc = None
self._loop.run_until_complete(self._connect())
async def _connect(self) -> None:
import nats # lazy import
self._nc = await nats.connect(self._url)
def publish(self, anomaly: Anomaly) -> None:
if self._nc is None:
raise RuntimeError("NATS client not connected")
payload = anomaly.to_json().encode("utf-8")
self._loop.run_until_complete(self._nc.publish(anomaly.nats_subject(), payload))
def close(self) -> None:
if self._nc is None:
return
try:
self._loop.run_until_complete(self._nc.drain())
except Exception as exc:
logger.warning("NATS drain failed: %s", exc)
finally:
self._loop.close()
self._nc = None
def make_publisher(nats_url: str | None = None) -> Publisher:
"""Pick NATS or stdout based on env/arg. Empty/None -> stdout fallback."""
url = nats_url if nats_url is not None else os.environ.get("NATS_URL", "")
if not url:
logger.info("NATS_URL empty -> using stdout fallback publisher")
return StdoutPublisher()
logger.info("Connecting NATS publisher to %s", url)
return NatsPublisher(url)

View File

@@ -0,0 +1,143 @@
"""MCAP + CSV readers.
MCAP decoding stays schema-agnostic: we read each message's bytes and try
JSON first (CDR/ROS2 is out of scope for v0 — field extraction relies on
known field names once decoded). If the message body is not JSON we skip it.
Rules operate on pandas DataFrames with per-topic columns:
imu -> ts, ax, ay, az, gx, gy, gz
usbl -> ts, distance_m, snr_db
battery -> ts, voltage_v
"""
from __future__ import annotations
import csv
import json
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path
from typing import Any
import pandas as pd
logger = logging.getLogger(__name__)
TOPIC_IMU = "/mavros/imu/data"
TOPIC_USBL = "/usbl_reading/usbl_solution"
TOPIC_BATTERY = "/mavros/battery"
KNOWN_TOPICS = (TOPIC_IMU, TOPIC_USBL, TOPIC_BATTERY)
def _decode_payload(data: bytes) -> dict[str, Any] | None:
try:
return json.loads(data.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError):
return None
def iter_mcap_messages(
path: str | Path,
topics: Iterable[str] | None = None,
) -> Iterator[tuple[str, float, dict[str, Any]]]:
"""Yield (topic, ts_seconds, decoded_payload) for matching messages."""
from mcap.reader import make_reader # lazy: lib mcap
wanted = set(topics) if topics else None
with open(path, "rb") as fh:
reader = make_reader(fh)
for schema, channel, message in reader.iter_messages(topics=list(wanted) if wanted else None):
payload = _decode_payload(message.data)
if payload is None:
continue
ts = message.log_time / 1e9 # MCAP uses ns
yield channel.topic, ts, payload
def _extract_imu(payload: dict[str, Any]) -> dict[str, float] | None:
lin = payload.get("linear_acceleration") or payload.get("accel") or {}
ang = payload.get("angular_velocity") or payload.get("gyro") or {}
if not lin and not ang:
return None
return {
"ax": float(lin.get("x", 0.0)),
"ay": float(lin.get("y", 0.0)),
"az": float(lin.get("z", 0.0)),
"gx": float(ang.get("x", 0.0)),
"gy": float(ang.get("y", 0.0)),
"gz": float(ang.get("z", 0.0)),
}
def _extract_usbl(payload: dict[str, Any]) -> dict[str, float] | None:
dist = payload.get("distance_m", payload.get("range_m"))
snr = payload.get("snr_db", payload.get("snr"))
if dist is None or snr is None:
return None
return {"distance_m": float(dist), "snr_db": float(snr)}
def _extract_battery(payload: dict[str, Any]) -> dict[str, float] | None:
v = payload.get("voltage_v", payload.get("voltage"))
if v is None:
return None
return {"voltage_v": float(v)}
_EXTRACTORS = {
TOPIC_IMU: _extract_imu,
TOPIC_USBL: _extract_usbl,
TOPIC_BATTERY: _extract_battery,
}
def load_mcap(path: str | Path) -> dict[str, pd.DataFrame]:
"""Load an MCAP file into per-topic DataFrames.
Returns a dict keyed by topic. Missing topics yield empty DataFrames.
"""
rows: dict[str, list[dict[str, Any]]] = {t: [] for t in KNOWN_TOPICS}
for topic, ts, payload in iter_mcap_messages(path, KNOWN_TOPICS):
extractor = _EXTRACTORS.get(topic)
if extractor is None:
continue
fields = extractor(payload)
if fields is None:
continue
fields["ts"] = ts
rows[topic].append(fields)
out: dict[str, pd.DataFrame] = {}
for topic, items in rows.items():
if not items:
out[topic] = pd.DataFrame()
continue
df = pd.DataFrame(items).sort_values("ts").reset_index(drop=True)
out[topic] = df
return out
def load_csv_nav(path: str | Path) -> pd.DataFrame:
"""Load a USV nav CSV. Expected columns: ts, lat, lon, heading (flexible).
Missing file or unparseable rows return an empty DataFrame.
"""
p = Path(path)
if not p.exists():
logger.warning("CSV nav file not found: %s", path)
return pd.DataFrame()
rows: list[dict[str, Any]] = []
with p.open(newline="") as fh:
reader = csv.DictReader(fh)
for row in reader:
try:
rows.append({k: float(v) if v not in ("", None) else None for k, v in row.items()})
except ValueError:
continue
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
if "ts" in df.columns:
df = df.sort_values("ts").reset_index(drop=True)
return df

View File

@@ -0,0 +1,148 @@
"""CLI + service entrypoint for cosma-log-analyzer."""
from __future__ import annotations
import logging
import os
import sys
import time
from pathlib import Path
from typing import Iterable
import click
from dotenv import load_dotenv
from .bus import Publisher, StdoutPublisher, make_publisher
from .ingest import (
KNOWN_TOPICS,
TOPIC_BATTERY,
TOPIC_IMU,
TOPIC_USBL,
load_mcap,
)
from .models import Anomaly
from .rules import all_rules
from .rules.base import Rule
logger = logging.getLogger(__name__)
TOPIC_TO_KEY = {
TOPIC_IMU: TOPIC_IMU,
TOPIC_USBL: TOPIC_USBL,
TOPIC_BATTERY: TOPIC_BATTERY,
}
def _setup_logging() -> None:
level = os.environ.get("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, level, logging.INFO),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
def analyze_mcap(
path: str | Path,
subject: str,
rules: Iterable[Rule] | None = None,
) -> list[Anomaly]:
"""Load an MCAP file, run all rules, return the anomaly list."""
dataframes = load_mcap(path)
rules = list(rules) if rules is not None else all_rules()
anomalies: list[Anomaly] = []
for rule in rules:
rule.bind(subject)
df = dataframes.get(rule.topic)
if df is None or df.empty:
continue
anomalies.extend(rule.detect(df))
anomalies.sort(key=lambda a: a.timestamp)
return anomalies
def emit(anomalies: Iterable[Anomaly], publisher: Publisher) -> int:
count = 0
for a in anomalies:
publisher.publish(a)
count += 1
return count
@click.group()
def cli() -> None:
"""cosma-log-analyzer: deterministic anomaly detection on AUV logs."""
load_dotenv()
_setup_logging()
@cli.command("ingest")
@click.argument("path", type=click.Path(exists=True, dir_okay=False))
@click.option("--subject", default="AUV000", help="AUV identifier for NATS subject.")
@click.option("--dry-run", is_flag=True, help="Force stdout publisher (ignore NATS_URL).")
def ingest_cmd(path: str, subject: str, dry_run: bool) -> None:
"""Analyze a single MCAP file and publish anomalies."""
publisher: Publisher = StdoutPublisher() if dry_run else make_publisher()
try:
anomalies = analyze_mcap(path, subject)
n = emit(anomalies, publisher)
logger.info("Processed %s -> %d anomalies", path, n)
finally:
publisher.close()
@cli.command("serve")
@click.option(
"--mcap-dir",
default=None,
help="Directory to watch (default: MCAP_DIR env).",
)
@click.option(
"--subject",
default="AUV000",
help="AUV identifier for NATS subject.",
)
@click.option(
"--poll-interval",
default=None,
type=float,
help="Seconds between scans (default: POLL_INTERVAL_S env, else 30).",
)
def serve_cmd(mcap_dir: str | None, subject: str, poll_interval: float | None) -> None:
"""Watch a directory and process new MCAP files as they appear."""
mcap_dir = mcap_dir or os.environ.get("MCAP_DIR") or "/data/mcap"
interval = poll_interval if poll_interval is not None else float(
os.environ.get("POLL_INTERVAL_S", "30")
)
watch_dir = Path(mcap_dir)
watch_dir.mkdir(parents=True, exist_ok=True)
logger.info("Watching %s (interval=%.1fs)", watch_dir, interval)
publisher = make_publisher()
seen: set[str] = set()
try:
while True:
for mcap_path in sorted(watch_dir.glob("*.mcap")):
key = str(mcap_path)
if key in seen:
continue
logger.info("New MCAP: %s", mcap_path)
try:
anomalies = analyze_mcap(mcap_path, subject)
n = emit(anomalies, publisher)
logger.info("Emitted %d anomalies from %s", n, mcap_path.name)
except Exception as exc:
logger.exception("Failed processing %s: %s", mcap_path, exc)
seen.add(key)
time.sleep(interval)
except KeyboardInterrupt:
logger.info("Shutting down on SIGINT")
finally:
publisher.close()
def main() -> None: # pragma: no cover
cli(standalone_mode=True)
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, field
from typing import Any
SEVERITIES = ("info", "warn", "critical")
@dataclass
class Anomaly:
rule: str
severity: str
timestamp: float
subject: str
topic: str
value: float | None = None
context: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if self.severity not in SEVERITIES:
raise ValueError(
f"severity must be one of {SEVERITIES}, got {self.severity!r}"
)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def to_json(self) -> str:
return json.dumps(self.to_dict(), sort_keys=True, default=_json_default)
def nats_subject(self) -> str:
return f"cosma.auv.{self.subject}.anomaly.{self.rule}"
def _json_default(obj: Any) -> Any:
if hasattr(obj, "isoformat"):
return obj.isoformat()
raise TypeError(f"Not JSON serializable: {type(obj).__name__}")