Files
cosma-log-analyzer/README.md
2026-04-19 15:20:10 +00:00

4.8 KiB

cosma-log-analyzer

Deterministic anomaly detection service for COSMA AUV logs. Ingests MCAP files produced by the AUV/USV pipeline, evaluates a set of rules against IMU / USBL / battery topics, and publishes each detection as a JSON event on NATS (or stdout in dev).

Context

COSMA (Flag) operates an AUV that streams telemetry to a surface USV. All telemetry is persisted as MCAP (ROS2-native container). This service is livrable #3: the first-pass observability layer before any statistical or ML detection is added.

  ┌──────┐  MCAP   ┌──────┐  MCAP   ┌───────────────────┐   NATS    ┌────────────────┐
  │ AUV  │────────▶│ USV  │────────▶│ cosma-log-analyzer│──────────▶│ cosma-monitor  │
  └──────┘         └──────┘         │   (this repo)      │  events   │      UI        │
                                    └───────────────────┘           └────────────────┘

Rules (v0)

Rule Threshold (default) Severity Topic
imu_outliers rolling 10 s window, |z| > 3 warn /mavros/imu/data
watchdog_imu gap > 2 s between two IMU msgs critical /mavros/imu/data
usbl_snr_low SNR < 5 dB for 3 consecutive samples warn /usbl_reading/usbl_solution
usbl_distance_spike |Δdistance| > 50 m in less than 1 s warn /usbl_reading/usbl_solution
battery_low voltage < 13.5 V for more than 5 s critical /mavros/battery

All thresholds are tunable via environment variables (BATTERY_LOW_V, USBL_SNR_LOW, USBL_DIST_SPIKE_M, WATCHDOG_IMU_S) or rule constructor arguments.

NATS subject

cosma.auv.{subject}.anomaly.{rule}
# ex: cosma.auv.AUV206.anomaly.battery_low

If NATS_URL is empty, events are written as JSON Lines to stdout — useful in dev and CI.

Example anomaly payload

{
  "rule": "battery_low",
  "severity": "critical",
  "timestamp": 1700000055.0,
  "subject": "AUV206",
  "topic": "/mavros/battery",
  "value": 13.26,
  "context": {
    "min_voltage_v": 13.5,
    "min_duration_s": 5.0,
    "run_start_ts": 1700000051.0,
    "below_duration_s": 9.0
  }
}

Install

Python 3.11+ recommended. Works on 3.10.

pip install -e .[dev]

CLI

# One-shot on a single MCAP file
cosma-log-analyzer ingest path/to/log.mcap --subject AUV206

# Dry-run: force stdout even if NATS_URL is set
cosma-log-analyzer ingest path/to/log.mcap --dry-run

# Service mode: watch a directory for new MCAP files
cosma-log-analyzer serve --mcap-dir /data/mcap

Docker

docker compose up --build
# drop MCAP files into ./data/mcap and watch NATS on :4222

systemd

sudo cp systemd/cosma-log-analyzer.service /etc/systemd/system/
sudo systemctl enable --now cosma-log-analyzer
journalctl -u cosma-log-analyzer -f

Tests

pytest -v                        # 32 tests, runs the e2e against a fake MCAP
pytest --cov --cov-report=term   # coverage (rules/ > 95%)

The fake MCAP generator (tests/fixtures/generate_fake_mcap.py) produces a synthetic 60 s trace with one instance of each rule's trigger condition — the e2e test asserts we detect exactly those.

Adding a rule

  1. Subclass Rule in src/cosma_log_analyzer/rules/<name>.py:

    class MyRule(Rule):
        name = "my_rule"
        topic = "/my/topic"
        severity = "warn"
    
        def detect(self, df: pd.DataFrame) -> list[Anomaly]:
            ...
    
  2. Register it in rules/__init__.py::all_rules().

  3. Add a test in tests/test_rules.py.

Roadmap v1

  • Rolling-stats rules (heading drift, GPS dropout correlated with USBL).
  • Time alignment between MCAP IMU and CSV USV nav.
  • ML anomaly layer (Isolation Forest) once we have > 50 h of nominal dive datasets to train against.
  • Backpressure + JetStream persistence for the NATS publisher.

Layout

src/cosma_log_analyzer/    # package code
  rules/                   # one file per rule
  main.py                  # Click CLI: `ingest` + `serve`
  ingest.py                # MCAP + CSV readers -> pandas
  bus.py                   # NATS publisher + stdout fallback
  models.py                # Anomaly dataclass
tests/                     # pytest suite + fake MCAP fixture
examples/run_on_fake.sh    # end-to-end demo
systemd/                   # unit file for on-prem deployment