Compare commits

...

4 Commits

Author SHA1 Message Date
floppyrj45
b0bbb51873 feat: scaffold cosma-log-analyzer with 5 deterministic rules + fake MCAP e2e test
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 15:20:20 +00:00
floppyrj45
668d84c187 feat: add 5 deterministic rules (IMU outliers/watchdog, USBL SNR/spike, battery_low)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 15:20:17 +00:00
floppyrj45
67b2121add feat: add MCAP/CSV ingest, NATS publisher with stdout fallback, and CLI
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 15:20:13 +00:00
floppyrj45
f04d7c90c9 chore: scaffold project skeleton (pyproject, Docker, systemd, README)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 15:20:10 +00:00
27 changed files with 1502 additions and 1 deletions

9
.env.example Normal file
View File

@@ -0,0 +1,9 @@
NATS_URL= # empty = stdout fallback
MCAP_DIR=/data/mcap
POLL_INTERVAL_S=30
LOG_LEVEL=INFO
# thresholds (optional, defaults in code)
BATTERY_LOW_V=13.5
USBL_SNR_LOW=5.0
USBL_DIST_SPIKE_M=50
WATCHDOG_IMU_S=2.0

20
.gitignore vendored Normal file
View File

@@ -0,0 +1,20 @@
__pycache__/
*.py[cod]
*$py.class
*.egg-info/
.eggs/
build/
dist/
.venv/
venv/
env/
.env
.pytest_cache/
.coverage
htmlcov/
.mypy_cache/
.ruff_cache/
*.mcap
!tests/fixtures/*.mcap
.idea/
.vscode/

21
Dockerfile Normal file
View File

@@ -0,0 +1,21 @@
FROM python:3.11-slim
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1
WORKDIR /app
COPY pyproject.toml README.md ./
COPY src ./src
RUN pip install --upgrade pip && pip install .
ENV MCAP_DIR=/data/mcap \
POLL_INTERVAL_S=30 \
LOG_LEVEL=INFO
VOLUME ["/data/mcap"]
ENTRYPOINT ["cosma-log-analyzer"]
CMD ["serve"]

148
README.md
View File

@@ -1,3 +1,149 @@
# cosma-log-analyzer # cosma-log-analyzer
Détecteur anomalies logs AUV COSMA (règles déterministes + NATS) Deterministic anomaly detection service for COSMA AUV logs. Ingests MCAP
files produced by the AUV/USV pipeline, evaluates a set of rules against
IMU / USBL / battery topics, and publishes each detection as a JSON event
on NATS (or stdout in dev).
## Context
COSMA (Flag) operates an AUV that streams telemetry to a surface USV. All
telemetry is persisted as MCAP (ROS2-native container). This service is
livrable #3: the first-pass observability layer before any statistical or
ML detection is added.
```
┌──────┐ MCAP ┌──────┐ MCAP ┌───────────────────┐ NATS ┌────────────────┐
│ AUV │────────▶│ USV │────────▶│ cosma-log-analyzer│──────────▶│ cosma-monitor │
└──────┘ └──────┘ │ (this repo) │ events │ UI │
└───────────────────┘ └────────────────┘
```
## Rules (v0)
| Rule | Threshold (default) | Severity | Topic |
|-----------------------|-------------------------------------------|----------|----------------------------------|
| `imu_outliers` | rolling 10 s window, \|z\| > 3 | warn | `/mavros/imu/data` |
| `watchdog_imu` | gap > 2 s between two IMU msgs | critical | `/mavros/imu/data` |
| `usbl_snr_low` | SNR < 5 dB for 3 consecutive samples | warn | `/usbl_reading/usbl_solution` |
| `usbl_distance_spike` | \|Δdistance\| > 50 m in less than 1 s | warn | `/usbl_reading/usbl_solution` |
| `battery_low` | voltage < 13.5 V for more than 5 s | critical | `/mavros/battery` |
All thresholds are tunable via environment variables (`BATTERY_LOW_V`,
`USBL_SNR_LOW`, `USBL_DIST_SPIKE_M`, `WATCHDOG_IMU_S`) or rule
constructor arguments.
## NATS subject
```
cosma.auv.{subject}.anomaly.{rule}
# ex: cosma.auv.AUV206.anomaly.battery_low
```
If `NATS_URL` is empty, events are written as JSON Lines to stdout —
useful in dev and CI.
## Example anomaly payload
```json
{
"rule": "battery_low",
"severity": "critical",
"timestamp": 1700000055.0,
"subject": "AUV206",
"topic": "/mavros/battery",
"value": 13.26,
"context": {
"min_voltage_v": 13.5,
"min_duration_s": 5.0,
"run_start_ts": 1700000051.0,
"below_duration_s": 9.0
}
}
```
## Install
Python 3.11+ recommended. Works on 3.10.
```bash
pip install -e .[dev]
```
## CLI
```bash
# One-shot on a single MCAP file
cosma-log-analyzer ingest path/to/log.mcap --subject AUV206
# Dry-run: force stdout even if NATS_URL is set
cosma-log-analyzer ingest path/to/log.mcap --dry-run
# Service mode: watch a directory for new MCAP files
cosma-log-analyzer serve --mcap-dir /data/mcap
```
## Docker
```bash
docker compose up --build
# drop MCAP files into ./data/mcap and watch NATS on :4222
```
## systemd
```bash
sudo cp systemd/cosma-log-analyzer.service /etc/systemd/system/
sudo systemctl enable --now cosma-log-analyzer
journalctl -u cosma-log-analyzer -f
```
## Tests
```bash
pytest -v # 32 tests, runs the e2e against a fake MCAP
pytest --cov --cov-report=term # coverage (rules/ > 95%)
```
The fake MCAP generator (`tests/fixtures/generate_fake_mcap.py`) produces
a synthetic 60 s trace with one instance of each rule's trigger
condition — the e2e test asserts we detect exactly those.
## Adding a rule
1. Subclass `Rule` in `src/cosma_log_analyzer/rules/<name>.py`:
```python
class MyRule(Rule):
name = "my_rule"
topic = "/my/topic"
severity = "warn"
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
...
```
2. Register it in `rules/__init__.py::all_rules()`.
3. Add a test in `tests/test_rules.py`.
## Roadmap v1
- Rolling-stats rules (heading drift, GPS dropout correlated with USBL).
- Time alignment between MCAP IMU and CSV USV nav.
- ML anomaly layer (Isolation Forest) once we have > 50 h of nominal
dive datasets to train against.
- Backpressure + JetStream persistence for the NATS publisher.
## Layout
```
src/cosma_log_analyzer/ # package code
rules/ # one file per rule
main.py # Click CLI: `ingest` + `serve`
ingest.py # MCAP + CSV readers -> pandas
bus.py # NATS publisher + stdout fallback
models.py # Anomaly dataclass
tests/ # pytest suite + fake MCAP fixture
examples/run_on_fake.sh # end-to-end demo
systemd/ # unit file for on-prem deployment
```

21
docker-compose.yml Normal file
View File

@@ -0,0 +1,21 @@
services:
nats:
image: nats:2.10-alpine
command: ["-js", "-m", "8222"]
ports:
- "4222:4222"
- "8222:8222"
restart: unless-stopped
analyzer:
build: .
depends_on:
- nats
environment:
NATS_URL: nats://nats:4222
MCAP_DIR: /data/mcap
POLL_INTERVAL_S: 30
LOG_LEVEL: INFO
volumes:
- ./data/mcap:/data/mcap
restart: unless-stopped

13
examples/run_on_fake.sh Executable file
View File

@@ -0,0 +1,13 @@
#!/usr/bin/env bash
# End-to-end demo: generate a synthetic MCAP, run the analyzer on it,
# pipe stdout anomalies to the console (NATS fallback mode).
set -euo pipefail
ROOT=$(cd "$(dirname "$0")/.." && pwd)
cd "$ROOT"
OUT=$(mktemp -d)/demo.mcap
python3 tests/fixtures/generate_fake_mcap.py "$OUT"
echo "---- Running analyzer on $OUT ----"
NATS_URL="" cosma-log-analyzer ingest "$OUT" --subject AUV206 --dry-run

44
pyproject.toml Normal file
View File

@@ -0,0 +1,44 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "cosma-log-analyzer"
version = "0.1.0"
description = "Deterministic anomaly detector for COSMA AUV logs (MCAP/CSV -> NATS)"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "Proprietary" }
authors = [{ name = "Flag / COSMA" }]
dependencies = [
"mcap>=1.2",
"pandas>=2.1",
"numpy>=1.26",
"nats-py>=2.6",
"python-dotenv>=1.0",
"click>=8.1",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4",
"pytest-cov>=4.1",
]
[project.scripts]
cosma-log-analyzer = "cosma_log_analyzer.main:cli"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-ra"
[tool.coverage.run]
source = ["src/cosma_log_analyzer"]
branch = true
[tool.coverage.report]
show_missing = true
skip_covered = false

View File

@@ -0,0 +1 @@
__version__ = "0.1.0"

View File

@@ -0,0 +1,77 @@
"""Event bus publisher: NATS when configured, stdout otherwise."""
from __future__ import annotations
import asyncio
import logging
import os
import sys
from typing import Protocol
from .models import Anomaly
logger = logging.getLogger(__name__)
class Publisher(Protocol):
def publish(self, anomaly: Anomaly) -> None: ...
def close(self) -> None: ...
class StdoutPublisher:
"""Fallback: writes JSON Lines to stdout. Thread/process-safe via writes."""
def __init__(self, stream=None) -> None:
self._stream = stream if stream is not None else sys.stdout
def publish(self, anomaly: Anomaly) -> None:
line = anomaly.to_json()
self._stream.write(line + "\n")
self._stream.flush()
def close(self) -> None:
try:
self._stream.flush()
except Exception:
pass
class NatsPublisher:
"""Sync wrapper around nats-py async client. Keeps a dedicated loop."""
def __init__(self, url: str) -> None:
self._url = url
self._loop = asyncio.new_event_loop()
self._nc = None
self._loop.run_until_complete(self._connect())
async def _connect(self) -> None:
import nats # lazy import
self._nc = await nats.connect(self._url)
def publish(self, anomaly: Anomaly) -> None:
if self._nc is None:
raise RuntimeError("NATS client not connected")
payload = anomaly.to_json().encode("utf-8")
self._loop.run_until_complete(self._nc.publish(anomaly.nats_subject(), payload))
def close(self) -> None:
if self._nc is None:
return
try:
self._loop.run_until_complete(self._nc.drain())
except Exception as exc:
logger.warning("NATS drain failed: %s", exc)
finally:
self._loop.close()
self._nc = None
def make_publisher(nats_url: str | None = None) -> Publisher:
"""Pick NATS or stdout based on env/arg. Empty/None -> stdout fallback."""
url = nats_url if nats_url is not None else os.environ.get("NATS_URL", "")
if not url:
logger.info("NATS_URL empty -> using stdout fallback publisher")
return StdoutPublisher()
logger.info("Connecting NATS publisher to %s", url)
return NatsPublisher(url)

View File

@@ -0,0 +1,143 @@
"""MCAP + CSV readers.
MCAP decoding stays schema-agnostic: we read each message's bytes and try
JSON first (CDR/ROS2 is out of scope for v0 — field extraction relies on
known field names once decoded). If the message body is not JSON we skip it.
Rules operate on pandas DataFrames with per-topic columns:
imu -> ts, ax, ay, az, gx, gy, gz
usbl -> ts, distance_m, snr_db
battery -> ts, voltage_v
"""
from __future__ import annotations
import csv
import json
import logging
from collections.abc import Iterable, Iterator
from pathlib import Path
from typing import Any
import pandas as pd
logger = logging.getLogger(__name__)
TOPIC_IMU = "/mavros/imu/data"
TOPIC_USBL = "/usbl_reading/usbl_solution"
TOPIC_BATTERY = "/mavros/battery"
KNOWN_TOPICS = (TOPIC_IMU, TOPIC_USBL, TOPIC_BATTERY)
def _decode_payload(data: bytes) -> dict[str, Any] | None:
try:
return json.loads(data.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError):
return None
def iter_mcap_messages(
path: str | Path,
topics: Iterable[str] | None = None,
) -> Iterator[tuple[str, float, dict[str, Any]]]:
"""Yield (topic, ts_seconds, decoded_payload) for matching messages."""
from mcap.reader import make_reader # lazy: lib mcap
wanted = set(topics) if topics else None
with open(path, "rb") as fh:
reader = make_reader(fh)
for schema, channel, message in reader.iter_messages(topics=list(wanted) if wanted else None):
payload = _decode_payload(message.data)
if payload is None:
continue
ts = message.log_time / 1e9 # MCAP uses ns
yield channel.topic, ts, payload
def _extract_imu(payload: dict[str, Any]) -> dict[str, float] | None:
lin = payload.get("linear_acceleration") or payload.get("accel") or {}
ang = payload.get("angular_velocity") or payload.get("gyro") or {}
if not lin and not ang:
return None
return {
"ax": float(lin.get("x", 0.0)),
"ay": float(lin.get("y", 0.0)),
"az": float(lin.get("z", 0.0)),
"gx": float(ang.get("x", 0.0)),
"gy": float(ang.get("y", 0.0)),
"gz": float(ang.get("z", 0.0)),
}
def _extract_usbl(payload: dict[str, Any]) -> dict[str, float] | None:
dist = payload.get("distance_m", payload.get("range_m"))
snr = payload.get("snr_db", payload.get("snr"))
if dist is None or snr is None:
return None
return {"distance_m": float(dist), "snr_db": float(snr)}
def _extract_battery(payload: dict[str, Any]) -> dict[str, float] | None:
v = payload.get("voltage_v", payload.get("voltage"))
if v is None:
return None
return {"voltage_v": float(v)}
_EXTRACTORS = {
TOPIC_IMU: _extract_imu,
TOPIC_USBL: _extract_usbl,
TOPIC_BATTERY: _extract_battery,
}
def load_mcap(path: str | Path) -> dict[str, pd.DataFrame]:
"""Load an MCAP file into per-topic DataFrames.
Returns a dict keyed by topic. Missing topics yield empty DataFrames.
"""
rows: dict[str, list[dict[str, Any]]] = {t: [] for t in KNOWN_TOPICS}
for topic, ts, payload in iter_mcap_messages(path, KNOWN_TOPICS):
extractor = _EXTRACTORS.get(topic)
if extractor is None:
continue
fields = extractor(payload)
if fields is None:
continue
fields["ts"] = ts
rows[topic].append(fields)
out: dict[str, pd.DataFrame] = {}
for topic, items in rows.items():
if not items:
out[topic] = pd.DataFrame()
continue
df = pd.DataFrame(items).sort_values("ts").reset_index(drop=True)
out[topic] = df
return out
def load_csv_nav(path: str | Path) -> pd.DataFrame:
"""Load a USV nav CSV. Expected columns: ts, lat, lon, heading (flexible).
Missing file or unparseable rows return an empty DataFrame.
"""
p = Path(path)
if not p.exists():
logger.warning("CSV nav file not found: %s", path)
return pd.DataFrame()
rows: list[dict[str, Any]] = []
with p.open(newline="") as fh:
reader = csv.DictReader(fh)
for row in reader:
try:
rows.append({k: float(v) if v not in ("", None) else None for k, v in row.items()})
except ValueError:
continue
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows)
if "ts" in df.columns:
df = df.sort_values("ts").reset_index(drop=True)
return df

View File

@@ -0,0 +1,148 @@
"""CLI + service entrypoint for cosma-log-analyzer."""
from __future__ import annotations
import logging
import os
import sys
import time
from pathlib import Path
from typing import Iterable
import click
from dotenv import load_dotenv
from .bus import Publisher, StdoutPublisher, make_publisher
from .ingest import (
KNOWN_TOPICS,
TOPIC_BATTERY,
TOPIC_IMU,
TOPIC_USBL,
load_mcap,
)
from .models import Anomaly
from .rules import all_rules
from .rules.base import Rule
logger = logging.getLogger(__name__)
TOPIC_TO_KEY = {
TOPIC_IMU: TOPIC_IMU,
TOPIC_USBL: TOPIC_USBL,
TOPIC_BATTERY: TOPIC_BATTERY,
}
def _setup_logging() -> None:
level = os.environ.get("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, level, logging.INFO),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
def analyze_mcap(
path: str | Path,
subject: str,
rules: Iterable[Rule] | None = None,
) -> list[Anomaly]:
"""Load an MCAP file, run all rules, return the anomaly list."""
dataframes = load_mcap(path)
rules = list(rules) if rules is not None else all_rules()
anomalies: list[Anomaly] = []
for rule in rules:
rule.bind(subject)
df = dataframes.get(rule.topic)
if df is None or df.empty:
continue
anomalies.extend(rule.detect(df))
anomalies.sort(key=lambda a: a.timestamp)
return anomalies
def emit(anomalies: Iterable[Anomaly], publisher: Publisher) -> int:
count = 0
for a in anomalies:
publisher.publish(a)
count += 1
return count
@click.group()
def cli() -> None:
"""cosma-log-analyzer: deterministic anomaly detection on AUV logs."""
load_dotenv()
_setup_logging()
@cli.command("ingest")
@click.argument("path", type=click.Path(exists=True, dir_okay=False))
@click.option("--subject", default="AUV000", help="AUV identifier for NATS subject.")
@click.option("--dry-run", is_flag=True, help="Force stdout publisher (ignore NATS_URL).")
def ingest_cmd(path: str, subject: str, dry_run: bool) -> None:
"""Analyze a single MCAP file and publish anomalies."""
publisher: Publisher = StdoutPublisher() if dry_run else make_publisher()
try:
anomalies = analyze_mcap(path, subject)
n = emit(anomalies, publisher)
logger.info("Processed %s -> %d anomalies", path, n)
finally:
publisher.close()
@cli.command("serve")
@click.option(
"--mcap-dir",
default=None,
help="Directory to watch (default: MCAP_DIR env).",
)
@click.option(
"--subject",
default="AUV000",
help="AUV identifier for NATS subject.",
)
@click.option(
"--poll-interval",
default=None,
type=float,
help="Seconds between scans (default: POLL_INTERVAL_S env, else 30).",
)
def serve_cmd(mcap_dir: str | None, subject: str, poll_interval: float | None) -> None:
"""Watch a directory and process new MCAP files as they appear."""
mcap_dir = mcap_dir or os.environ.get("MCAP_DIR") or "/data/mcap"
interval = poll_interval if poll_interval is not None else float(
os.environ.get("POLL_INTERVAL_S", "30")
)
watch_dir = Path(mcap_dir)
watch_dir.mkdir(parents=True, exist_ok=True)
logger.info("Watching %s (interval=%.1fs)", watch_dir, interval)
publisher = make_publisher()
seen: set[str] = set()
try:
while True:
for mcap_path in sorted(watch_dir.glob("*.mcap")):
key = str(mcap_path)
if key in seen:
continue
logger.info("New MCAP: %s", mcap_path)
try:
anomalies = analyze_mcap(mcap_path, subject)
n = emit(anomalies, publisher)
logger.info("Emitted %d anomalies from %s", n, mcap_path.name)
except Exception as exc:
logger.exception("Failed processing %s: %s", mcap_path, exc)
seen.add(key)
time.sleep(interval)
except KeyboardInterrupt:
logger.info("Shutting down on SIGINT")
finally:
publisher.close()
def main() -> None: # pragma: no cover
cli(standalone_mode=True)
if __name__ == "__main__": # pragma: no cover
main()

View File

@@ -0,0 +1,40 @@
from __future__ import annotations
import json
from dataclasses import asdict, dataclass, field
from typing import Any
SEVERITIES = ("info", "warn", "critical")
@dataclass
class Anomaly:
rule: str
severity: str
timestamp: float
subject: str
topic: str
value: float | None = None
context: dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
if self.severity not in SEVERITIES:
raise ValueError(
f"severity must be one of {SEVERITIES}, got {self.severity!r}"
)
def to_dict(self) -> dict[str, Any]:
return asdict(self)
def to_json(self) -> str:
return json.dumps(self.to_dict(), sort_keys=True, default=_json_default)
def nats_subject(self) -> str:
return f"cosma.auv.{self.subject}.anomaly.{self.rule}"
def _json_default(obj: Any) -> Any:
if hasattr(obj, "isoformat"):
return obj.isoformat()
raise TypeError(f"Not JSON serializable: {type(obj).__name__}")

View File

@@ -0,0 +1,34 @@
"""Deterministic rules v0.
Adding a new rule = subclass Rule, register it in `ALL_RULES`, add a test.
"""
from __future__ import annotations
from .base import Rule
from .battery_low import BatteryLowRule
from .imu_outliers import ImuOutliersRule
from .usbl_distance_spike import UsblDistanceSpikeRule
from .usbl_snr_low import UsblSnrLowRule
from .watchdog_imu import WatchdogImuRule
def all_rules() -> list[Rule]:
"""Default rule set, instantiated with env/default thresholds."""
return [
ImuOutliersRule(),
WatchdogImuRule(),
UsblSnrLowRule(),
UsblDistanceSpikeRule(),
BatteryLowRule(),
]
__all__ = [
"Rule",
"ImuOutliersRule",
"WatchdogImuRule",
"UsblSnrLowRule",
"UsblDistanceSpikeRule",
"BatteryLowRule",
"all_rules",
]

View File

@@ -0,0 +1,47 @@
from __future__ import annotations
from abc import ABC, abstractmethod
import pandas as pd
from ..models import Anomaly
class Rule(ABC):
"""Base class for detection rules.
Subclasses declare class attributes `name`, `topic`, `severity`, then
implement `detect(df)` which receives the topic-specific DataFrame and
returns a list of Anomaly instances.
"""
name: str = "rule"
topic: str = ""
severity: str = "warn"
def __init__(self, subject: str = "AUV000") -> None:
self.subject = subject
def bind(self, subject: str) -> "Rule":
self.subject = subject
return self
@abstractmethod
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
...
def _make(
self,
ts: float,
value: float | None,
context: dict,
) -> Anomaly:
return Anomaly(
rule=self.name,
severity=self.severity,
timestamp=float(ts),
subject=self.subject,
topic=self.topic,
value=value,
context=context,
)

View File

@@ -0,0 +1,62 @@
from __future__ import annotations
import os
import pandas as pd
from ..ingest import TOPIC_BATTERY
from ..models import Anomaly
from .base import Rule
class BatteryLowRule(Rule):
"""Fire when voltage < threshold for more than `min_duration_s`."""
name = "battery_low"
topic = TOPIC_BATTERY
severity = "critical"
def __init__(
self,
subject: str = "AUV000",
min_voltage_v: float | None = None,
min_duration_s: float = 5.0,
) -> None:
super().__init__(subject)
if min_voltage_v is None:
min_voltage_v = float(os.environ.get("BATTERY_LOW_V", 13.5))
self.min_voltage_v = min_voltage_v
self.min_duration_s = min_duration_s
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
if df.empty:
return []
below = df["voltage_v"] < self.min_voltage_v
if not below.any():
return []
run_id = (~below).cumsum()
anomalies: list[Anomaly] = []
for rid, group in df[below].groupby(run_id[below]):
duration = float(group["ts"].iloc[-1] - group["ts"].iloc[0])
if duration < self.min_duration_s:
continue
fire_row = None
for _, row in group.iterrows():
if row["ts"] - group["ts"].iloc[0] >= self.min_duration_s:
fire_row = row
break
if fire_row is None:
fire_row = group.iloc[-1]
anomalies.append(
self._make(
ts=float(fire_row["ts"]),
value=float(fire_row["voltage_v"]),
context={
"min_voltage_v": self.min_voltage_v,
"min_duration_s": self.min_duration_s,
"run_start_ts": float(group["ts"].iloc[0]),
"below_duration_s": duration,
},
)
)
return anomalies

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import numpy as np
import pandas as pd
from ..ingest import TOPIC_IMU
from ..models import Anomaly
from .base import Rule
class ImuOutliersRule(Rule):
"""Fire when instant accel magnitude deviates > z_thresh rolling sigmas."""
name = "imu_outliers"
topic = TOPIC_IMU
severity = "warn"
def __init__(
self,
subject: str = "AUV000",
window_s: float = 10.0,
z_thresh: float = 3.0,
) -> None:
super().__init__(subject)
self.window_s = window_s
self.z_thresh = z_thresh
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
if df.empty or len(df) < 10:
return []
mag = np.sqrt(df["ax"] ** 2 + df["ay"] ** 2 + df["az"] ** 2)
dt = float(df["ts"].diff().median() or 0.02)
window = max(5, int(self.window_s / dt))
baseline_mean = mag.rolling(window, min_periods=window // 2).mean()
baseline_std = mag.rolling(window, min_periods=window // 2).std()
z = (mag - baseline_mean) / baseline_std.replace(0, np.nan)
mask = z.abs() > self.z_thresh
anomalies: list[Anomaly] = []
for idx in df.index[mask.fillna(False)]:
anomalies.append(
self._make(
ts=float(df.at[idx, "ts"]),
value=float(mag.iloc[idx]),
context={
"window_s": self.window_s,
"z_score": float(z.iloc[idx]),
"baseline_mean": float(baseline_mean.iloc[idx]),
"baseline_std": float(baseline_std.iloc[idx]),
},
)
)
return anomalies

View File

@@ -0,0 +1,50 @@
from __future__ import annotations
import os
import pandas as pd
from ..ingest import TOPIC_USBL
from ..models import Anomaly
from .base import Rule
class UsblDistanceSpikeRule(Rule):
"""Fire when |Δdistance| > spike_m within less than max_dt_s."""
name = "usbl_distance_spike"
topic = TOPIC_USBL
severity = "warn"
def __init__(
self,
subject: str = "AUV000",
spike_m: float | None = None,
max_dt_s: float = 1.0,
) -> None:
super().__init__(subject)
if spike_m is None:
spike_m = float(os.environ.get("USBL_DIST_SPIKE_M", 50.0))
self.spike_m = spike_m
self.max_dt_s = max_dt_s
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
if df.empty or len(df) < 2:
return []
dt = df["ts"].diff()
ddist = df["distance_m"].diff().abs()
mask = (ddist > self.spike_m) & (dt < self.max_dt_s)
anomalies: list[Anomaly] = []
for idx in df.index[mask.fillna(False)]:
anomalies.append(
self._make(
ts=float(df.at[idx, "ts"]),
value=float(ddist.iloc[idx]),
context={
"delta_m": float(ddist.iloc[idx]),
"dt_s": float(dt.iloc[idx]),
"spike_m": self.spike_m,
},
)
)
return anomalies

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import os
import pandas as pd
from ..ingest import TOPIC_USBL
from ..models import Anomaly
from .base import Rule
class UsblSnrLowRule(Rule):
"""Fire when SNR stays below threshold for `consec` consecutive samples."""
name = "usbl_snr_low"
topic = TOPIC_USBL
severity = "warn"
def __init__(
self,
subject: str = "AUV000",
min_snr_db: float | None = None,
consec: int = 3,
) -> None:
super().__init__(subject)
if min_snr_db is None:
min_snr_db = float(os.environ.get("USBL_SNR_LOW", 5.0))
self.min_snr_db = min_snr_db
self.consec = consec
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
if df.empty or len(df) < self.consec:
return []
low = df["snr_db"] < self.min_snr_db
run = low.astype(int).groupby((~low).cumsum()).cumsum()
anomalies: list[Anomaly] = []
fired_run = -1
runs_id = (~low).cumsum()
for idx in df.index:
if run.iloc[idx] >= self.consec and runs_id.iloc[idx] != fired_run:
fired_run = int(runs_id.iloc[idx])
anomalies.append(
self._make(
ts=float(df.at[idx, "ts"]),
value=float(df.at[idx, "snr_db"]),
context={
"min_snr_db": self.min_snr_db,
"consec": self.consec,
},
)
)
return anomalies

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
import os
import pandas as pd
from ..ingest import TOPIC_IMU
from ..models import Anomaly
from .base import Rule
class WatchdogImuRule(Rule):
"""Fire when the gap between two IMU messages exceeds `max_gap_s`."""
name = "watchdog_imu"
topic = TOPIC_IMU
severity = "critical"
def __init__(self, subject: str = "AUV000", max_gap_s: float | None = None) -> None:
super().__init__(subject)
if max_gap_s is None:
max_gap_s = float(os.environ.get("WATCHDOG_IMU_S", 2.0))
self.max_gap_s = max_gap_s
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
if df.empty or len(df) < 2:
return []
gaps = df["ts"].diff()
mask = gaps > self.max_gap_s
anomalies: list[Anomaly] = []
for idx in df.index[mask.fillna(False)]:
anomalies.append(
self._make(
ts=float(df.at[idx, "ts"]),
value=float(gaps.iloc[idx]),
context={
"gap_s": float(gaps.iloc[idx]),
"max_gap_s": self.max_gap_s,
"prev_ts": float(df.at[idx - 1, "ts"]),
},
)
)
return anomalies

View File

@@ -0,0 +1,19 @@
[Unit]
Description=COSMA log analyzer (MCAP -> NATS anomaly events)
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=cosma
Group=cosma
WorkingDirectory=/opt/cosma-log-analyzer
EnvironmentFile=/etc/cosma-log-analyzer.env
ExecStart=/opt/cosma-log-analyzer/.venv/bin/cosma-log-analyzer serve
Restart=on-failure
RestartSec=5
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target

0
tests/__init__.py Normal file
View File

13
tests/conftest.py Normal file
View File

@@ -0,0 +1,13 @@
from __future__ import annotations
from pathlib import Path
import pytest
from tests.fixtures.generate_fake_mcap import generate
@pytest.fixture(scope="session")
def fake_mcap(tmp_path_factory: pytest.TempPathFactory) -> Path:
path = tmp_path_factory.mktemp("mcap") / "fake.mcap"
return generate(path)

0
tests/fixtures/__init__.py vendored Normal file
View File

127
tests/fixtures/generate_fake_mcap.py vendored Normal file
View File

@@ -0,0 +1,127 @@
"""Generate a synthetic MCAP file for AUV log analyzer testing.
Scenarios baked in (relative to t0):
- IMU 50 Hz over 60 s.
- IMU outlier at t=30 s (accel magnitude spike, ~6x baseline).
- IMU gap: no messages between t=45 s and t=48 s (watchdog fires).
- USBL 2 Hz over 60 s, nominal SNR=12 dB; 5 s window at t=20 s with SNR=2.
- USBL distance jump of 100 m at t=40 s.
- Battery 1 Hz, linearly from 16.0 V to 12.0 V over 60 s.
"""
from __future__ import annotations
import json
import math
import random
from pathlib import Path
from mcap.writer import Writer
IMU_TOPIC = "/mavros/imu/data"
USBL_TOPIC = "/usbl_reading/usbl_solution"
BATTERY_TOPIC = "/mavros/battery"
T0_NS = 1_700_000_000_000_000_000 # 2023-11-14 22:13:20 UTC, stable ref
def _ns(seconds_from_t0: float) -> int:
return T0_NS + int(seconds_from_t0 * 1e9)
def _register(writer: Writer, topic: str) -> int:
schema_id = writer.register_schema(
name=f"{topic.strip('/').replace('/', '_')}_json",
encoding="jsonschema",
data=b"{}",
)
return writer.register_channel(
topic=topic, message_encoding="json", schema_id=schema_id
)
def _write_message(writer: Writer, channel_id: int, t_s: float, payload: dict, seq: int) -> None:
data = json.dumps(payload).encode("utf-8")
ns = _ns(t_s)
writer.add_message(
channel_id=channel_id,
log_time=ns,
data=data,
publish_time=ns,
sequence=seq,
)
def generate(path: Path, seed: int = 42) -> Path:
"""Write a synthetic MCAP to `path`. Returns `path`."""
rng = random.Random(seed)
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "wb") as fh:
w = Writer(fh)
w.start(profile="", library="cosma-fake")
imu_ch = _register(w, IMU_TOPIC)
usbl_ch = _register(w, USBL_TOPIC)
bat_ch = _register(w, BATTERY_TOPIC)
seq = 0
# --- IMU 50Hz, with outlier at t=30s and gap 45-48s
dt = 0.02 # 50 Hz
t = 0.0
while t <= 60.0:
if 45.0 < t < 48.0: # watchdog gap (skip publishing)
t += dt
continue
if abs(t - 30.0) < 1e-6: # outlier single sample
ax, ay, az = 30.0, 30.0, 30.0
else:
ax = rng.gauss(0.0, 0.05)
ay = rng.gauss(0.0, 0.05)
az = rng.gauss(9.81, 0.05)
payload = {
"linear_acceleration": {"x": ax, "y": ay, "z": az},
"angular_velocity": {
"x": rng.gauss(0.0, 0.001),
"y": rng.gauss(0.0, 0.001),
"z": rng.gauss(0.0, 0.001),
},
}
_write_message(w, imu_ch, t, payload, seq)
seq += 1
t += dt
# --- USBL 2Hz, SNR low 20-25s, distance spike at t=40s
t = 0.0
last_dist = 100.0
while t <= 60.0:
if 20.0 <= t < 25.0:
snr = 2.0
else:
snr = 12.0 + rng.gauss(0.0, 0.2)
if abs(t - 40.0) < 0.01:
dist = last_dist + 100.0 # 100m jump
else:
dist = last_dist + rng.gauss(0.0, 0.5)
last_dist = dist
payload = {"distance_m": dist, "snr_db": snr}
_write_message(w, usbl_ch, t, payload, seq)
seq += 1
t += 0.5
# --- Battery 1Hz, 16.0V -> 12.0V linear over 60s
for i in range(61):
t = float(i)
v = 16.0 - (4.0 * t / 60.0)
payload = {"voltage_v": v}
_write_message(w, bat_ch, t, payload, seq)
seq += 1
w.finish()
return path
if __name__ == "__main__": # pragma: no cover
import sys
out = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("fake.mcap")
generate(out)
print(f"Wrote {out}")

73
tests/test_e2e.py Normal file
View File

@@ -0,0 +1,73 @@
from __future__ import annotations
import io
import json
from pathlib import Path
from cosma_log_analyzer.bus import StdoutPublisher
from cosma_log_analyzer.main import analyze_mcap, emit
EXPECTED_RULES = {
"imu_outliers",
"watchdog_imu",
"usbl_snr_low",
"usbl_distance_spike",
"battery_low",
}
def test_analyze_fake_mcap_produces_all_rule_types(fake_mcap: Path) -> None:
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
fired = {a.rule for a in anomalies}
assert EXPECTED_RULES <= fired, f"missing rules: {EXPECTED_RULES - fired}"
def test_analyze_fake_mcap_subject_propagated(fake_mcap: Path) -> None:
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
assert anomalies
assert all(a.subject == "AUV206" for a in anomalies)
def test_watchdog_detects_the_gap(fake_mcap: Path) -> None:
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
watchdog = [a for a in anomalies if a.rule == "watchdog_imu"]
# The fixture inserts exactly one 3s gap.
assert len(watchdog) == 1
assert watchdog[0].context["gap_s"] > 2.9
def test_battery_low_fires_once(fake_mcap: Path) -> None:
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
bat = [a for a in anomalies if a.rule == "battery_low"]
assert len(bat) == 1
assert bat[0].severity == "critical"
assert bat[0].value < 13.5
def test_usbl_distance_spike_fires_once(fake_mcap: Path) -> None:
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
spikes = [a for a in anomalies if a.rule == "usbl_distance_spike"]
assert len(spikes) == 1
assert spikes[0].context["delta_m"] > 50.0
def test_usbl_snr_low_fires(fake_mcap: Path) -> None:
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
snr = [a for a in anomalies if a.rule == "usbl_snr_low"]
assert len(snr) >= 1
assert all(a.value < 5.0 for a in snr)
def test_stdout_publisher_emits_json_lines(fake_mcap: Path) -> None:
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
buf = io.StringIO()
publisher = StdoutPublisher(stream=buf)
n = emit(anomalies, publisher)
publisher.close()
lines = [ln for ln in buf.getvalue().splitlines() if ln]
assert n == len(lines) == len(anomalies)
for line in lines:
obj = json.loads(line)
assert obj["subject"] == "AUV206"
assert obj["rule"] in EXPECTED_RULES

58
tests/test_ingest.py Normal file
View File

@@ -0,0 +1,58 @@
from __future__ import annotations
from pathlib import Path
from cosma_log_analyzer.ingest import (
KNOWN_TOPICS,
TOPIC_BATTERY,
TOPIC_IMU,
TOPIC_USBL,
iter_mcap_messages,
load_csv_nav,
load_mcap,
)
def test_load_mcap_returns_dataframes(fake_mcap: Path) -> None:
data = load_mcap(fake_mcap)
assert set(data.keys()) == set(KNOWN_TOPICS)
imu = data[TOPIC_IMU]
usbl = data[TOPIC_USBL]
bat = data[TOPIC_BATTERY]
assert not imu.empty
assert not usbl.empty
assert not bat.empty
# IMU ~50 Hz over 60s minus a 3s gap => ~2850 samples
assert 2700 < len(imu) < 3000
# USBL 2Hz over 60s => ~121 samples
assert 115 < len(usbl) < 130
# Battery 1Hz over 60s => 61 samples
assert len(bat) == 61
assert {"ts", "ax", "ay", "az", "gx", "gy", "gz"}.issubset(imu.columns)
assert {"ts", "distance_m", "snr_db"}.issubset(usbl.columns)
assert {"ts", "voltage_v"}.issubset(bat.columns)
def test_load_mcap_sorted_by_ts(fake_mcap: Path) -> None:
data = load_mcap(fake_mcap)
for df in data.values():
assert df["ts"].is_monotonic_increasing
def test_iter_mcap_messages_filter(fake_mcap: Path) -> None:
msgs = list(iter_mcap_messages(fake_mcap, topics=[TOPIC_BATTERY]))
assert len(msgs) == 61
assert all(t == TOPIC_BATTERY for t, _, _ in msgs)
def test_load_csv_nav_missing_file(tmp_path: Path) -> None:
df = load_csv_nav(tmp_path / "nope.csv")
assert df.empty
def test_load_csv_nav_parses(tmp_path: Path) -> None:
p = tmp_path / "nav.csv"
p.write_text("ts,lat,lon\n1.0,48.0,2.0\n2.0,48.1,2.1\n")
df = load_csv_nav(p)
assert len(df) == 2
assert df["ts"].is_monotonic_increasing

188
tests/test_rules.py Normal file
View File

@@ -0,0 +1,188 @@
from __future__ import annotations
import pandas as pd
import pytest
from cosma_log_analyzer.models import Anomaly
from cosma_log_analyzer.rules import (
BatteryLowRule,
ImuOutliersRule,
UsblDistanceSpikeRule,
UsblSnrLowRule,
WatchdogImuRule,
all_rules,
)
def _imu_df(n: int = 600, dt: float = 0.02) -> pd.DataFrame:
ts = [i * dt for i in range(n)]
return pd.DataFrame(
{
"ts": ts,
"ax": [0.0] * n,
"ay": [0.0] * n,
"az": [9.81] * n,
"gx": [0.0] * n,
"gy": [0.0] * n,
"gz": [0.0] * n,
}
)
def test_imu_outliers_fires_on_single_spike() -> None:
df = _imu_df()
df.loc[300, ["ax", "ay", "az"]] = [30.0, 30.0, 30.0]
anomalies = ImuOutliersRule(subject="AUV206").detect(df)
assert any(a.rule == "imu_outliers" for a in anomalies)
a = anomalies[0]
assert isinstance(a, Anomaly)
assert a.subject == "AUV206"
assert a.severity == "warn"
def test_imu_outliers_empty_df() -> None:
assert ImuOutliersRule().detect(pd.DataFrame()) == []
def test_imu_outliers_no_spike_no_anomaly() -> None:
df = _imu_df()
assert ImuOutliersRule().detect(df) == []
def test_watchdog_imu_fires_on_gap() -> None:
df = pd.DataFrame({"ts": [0.0, 0.5, 1.0, 5.0, 5.5]})
anomalies = WatchdogImuRule(max_gap_s=2.0).detect(df)
assert len(anomalies) == 1
assert anomalies[0].context["gap_s"] == pytest.approx(4.0)
assert anomalies[0].severity == "critical"
def test_watchdog_imu_no_gap() -> None:
df = pd.DataFrame({"ts": [i * 0.02 for i in range(100)]})
assert WatchdogImuRule(max_gap_s=2.0).detect(df) == []
def test_watchdog_imu_short_df() -> None:
assert WatchdogImuRule().detect(pd.DataFrame()) == []
assert WatchdogImuRule().detect(pd.DataFrame({"ts": [0.0]})) == []
def test_usbl_snr_low_needs_three_consec() -> None:
df = pd.DataFrame(
{
"ts": [0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0],
"distance_m": [100.0] * 7,
"snr_db": [10.0, 2.0, 2.0, 2.0, 10.0, 2.0, 10.0],
}
)
anomalies = UsblSnrLowRule(min_snr_db=5.0, consec=3).detect(df)
assert len(anomalies) == 1
assert anomalies[0].value == pytest.approx(2.0)
def test_usbl_snr_low_no_run_long_enough() -> None:
df = pd.DataFrame(
{
"ts": [0.0, 0.5, 1.0, 1.5, 2.0],
"distance_m": [100.0] * 5,
"snr_db": [2.0, 10.0, 2.0, 10.0, 2.0],
}
)
assert UsblSnrLowRule(min_snr_db=5.0, consec=3).detect(df) == []
def test_usbl_snr_low_empty() -> None:
assert UsblSnrLowRule().detect(pd.DataFrame()) == []
def test_usbl_distance_spike_fires() -> None:
df = pd.DataFrame(
{
"ts": [0.0, 0.5, 1.0, 1.5],
"distance_m": [100.0, 100.5, 250.0, 250.5],
"snr_db": [12.0] * 4,
}
)
anomalies = UsblDistanceSpikeRule(spike_m=50.0, max_dt_s=1.0).detect(df)
assert len(anomalies) == 1
assert anomalies[0].value == pytest.approx(149.5)
def test_usbl_distance_spike_ignores_slow_drift() -> None:
df = pd.DataFrame(
{
"ts": [0.0, 2.0, 4.0], # dt > max_dt_s
"distance_m": [100.0, 250.0, 400.0],
"snr_db": [12.0] * 3,
}
)
assert UsblDistanceSpikeRule(spike_m=50.0, max_dt_s=1.0).detect(df) == []
def test_usbl_distance_spike_empty() -> None:
assert UsblDistanceSpikeRule().detect(pd.DataFrame()) == []
def test_battery_low_fires_on_sustained_drop() -> None:
ts = [float(i) for i in range(20)]
voltage = [15.0] * 5 + [13.0] * 10 + [15.0] * 5
df = pd.DataFrame({"ts": ts, "voltage_v": voltage})
anomalies = BatteryLowRule(min_voltage_v=13.5, min_duration_s=5.0).detect(df)
assert len(anomalies) == 1
assert anomalies[0].severity == "critical"
assert anomalies[0].value == pytest.approx(13.0)
def test_battery_low_ignores_short_dips() -> None:
ts = [float(i) for i in range(10)]
voltage = [15.0, 15.0, 13.0, 13.0, 15.0, 15.0, 13.0, 13.0, 15.0, 15.0]
df = pd.DataFrame({"ts": ts, "voltage_v": voltage})
assert BatteryLowRule(min_voltage_v=13.5, min_duration_s=5.0).detect(df) == []
def test_battery_low_empty() -> None:
assert BatteryLowRule().detect(pd.DataFrame()) == []
def test_battery_low_always_above() -> None:
df = pd.DataFrame({"ts": [0.0, 1.0, 2.0], "voltage_v": [16.0, 15.5, 15.0]})
assert BatteryLowRule(min_voltage_v=13.5).detect(df) == []
def test_all_rules_returns_five() -> None:
rules = all_rules()
assert len(rules) == 5
assert {r.name for r in rules} == {
"imu_outliers",
"watchdog_imu",
"usbl_snr_low",
"usbl_distance_spike",
"battery_low",
}
def test_rule_bind_sets_subject() -> None:
r = BatteryLowRule()
r.bind("AUV206")
assert r.subject == "AUV206"
def test_anomaly_severity_validation() -> None:
with pytest.raises(ValueError):
Anomaly(
rule="x", severity="bogus", timestamp=0.0, subject="s", topic="t"
)
def test_anomaly_json_and_subject() -> None:
a = Anomaly(
rule="battery_low",
severity="critical",
timestamp=123.0,
subject="AUV206",
topic="/mavros/battery",
value=12.5,
context={"k": 1},
)
assert a.nats_subject() == "cosma.auv.AUV206.anomaly.battery_low"
assert "battery_low" in a.to_json()