diff --git a/src/cosma_log_analyzer/rules/__init__.py b/src/cosma_log_analyzer/rules/__init__.py new file mode 100644 index 0000000..d63eb9c --- /dev/null +++ b/src/cosma_log_analyzer/rules/__init__.py @@ -0,0 +1,34 @@ +"""Deterministic rules v0. + +Adding a new rule = subclass Rule, register it in `ALL_RULES`, add a test. +""" +from __future__ import annotations + +from .base import Rule +from .battery_low import BatteryLowRule +from .imu_outliers import ImuOutliersRule +from .usbl_distance_spike import UsblDistanceSpikeRule +from .usbl_snr_low import UsblSnrLowRule +from .watchdog_imu import WatchdogImuRule + + +def all_rules() -> list[Rule]: + """Default rule set, instantiated with env/default thresholds.""" + return [ + ImuOutliersRule(), + WatchdogImuRule(), + UsblSnrLowRule(), + UsblDistanceSpikeRule(), + BatteryLowRule(), + ] + + +__all__ = [ + "Rule", + "ImuOutliersRule", + "WatchdogImuRule", + "UsblSnrLowRule", + "UsblDistanceSpikeRule", + "BatteryLowRule", + "all_rules", +] diff --git a/src/cosma_log_analyzer/rules/base.py b/src/cosma_log_analyzer/rules/base.py new file mode 100644 index 0000000..68c0c64 --- /dev/null +++ b/src/cosma_log_analyzer/rules/base.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod + +import pandas as pd + +from ..models import Anomaly + + +class Rule(ABC): + """Base class for detection rules. + + Subclasses declare class attributes `name`, `topic`, `severity`, then + implement `detect(df)` which receives the topic-specific DataFrame and + returns a list of Anomaly instances. + """ + + name: str = "rule" + topic: str = "" + severity: str = "warn" + + def __init__(self, subject: str = "AUV000") -> None: + self.subject = subject + + def bind(self, subject: str) -> "Rule": + self.subject = subject + return self + + @abstractmethod + def detect(self, df: pd.DataFrame) -> list[Anomaly]: + ... + + def _make( + self, + ts: float, + value: float | None, + context: dict, + ) -> Anomaly: + return Anomaly( + rule=self.name, + severity=self.severity, + timestamp=float(ts), + subject=self.subject, + topic=self.topic, + value=value, + context=context, + ) diff --git a/src/cosma_log_analyzer/rules/battery_low.py b/src/cosma_log_analyzer/rules/battery_low.py new file mode 100644 index 0000000..799cdec --- /dev/null +++ b/src/cosma_log_analyzer/rules/battery_low.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import os + +import pandas as pd + +from ..ingest import TOPIC_BATTERY +from ..models import Anomaly +from .base import Rule + + +class BatteryLowRule(Rule): + """Fire when voltage < threshold for more than `min_duration_s`.""" + + name = "battery_low" + topic = TOPIC_BATTERY + severity = "critical" + + def __init__( + self, + subject: str = "AUV000", + min_voltage_v: float | None = None, + min_duration_s: float = 5.0, + ) -> None: + super().__init__(subject) + if min_voltage_v is None: + min_voltage_v = float(os.environ.get("BATTERY_LOW_V", 13.5)) + self.min_voltage_v = min_voltage_v + self.min_duration_s = min_duration_s + + def detect(self, df: pd.DataFrame) -> list[Anomaly]: + if df.empty: + return [] + below = df["voltage_v"] < self.min_voltage_v + if not below.any(): + return [] + run_id = (~below).cumsum() + anomalies: list[Anomaly] = [] + for rid, group in df[below].groupby(run_id[below]): + duration = float(group["ts"].iloc[-1] - group["ts"].iloc[0]) + if duration < self.min_duration_s: + continue + fire_row = None + for _, row in group.iterrows(): + if row["ts"] - group["ts"].iloc[0] >= self.min_duration_s: + fire_row = row + break + if fire_row is None: + fire_row = group.iloc[-1] + anomalies.append( + self._make( + ts=float(fire_row["ts"]), + value=float(fire_row["voltage_v"]), + context={ + "min_voltage_v": self.min_voltage_v, + "min_duration_s": self.min_duration_s, + "run_start_ts": float(group["ts"].iloc[0]), + "below_duration_s": duration, + }, + ) + ) + return anomalies diff --git a/src/cosma_log_analyzer/rules/imu_outliers.py b/src/cosma_log_analyzer/rules/imu_outliers.py new file mode 100644 index 0000000..d916310 --- /dev/null +++ b/src/cosma_log_analyzer/rules/imu_outliers.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd + +from ..ingest import TOPIC_IMU +from ..models import Anomaly +from .base import Rule + + +class ImuOutliersRule(Rule): + """Fire when instant accel magnitude deviates > z_thresh rolling sigmas.""" + + name = "imu_outliers" + topic = TOPIC_IMU + severity = "warn" + + def __init__( + self, + subject: str = "AUV000", + window_s: float = 10.0, + z_thresh: float = 3.0, + ) -> None: + super().__init__(subject) + self.window_s = window_s + self.z_thresh = z_thresh + + def detect(self, df: pd.DataFrame) -> list[Anomaly]: + if df.empty or len(df) < 10: + return [] + mag = np.sqrt(df["ax"] ** 2 + df["ay"] ** 2 + df["az"] ** 2) + dt = float(df["ts"].diff().median() or 0.02) + window = max(5, int(self.window_s / dt)) + baseline_mean = mag.rolling(window, min_periods=window // 2).mean() + baseline_std = mag.rolling(window, min_periods=window // 2).std() + z = (mag - baseline_mean) / baseline_std.replace(0, np.nan) + mask = z.abs() > self.z_thresh + anomalies: list[Anomaly] = [] + for idx in df.index[mask.fillna(False)]: + anomalies.append( + self._make( + ts=float(df.at[idx, "ts"]), + value=float(mag.iloc[idx]), + context={ + "window_s": self.window_s, + "z_score": float(z.iloc[idx]), + "baseline_mean": float(baseline_mean.iloc[idx]), + "baseline_std": float(baseline_std.iloc[idx]), + }, + ) + ) + return anomalies diff --git a/src/cosma_log_analyzer/rules/usbl_distance_spike.py b/src/cosma_log_analyzer/rules/usbl_distance_spike.py new file mode 100644 index 0000000..197f26c --- /dev/null +++ b/src/cosma_log_analyzer/rules/usbl_distance_spike.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import os + +import pandas as pd + +from ..ingest import TOPIC_USBL +from ..models import Anomaly +from .base import Rule + + +class UsblDistanceSpikeRule(Rule): + """Fire when |Δdistance| > spike_m within less than max_dt_s.""" + + name = "usbl_distance_spike" + topic = TOPIC_USBL + severity = "warn" + + def __init__( + self, + subject: str = "AUV000", + spike_m: float | None = None, + max_dt_s: float = 1.0, + ) -> None: + super().__init__(subject) + if spike_m is None: + spike_m = float(os.environ.get("USBL_DIST_SPIKE_M", 50.0)) + self.spike_m = spike_m + self.max_dt_s = max_dt_s + + def detect(self, df: pd.DataFrame) -> list[Anomaly]: + if df.empty or len(df) < 2: + return [] + dt = df["ts"].diff() + ddist = df["distance_m"].diff().abs() + mask = (ddist > self.spike_m) & (dt < self.max_dt_s) + anomalies: list[Anomaly] = [] + for idx in df.index[mask.fillna(False)]: + anomalies.append( + self._make( + ts=float(df.at[idx, "ts"]), + value=float(ddist.iloc[idx]), + context={ + "delta_m": float(ddist.iloc[idx]), + "dt_s": float(dt.iloc[idx]), + "spike_m": self.spike_m, + }, + ) + ) + return anomalies diff --git a/src/cosma_log_analyzer/rules/usbl_snr_low.py b/src/cosma_log_analyzer/rules/usbl_snr_low.py new file mode 100644 index 0000000..26fac42 --- /dev/null +++ b/src/cosma_log_analyzer/rules/usbl_snr_low.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import os + +import pandas as pd + +from ..ingest import TOPIC_USBL +from ..models import Anomaly +from .base import Rule + + +class UsblSnrLowRule(Rule): + """Fire when SNR stays below threshold for `consec` consecutive samples.""" + + name = "usbl_snr_low" + topic = TOPIC_USBL + severity = "warn" + + def __init__( + self, + subject: str = "AUV000", + min_snr_db: float | None = None, + consec: int = 3, + ) -> None: + super().__init__(subject) + if min_snr_db is None: + min_snr_db = float(os.environ.get("USBL_SNR_LOW", 5.0)) + self.min_snr_db = min_snr_db + self.consec = consec + + def detect(self, df: pd.DataFrame) -> list[Anomaly]: + if df.empty or len(df) < self.consec: + return [] + low = df["snr_db"] < self.min_snr_db + run = low.astype(int).groupby((~low).cumsum()).cumsum() + anomalies: list[Anomaly] = [] + fired_run = -1 + runs_id = (~low).cumsum() + for idx in df.index: + if run.iloc[idx] >= self.consec and runs_id.iloc[idx] != fired_run: + fired_run = int(runs_id.iloc[idx]) + anomalies.append( + self._make( + ts=float(df.at[idx, "ts"]), + value=float(df.at[idx, "snr_db"]), + context={ + "min_snr_db": self.min_snr_db, + "consec": self.consec, + }, + ) + ) + return anomalies diff --git a/src/cosma_log_analyzer/rules/watchdog_imu.py b/src/cosma_log_analyzer/rules/watchdog_imu.py new file mode 100644 index 0000000..610b633 --- /dev/null +++ b/src/cosma_log_analyzer/rules/watchdog_imu.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import os + +import pandas as pd + +from ..ingest import TOPIC_IMU +from ..models import Anomaly +from .base import Rule + + +class WatchdogImuRule(Rule): + """Fire when the gap between two IMU messages exceeds `max_gap_s`.""" + + name = "watchdog_imu" + topic = TOPIC_IMU + severity = "critical" + + def __init__(self, subject: str = "AUV000", max_gap_s: float | None = None) -> None: + super().__init__(subject) + if max_gap_s is None: + max_gap_s = float(os.environ.get("WATCHDOG_IMU_S", 2.0)) + self.max_gap_s = max_gap_s + + def detect(self, df: pd.DataFrame) -> list[Anomaly]: + if df.empty or len(df) < 2: + return [] + gaps = df["ts"].diff() + mask = gaps > self.max_gap_s + anomalies: list[Anomaly] = [] + for idx in df.index[mask.fillna(False)]: + anomalies.append( + self._make( + ts=float(df.at[idx, "ts"]), + value=float(gaps.iloc[idx]), + context={ + "gap_s": float(gaps.iloc[idx]), + "max_gap_s": self.max_gap_s, + "prev_ts": float(df.at[idx - 1, "ts"]), + }, + ) + ) + return anomalies