189 lines
5.3 KiB
Python
189 lines
5.3 KiB
Python
from __future__ import annotations
|
|
|
|
import pandas as pd
|
|
import pytest
|
|
|
|
from cosma_log_analyzer.models import Anomaly
|
|
from cosma_log_analyzer.rules import (
|
|
BatteryLowRule,
|
|
ImuOutliersRule,
|
|
UsblDistanceSpikeRule,
|
|
UsblSnrLowRule,
|
|
WatchdogImuRule,
|
|
all_rules,
|
|
)
|
|
|
|
|
|
def _imu_df(n: int = 600, dt: float = 0.02) -> pd.DataFrame:
|
|
ts = [i * dt for i in range(n)]
|
|
return pd.DataFrame(
|
|
{
|
|
"ts": ts,
|
|
"ax": [0.0] * n,
|
|
"ay": [0.0] * n,
|
|
"az": [9.81] * n,
|
|
"gx": [0.0] * n,
|
|
"gy": [0.0] * n,
|
|
"gz": [0.0] * n,
|
|
}
|
|
)
|
|
|
|
|
|
def test_imu_outliers_fires_on_single_spike() -> None:
|
|
df = _imu_df()
|
|
df.loc[300, ["ax", "ay", "az"]] = [30.0, 30.0, 30.0]
|
|
anomalies = ImuOutliersRule(subject="AUV206").detect(df)
|
|
assert any(a.rule == "imu_outliers" for a in anomalies)
|
|
a = anomalies[0]
|
|
assert isinstance(a, Anomaly)
|
|
assert a.subject == "AUV206"
|
|
assert a.severity == "warn"
|
|
|
|
|
|
def test_imu_outliers_empty_df() -> None:
|
|
assert ImuOutliersRule().detect(pd.DataFrame()) == []
|
|
|
|
|
|
def test_imu_outliers_no_spike_no_anomaly() -> None:
|
|
df = _imu_df()
|
|
assert ImuOutliersRule().detect(df) == []
|
|
|
|
|
|
def test_watchdog_imu_fires_on_gap() -> None:
|
|
df = pd.DataFrame({"ts": [0.0, 0.5, 1.0, 5.0, 5.5]})
|
|
anomalies = WatchdogImuRule(max_gap_s=2.0).detect(df)
|
|
assert len(anomalies) == 1
|
|
assert anomalies[0].context["gap_s"] == pytest.approx(4.0)
|
|
assert anomalies[0].severity == "critical"
|
|
|
|
|
|
def test_watchdog_imu_no_gap() -> None:
|
|
df = pd.DataFrame({"ts": [i * 0.02 for i in range(100)]})
|
|
assert WatchdogImuRule(max_gap_s=2.0).detect(df) == []
|
|
|
|
|
|
def test_watchdog_imu_short_df() -> None:
|
|
assert WatchdogImuRule().detect(pd.DataFrame()) == []
|
|
assert WatchdogImuRule().detect(pd.DataFrame({"ts": [0.0]})) == []
|
|
|
|
|
|
def test_usbl_snr_low_needs_three_consec() -> None:
|
|
df = pd.DataFrame(
|
|
{
|
|
"ts": [0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0],
|
|
"distance_m": [100.0] * 7,
|
|
"snr_db": [10.0, 2.0, 2.0, 2.0, 10.0, 2.0, 10.0],
|
|
}
|
|
)
|
|
anomalies = UsblSnrLowRule(min_snr_db=5.0, consec=3).detect(df)
|
|
assert len(anomalies) == 1
|
|
assert anomalies[0].value == pytest.approx(2.0)
|
|
|
|
|
|
def test_usbl_snr_low_no_run_long_enough() -> None:
|
|
df = pd.DataFrame(
|
|
{
|
|
"ts": [0.0, 0.5, 1.0, 1.5, 2.0],
|
|
"distance_m": [100.0] * 5,
|
|
"snr_db": [2.0, 10.0, 2.0, 10.0, 2.0],
|
|
}
|
|
)
|
|
assert UsblSnrLowRule(min_snr_db=5.0, consec=3).detect(df) == []
|
|
|
|
|
|
def test_usbl_snr_low_empty() -> None:
|
|
assert UsblSnrLowRule().detect(pd.DataFrame()) == []
|
|
|
|
|
|
def test_usbl_distance_spike_fires() -> None:
|
|
df = pd.DataFrame(
|
|
{
|
|
"ts": [0.0, 0.5, 1.0, 1.5],
|
|
"distance_m": [100.0, 100.5, 250.0, 250.5],
|
|
"snr_db": [12.0] * 4,
|
|
}
|
|
)
|
|
anomalies = UsblDistanceSpikeRule(spike_m=50.0, max_dt_s=1.0).detect(df)
|
|
assert len(anomalies) == 1
|
|
assert anomalies[0].value == pytest.approx(149.5)
|
|
|
|
|
|
def test_usbl_distance_spike_ignores_slow_drift() -> None:
|
|
df = pd.DataFrame(
|
|
{
|
|
"ts": [0.0, 2.0, 4.0], # dt > max_dt_s
|
|
"distance_m": [100.0, 250.0, 400.0],
|
|
"snr_db": [12.0] * 3,
|
|
}
|
|
)
|
|
assert UsblDistanceSpikeRule(spike_m=50.0, max_dt_s=1.0).detect(df) == []
|
|
|
|
|
|
def test_usbl_distance_spike_empty() -> None:
|
|
assert UsblDistanceSpikeRule().detect(pd.DataFrame()) == []
|
|
|
|
|
|
def test_battery_low_fires_on_sustained_drop() -> None:
|
|
ts = [float(i) for i in range(20)]
|
|
voltage = [15.0] * 5 + [13.0] * 10 + [15.0] * 5
|
|
df = pd.DataFrame({"ts": ts, "voltage_v": voltage})
|
|
anomalies = BatteryLowRule(min_voltage_v=13.5, min_duration_s=5.0).detect(df)
|
|
assert len(anomalies) == 1
|
|
assert anomalies[0].severity == "critical"
|
|
assert anomalies[0].value == pytest.approx(13.0)
|
|
|
|
|
|
def test_battery_low_ignores_short_dips() -> None:
|
|
ts = [float(i) for i in range(10)]
|
|
voltage = [15.0, 15.0, 13.0, 13.0, 15.0, 15.0, 13.0, 13.0, 15.0, 15.0]
|
|
df = pd.DataFrame({"ts": ts, "voltage_v": voltage})
|
|
assert BatteryLowRule(min_voltage_v=13.5, min_duration_s=5.0).detect(df) == []
|
|
|
|
|
|
def test_battery_low_empty() -> None:
|
|
assert BatteryLowRule().detect(pd.DataFrame()) == []
|
|
|
|
|
|
def test_battery_low_always_above() -> None:
|
|
df = pd.DataFrame({"ts": [0.0, 1.0, 2.0], "voltage_v": [16.0, 15.5, 15.0]})
|
|
assert BatteryLowRule(min_voltage_v=13.5).detect(df) == []
|
|
|
|
|
|
def test_all_rules_returns_five() -> None:
|
|
rules = all_rules()
|
|
assert len(rules) == 5
|
|
assert {r.name for r in rules} == {
|
|
"imu_outliers",
|
|
"watchdog_imu",
|
|
"usbl_snr_low",
|
|
"usbl_distance_spike",
|
|
"battery_low",
|
|
}
|
|
|
|
|
|
def test_rule_bind_sets_subject() -> None:
|
|
r = BatteryLowRule()
|
|
r.bind("AUV206")
|
|
assert r.subject == "AUV206"
|
|
|
|
|
|
def test_anomaly_severity_validation() -> None:
|
|
with pytest.raises(ValueError):
|
|
Anomaly(
|
|
rule="x", severity="bogus", timestamp=0.0, subject="s", topic="t"
|
|
)
|
|
|
|
|
|
def test_anomaly_json_and_subject() -> None:
|
|
a = Anomaly(
|
|
rule="battery_low",
|
|
severity="critical",
|
|
timestamp=123.0,
|
|
subject="AUV206",
|
|
topic="/mavros/battery",
|
|
value=12.5,
|
|
context={"k": 1},
|
|
)
|
|
assert a.nats_subject() == "cosma.auv.AUV206.anomaly.battery_low"
|
|
assert "battery_low" in a.to_json()
|