From 67b2121adda394263d3bc94216a17c30cb5d7e84 Mon Sep 17 00:00:00 2001 From: floppyrj45 Date: Sun, 19 Apr 2026 15:20:13 +0000 Subject: [PATCH] feat: add MCAP/CSV ingest, NATS publisher with stdout fallback, and CLI Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cosma_log_analyzer/bus.py | 77 ++++++++++++++++ src/cosma_log_analyzer/ingest.py | 143 +++++++++++++++++++++++++++++ src/cosma_log_analyzer/main.py | 148 +++++++++++++++++++++++++++++++ src/cosma_log_analyzer/models.py | 40 +++++++++ 4 files changed, 408 insertions(+) create mode 100644 src/cosma_log_analyzer/bus.py create mode 100644 src/cosma_log_analyzer/ingest.py create mode 100644 src/cosma_log_analyzer/main.py create mode 100644 src/cosma_log_analyzer/models.py diff --git a/src/cosma_log_analyzer/bus.py b/src/cosma_log_analyzer/bus.py new file mode 100644 index 0000000..f49ffa1 --- /dev/null +++ b/src/cosma_log_analyzer/bus.py @@ -0,0 +1,77 @@ +"""Event bus publisher: NATS when configured, stdout otherwise.""" +from __future__ import annotations + +import asyncio +import logging +import os +import sys +from typing import Protocol + +from .models import Anomaly + +logger = logging.getLogger(__name__) + + +class Publisher(Protocol): + def publish(self, anomaly: Anomaly) -> None: ... + def close(self) -> None: ... + + +class StdoutPublisher: + """Fallback: writes JSON Lines to stdout. Thread/process-safe via writes.""" + + def __init__(self, stream=None) -> None: + self._stream = stream if stream is not None else sys.stdout + + def publish(self, anomaly: Anomaly) -> None: + line = anomaly.to_json() + self._stream.write(line + "\n") + self._stream.flush() + + def close(self) -> None: + try: + self._stream.flush() + except Exception: + pass + + +class NatsPublisher: + """Sync wrapper around nats-py async client. Keeps a dedicated loop.""" + + def __init__(self, url: str) -> None: + self._url = url + self._loop = asyncio.new_event_loop() + self._nc = None + self._loop.run_until_complete(self._connect()) + + async def _connect(self) -> None: + import nats # lazy import + + self._nc = await nats.connect(self._url) + + def publish(self, anomaly: Anomaly) -> None: + if self._nc is None: + raise RuntimeError("NATS client not connected") + payload = anomaly.to_json().encode("utf-8") + self._loop.run_until_complete(self._nc.publish(anomaly.nats_subject(), payload)) + + def close(self) -> None: + if self._nc is None: + return + try: + self._loop.run_until_complete(self._nc.drain()) + except Exception as exc: + logger.warning("NATS drain failed: %s", exc) + finally: + self._loop.close() + self._nc = None + + +def make_publisher(nats_url: str | None = None) -> Publisher: + """Pick NATS or stdout based on env/arg. Empty/None -> stdout fallback.""" + url = nats_url if nats_url is not None else os.environ.get("NATS_URL", "") + if not url: + logger.info("NATS_URL empty -> using stdout fallback publisher") + return StdoutPublisher() + logger.info("Connecting NATS publisher to %s", url) + return NatsPublisher(url) diff --git a/src/cosma_log_analyzer/ingest.py b/src/cosma_log_analyzer/ingest.py new file mode 100644 index 0000000..0bc039d --- /dev/null +++ b/src/cosma_log_analyzer/ingest.py @@ -0,0 +1,143 @@ +"""MCAP + CSV readers. + +MCAP decoding stays schema-agnostic: we read each message's bytes and try +JSON first (CDR/ROS2 is out of scope for v0 — field extraction relies on +known field names once decoded). If the message body is not JSON we skip it. + +Rules operate on pandas DataFrames with per-topic columns: + imu -> ts, ax, ay, az, gx, gy, gz + usbl -> ts, distance_m, snr_db + battery -> ts, voltage_v +""" +from __future__ import annotations + +import csv +import json +import logging +from collections.abc import Iterable, Iterator +from pathlib import Path +from typing import Any + +import pandas as pd + +logger = logging.getLogger(__name__) + + +TOPIC_IMU = "/mavros/imu/data" +TOPIC_USBL = "/usbl_reading/usbl_solution" +TOPIC_BATTERY = "/mavros/battery" + +KNOWN_TOPICS = (TOPIC_IMU, TOPIC_USBL, TOPIC_BATTERY) + + +def _decode_payload(data: bytes) -> dict[str, Any] | None: + try: + return json.loads(data.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError): + return None + + +def iter_mcap_messages( + path: str | Path, + topics: Iterable[str] | None = None, +) -> Iterator[tuple[str, float, dict[str, Any]]]: + """Yield (topic, ts_seconds, decoded_payload) for matching messages.""" + from mcap.reader import make_reader # lazy: lib mcap + + wanted = set(topics) if topics else None + with open(path, "rb") as fh: + reader = make_reader(fh) + for schema, channel, message in reader.iter_messages(topics=list(wanted) if wanted else None): + payload = _decode_payload(message.data) + if payload is None: + continue + ts = message.log_time / 1e9 # MCAP uses ns + yield channel.topic, ts, payload + + +def _extract_imu(payload: dict[str, Any]) -> dict[str, float] | None: + lin = payload.get("linear_acceleration") or payload.get("accel") or {} + ang = payload.get("angular_velocity") or payload.get("gyro") or {} + if not lin and not ang: + return None + return { + "ax": float(lin.get("x", 0.0)), + "ay": float(lin.get("y", 0.0)), + "az": float(lin.get("z", 0.0)), + "gx": float(ang.get("x", 0.0)), + "gy": float(ang.get("y", 0.0)), + "gz": float(ang.get("z", 0.0)), + } + + +def _extract_usbl(payload: dict[str, Any]) -> dict[str, float] | None: + dist = payload.get("distance_m", payload.get("range_m")) + snr = payload.get("snr_db", payload.get("snr")) + if dist is None or snr is None: + return None + return {"distance_m": float(dist), "snr_db": float(snr)} + + +def _extract_battery(payload: dict[str, Any]) -> dict[str, float] | None: + v = payload.get("voltage_v", payload.get("voltage")) + if v is None: + return None + return {"voltage_v": float(v)} + + +_EXTRACTORS = { + TOPIC_IMU: _extract_imu, + TOPIC_USBL: _extract_usbl, + TOPIC_BATTERY: _extract_battery, +} + + +def load_mcap(path: str | Path) -> dict[str, pd.DataFrame]: + """Load an MCAP file into per-topic DataFrames. + + Returns a dict keyed by topic. Missing topics yield empty DataFrames. + """ + rows: dict[str, list[dict[str, Any]]] = {t: [] for t in KNOWN_TOPICS} + for topic, ts, payload in iter_mcap_messages(path, KNOWN_TOPICS): + extractor = _EXTRACTORS.get(topic) + if extractor is None: + continue + fields = extractor(payload) + if fields is None: + continue + fields["ts"] = ts + rows[topic].append(fields) + + out: dict[str, pd.DataFrame] = {} + for topic, items in rows.items(): + if not items: + out[topic] = pd.DataFrame() + continue + df = pd.DataFrame(items).sort_values("ts").reset_index(drop=True) + out[topic] = df + return out + + +def load_csv_nav(path: str | Path) -> pd.DataFrame: + """Load a USV nav CSV. Expected columns: ts, lat, lon, heading (flexible). + + Missing file or unparseable rows return an empty DataFrame. + """ + p = Path(path) + if not p.exists(): + logger.warning("CSV nav file not found: %s", path) + return pd.DataFrame() + rows: list[dict[str, Any]] = [] + with p.open(newline="") as fh: + reader = csv.DictReader(fh) + for row in reader: + try: + rows.append({k: float(v) if v not in ("", None) else None for k, v in row.items()}) + except ValueError: + continue + if not rows: + return pd.DataFrame() + df = pd.DataFrame(rows) + if "ts" in df.columns: + df = df.sort_values("ts").reset_index(drop=True) + return df diff --git a/src/cosma_log_analyzer/main.py b/src/cosma_log_analyzer/main.py new file mode 100644 index 0000000..76a438e --- /dev/null +++ b/src/cosma_log_analyzer/main.py @@ -0,0 +1,148 @@ +"""CLI + service entrypoint for cosma-log-analyzer.""" +from __future__ import annotations + +import logging +import os +import sys +import time +from pathlib import Path +from typing import Iterable + +import click +from dotenv import load_dotenv + +from .bus import Publisher, StdoutPublisher, make_publisher +from .ingest import ( + KNOWN_TOPICS, + TOPIC_BATTERY, + TOPIC_IMU, + TOPIC_USBL, + load_mcap, +) +from .models import Anomaly +from .rules import all_rules +from .rules.base import Rule + +logger = logging.getLogger(__name__) + + +TOPIC_TO_KEY = { + TOPIC_IMU: TOPIC_IMU, + TOPIC_USBL: TOPIC_USBL, + TOPIC_BATTERY: TOPIC_BATTERY, +} + + +def _setup_logging() -> None: + level = os.environ.get("LOG_LEVEL", "INFO").upper() + logging.basicConfig( + level=getattr(logging, level, logging.INFO), + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + + +def analyze_mcap( + path: str | Path, + subject: str, + rules: Iterable[Rule] | None = None, +) -> list[Anomaly]: + """Load an MCAP file, run all rules, return the anomaly list.""" + dataframes = load_mcap(path) + rules = list(rules) if rules is not None else all_rules() + anomalies: list[Anomaly] = [] + for rule in rules: + rule.bind(subject) + df = dataframes.get(rule.topic) + if df is None or df.empty: + continue + anomalies.extend(rule.detect(df)) + anomalies.sort(key=lambda a: a.timestamp) + return anomalies + + +def emit(anomalies: Iterable[Anomaly], publisher: Publisher) -> int: + count = 0 + for a in anomalies: + publisher.publish(a) + count += 1 + return count + + +@click.group() +def cli() -> None: + """cosma-log-analyzer: deterministic anomaly detection on AUV logs.""" + load_dotenv() + _setup_logging() + + +@cli.command("ingest") +@click.argument("path", type=click.Path(exists=True, dir_okay=False)) +@click.option("--subject", default="AUV000", help="AUV identifier for NATS subject.") +@click.option("--dry-run", is_flag=True, help="Force stdout publisher (ignore NATS_URL).") +def ingest_cmd(path: str, subject: str, dry_run: bool) -> None: + """Analyze a single MCAP file and publish anomalies.""" + publisher: Publisher = StdoutPublisher() if dry_run else make_publisher() + try: + anomalies = analyze_mcap(path, subject) + n = emit(anomalies, publisher) + logger.info("Processed %s -> %d anomalies", path, n) + finally: + publisher.close() + + +@cli.command("serve") +@click.option( + "--mcap-dir", + default=None, + help="Directory to watch (default: MCAP_DIR env).", +) +@click.option( + "--subject", + default="AUV000", + help="AUV identifier for NATS subject.", +) +@click.option( + "--poll-interval", + default=None, + type=float, + help="Seconds between scans (default: POLL_INTERVAL_S env, else 30).", +) +def serve_cmd(mcap_dir: str | None, subject: str, poll_interval: float | None) -> None: + """Watch a directory and process new MCAP files as they appear.""" + mcap_dir = mcap_dir or os.environ.get("MCAP_DIR") or "/data/mcap" + interval = poll_interval if poll_interval is not None else float( + os.environ.get("POLL_INTERVAL_S", "30") + ) + watch_dir = Path(mcap_dir) + watch_dir.mkdir(parents=True, exist_ok=True) + logger.info("Watching %s (interval=%.1fs)", watch_dir, interval) + + publisher = make_publisher() + seen: set[str] = set() + try: + while True: + for mcap_path in sorted(watch_dir.glob("*.mcap")): + key = str(mcap_path) + if key in seen: + continue + logger.info("New MCAP: %s", mcap_path) + try: + anomalies = analyze_mcap(mcap_path, subject) + n = emit(anomalies, publisher) + logger.info("Emitted %d anomalies from %s", n, mcap_path.name) + except Exception as exc: + logger.exception("Failed processing %s: %s", mcap_path, exc) + seen.add(key) + time.sleep(interval) + except KeyboardInterrupt: + logger.info("Shutting down on SIGINT") + finally: + publisher.close() + + +def main() -> None: # pragma: no cover + cli(standalone_mode=True) + + +if __name__ == "__main__": # pragma: no cover + main() diff --git a/src/cosma_log_analyzer/models.py b/src/cosma_log_analyzer/models.py new file mode 100644 index 0000000..0bbcc7f --- /dev/null +++ b/src/cosma_log_analyzer/models.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import json +from dataclasses import asdict, dataclass, field +from typing import Any + + +SEVERITIES = ("info", "warn", "critical") + + +@dataclass +class Anomaly: + rule: str + severity: str + timestamp: float + subject: str + topic: str + value: float | None = None + context: dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + if self.severity not in SEVERITIES: + raise ValueError( + f"severity must be one of {SEVERITIES}, got {self.severity!r}" + ) + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + def to_json(self) -> str: + return json.dumps(self.to_dict(), sort_keys=True, default=_json_default) + + def nats_subject(self) -> str: + return f"cosma.auv.{self.subject}.anomaly.{self.rule}" + + +def _json_default(obj: Any) -> Any: + if hasattr(obj, "isoformat"): + return obj.isoformat() + raise TypeError(f"Not JSON serializable: {type(obj).__name__}")