Compare commits
4 Commits
20e19239eb
...
b0bbb51873
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0bbb51873 | ||
|
|
668d84c187 | ||
|
|
67b2121add | ||
|
|
f04d7c90c9 |
9
.env.example
Normal file
9
.env.example
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
NATS_URL= # empty = stdout fallback
|
||||||
|
MCAP_DIR=/data/mcap
|
||||||
|
POLL_INTERVAL_S=30
|
||||||
|
LOG_LEVEL=INFO
|
||||||
|
# thresholds (optional, defaults in code)
|
||||||
|
BATTERY_LOW_V=13.5
|
||||||
|
USBL_SNR_LOW=5.0
|
||||||
|
USBL_DIST_SPIKE_M=50
|
||||||
|
WATCHDOG_IMU_S=2.0
|
||||||
20
.gitignore
vendored
Normal file
20
.gitignore
vendored
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
*$py.class
|
||||||
|
*.egg-info/
|
||||||
|
.eggs/
|
||||||
|
build/
|
||||||
|
dist/
|
||||||
|
.venv/
|
||||||
|
venv/
|
||||||
|
env/
|
||||||
|
.env
|
||||||
|
.pytest_cache/
|
||||||
|
.coverage
|
||||||
|
htmlcov/
|
||||||
|
.mypy_cache/
|
||||||
|
.ruff_cache/
|
||||||
|
*.mcap
|
||||||
|
!tests/fixtures/*.mcap
|
||||||
|
.idea/
|
||||||
|
.vscode/
|
||||||
21
Dockerfile
Normal file
21
Dockerfile
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
FROM python:3.11-slim
|
||||||
|
|
||||||
|
ENV PYTHONUNBUFFERED=1 \
|
||||||
|
PYTHONDONTWRITEBYTECODE=1 \
|
||||||
|
PIP_NO_CACHE_DIR=1
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
COPY pyproject.toml README.md ./
|
||||||
|
COPY src ./src
|
||||||
|
|
||||||
|
RUN pip install --upgrade pip && pip install .
|
||||||
|
|
||||||
|
ENV MCAP_DIR=/data/mcap \
|
||||||
|
POLL_INTERVAL_S=30 \
|
||||||
|
LOG_LEVEL=INFO
|
||||||
|
|
||||||
|
VOLUME ["/data/mcap"]
|
||||||
|
|
||||||
|
ENTRYPOINT ["cosma-log-analyzer"]
|
||||||
|
CMD ["serve"]
|
||||||
148
README.md
148
README.md
@@ -1,3 +1,149 @@
|
|||||||
# cosma-log-analyzer
|
# cosma-log-analyzer
|
||||||
|
|
||||||
Détecteur anomalies logs AUV COSMA (règles déterministes + NATS)
|
Deterministic anomaly detection service for COSMA AUV logs. Ingests MCAP
|
||||||
|
files produced by the AUV/USV pipeline, evaluates a set of rules against
|
||||||
|
IMU / USBL / battery topics, and publishes each detection as a JSON event
|
||||||
|
on NATS (or stdout in dev).
|
||||||
|
|
||||||
|
## Context
|
||||||
|
|
||||||
|
COSMA (Flag) operates an AUV that streams telemetry to a surface USV. All
|
||||||
|
telemetry is persisted as MCAP (ROS2-native container). This service is
|
||||||
|
livrable #3: the first-pass observability layer before any statistical or
|
||||||
|
ML detection is added.
|
||||||
|
|
||||||
|
```
|
||||||
|
┌──────┐ MCAP ┌──────┐ MCAP ┌───────────────────┐ NATS ┌────────────────┐
|
||||||
|
│ AUV │────────▶│ USV │────────▶│ cosma-log-analyzer│──────────▶│ cosma-monitor │
|
||||||
|
└──────┘ └──────┘ │ (this repo) │ events │ UI │
|
||||||
|
└───────────────────┘ └────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
## Rules (v0)
|
||||||
|
|
||||||
|
| Rule | Threshold (default) | Severity | Topic |
|
||||||
|
|-----------------------|-------------------------------------------|----------|----------------------------------|
|
||||||
|
| `imu_outliers` | rolling 10 s window, \|z\| > 3 | warn | `/mavros/imu/data` |
|
||||||
|
| `watchdog_imu` | gap > 2 s between two IMU msgs | critical | `/mavros/imu/data` |
|
||||||
|
| `usbl_snr_low` | SNR < 5 dB for 3 consecutive samples | warn | `/usbl_reading/usbl_solution` |
|
||||||
|
| `usbl_distance_spike` | \|Δdistance\| > 50 m in less than 1 s | warn | `/usbl_reading/usbl_solution` |
|
||||||
|
| `battery_low` | voltage < 13.5 V for more than 5 s | critical | `/mavros/battery` |
|
||||||
|
|
||||||
|
All thresholds are tunable via environment variables (`BATTERY_LOW_V`,
|
||||||
|
`USBL_SNR_LOW`, `USBL_DIST_SPIKE_M`, `WATCHDOG_IMU_S`) or rule
|
||||||
|
constructor arguments.
|
||||||
|
|
||||||
|
## NATS subject
|
||||||
|
|
||||||
|
```
|
||||||
|
cosma.auv.{subject}.anomaly.{rule}
|
||||||
|
# ex: cosma.auv.AUV206.anomaly.battery_low
|
||||||
|
```
|
||||||
|
|
||||||
|
If `NATS_URL` is empty, events are written as JSON Lines to stdout —
|
||||||
|
useful in dev and CI.
|
||||||
|
|
||||||
|
## Example anomaly payload
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"rule": "battery_low",
|
||||||
|
"severity": "critical",
|
||||||
|
"timestamp": 1700000055.0,
|
||||||
|
"subject": "AUV206",
|
||||||
|
"topic": "/mavros/battery",
|
||||||
|
"value": 13.26,
|
||||||
|
"context": {
|
||||||
|
"min_voltage_v": 13.5,
|
||||||
|
"min_duration_s": 5.0,
|
||||||
|
"run_start_ts": 1700000051.0,
|
||||||
|
"below_duration_s": 9.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Install
|
||||||
|
|
||||||
|
Python 3.11+ recommended. Works on 3.10.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip install -e .[dev]
|
||||||
|
```
|
||||||
|
|
||||||
|
## CLI
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# One-shot on a single MCAP file
|
||||||
|
cosma-log-analyzer ingest path/to/log.mcap --subject AUV206
|
||||||
|
|
||||||
|
# Dry-run: force stdout even if NATS_URL is set
|
||||||
|
cosma-log-analyzer ingest path/to/log.mcap --dry-run
|
||||||
|
|
||||||
|
# Service mode: watch a directory for new MCAP files
|
||||||
|
cosma-log-analyzer serve --mcap-dir /data/mcap
|
||||||
|
```
|
||||||
|
|
||||||
|
## Docker
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker compose up --build
|
||||||
|
# drop MCAP files into ./data/mcap and watch NATS on :4222
|
||||||
|
```
|
||||||
|
|
||||||
|
## systemd
|
||||||
|
|
||||||
|
```bash
|
||||||
|
sudo cp systemd/cosma-log-analyzer.service /etc/systemd/system/
|
||||||
|
sudo systemctl enable --now cosma-log-analyzer
|
||||||
|
journalctl -u cosma-log-analyzer -f
|
||||||
|
```
|
||||||
|
|
||||||
|
## Tests
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pytest -v # 32 tests, runs the e2e against a fake MCAP
|
||||||
|
pytest --cov --cov-report=term # coverage (rules/ > 95%)
|
||||||
|
```
|
||||||
|
|
||||||
|
The fake MCAP generator (`tests/fixtures/generate_fake_mcap.py`) produces
|
||||||
|
a synthetic 60 s trace with one instance of each rule's trigger
|
||||||
|
condition — the e2e test asserts we detect exactly those.
|
||||||
|
|
||||||
|
## Adding a rule
|
||||||
|
|
||||||
|
1. Subclass `Rule` in `src/cosma_log_analyzer/rules/<name>.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
class MyRule(Rule):
|
||||||
|
name = "my_rule"
|
||||||
|
topic = "/my/topic"
|
||||||
|
severity = "warn"
|
||||||
|
|
||||||
|
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Register it in `rules/__init__.py::all_rules()`.
|
||||||
|
3. Add a test in `tests/test_rules.py`.
|
||||||
|
|
||||||
|
## Roadmap v1
|
||||||
|
|
||||||
|
- Rolling-stats rules (heading drift, GPS dropout correlated with USBL).
|
||||||
|
- Time alignment between MCAP IMU and CSV USV nav.
|
||||||
|
- ML anomaly layer (Isolation Forest) once we have > 50 h of nominal
|
||||||
|
dive datasets to train against.
|
||||||
|
- Backpressure + JetStream persistence for the NATS publisher.
|
||||||
|
|
||||||
|
## Layout
|
||||||
|
|
||||||
|
```
|
||||||
|
src/cosma_log_analyzer/ # package code
|
||||||
|
rules/ # one file per rule
|
||||||
|
main.py # Click CLI: `ingest` + `serve`
|
||||||
|
ingest.py # MCAP + CSV readers -> pandas
|
||||||
|
bus.py # NATS publisher + stdout fallback
|
||||||
|
models.py # Anomaly dataclass
|
||||||
|
tests/ # pytest suite + fake MCAP fixture
|
||||||
|
examples/run_on_fake.sh # end-to-end demo
|
||||||
|
systemd/ # unit file for on-prem deployment
|
||||||
|
```
|
||||||
|
|||||||
21
docker-compose.yml
Normal file
21
docker-compose.yml
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
services:
|
||||||
|
nats:
|
||||||
|
image: nats:2.10-alpine
|
||||||
|
command: ["-js", "-m", "8222"]
|
||||||
|
ports:
|
||||||
|
- "4222:4222"
|
||||||
|
- "8222:8222"
|
||||||
|
restart: unless-stopped
|
||||||
|
|
||||||
|
analyzer:
|
||||||
|
build: .
|
||||||
|
depends_on:
|
||||||
|
- nats
|
||||||
|
environment:
|
||||||
|
NATS_URL: nats://nats:4222
|
||||||
|
MCAP_DIR: /data/mcap
|
||||||
|
POLL_INTERVAL_S: 30
|
||||||
|
LOG_LEVEL: INFO
|
||||||
|
volumes:
|
||||||
|
- ./data/mcap:/data/mcap
|
||||||
|
restart: unless-stopped
|
||||||
13
examples/run_on_fake.sh
Executable file
13
examples/run_on_fake.sh
Executable file
@@ -0,0 +1,13 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# End-to-end demo: generate a synthetic MCAP, run the analyzer on it,
|
||||||
|
# pipe stdout anomalies to the console (NATS fallback mode).
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
ROOT=$(cd "$(dirname "$0")/.." && pwd)
|
||||||
|
cd "$ROOT"
|
||||||
|
|
||||||
|
OUT=$(mktemp -d)/demo.mcap
|
||||||
|
python3 tests/fixtures/generate_fake_mcap.py "$OUT"
|
||||||
|
|
||||||
|
echo "---- Running analyzer on $OUT ----"
|
||||||
|
NATS_URL="" cosma-log-analyzer ingest "$OUT" --subject AUV206 --dry-run
|
||||||
44
pyproject.toml
Normal file
44
pyproject.toml
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["setuptools>=68", "wheel"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "cosma-log-analyzer"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "Deterministic anomaly detector for COSMA AUV logs (MCAP/CSV -> NATS)"
|
||||||
|
readme = "README.md"
|
||||||
|
requires-python = ">=3.10"
|
||||||
|
license = { text = "Proprietary" }
|
||||||
|
authors = [{ name = "Flag / COSMA" }]
|
||||||
|
dependencies = [
|
||||||
|
"mcap>=1.2",
|
||||||
|
"pandas>=2.1",
|
||||||
|
"numpy>=1.26",
|
||||||
|
"nats-py>=2.6",
|
||||||
|
"python-dotenv>=1.0",
|
||||||
|
"click>=8.1",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.optional-dependencies]
|
||||||
|
dev = [
|
||||||
|
"pytest>=7.4",
|
||||||
|
"pytest-cov>=4.1",
|
||||||
|
]
|
||||||
|
|
||||||
|
[project.scripts]
|
||||||
|
cosma-log-analyzer = "cosma_log_analyzer.main:cli"
|
||||||
|
|
||||||
|
[tool.setuptools.packages.find]
|
||||||
|
where = ["src"]
|
||||||
|
|
||||||
|
[tool.pytest.ini_options]
|
||||||
|
testpaths = ["tests"]
|
||||||
|
addopts = "-ra"
|
||||||
|
|
||||||
|
[tool.coverage.run]
|
||||||
|
source = ["src/cosma_log_analyzer"]
|
||||||
|
branch = true
|
||||||
|
|
||||||
|
[tool.coverage.report]
|
||||||
|
show_missing = true
|
||||||
|
skip_covered = false
|
||||||
1
src/cosma_log_analyzer/__init__.py
Normal file
1
src/cosma_log_analyzer/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
__version__ = "0.1.0"
|
||||||
77
src/cosma_log_analyzer/bus.py
Normal file
77
src/cosma_log_analyzer/bus.py
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
"""Event bus publisher: NATS when configured, stdout otherwise."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from typing import Protocol
|
||||||
|
|
||||||
|
from .models import Anomaly
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Publisher(Protocol):
|
||||||
|
def publish(self, anomaly: Anomaly) -> None: ...
|
||||||
|
def close(self) -> None: ...
|
||||||
|
|
||||||
|
|
||||||
|
class StdoutPublisher:
|
||||||
|
"""Fallback: writes JSON Lines to stdout. Thread/process-safe via writes."""
|
||||||
|
|
||||||
|
def __init__(self, stream=None) -> None:
|
||||||
|
self._stream = stream if stream is not None else sys.stdout
|
||||||
|
|
||||||
|
def publish(self, anomaly: Anomaly) -> None:
|
||||||
|
line = anomaly.to_json()
|
||||||
|
self._stream.write(line + "\n")
|
||||||
|
self._stream.flush()
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
try:
|
||||||
|
self._stream.flush()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class NatsPublisher:
|
||||||
|
"""Sync wrapper around nats-py async client. Keeps a dedicated loop."""
|
||||||
|
|
||||||
|
def __init__(self, url: str) -> None:
|
||||||
|
self._url = url
|
||||||
|
self._loop = asyncio.new_event_loop()
|
||||||
|
self._nc = None
|
||||||
|
self._loop.run_until_complete(self._connect())
|
||||||
|
|
||||||
|
async def _connect(self) -> None:
|
||||||
|
import nats # lazy import
|
||||||
|
|
||||||
|
self._nc = await nats.connect(self._url)
|
||||||
|
|
||||||
|
def publish(self, anomaly: Anomaly) -> None:
|
||||||
|
if self._nc is None:
|
||||||
|
raise RuntimeError("NATS client not connected")
|
||||||
|
payload = anomaly.to_json().encode("utf-8")
|
||||||
|
self._loop.run_until_complete(self._nc.publish(anomaly.nats_subject(), payload))
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
if self._nc is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self._loop.run_until_complete(self._nc.drain())
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("NATS drain failed: %s", exc)
|
||||||
|
finally:
|
||||||
|
self._loop.close()
|
||||||
|
self._nc = None
|
||||||
|
|
||||||
|
|
||||||
|
def make_publisher(nats_url: str | None = None) -> Publisher:
|
||||||
|
"""Pick NATS or stdout based on env/arg. Empty/None -> stdout fallback."""
|
||||||
|
url = nats_url if nats_url is not None else os.environ.get("NATS_URL", "")
|
||||||
|
if not url:
|
||||||
|
logger.info("NATS_URL empty -> using stdout fallback publisher")
|
||||||
|
return StdoutPublisher()
|
||||||
|
logger.info("Connecting NATS publisher to %s", url)
|
||||||
|
return NatsPublisher(url)
|
||||||
143
src/cosma_log_analyzer/ingest.py
Normal file
143
src/cosma_log_analyzer/ingest.py
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
"""MCAP + CSV readers.
|
||||||
|
|
||||||
|
MCAP decoding stays schema-agnostic: we read each message's bytes and try
|
||||||
|
JSON first (CDR/ROS2 is out of scope for v0 — field extraction relies on
|
||||||
|
known field names once decoded). If the message body is not JSON we skip it.
|
||||||
|
|
||||||
|
Rules operate on pandas DataFrames with per-topic columns:
|
||||||
|
imu -> ts, ax, ay, az, gx, gy, gz
|
||||||
|
usbl -> ts, distance_m, snr_db
|
||||||
|
battery -> ts, voltage_v
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import csv
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from collections.abc import Iterable, Iterator
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
TOPIC_IMU = "/mavros/imu/data"
|
||||||
|
TOPIC_USBL = "/usbl_reading/usbl_solution"
|
||||||
|
TOPIC_BATTERY = "/mavros/battery"
|
||||||
|
|
||||||
|
KNOWN_TOPICS = (TOPIC_IMU, TOPIC_USBL, TOPIC_BATTERY)
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_payload(data: bytes) -> dict[str, Any] | None:
|
||||||
|
try:
|
||||||
|
return json.loads(data.decode("utf-8"))
|
||||||
|
except (UnicodeDecodeError, json.JSONDecodeError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def iter_mcap_messages(
|
||||||
|
path: str | Path,
|
||||||
|
topics: Iterable[str] | None = None,
|
||||||
|
) -> Iterator[tuple[str, float, dict[str, Any]]]:
|
||||||
|
"""Yield (topic, ts_seconds, decoded_payload) for matching messages."""
|
||||||
|
from mcap.reader import make_reader # lazy: lib mcap
|
||||||
|
|
||||||
|
wanted = set(topics) if topics else None
|
||||||
|
with open(path, "rb") as fh:
|
||||||
|
reader = make_reader(fh)
|
||||||
|
for schema, channel, message in reader.iter_messages(topics=list(wanted) if wanted else None):
|
||||||
|
payload = _decode_payload(message.data)
|
||||||
|
if payload is None:
|
||||||
|
continue
|
||||||
|
ts = message.log_time / 1e9 # MCAP uses ns
|
||||||
|
yield channel.topic, ts, payload
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_imu(payload: dict[str, Any]) -> dict[str, float] | None:
|
||||||
|
lin = payload.get("linear_acceleration") or payload.get("accel") or {}
|
||||||
|
ang = payload.get("angular_velocity") or payload.get("gyro") or {}
|
||||||
|
if not lin and not ang:
|
||||||
|
return None
|
||||||
|
return {
|
||||||
|
"ax": float(lin.get("x", 0.0)),
|
||||||
|
"ay": float(lin.get("y", 0.0)),
|
||||||
|
"az": float(lin.get("z", 0.0)),
|
||||||
|
"gx": float(ang.get("x", 0.0)),
|
||||||
|
"gy": float(ang.get("y", 0.0)),
|
||||||
|
"gz": float(ang.get("z", 0.0)),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_usbl(payload: dict[str, Any]) -> dict[str, float] | None:
|
||||||
|
dist = payload.get("distance_m", payload.get("range_m"))
|
||||||
|
snr = payload.get("snr_db", payload.get("snr"))
|
||||||
|
if dist is None or snr is None:
|
||||||
|
return None
|
||||||
|
return {"distance_m": float(dist), "snr_db": float(snr)}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_battery(payload: dict[str, Any]) -> dict[str, float] | None:
|
||||||
|
v = payload.get("voltage_v", payload.get("voltage"))
|
||||||
|
if v is None:
|
||||||
|
return None
|
||||||
|
return {"voltage_v": float(v)}
|
||||||
|
|
||||||
|
|
||||||
|
_EXTRACTORS = {
|
||||||
|
TOPIC_IMU: _extract_imu,
|
||||||
|
TOPIC_USBL: _extract_usbl,
|
||||||
|
TOPIC_BATTERY: _extract_battery,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def load_mcap(path: str | Path) -> dict[str, pd.DataFrame]:
|
||||||
|
"""Load an MCAP file into per-topic DataFrames.
|
||||||
|
|
||||||
|
Returns a dict keyed by topic. Missing topics yield empty DataFrames.
|
||||||
|
"""
|
||||||
|
rows: dict[str, list[dict[str, Any]]] = {t: [] for t in KNOWN_TOPICS}
|
||||||
|
for topic, ts, payload in iter_mcap_messages(path, KNOWN_TOPICS):
|
||||||
|
extractor = _EXTRACTORS.get(topic)
|
||||||
|
if extractor is None:
|
||||||
|
continue
|
||||||
|
fields = extractor(payload)
|
||||||
|
if fields is None:
|
||||||
|
continue
|
||||||
|
fields["ts"] = ts
|
||||||
|
rows[topic].append(fields)
|
||||||
|
|
||||||
|
out: dict[str, pd.DataFrame] = {}
|
||||||
|
for topic, items in rows.items():
|
||||||
|
if not items:
|
||||||
|
out[topic] = pd.DataFrame()
|
||||||
|
continue
|
||||||
|
df = pd.DataFrame(items).sort_values("ts").reset_index(drop=True)
|
||||||
|
out[topic] = df
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def load_csv_nav(path: str | Path) -> pd.DataFrame:
|
||||||
|
"""Load a USV nav CSV. Expected columns: ts, lat, lon, heading (flexible).
|
||||||
|
|
||||||
|
Missing file or unparseable rows return an empty DataFrame.
|
||||||
|
"""
|
||||||
|
p = Path(path)
|
||||||
|
if not p.exists():
|
||||||
|
logger.warning("CSV nav file not found: %s", path)
|
||||||
|
return pd.DataFrame()
|
||||||
|
rows: list[dict[str, Any]] = []
|
||||||
|
with p.open(newline="") as fh:
|
||||||
|
reader = csv.DictReader(fh)
|
||||||
|
for row in reader:
|
||||||
|
try:
|
||||||
|
rows.append({k: float(v) if v not in ("", None) else None for k, v in row.items()})
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
if not rows:
|
||||||
|
return pd.DataFrame()
|
||||||
|
df = pd.DataFrame(rows)
|
||||||
|
if "ts" in df.columns:
|
||||||
|
df = df.sort_values("ts").reset_index(drop=True)
|
||||||
|
return df
|
||||||
148
src/cosma_log_analyzer/main.py
Normal file
148
src/cosma_log_analyzer/main.py
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
"""CLI + service entrypoint for cosma-log-analyzer."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
|
import click
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
from .bus import Publisher, StdoutPublisher, make_publisher
|
||||||
|
from .ingest import (
|
||||||
|
KNOWN_TOPICS,
|
||||||
|
TOPIC_BATTERY,
|
||||||
|
TOPIC_IMU,
|
||||||
|
TOPIC_USBL,
|
||||||
|
load_mcap,
|
||||||
|
)
|
||||||
|
from .models import Anomaly
|
||||||
|
from .rules import all_rules
|
||||||
|
from .rules.base import Rule
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
TOPIC_TO_KEY = {
|
||||||
|
TOPIC_IMU: TOPIC_IMU,
|
||||||
|
TOPIC_USBL: TOPIC_USBL,
|
||||||
|
TOPIC_BATTERY: TOPIC_BATTERY,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _setup_logging() -> None:
|
||||||
|
level = os.environ.get("LOG_LEVEL", "INFO").upper()
|
||||||
|
logging.basicConfig(
|
||||||
|
level=getattr(logging, level, logging.INFO),
|
||||||
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def analyze_mcap(
|
||||||
|
path: str | Path,
|
||||||
|
subject: str,
|
||||||
|
rules: Iterable[Rule] | None = None,
|
||||||
|
) -> list[Anomaly]:
|
||||||
|
"""Load an MCAP file, run all rules, return the anomaly list."""
|
||||||
|
dataframes = load_mcap(path)
|
||||||
|
rules = list(rules) if rules is not None else all_rules()
|
||||||
|
anomalies: list[Anomaly] = []
|
||||||
|
for rule in rules:
|
||||||
|
rule.bind(subject)
|
||||||
|
df = dataframes.get(rule.topic)
|
||||||
|
if df is None or df.empty:
|
||||||
|
continue
|
||||||
|
anomalies.extend(rule.detect(df))
|
||||||
|
anomalies.sort(key=lambda a: a.timestamp)
|
||||||
|
return anomalies
|
||||||
|
|
||||||
|
|
||||||
|
def emit(anomalies: Iterable[Anomaly], publisher: Publisher) -> int:
|
||||||
|
count = 0
|
||||||
|
for a in anomalies:
|
||||||
|
publisher.publish(a)
|
||||||
|
count += 1
|
||||||
|
return count
|
||||||
|
|
||||||
|
|
||||||
|
@click.group()
|
||||||
|
def cli() -> None:
|
||||||
|
"""cosma-log-analyzer: deterministic anomaly detection on AUV logs."""
|
||||||
|
load_dotenv()
|
||||||
|
_setup_logging()
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command("ingest")
|
||||||
|
@click.argument("path", type=click.Path(exists=True, dir_okay=False))
|
||||||
|
@click.option("--subject", default="AUV000", help="AUV identifier for NATS subject.")
|
||||||
|
@click.option("--dry-run", is_flag=True, help="Force stdout publisher (ignore NATS_URL).")
|
||||||
|
def ingest_cmd(path: str, subject: str, dry_run: bool) -> None:
|
||||||
|
"""Analyze a single MCAP file and publish anomalies."""
|
||||||
|
publisher: Publisher = StdoutPublisher() if dry_run else make_publisher()
|
||||||
|
try:
|
||||||
|
anomalies = analyze_mcap(path, subject)
|
||||||
|
n = emit(anomalies, publisher)
|
||||||
|
logger.info("Processed %s -> %d anomalies", path, n)
|
||||||
|
finally:
|
||||||
|
publisher.close()
|
||||||
|
|
||||||
|
|
||||||
|
@cli.command("serve")
|
||||||
|
@click.option(
|
||||||
|
"--mcap-dir",
|
||||||
|
default=None,
|
||||||
|
help="Directory to watch (default: MCAP_DIR env).",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--subject",
|
||||||
|
default="AUV000",
|
||||||
|
help="AUV identifier for NATS subject.",
|
||||||
|
)
|
||||||
|
@click.option(
|
||||||
|
"--poll-interval",
|
||||||
|
default=None,
|
||||||
|
type=float,
|
||||||
|
help="Seconds between scans (default: POLL_INTERVAL_S env, else 30).",
|
||||||
|
)
|
||||||
|
def serve_cmd(mcap_dir: str | None, subject: str, poll_interval: float | None) -> None:
|
||||||
|
"""Watch a directory and process new MCAP files as they appear."""
|
||||||
|
mcap_dir = mcap_dir or os.environ.get("MCAP_DIR") or "/data/mcap"
|
||||||
|
interval = poll_interval if poll_interval is not None else float(
|
||||||
|
os.environ.get("POLL_INTERVAL_S", "30")
|
||||||
|
)
|
||||||
|
watch_dir = Path(mcap_dir)
|
||||||
|
watch_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
logger.info("Watching %s (interval=%.1fs)", watch_dir, interval)
|
||||||
|
|
||||||
|
publisher = make_publisher()
|
||||||
|
seen: set[str] = set()
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
for mcap_path in sorted(watch_dir.glob("*.mcap")):
|
||||||
|
key = str(mcap_path)
|
||||||
|
if key in seen:
|
||||||
|
continue
|
||||||
|
logger.info("New MCAP: %s", mcap_path)
|
||||||
|
try:
|
||||||
|
anomalies = analyze_mcap(mcap_path, subject)
|
||||||
|
n = emit(anomalies, publisher)
|
||||||
|
logger.info("Emitted %d anomalies from %s", n, mcap_path.name)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.exception("Failed processing %s: %s", mcap_path, exc)
|
||||||
|
seen.add(key)
|
||||||
|
time.sleep(interval)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("Shutting down on SIGINT")
|
||||||
|
finally:
|
||||||
|
publisher.close()
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None: # pragma: no cover
|
||||||
|
cli(standalone_mode=True)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": # pragma: no cover
|
||||||
|
main()
|
||||||
40
src/cosma_log_analyzer/models.py
Normal file
40
src/cosma_log_analyzer/models.py
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from dataclasses import asdict, dataclass, field
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
SEVERITIES = ("info", "warn", "critical")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Anomaly:
|
||||||
|
rule: str
|
||||||
|
severity: str
|
||||||
|
timestamp: float
|
||||||
|
subject: str
|
||||||
|
topic: str
|
||||||
|
value: float | None = None
|
||||||
|
context: dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
if self.severity not in SEVERITIES:
|
||||||
|
raise ValueError(
|
||||||
|
f"severity must be one of {SEVERITIES}, got {self.severity!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def to_dict(self) -> dict[str, Any]:
|
||||||
|
return asdict(self)
|
||||||
|
|
||||||
|
def to_json(self) -> str:
|
||||||
|
return json.dumps(self.to_dict(), sort_keys=True, default=_json_default)
|
||||||
|
|
||||||
|
def nats_subject(self) -> str:
|
||||||
|
return f"cosma.auv.{self.subject}.anomaly.{self.rule}"
|
||||||
|
|
||||||
|
|
||||||
|
def _json_default(obj: Any) -> Any:
|
||||||
|
if hasattr(obj, "isoformat"):
|
||||||
|
return obj.isoformat()
|
||||||
|
raise TypeError(f"Not JSON serializable: {type(obj).__name__}")
|
||||||
34
src/cosma_log_analyzer/rules/__init__.py
Normal file
34
src/cosma_log_analyzer/rules/__init__.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
"""Deterministic rules v0.
|
||||||
|
|
||||||
|
Adding a new rule = subclass Rule, register it in `ALL_RULES`, add a test.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from .base import Rule
|
||||||
|
from .battery_low import BatteryLowRule
|
||||||
|
from .imu_outliers import ImuOutliersRule
|
||||||
|
from .usbl_distance_spike import UsblDistanceSpikeRule
|
||||||
|
from .usbl_snr_low import UsblSnrLowRule
|
||||||
|
from .watchdog_imu import WatchdogImuRule
|
||||||
|
|
||||||
|
|
||||||
|
def all_rules() -> list[Rule]:
|
||||||
|
"""Default rule set, instantiated with env/default thresholds."""
|
||||||
|
return [
|
||||||
|
ImuOutliersRule(),
|
||||||
|
WatchdogImuRule(),
|
||||||
|
UsblSnrLowRule(),
|
||||||
|
UsblDistanceSpikeRule(),
|
||||||
|
BatteryLowRule(),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"Rule",
|
||||||
|
"ImuOutliersRule",
|
||||||
|
"WatchdogImuRule",
|
||||||
|
"UsblSnrLowRule",
|
||||||
|
"UsblDistanceSpikeRule",
|
||||||
|
"BatteryLowRule",
|
||||||
|
"all_rules",
|
||||||
|
]
|
||||||
47
src/cosma_log_analyzer/rules/base.py
Normal file
47
src/cosma_log_analyzer/rules/base.py
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from ..models import Anomaly
|
||||||
|
|
||||||
|
|
||||||
|
class Rule(ABC):
|
||||||
|
"""Base class for detection rules.
|
||||||
|
|
||||||
|
Subclasses declare class attributes `name`, `topic`, `severity`, then
|
||||||
|
implement `detect(df)` which receives the topic-specific DataFrame and
|
||||||
|
returns a list of Anomaly instances.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name: str = "rule"
|
||||||
|
topic: str = ""
|
||||||
|
severity: str = "warn"
|
||||||
|
|
||||||
|
def __init__(self, subject: str = "AUV000") -> None:
|
||||||
|
self.subject = subject
|
||||||
|
|
||||||
|
def bind(self, subject: str) -> "Rule":
|
||||||
|
self.subject = subject
|
||||||
|
return self
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
|
||||||
|
...
|
||||||
|
|
||||||
|
def _make(
|
||||||
|
self,
|
||||||
|
ts: float,
|
||||||
|
value: float | None,
|
||||||
|
context: dict,
|
||||||
|
) -> Anomaly:
|
||||||
|
return Anomaly(
|
||||||
|
rule=self.name,
|
||||||
|
severity=self.severity,
|
||||||
|
timestamp=float(ts),
|
||||||
|
subject=self.subject,
|
||||||
|
topic=self.topic,
|
||||||
|
value=value,
|
||||||
|
context=context,
|
||||||
|
)
|
||||||
62
src/cosma_log_analyzer/rules/battery_low.py
Normal file
62
src/cosma_log_analyzer/rules/battery_low.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from ..ingest import TOPIC_BATTERY
|
||||||
|
from ..models import Anomaly
|
||||||
|
from .base import Rule
|
||||||
|
|
||||||
|
|
||||||
|
class BatteryLowRule(Rule):
|
||||||
|
"""Fire when voltage < threshold for more than `min_duration_s`."""
|
||||||
|
|
||||||
|
name = "battery_low"
|
||||||
|
topic = TOPIC_BATTERY
|
||||||
|
severity = "critical"
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
subject: str = "AUV000",
|
||||||
|
min_voltage_v: float | None = None,
|
||||||
|
min_duration_s: float = 5.0,
|
||||||
|
) -> None:
|
||||||
|
super().__init__(subject)
|
||||||
|
if min_voltage_v is None:
|
||||||
|
min_voltage_v = float(os.environ.get("BATTERY_LOW_V", 13.5))
|
||||||
|
self.min_voltage_v = min_voltage_v
|
||||||
|
self.min_duration_s = min_duration_s
|
||||||
|
|
||||||
|
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
|
||||||
|
if df.empty:
|
||||||
|
return []
|
||||||
|
below = df["voltage_v"] < self.min_voltage_v
|
||||||
|
if not below.any():
|
||||||
|
return []
|
||||||
|
run_id = (~below).cumsum()
|
||||||
|
anomalies: list[Anomaly] = []
|
||||||
|
for rid, group in df[below].groupby(run_id[below]):
|
||||||
|
duration = float(group["ts"].iloc[-1] - group["ts"].iloc[0])
|
||||||
|
if duration < self.min_duration_s:
|
||||||
|
continue
|
||||||
|
fire_row = None
|
||||||
|
for _, row in group.iterrows():
|
||||||
|
if row["ts"] - group["ts"].iloc[0] >= self.min_duration_s:
|
||||||
|
fire_row = row
|
||||||
|
break
|
||||||
|
if fire_row is None:
|
||||||
|
fire_row = group.iloc[-1]
|
||||||
|
anomalies.append(
|
||||||
|
self._make(
|
||||||
|
ts=float(fire_row["ts"]),
|
||||||
|
value=float(fire_row["voltage_v"]),
|
||||||
|
context={
|
||||||
|
"min_voltage_v": self.min_voltage_v,
|
||||||
|
"min_duration_s": self.min_duration_s,
|
||||||
|
"run_start_ts": float(group["ts"].iloc[0]),
|
||||||
|
"below_duration_s": duration,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return anomalies
|
||||||
52
src/cosma_log_analyzer/rules/imu_outliers.py
Normal file
52
src/cosma_log_analyzer/rules/imu_outliers.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from ..ingest import TOPIC_IMU
|
||||||
|
from ..models import Anomaly
|
||||||
|
from .base import Rule
|
||||||
|
|
||||||
|
|
||||||
|
class ImuOutliersRule(Rule):
|
||||||
|
"""Fire when instant accel magnitude deviates > z_thresh rolling sigmas."""
|
||||||
|
|
||||||
|
name = "imu_outliers"
|
||||||
|
topic = TOPIC_IMU
|
||||||
|
severity = "warn"
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
subject: str = "AUV000",
|
||||||
|
window_s: float = 10.0,
|
||||||
|
z_thresh: float = 3.0,
|
||||||
|
) -> None:
|
||||||
|
super().__init__(subject)
|
||||||
|
self.window_s = window_s
|
||||||
|
self.z_thresh = z_thresh
|
||||||
|
|
||||||
|
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
|
||||||
|
if df.empty or len(df) < 10:
|
||||||
|
return []
|
||||||
|
mag = np.sqrt(df["ax"] ** 2 + df["ay"] ** 2 + df["az"] ** 2)
|
||||||
|
dt = float(df["ts"].diff().median() or 0.02)
|
||||||
|
window = max(5, int(self.window_s / dt))
|
||||||
|
baseline_mean = mag.rolling(window, min_periods=window // 2).mean()
|
||||||
|
baseline_std = mag.rolling(window, min_periods=window // 2).std()
|
||||||
|
z = (mag - baseline_mean) / baseline_std.replace(0, np.nan)
|
||||||
|
mask = z.abs() > self.z_thresh
|
||||||
|
anomalies: list[Anomaly] = []
|
||||||
|
for idx in df.index[mask.fillna(False)]:
|
||||||
|
anomalies.append(
|
||||||
|
self._make(
|
||||||
|
ts=float(df.at[idx, "ts"]),
|
||||||
|
value=float(mag.iloc[idx]),
|
||||||
|
context={
|
||||||
|
"window_s": self.window_s,
|
||||||
|
"z_score": float(z.iloc[idx]),
|
||||||
|
"baseline_mean": float(baseline_mean.iloc[idx]),
|
||||||
|
"baseline_std": float(baseline_std.iloc[idx]),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return anomalies
|
||||||
50
src/cosma_log_analyzer/rules/usbl_distance_spike.py
Normal file
50
src/cosma_log_analyzer/rules/usbl_distance_spike.py
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from ..ingest import TOPIC_USBL
|
||||||
|
from ..models import Anomaly
|
||||||
|
from .base import Rule
|
||||||
|
|
||||||
|
|
||||||
|
class UsblDistanceSpikeRule(Rule):
|
||||||
|
"""Fire when |Δdistance| > spike_m within less than max_dt_s."""
|
||||||
|
|
||||||
|
name = "usbl_distance_spike"
|
||||||
|
topic = TOPIC_USBL
|
||||||
|
severity = "warn"
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
subject: str = "AUV000",
|
||||||
|
spike_m: float | None = None,
|
||||||
|
max_dt_s: float = 1.0,
|
||||||
|
) -> None:
|
||||||
|
super().__init__(subject)
|
||||||
|
if spike_m is None:
|
||||||
|
spike_m = float(os.environ.get("USBL_DIST_SPIKE_M", 50.0))
|
||||||
|
self.spike_m = spike_m
|
||||||
|
self.max_dt_s = max_dt_s
|
||||||
|
|
||||||
|
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
|
||||||
|
if df.empty or len(df) < 2:
|
||||||
|
return []
|
||||||
|
dt = df["ts"].diff()
|
||||||
|
ddist = df["distance_m"].diff().abs()
|
||||||
|
mask = (ddist > self.spike_m) & (dt < self.max_dt_s)
|
||||||
|
anomalies: list[Anomaly] = []
|
||||||
|
for idx in df.index[mask.fillna(False)]:
|
||||||
|
anomalies.append(
|
||||||
|
self._make(
|
||||||
|
ts=float(df.at[idx, "ts"]),
|
||||||
|
value=float(ddist.iloc[idx]),
|
||||||
|
context={
|
||||||
|
"delta_m": float(ddist.iloc[idx]),
|
||||||
|
"dt_s": float(dt.iloc[idx]),
|
||||||
|
"spike_m": self.spike_m,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return anomalies
|
||||||
52
src/cosma_log_analyzer/rules/usbl_snr_low.py
Normal file
52
src/cosma_log_analyzer/rules/usbl_snr_low.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from ..ingest import TOPIC_USBL
|
||||||
|
from ..models import Anomaly
|
||||||
|
from .base import Rule
|
||||||
|
|
||||||
|
|
||||||
|
class UsblSnrLowRule(Rule):
|
||||||
|
"""Fire when SNR stays below threshold for `consec` consecutive samples."""
|
||||||
|
|
||||||
|
name = "usbl_snr_low"
|
||||||
|
topic = TOPIC_USBL
|
||||||
|
severity = "warn"
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
subject: str = "AUV000",
|
||||||
|
min_snr_db: float | None = None,
|
||||||
|
consec: int = 3,
|
||||||
|
) -> None:
|
||||||
|
super().__init__(subject)
|
||||||
|
if min_snr_db is None:
|
||||||
|
min_snr_db = float(os.environ.get("USBL_SNR_LOW", 5.0))
|
||||||
|
self.min_snr_db = min_snr_db
|
||||||
|
self.consec = consec
|
||||||
|
|
||||||
|
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
|
||||||
|
if df.empty or len(df) < self.consec:
|
||||||
|
return []
|
||||||
|
low = df["snr_db"] < self.min_snr_db
|
||||||
|
run = low.astype(int).groupby((~low).cumsum()).cumsum()
|
||||||
|
anomalies: list[Anomaly] = []
|
||||||
|
fired_run = -1
|
||||||
|
runs_id = (~low).cumsum()
|
||||||
|
for idx in df.index:
|
||||||
|
if run.iloc[idx] >= self.consec and runs_id.iloc[idx] != fired_run:
|
||||||
|
fired_run = int(runs_id.iloc[idx])
|
||||||
|
anomalies.append(
|
||||||
|
self._make(
|
||||||
|
ts=float(df.at[idx, "ts"]),
|
||||||
|
value=float(df.at[idx, "snr_db"]),
|
||||||
|
context={
|
||||||
|
"min_snr_db": self.min_snr_db,
|
||||||
|
"consec": self.consec,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return anomalies
|
||||||
43
src/cosma_log_analyzer/rules/watchdog_imu.py
Normal file
43
src/cosma_log_analyzer/rules/watchdog_imu.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
|
||||||
|
from ..ingest import TOPIC_IMU
|
||||||
|
from ..models import Anomaly
|
||||||
|
from .base import Rule
|
||||||
|
|
||||||
|
|
||||||
|
class WatchdogImuRule(Rule):
|
||||||
|
"""Fire when the gap between two IMU messages exceeds `max_gap_s`."""
|
||||||
|
|
||||||
|
name = "watchdog_imu"
|
||||||
|
topic = TOPIC_IMU
|
||||||
|
severity = "critical"
|
||||||
|
|
||||||
|
def __init__(self, subject: str = "AUV000", max_gap_s: float | None = None) -> None:
|
||||||
|
super().__init__(subject)
|
||||||
|
if max_gap_s is None:
|
||||||
|
max_gap_s = float(os.environ.get("WATCHDOG_IMU_S", 2.0))
|
||||||
|
self.max_gap_s = max_gap_s
|
||||||
|
|
||||||
|
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
|
||||||
|
if df.empty or len(df) < 2:
|
||||||
|
return []
|
||||||
|
gaps = df["ts"].diff()
|
||||||
|
mask = gaps > self.max_gap_s
|
||||||
|
anomalies: list[Anomaly] = []
|
||||||
|
for idx in df.index[mask.fillna(False)]:
|
||||||
|
anomalies.append(
|
||||||
|
self._make(
|
||||||
|
ts=float(df.at[idx, "ts"]),
|
||||||
|
value=float(gaps.iloc[idx]),
|
||||||
|
context={
|
||||||
|
"gap_s": float(gaps.iloc[idx]),
|
||||||
|
"max_gap_s": self.max_gap_s,
|
||||||
|
"prev_ts": float(df.at[idx - 1, "ts"]),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return anomalies
|
||||||
19
systemd/cosma-log-analyzer.service
Normal file
19
systemd/cosma-log-analyzer.service
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
[Unit]
|
||||||
|
Description=COSMA log analyzer (MCAP -> NATS anomaly events)
|
||||||
|
After=network-online.target
|
||||||
|
Wants=network-online.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
User=cosma
|
||||||
|
Group=cosma
|
||||||
|
WorkingDirectory=/opt/cosma-log-analyzer
|
||||||
|
EnvironmentFile=/etc/cosma-log-analyzer.env
|
||||||
|
ExecStart=/opt/cosma-log-analyzer/.venv/bin/cosma-log-analyzer serve
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=5
|
||||||
|
StandardOutput=journal
|
||||||
|
StandardError=journal
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
13
tests/conftest.py
Normal file
13
tests/conftest.py
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from tests.fixtures.generate_fake_mcap import generate
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def fake_mcap(tmp_path_factory: pytest.TempPathFactory) -> Path:
|
||||||
|
path = tmp_path_factory.mktemp("mcap") / "fake.mcap"
|
||||||
|
return generate(path)
|
||||||
0
tests/fixtures/__init__.py
vendored
Normal file
0
tests/fixtures/__init__.py
vendored
Normal file
127
tests/fixtures/generate_fake_mcap.py
vendored
Normal file
127
tests/fixtures/generate_fake_mcap.py
vendored
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
"""Generate a synthetic MCAP file for AUV log analyzer testing.
|
||||||
|
|
||||||
|
Scenarios baked in (relative to t0):
|
||||||
|
- IMU 50 Hz over 60 s.
|
||||||
|
- IMU outlier at t=30 s (accel magnitude spike, ~6x baseline).
|
||||||
|
- IMU gap: no messages between t=45 s and t=48 s (watchdog fires).
|
||||||
|
- USBL 2 Hz over 60 s, nominal SNR=12 dB; 5 s window at t=20 s with SNR=2.
|
||||||
|
- USBL distance jump of 100 m at t=40 s.
|
||||||
|
- Battery 1 Hz, linearly from 16.0 V to 12.0 V over 60 s.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import random
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from mcap.writer import Writer
|
||||||
|
|
||||||
|
IMU_TOPIC = "/mavros/imu/data"
|
||||||
|
USBL_TOPIC = "/usbl_reading/usbl_solution"
|
||||||
|
BATTERY_TOPIC = "/mavros/battery"
|
||||||
|
|
||||||
|
T0_NS = 1_700_000_000_000_000_000 # 2023-11-14 22:13:20 UTC, stable ref
|
||||||
|
|
||||||
|
|
||||||
|
def _ns(seconds_from_t0: float) -> int:
|
||||||
|
return T0_NS + int(seconds_from_t0 * 1e9)
|
||||||
|
|
||||||
|
|
||||||
|
def _register(writer: Writer, topic: str) -> int:
|
||||||
|
schema_id = writer.register_schema(
|
||||||
|
name=f"{topic.strip('/').replace('/', '_')}_json",
|
||||||
|
encoding="jsonschema",
|
||||||
|
data=b"{}",
|
||||||
|
)
|
||||||
|
return writer.register_channel(
|
||||||
|
topic=topic, message_encoding="json", schema_id=schema_id
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _write_message(writer: Writer, channel_id: int, t_s: float, payload: dict, seq: int) -> None:
|
||||||
|
data = json.dumps(payload).encode("utf-8")
|
||||||
|
ns = _ns(t_s)
|
||||||
|
writer.add_message(
|
||||||
|
channel_id=channel_id,
|
||||||
|
log_time=ns,
|
||||||
|
data=data,
|
||||||
|
publish_time=ns,
|
||||||
|
sequence=seq,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def generate(path: Path, seed: int = 42) -> Path:
|
||||||
|
"""Write a synthetic MCAP to `path`. Returns `path`."""
|
||||||
|
rng = random.Random(seed)
|
||||||
|
path = Path(path)
|
||||||
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(path, "wb") as fh:
|
||||||
|
w = Writer(fh)
|
||||||
|
w.start(profile="", library="cosma-fake")
|
||||||
|
imu_ch = _register(w, IMU_TOPIC)
|
||||||
|
usbl_ch = _register(w, USBL_TOPIC)
|
||||||
|
bat_ch = _register(w, BATTERY_TOPIC)
|
||||||
|
|
||||||
|
seq = 0
|
||||||
|
# --- IMU 50Hz, with outlier at t=30s and gap 45-48s
|
||||||
|
dt = 0.02 # 50 Hz
|
||||||
|
t = 0.0
|
||||||
|
while t <= 60.0:
|
||||||
|
if 45.0 < t < 48.0: # watchdog gap (skip publishing)
|
||||||
|
t += dt
|
||||||
|
continue
|
||||||
|
if abs(t - 30.0) < 1e-6: # outlier single sample
|
||||||
|
ax, ay, az = 30.0, 30.0, 30.0
|
||||||
|
else:
|
||||||
|
ax = rng.gauss(0.0, 0.05)
|
||||||
|
ay = rng.gauss(0.0, 0.05)
|
||||||
|
az = rng.gauss(9.81, 0.05)
|
||||||
|
payload = {
|
||||||
|
"linear_acceleration": {"x": ax, "y": ay, "z": az},
|
||||||
|
"angular_velocity": {
|
||||||
|
"x": rng.gauss(0.0, 0.001),
|
||||||
|
"y": rng.gauss(0.0, 0.001),
|
||||||
|
"z": rng.gauss(0.0, 0.001),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_write_message(w, imu_ch, t, payload, seq)
|
||||||
|
seq += 1
|
||||||
|
t += dt
|
||||||
|
|
||||||
|
# --- USBL 2Hz, SNR low 20-25s, distance spike at t=40s
|
||||||
|
t = 0.0
|
||||||
|
last_dist = 100.0
|
||||||
|
while t <= 60.0:
|
||||||
|
if 20.0 <= t < 25.0:
|
||||||
|
snr = 2.0
|
||||||
|
else:
|
||||||
|
snr = 12.0 + rng.gauss(0.0, 0.2)
|
||||||
|
if abs(t - 40.0) < 0.01:
|
||||||
|
dist = last_dist + 100.0 # 100m jump
|
||||||
|
else:
|
||||||
|
dist = last_dist + rng.gauss(0.0, 0.5)
|
||||||
|
last_dist = dist
|
||||||
|
payload = {"distance_m": dist, "snr_db": snr}
|
||||||
|
_write_message(w, usbl_ch, t, payload, seq)
|
||||||
|
seq += 1
|
||||||
|
t += 0.5
|
||||||
|
|
||||||
|
# --- Battery 1Hz, 16.0V -> 12.0V linear over 60s
|
||||||
|
for i in range(61):
|
||||||
|
t = float(i)
|
||||||
|
v = 16.0 - (4.0 * t / 60.0)
|
||||||
|
payload = {"voltage_v": v}
|
||||||
|
_write_message(w, bat_ch, t, payload, seq)
|
||||||
|
seq += 1
|
||||||
|
|
||||||
|
w.finish()
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": # pragma: no cover
|
||||||
|
import sys
|
||||||
|
|
||||||
|
out = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("fake.mcap")
|
||||||
|
generate(out)
|
||||||
|
print(f"Wrote {out}")
|
||||||
73
tests/test_e2e.py
Normal file
73
tests/test_e2e.py
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import io
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from cosma_log_analyzer.bus import StdoutPublisher
|
||||||
|
from cosma_log_analyzer.main import analyze_mcap, emit
|
||||||
|
|
||||||
|
|
||||||
|
EXPECTED_RULES = {
|
||||||
|
"imu_outliers",
|
||||||
|
"watchdog_imu",
|
||||||
|
"usbl_snr_low",
|
||||||
|
"usbl_distance_spike",
|
||||||
|
"battery_low",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_analyze_fake_mcap_produces_all_rule_types(fake_mcap: Path) -> None:
|
||||||
|
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
|
||||||
|
fired = {a.rule for a in anomalies}
|
||||||
|
assert EXPECTED_RULES <= fired, f"missing rules: {EXPECTED_RULES - fired}"
|
||||||
|
|
||||||
|
|
||||||
|
def test_analyze_fake_mcap_subject_propagated(fake_mcap: Path) -> None:
|
||||||
|
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
|
||||||
|
assert anomalies
|
||||||
|
assert all(a.subject == "AUV206" for a in anomalies)
|
||||||
|
|
||||||
|
|
||||||
|
def test_watchdog_detects_the_gap(fake_mcap: Path) -> None:
|
||||||
|
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
|
||||||
|
watchdog = [a for a in anomalies if a.rule == "watchdog_imu"]
|
||||||
|
# The fixture inserts exactly one 3s gap.
|
||||||
|
assert len(watchdog) == 1
|
||||||
|
assert watchdog[0].context["gap_s"] > 2.9
|
||||||
|
|
||||||
|
|
||||||
|
def test_battery_low_fires_once(fake_mcap: Path) -> None:
|
||||||
|
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
|
||||||
|
bat = [a for a in anomalies if a.rule == "battery_low"]
|
||||||
|
assert len(bat) == 1
|
||||||
|
assert bat[0].severity == "critical"
|
||||||
|
assert bat[0].value < 13.5
|
||||||
|
|
||||||
|
|
||||||
|
def test_usbl_distance_spike_fires_once(fake_mcap: Path) -> None:
|
||||||
|
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
|
||||||
|
spikes = [a for a in anomalies if a.rule == "usbl_distance_spike"]
|
||||||
|
assert len(spikes) == 1
|
||||||
|
assert spikes[0].context["delta_m"] > 50.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_usbl_snr_low_fires(fake_mcap: Path) -> None:
|
||||||
|
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
|
||||||
|
snr = [a for a in anomalies if a.rule == "usbl_snr_low"]
|
||||||
|
assert len(snr) >= 1
|
||||||
|
assert all(a.value < 5.0 for a in snr)
|
||||||
|
|
||||||
|
|
||||||
|
def test_stdout_publisher_emits_json_lines(fake_mcap: Path) -> None:
|
||||||
|
anomalies = analyze_mcap(fake_mcap, subject="AUV206")
|
||||||
|
buf = io.StringIO()
|
||||||
|
publisher = StdoutPublisher(stream=buf)
|
||||||
|
n = emit(anomalies, publisher)
|
||||||
|
publisher.close()
|
||||||
|
lines = [ln for ln in buf.getvalue().splitlines() if ln]
|
||||||
|
assert n == len(lines) == len(anomalies)
|
||||||
|
for line in lines:
|
||||||
|
obj = json.loads(line)
|
||||||
|
assert obj["subject"] == "AUV206"
|
||||||
|
assert obj["rule"] in EXPECTED_RULES
|
||||||
58
tests/test_ingest.py
Normal file
58
tests/test_ingest.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from cosma_log_analyzer.ingest import (
|
||||||
|
KNOWN_TOPICS,
|
||||||
|
TOPIC_BATTERY,
|
||||||
|
TOPIC_IMU,
|
||||||
|
TOPIC_USBL,
|
||||||
|
iter_mcap_messages,
|
||||||
|
load_csv_nav,
|
||||||
|
load_mcap,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_mcap_returns_dataframes(fake_mcap: Path) -> None:
|
||||||
|
data = load_mcap(fake_mcap)
|
||||||
|
assert set(data.keys()) == set(KNOWN_TOPICS)
|
||||||
|
imu = data[TOPIC_IMU]
|
||||||
|
usbl = data[TOPIC_USBL]
|
||||||
|
bat = data[TOPIC_BATTERY]
|
||||||
|
assert not imu.empty
|
||||||
|
assert not usbl.empty
|
||||||
|
assert not bat.empty
|
||||||
|
# IMU ~50 Hz over 60s minus a 3s gap => ~2850 samples
|
||||||
|
assert 2700 < len(imu) < 3000
|
||||||
|
# USBL 2Hz over 60s => ~121 samples
|
||||||
|
assert 115 < len(usbl) < 130
|
||||||
|
# Battery 1Hz over 60s => 61 samples
|
||||||
|
assert len(bat) == 61
|
||||||
|
assert {"ts", "ax", "ay", "az", "gx", "gy", "gz"}.issubset(imu.columns)
|
||||||
|
assert {"ts", "distance_m", "snr_db"}.issubset(usbl.columns)
|
||||||
|
assert {"ts", "voltage_v"}.issubset(bat.columns)
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_mcap_sorted_by_ts(fake_mcap: Path) -> None:
|
||||||
|
data = load_mcap(fake_mcap)
|
||||||
|
for df in data.values():
|
||||||
|
assert df["ts"].is_monotonic_increasing
|
||||||
|
|
||||||
|
|
||||||
|
def test_iter_mcap_messages_filter(fake_mcap: Path) -> None:
|
||||||
|
msgs = list(iter_mcap_messages(fake_mcap, topics=[TOPIC_BATTERY]))
|
||||||
|
assert len(msgs) == 61
|
||||||
|
assert all(t == TOPIC_BATTERY for t, _, _ in msgs)
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_csv_nav_missing_file(tmp_path: Path) -> None:
|
||||||
|
df = load_csv_nav(tmp_path / "nope.csv")
|
||||||
|
assert df.empty
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_csv_nav_parses(tmp_path: Path) -> None:
|
||||||
|
p = tmp_path / "nav.csv"
|
||||||
|
p.write_text("ts,lat,lon\n1.0,48.0,2.0\n2.0,48.1,2.1\n")
|
||||||
|
df = load_csv_nav(p)
|
||||||
|
assert len(df) == 2
|
||||||
|
assert df["ts"].is_monotonic_increasing
|
||||||
188
tests/test_rules.py
Normal file
188
tests/test_rules.py
Normal file
@@ -0,0 +1,188 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pandas as pd
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from cosma_log_analyzer.models import Anomaly
|
||||||
|
from cosma_log_analyzer.rules import (
|
||||||
|
BatteryLowRule,
|
||||||
|
ImuOutliersRule,
|
||||||
|
UsblDistanceSpikeRule,
|
||||||
|
UsblSnrLowRule,
|
||||||
|
WatchdogImuRule,
|
||||||
|
all_rules,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _imu_df(n: int = 600, dt: float = 0.02) -> pd.DataFrame:
|
||||||
|
ts = [i * dt for i in range(n)]
|
||||||
|
return pd.DataFrame(
|
||||||
|
{
|
||||||
|
"ts": ts,
|
||||||
|
"ax": [0.0] * n,
|
||||||
|
"ay": [0.0] * n,
|
||||||
|
"az": [9.81] * n,
|
||||||
|
"gx": [0.0] * n,
|
||||||
|
"gy": [0.0] * n,
|
||||||
|
"gz": [0.0] * n,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_imu_outliers_fires_on_single_spike() -> None:
|
||||||
|
df = _imu_df()
|
||||||
|
df.loc[300, ["ax", "ay", "az"]] = [30.0, 30.0, 30.0]
|
||||||
|
anomalies = ImuOutliersRule(subject="AUV206").detect(df)
|
||||||
|
assert any(a.rule == "imu_outliers" for a in anomalies)
|
||||||
|
a = anomalies[0]
|
||||||
|
assert isinstance(a, Anomaly)
|
||||||
|
assert a.subject == "AUV206"
|
||||||
|
assert a.severity == "warn"
|
||||||
|
|
||||||
|
|
||||||
|
def test_imu_outliers_empty_df() -> None:
|
||||||
|
assert ImuOutliersRule().detect(pd.DataFrame()) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_imu_outliers_no_spike_no_anomaly() -> None:
|
||||||
|
df = _imu_df()
|
||||||
|
assert ImuOutliersRule().detect(df) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_watchdog_imu_fires_on_gap() -> None:
|
||||||
|
df = pd.DataFrame({"ts": [0.0, 0.5, 1.0, 5.0, 5.5]})
|
||||||
|
anomalies = WatchdogImuRule(max_gap_s=2.0).detect(df)
|
||||||
|
assert len(anomalies) == 1
|
||||||
|
assert anomalies[0].context["gap_s"] == pytest.approx(4.0)
|
||||||
|
assert anomalies[0].severity == "critical"
|
||||||
|
|
||||||
|
|
||||||
|
def test_watchdog_imu_no_gap() -> None:
|
||||||
|
df = pd.DataFrame({"ts": [i * 0.02 for i in range(100)]})
|
||||||
|
assert WatchdogImuRule(max_gap_s=2.0).detect(df) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_watchdog_imu_short_df() -> None:
|
||||||
|
assert WatchdogImuRule().detect(pd.DataFrame()) == []
|
||||||
|
assert WatchdogImuRule().detect(pd.DataFrame({"ts": [0.0]})) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_usbl_snr_low_needs_three_consec() -> None:
|
||||||
|
df = pd.DataFrame(
|
||||||
|
{
|
||||||
|
"ts": [0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0],
|
||||||
|
"distance_m": [100.0] * 7,
|
||||||
|
"snr_db": [10.0, 2.0, 2.0, 2.0, 10.0, 2.0, 10.0],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
anomalies = UsblSnrLowRule(min_snr_db=5.0, consec=3).detect(df)
|
||||||
|
assert len(anomalies) == 1
|
||||||
|
assert anomalies[0].value == pytest.approx(2.0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_usbl_snr_low_no_run_long_enough() -> None:
|
||||||
|
df = pd.DataFrame(
|
||||||
|
{
|
||||||
|
"ts": [0.0, 0.5, 1.0, 1.5, 2.0],
|
||||||
|
"distance_m": [100.0] * 5,
|
||||||
|
"snr_db": [2.0, 10.0, 2.0, 10.0, 2.0],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert UsblSnrLowRule(min_snr_db=5.0, consec=3).detect(df) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_usbl_snr_low_empty() -> None:
|
||||||
|
assert UsblSnrLowRule().detect(pd.DataFrame()) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_usbl_distance_spike_fires() -> None:
|
||||||
|
df = pd.DataFrame(
|
||||||
|
{
|
||||||
|
"ts": [0.0, 0.5, 1.0, 1.5],
|
||||||
|
"distance_m": [100.0, 100.5, 250.0, 250.5],
|
||||||
|
"snr_db": [12.0] * 4,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
anomalies = UsblDistanceSpikeRule(spike_m=50.0, max_dt_s=1.0).detect(df)
|
||||||
|
assert len(anomalies) == 1
|
||||||
|
assert anomalies[0].value == pytest.approx(149.5)
|
||||||
|
|
||||||
|
|
||||||
|
def test_usbl_distance_spike_ignores_slow_drift() -> None:
|
||||||
|
df = pd.DataFrame(
|
||||||
|
{
|
||||||
|
"ts": [0.0, 2.0, 4.0], # dt > max_dt_s
|
||||||
|
"distance_m": [100.0, 250.0, 400.0],
|
||||||
|
"snr_db": [12.0] * 3,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert UsblDistanceSpikeRule(spike_m=50.0, max_dt_s=1.0).detect(df) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_usbl_distance_spike_empty() -> None:
|
||||||
|
assert UsblDistanceSpikeRule().detect(pd.DataFrame()) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_battery_low_fires_on_sustained_drop() -> None:
|
||||||
|
ts = [float(i) for i in range(20)]
|
||||||
|
voltage = [15.0] * 5 + [13.0] * 10 + [15.0] * 5
|
||||||
|
df = pd.DataFrame({"ts": ts, "voltage_v": voltage})
|
||||||
|
anomalies = BatteryLowRule(min_voltage_v=13.5, min_duration_s=5.0).detect(df)
|
||||||
|
assert len(anomalies) == 1
|
||||||
|
assert anomalies[0].severity == "critical"
|
||||||
|
assert anomalies[0].value == pytest.approx(13.0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_battery_low_ignores_short_dips() -> None:
|
||||||
|
ts = [float(i) for i in range(10)]
|
||||||
|
voltage = [15.0, 15.0, 13.0, 13.0, 15.0, 15.0, 13.0, 13.0, 15.0, 15.0]
|
||||||
|
df = pd.DataFrame({"ts": ts, "voltage_v": voltage})
|
||||||
|
assert BatteryLowRule(min_voltage_v=13.5, min_duration_s=5.0).detect(df) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_battery_low_empty() -> None:
|
||||||
|
assert BatteryLowRule().detect(pd.DataFrame()) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_battery_low_always_above() -> None:
|
||||||
|
df = pd.DataFrame({"ts": [0.0, 1.0, 2.0], "voltage_v": [16.0, 15.5, 15.0]})
|
||||||
|
assert BatteryLowRule(min_voltage_v=13.5).detect(df) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_all_rules_returns_five() -> None:
|
||||||
|
rules = all_rules()
|
||||||
|
assert len(rules) == 5
|
||||||
|
assert {r.name for r in rules} == {
|
||||||
|
"imu_outliers",
|
||||||
|
"watchdog_imu",
|
||||||
|
"usbl_snr_low",
|
||||||
|
"usbl_distance_spike",
|
||||||
|
"battery_low",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_rule_bind_sets_subject() -> None:
|
||||||
|
r = BatteryLowRule()
|
||||||
|
r.bind("AUV206")
|
||||||
|
assert r.subject == "AUV206"
|
||||||
|
|
||||||
|
|
||||||
|
def test_anomaly_severity_validation() -> None:
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
Anomaly(
|
||||||
|
rule="x", severity="bogus", timestamp=0.0, subject="s", topic="t"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_anomaly_json_and_subject() -> None:
|
||||||
|
a = Anomaly(
|
||||||
|
rule="battery_low",
|
||||||
|
severity="critical",
|
||||||
|
timestamp=123.0,
|
||||||
|
subject="AUV206",
|
||||||
|
topic="/mavros/battery",
|
||||||
|
value=12.5,
|
||||||
|
context={"k": 1},
|
||||||
|
)
|
||||||
|
assert a.nats_subject() == "cosma.auv.AUV206.anomaly.battery_low"
|
||||||
|
assert "battery_low" in a.to_json()
|
||||||
Reference in New Issue
Block a user