cosma-log-analyzer
Deterministic anomaly detection service for COSMA AUV logs. Ingests MCAP files produced by the AUV/USV pipeline, evaluates a set of rules against IMU / USBL / battery topics, and publishes each detection as a JSON event on NATS (or stdout in dev).
Context
COSMA (Flag) operates an AUV that streams telemetry to a surface USV. All telemetry is persisted as MCAP (ROS2-native container). This service is livrable #3: the first-pass observability layer before any statistical or ML detection is added.
┌──────┐ MCAP ┌──────┐ MCAP ┌───────────────────┐ NATS ┌────────────────┐
│ AUV │────────▶│ USV │────────▶│ cosma-log-analyzer│──────────▶│ cosma-monitor │
└──────┘ └──────┘ │ (this repo) │ events │ UI │
└───────────────────┘ └────────────────┘
Rules (v0)
| Rule | Threshold (default) | Severity | Topic |
|---|---|---|---|
imu_outliers |
rolling 10 s window, |z| > 3 | warn | /mavros/imu/data |
watchdog_imu |
gap > 2 s between two IMU msgs | critical | /mavros/imu/data |
usbl_snr_low |
SNR < 5 dB for 3 consecutive samples | warn | /usbl_reading/usbl_solution |
usbl_distance_spike |
|Δdistance| > 50 m in less than 1 s | warn | /usbl_reading/usbl_solution |
battery_low |
voltage < 13.5 V for more than 5 s | critical | /mavros/battery |
All thresholds are tunable via environment variables (BATTERY_LOW_V,
USBL_SNR_LOW, USBL_DIST_SPIKE_M, WATCHDOG_IMU_S) or rule
constructor arguments.
NATS subject
cosma.auv.{subject}.anomaly.{rule}
# ex: cosma.auv.AUV206.anomaly.battery_low
If NATS_URL is empty, events are written as JSON Lines to stdout —
useful in dev and CI.
Example anomaly payload
{
"rule": "battery_low",
"severity": "critical",
"timestamp": 1700000055.0,
"subject": "AUV206",
"topic": "/mavros/battery",
"value": 13.26,
"context": {
"min_voltage_v": 13.5,
"min_duration_s": 5.0,
"run_start_ts": 1700000051.0,
"below_duration_s": 9.0
}
}
Install
Python 3.11+ recommended. Works on 3.10.
pip install -e .[dev]
CLI
# One-shot on a single MCAP file
cosma-log-analyzer ingest path/to/log.mcap --subject AUV206
# Dry-run: force stdout even if NATS_URL is set
cosma-log-analyzer ingest path/to/log.mcap --dry-run
# Service mode: watch a directory for new MCAP files
cosma-log-analyzer serve --mcap-dir /data/mcap
Docker
docker compose up --build
# drop MCAP files into ./data/mcap and watch NATS on :4222
systemd
sudo cp systemd/cosma-log-analyzer.service /etc/systemd/system/
sudo systemctl enable --now cosma-log-analyzer
journalctl -u cosma-log-analyzer -f
Tests
pytest -v # 32 tests, runs the e2e against a fake MCAP
pytest --cov --cov-report=term # coverage (rules/ > 95%)
The fake MCAP generator (tests/fixtures/generate_fake_mcap.py) produces
a synthetic 60 s trace with one instance of each rule's trigger
condition — the e2e test asserts we detect exactly those.
Adding a rule
-
Subclass
Ruleinsrc/cosma_log_analyzer/rules/<name>.py:class MyRule(Rule): name = "my_rule" topic = "/my/topic" severity = "warn" def detect(self, df: pd.DataFrame) -> list[Anomaly]: ... -
Register it in
rules/__init__.py::all_rules(). -
Add a test in
tests/test_rules.py.
Roadmap v1
- Rolling-stats rules (heading drift, GPS dropout correlated with USBL).
- Time alignment between MCAP IMU and CSV USV nav.
- ML anomaly layer (Isolation Forest) once we have > 50 h of nominal dive datasets to train against.
- Backpressure + JetStream persistence for the NATS publisher.
Layout
src/cosma_log_analyzer/ # package code
rules/ # one file per rule
main.py # Click CLI: `ingest` + `serve`
ingest.py # MCAP + CSV readers -> pandas
bus.py # NATS publisher + stdout fallback
models.py # Anomaly dataclass
tests/ # pytest suite + fake MCAP fixture
examples/run_on_fake.sh # end-to-end demo
systemd/ # unit file for on-prem deployment