feat: add 5 deterministic rules (IMU outliers/watchdog, USBL SNR/spike, battery_low)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
47
src/cosma_log_analyzer/rules/base.py
Normal file
47
src/cosma_log_analyzer/rules/base.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from ..models import Anomaly
|
||||
|
||||
|
||||
class Rule(ABC):
|
||||
"""Base class for detection rules.
|
||||
|
||||
Subclasses declare class attributes `name`, `topic`, `severity`, then
|
||||
implement `detect(df)` which receives the topic-specific DataFrame and
|
||||
returns a list of Anomaly instances.
|
||||
"""
|
||||
|
||||
name: str = "rule"
|
||||
topic: str = ""
|
||||
severity: str = "warn"
|
||||
|
||||
def __init__(self, subject: str = "AUV000") -> None:
|
||||
self.subject = subject
|
||||
|
||||
def bind(self, subject: str) -> "Rule":
|
||||
self.subject = subject
|
||||
return self
|
||||
|
||||
@abstractmethod
|
||||
def detect(self, df: pd.DataFrame) -> list[Anomaly]:
|
||||
...
|
||||
|
||||
def _make(
|
||||
self,
|
||||
ts: float,
|
||||
value: float | None,
|
||||
context: dict,
|
||||
) -> Anomaly:
|
||||
return Anomaly(
|
||||
rule=self.name,
|
||||
severity=self.severity,
|
||||
timestamp=float(ts),
|
||||
subject=self.subject,
|
||||
topic=self.topic,
|
||||
value=value,
|
||||
context=context,
|
||||
)
|
||||
Reference in New Issue
Block a user