From b0bbb518736b6c2a1b77ccf3c036323bf6a484f1 Mon Sep 17 00:00:00 2001 From: floppyrj45 Date: Sun, 19 Apr 2026 15:20:20 +0000 Subject: [PATCH] feat: scaffold cosma-log-analyzer with 5 deterministic rules + fake MCAP e2e test Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/__init__.py | 0 tests/conftest.py | 13 ++ tests/fixtures/__init__.py | 0 tests/fixtures/generate_fake_mcap.py | 127 ++++++++++++++++++ tests/test_e2e.py | 73 +++++++++++ tests/test_ingest.py | 58 +++++++++ tests/test_rules.py | 188 +++++++++++++++++++++++++++ 7 files changed, 459 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/fixtures/__init__.py create mode 100644 tests/fixtures/generate_fake_mcap.py create mode 100644 tests/test_e2e.py create mode 100644 tests/test_ingest.py create mode 100644 tests/test_rules.py diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..b9d743b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from pathlib import Path + +import pytest + +from tests.fixtures.generate_fake_mcap import generate + + +@pytest.fixture(scope="session") +def fake_mcap(tmp_path_factory: pytest.TempPathFactory) -> Path: + path = tmp_path_factory.mktemp("mcap") / "fake.mcap" + return generate(path) diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures/generate_fake_mcap.py b/tests/fixtures/generate_fake_mcap.py new file mode 100644 index 0000000..1765b9d --- /dev/null +++ b/tests/fixtures/generate_fake_mcap.py @@ -0,0 +1,127 @@ +"""Generate a synthetic MCAP file for AUV log analyzer testing. + +Scenarios baked in (relative to t0): +- IMU 50 Hz over 60 s. +- IMU outlier at t=30 s (accel magnitude spike, ~6x baseline). +- IMU gap: no messages between t=45 s and t=48 s (watchdog fires). +- USBL 2 Hz over 60 s, nominal SNR=12 dB; 5 s window at t=20 s with SNR=2. +- USBL distance jump of 100 m at t=40 s. +- Battery 1 Hz, linearly from 16.0 V to 12.0 V over 60 s. +""" +from __future__ import annotations + +import json +import math +import random +from pathlib import Path + +from mcap.writer import Writer + +IMU_TOPIC = "/mavros/imu/data" +USBL_TOPIC = "/usbl_reading/usbl_solution" +BATTERY_TOPIC = "/mavros/battery" + +T0_NS = 1_700_000_000_000_000_000 # 2023-11-14 22:13:20 UTC, stable ref + + +def _ns(seconds_from_t0: float) -> int: + return T0_NS + int(seconds_from_t0 * 1e9) + + +def _register(writer: Writer, topic: str) -> int: + schema_id = writer.register_schema( + name=f"{topic.strip('/').replace('/', '_')}_json", + encoding="jsonschema", + data=b"{}", + ) + return writer.register_channel( + topic=topic, message_encoding="json", schema_id=schema_id + ) + + +def _write_message(writer: Writer, channel_id: int, t_s: float, payload: dict, seq: int) -> None: + data = json.dumps(payload).encode("utf-8") + ns = _ns(t_s) + writer.add_message( + channel_id=channel_id, + log_time=ns, + data=data, + publish_time=ns, + sequence=seq, + ) + + +def generate(path: Path, seed: int = 42) -> Path: + """Write a synthetic MCAP to `path`. Returns `path`.""" + rng = random.Random(seed) + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "wb") as fh: + w = Writer(fh) + w.start(profile="", library="cosma-fake") + imu_ch = _register(w, IMU_TOPIC) + usbl_ch = _register(w, USBL_TOPIC) + bat_ch = _register(w, BATTERY_TOPIC) + + seq = 0 + # --- IMU 50Hz, with outlier at t=30s and gap 45-48s + dt = 0.02 # 50 Hz + t = 0.0 + while t <= 60.0: + if 45.0 < t < 48.0: # watchdog gap (skip publishing) + t += dt + continue + if abs(t - 30.0) < 1e-6: # outlier single sample + ax, ay, az = 30.0, 30.0, 30.0 + else: + ax = rng.gauss(0.0, 0.05) + ay = rng.gauss(0.0, 0.05) + az = rng.gauss(9.81, 0.05) + payload = { + "linear_acceleration": {"x": ax, "y": ay, "z": az}, + "angular_velocity": { + "x": rng.gauss(0.0, 0.001), + "y": rng.gauss(0.0, 0.001), + "z": rng.gauss(0.0, 0.001), + }, + } + _write_message(w, imu_ch, t, payload, seq) + seq += 1 + t += dt + + # --- USBL 2Hz, SNR low 20-25s, distance spike at t=40s + t = 0.0 + last_dist = 100.0 + while t <= 60.0: + if 20.0 <= t < 25.0: + snr = 2.0 + else: + snr = 12.0 + rng.gauss(0.0, 0.2) + if abs(t - 40.0) < 0.01: + dist = last_dist + 100.0 # 100m jump + else: + dist = last_dist + rng.gauss(0.0, 0.5) + last_dist = dist + payload = {"distance_m": dist, "snr_db": snr} + _write_message(w, usbl_ch, t, payload, seq) + seq += 1 + t += 0.5 + + # --- Battery 1Hz, 16.0V -> 12.0V linear over 60s + for i in range(61): + t = float(i) + v = 16.0 - (4.0 * t / 60.0) + payload = {"voltage_v": v} + _write_message(w, bat_ch, t, payload, seq) + seq += 1 + + w.finish() + return path + + +if __name__ == "__main__": # pragma: no cover + import sys + + out = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("fake.mcap") + generate(out) + print(f"Wrote {out}") diff --git a/tests/test_e2e.py b/tests/test_e2e.py new file mode 100644 index 0000000..49bbcd3 --- /dev/null +++ b/tests/test_e2e.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +import io +import json +from pathlib import Path + +from cosma_log_analyzer.bus import StdoutPublisher +from cosma_log_analyzer.main import analyze_mcap, emit + + +EXPECTED_RULES = { + "imu_outliers", + "watchdog_imu", + "usbl_snr_low", + "usbl_distance_spike", + "battery_low", +} + + +def test_analyze_fake_mcap_produces_all_rule_types(fake_mcap: Path) -> None: + anomalies = analyze_mcap(fake_mcap, subject="AUV206") + fired = {a.rule for a in anomalies} + assert EXPECTED_RULES <= fired, f"missing rules: {EXPECTED_RULES - fired}" + + +def test_analyze_fake_mcap_subject_propagated(fake_mcap: Path) -> None: + anomalies = analyze_mcap(fake_mcap, subject="AUV206") + assert anomalies + assert all(a.subject == "AUV206" for a in anomalies) + + +def test_watchdog_detects_the_gap(fake_mcap: Path) -> None: + anomalies = analyze_mcap(fake_mcap, subject="AUV206") + watchdog = [a for a in anomalies if a.rule == "watchdog_imu"] + # The fixture inserts exactly one 3s gap. + assert len(watchdog) == 1 + assert watchdog[0].context["gap_s"] > 2.9 + + +def test_battery_low_fires_once(fake_mcap: Path) -> None: + anomalies = analyze_mcap(fake_mcap, subject="AUV206") + bat = [a for a in anomalies if a.rule == "battery_low"] + assert len(bat) == 1 + assert bat[0].severity == "critical" + assert bat[0].value < 13.5 + + +def test_usbl_distance_spike_fires_once(fake_mcap: Path) -> None: + anomalies = analyze_mcap(fake_mcap, subject="AUV206") + spikes = [a for a in anomalies if a.rule == "usbl_distance_spike"] + assert len(spikes) == 1 + assert spikes[0].context["delta_m"] > 50.0 + + +def test_usbl_snr_low_fires(fake_mcap: Path) -> None: + anomalies = analyze_mcap(fake_mcap, subject="AUV206") + snr = [a for a in anomalies if a.rule == "usbl_snr_low"] + assert len(snr) >= 1 + assert all(a.value < 5.0 for a in snr) + + +def test_stdout_publisher_emits_json_lines(fake_mcap: Path) -> None: + anomalies = analyze_mcap(fake_mcap, subject="AUV206") + buf = io.StringIO() + publisher = StdoutPublisher(stream=buf) + n = emit(anomalies, publisher) + publisher.close() + lines = [ln for ln in buf.getvalue().splitlines() if ln] + assert n == len(lines) == len(anomalies) + for line in lines: + obj = json.loads(line) + assert obj["subject"] == "AUV206" + assert obj["rule"] in EXPECTED_RULES diff --git a/tests/test_ingest.py b/tests/test_ingest.py new file mode 100644 index 0000000..1d1746d --- /dev/null +++ b/tests/test_ingest.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from pathlib import Path + +from cosma_log_analyzer.ingest import ( + KNOWN_TOPICS, + TOPIC_BATTERY, + TOPIC_IMU, + TOPIC_USBL, + iter_mcap_messages, + load_csv_nav, + load_mcap, +) + + +def test_load_mcap_returns_dataframes(fake_mcap: Path) -> None: + data = load_mcap(fake_mcap) + assert set(data.keys()) == set(KNOWN_TOPICS) + imu = data[TOPIC_IMU] + usbl = data[TOPIC_USBL] + bat = data[TOPIC_BATTERY] + assert not imu.empty + assert not usbl.empty + assert not bat.empty + # IMU ~50 Hz over 60s minus a 3s gap => ~2850 samples + assert 2700 < len(imu) < 3000 + # USBL 2Hz over 60s => ~121 samples + assert 115 < len(usbl) < 130 + # Battery 1Hz over 60s => 61 samples + assert len(bat) == 61 + assert {"ts", "ax", "ay", "az", "gx", "gy", "gz"}.issubset(imu.columns) + assert {"ts", "distance_m", "snr_db"}.issubset(usbl.columns) + assert {"ts", "voltage_v"}.issubset(bat.columns) + + +def test_load_mcap_sorted_by_ts(fake_mcap: Path) -> None: + data = load_mcap(fake_mcap) + for df in data.values(): + assert df["ts"].is_monotonic_increasing + + +def test_iter_mcap_messages_filter(fake_mcap: Path) -> None: + msgs = list(iter_mcap_messages(fake_mcap, topics=[TOPIC_BATTERY])) + assert len(msgs) == 61 + assert all(t == TOPIC_BATTERY for t, _, _ in msgs) + + +def test_load_csv_nav_missing_file(tmp_path: Path) -> None: + df = load_csv_nav(tmp_path / "nope.csv") + assert df.empty + + +def test_load_csv_nav_parses(tmp_path: Path) -> None: + p = tmp_path / "nav.csv" + p.write_text("ts,lat,lon\n1.0,48.0,2.0\n2.0,48.1,2.1\n") + df = load_csv_nav(p) + assert len(df) == 2 + assert df["ts"].is_monotonic_increasing diff --git a/tests/test_rules.py b/tests/test_rules.py new file mode 100644 index 0000000..2ab12b4 --- /dev/null +++ b/tests/test_rules.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +import pandas as pd +import pytest + +from cosma_log_analyzer.models import Anomaly +from cosma_log_analyzer.rules import ( + BatteryLowRule, + ImuOutliersRule, + UsblDistanceSpikeRule, + UsblSnrLowRule, + WatchdogImuRule, + all_rules, +) + + +def _imu_df(n: int = 600, dt: float = 0.02) -> pd.DataFrame: + ts = [i * dt for i in range(n)] + return pd.DataFrame( + { + "ts": ts, + "ax": [0.0] * n, + "ay": [0.0] * n, + "az": [9.81] * n, + "gx": [0.0] * n, + "gy": [0.0] * n, + "gz": [0.0] * n, + } + ) + + +def test_imu_outliers_fires_on_single_spike() -> None: + df = _imu_df() + df.loc[300, ["ax", "ay", "az"]] = [30.0, 30.0, 30.0] + anomalies = ImuOutliersRule(subject="AUV206").detect(df) + assert any(a.rule == "imu_outliers" for a in anomalies) + a = anomalies[0] + assert isinstance(a, Anomaly) + assert a.subject == "AUV206" + assert a.severity == "warn" + + +def test_imu_outliers_empty_df() -> None: + assert ImuOutliersRule().detect(pd.DataFrame()) == [] + + +def test_imu_outliers_no_spike_no_anomaly() -> None: + df = _imu_df() + assert ImuOutliersRule().detect(df) == [] + + +def test_watchdog_imu_fires_on_gap() -> None: + df = pd.DataFrame({"ts": [0.0, 0.5, 1.0, 5.0, 5.5]}) + anomalies = WatchdogImuRule(max_gap_s=2.0).detect(df) + assert len(anomalies) == 1 + assert anomalies[0].context["gap_s"] == pytest.approx(4.0) + assert anomalies[0].severity == "critical" + + +def test_watchdog_imu_no_gap() -> None: + df = pd.DataFrame({"ts": [i * 0.02 for i in range(100)]}) + assert WatchdogImuRule(max_gap_s=2.0).detect(df) == [] + + +def test_watchdog_imu_short_df() -> None: + assert WatchdogImuRule().detect(pd.DataFrame()) == [] + assert WatchdogImuRule().detect(pd.DataFrame({"ts": [0.0]})) == [] + + +def test_usbl_snr_low_needs_three_consec() -> None: + df = pd.DataFrame( + { + "ts": [0.0, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0], + "distance_m": [100.0] * 7, + "snr_db": [10.0, 2.0, 2.0, 2.0, 10.0, 2.0, 10.0], + } + ) + anomalies = UsblSnrLowRule(min_snr_db=5.0, consec=3).detect(df) + assert len(anomalies) == 1 + assert anomalies[0].value == pytest.approx(2.0) + + +def test_usbl_snr_low_no_run_long_enough() -> None: + df = pd.DataFrame( + { + "ts": [0.0, 0.5, 1.0, 1.5, 2.0], + "distance_m": [100.0] * 5, + "snr_db": [2.0, 10.0, 2.0, 10.0, 2.0], + } + ) + assert UsblSnrLowRule(min_snr_db=5.0, consec=3).detect(df) == [] + + +def test_usbl_snr_low_empty() -> None: + assert UsblSnrLowRule().detect(pd.DataFrame()) == [] + + +def test_usbl_distance_spike_fires() -> None: + df = pd.DataFrame( + { + "ts": [0.0, 0.5, 1.0, 1.5], + "distance_m": [100.0, 100.5, 250.0, 250.5], + "snr_db": [12.0] * 4, + } + ) + anomalies = UsblDistanceSpikeRule(spike_m=50.0, max_dt_s=1.0).detect(df) + assert len(anomalies) == 1 + assert anomalies[0].value == pytest.approx(149.5) + + +def test_usbl_distance_spike_ignores_slow_drift() -> None: + df = pd.DataFrame( + { + "ts": [0.0, 2.0, 4.0], # dt > max_dt_s + "distance_m": [100.0, 250.0, 400.0], + "snr_db": [12.0] * 3, + } + ) + assert UsblDistanceSpikeRule(spike_m=50.0, max_dt_s=1.0).detect(df) == [] + + +def test_usbl_distance_spike_empty() -> None: + assert UsblDistanceSpikeRule().detect(pd.DataFrame()) == [] + + +def test_battery_low_fires_on_sustained_drop() -> None: + ts = [float(i) for i in range(20)] + voltage = [15.0] * 5 + [13.0] * 10 + [15.0] * 5 + df = pd.DataFrame({"ts": ts, "voltage_v": voltage}) + anomalies = BatteryLowRule(min_voltage_v=13.5, min_duration_s=5.0).detect(df) + assert len(anomalies) == 1 + assert anomalies[0].severity == "critical" + assert anomalies[0].value == pytest.approx(13.0) + + +def test_battery_low_ignores_short_dips() -> None: + ts = [float(i) for i in range(10)] + voltage = [15.0, 15.0, 13.0, 13.0, 15.0, 15.0, 13.0, 13.0, 15.0, 15.0] + df = pd.DataFrame({"ts": ts, "voltage_v": voltage}) + assert BatteryLowRule(min_voltage_v=13.5, min_duration_s=5.0).detect(df) == [] + + +def test_battery_low_empty() -> None: + assert BatteryLowRule().detect(pd.DataFrame()) == [] + + +def test_battery_low_always_above() -> None: + df = pd.DataFrame({"ts": [0.0, 1.0, 2.0], "voltage_v": [16.0, 15.5, 15.0]}) + assert BatteryLowRule(min_voltage_v=13.5).detect(df) == [] + + +def test_all_rules_returns_five() -> None: + rules = all_rules() + assert len(rules) == 5 + assert {r.name for r in rules} == { + "imu_outliers", + "watchdog_imu", + "usbl_snr_low", + "usbl_distance_spike", + "battery_low", + } + + +def test_rule_bind_sets_subject() -> None: + r = BatteryLowRule() + r.bind("AUV206") + assert r.subject == "AUV206" + + +def test_anomaly_severity_validation() -> None: + with pytest.raises(ValueError): + Anomaly( + rule="x", severity="bogus", timestamp=0.0, subject="s", topic="t" + ) + + +def test_anomaly_json_and_subject() -> None: + a = Anomaly( + rule="battery_low", + severity="critical", + timestamp=123.0, + subject="AUV206", + topic="/mavros/battery", + value=12.5, + context={"k": 1}, + ) + assert a.nats_subject() == "cosma.auv.AUV206.anomaly.battery_low" + assert "battery_low" in a.to_json()