feat: scaffold cosma-log-analyzer with 5 deterministic rules + fake MCAP e2e test
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
127
tests/fixtures/generate_fake_mcap.py
vendored
Normal file
127
tests/fixtures/generate_fake_mcap.py
vendored
Normal file
@@ -0,0 +1,127 @@
|
||||
"""Generate a synthetic MCAP file for AUV log analyzer testing.
|
||||
|
||||
Scenarios baked in (relative to t0):
|
||||
- IMU 50 Hz over 60 s.
|
||||
- IMU outlier at t=30 s (accel magnitude spike, ~6x baseline).
|
||||
- IMU gap: no messages between t=45 s and t=48 s (watchdog fires).
|
||||
- USBL 2 Hz over 60 s, nominal SNR=12 dB; 5 s window at t=20 s with SNR=2.
|
||||
- USBL distance jump of 100 m at t=40 s.
|
||||
- Battery 1 Hz, linearly from 16.0 V to 12.0 V over 60 s.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import math
|
||||
import random
|
||||
from pathlib import Path
|
||||
|
||||
from mcap.writer import Writer
|
||||
|
||||
IMU_TOPIC = "/mavros/imu/data"
|
||||
USBL_TOPIC = "/usbl_reading/usbl_solution"
|
||||
BATTERY_TOPIC = "/mavros/battery"
|
||||
|
||||
T0_NS = 1_700_000_000_000_000_000 # 2023-11-14 22:13:20 UTC, stable ref
|
||||
|
||||
|
||||
def _ns(seconds_from_t0: float) -> int:
|
||||
return T0_NS + int(seconds_from_t0 * 1e9)
|
||||
|
||||
|
||||
def _register(writer: Writer, topic: str) -> int:
|
||||
schema_id = writer.register_schema(
|
||||
name=f"{topic.strip('/').replace('/', '_')}_json",
|
||||
encoding="jsonschema",
|
||||
data=b"{}",
|
||||
)
|
||||
return writer.register_channel(
|
||||
topic=topic, message_encoding="json", schema_id=schema_id
|
||||
)
|
||||
|
||||
|
||||
def _write_message(writer: Writer, channel_id: int, t_s: float, payload: dict, seq: int) -> None:
|
||||
data = json.dumps(payload).encode("utf-8")
|
||||
ns = _ns(t_s)
|
||||
writer.add_message(
|
||||
channel_id=channel_id,
|
||||
log_time=ns,
|
||||
data=data,
|
||||
publish_time=ns,
|
||||
sequence=seq,
|
||||
)
|
||||
|
||||
|
||||
def generate(path: Path, seed: int = 42) -> Path:
|
||||
"""Write a synthetic MCAP to `path`. Returns `path`."""
|
||||
rng = random.Random(seed)
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(path, "wb") as fh:
|
||||
w = Writer(fh)
|
||||
w.start(profile="", library="cosma-fake")
|
||||
imu_ch = _register(w, IMU_TOPIC)
|
||||
usbl_ch = _register(w, USBL_TOPIC)
|
||||
bat_ch = _register(w, BATTERY_TOPIC)
|
||||
|
||||
seq = 0
|
||||
# --- IMU 50Hz, with outlier at t=30s and gap 45-48s
|
||||
dt = 0.02 # 50 Hz
|
||||
t = 0.0
|
||||
while t <= 60.0:
|
||||
if 45.0 < t < 48.0: # watchdog gap (skip publishing)
|
||||
t += dt
|
||||
continue
|
||||
if abs(t - 30.0) < 1e-6: # outlier single sample
|
||||
ax, ay, az = 30.0, 30.0, 30.0
|
||||
else:
|
||||
ax = rng.gauss(0.0, 0.05)
|
||||
ay = rng.gauss(0.0, 0.05)
|
||||
az = rng.gauss(9.81, 0.05)
|
||||
payload = {
|
||||
"linear_acceleration": {"x": ax, "y": ay, "z": az},
|
||||
"angular_velocity": {
|
||||
"x": rng.gauss(0.0, 0.001),
|
||||
"y": rng.gauss(0.0, 0.001),
|
||||
"z": rng.gauss(0.0, 0.001),
|
||||
},
|
||||
}
|
||||
_write_message(w, imu_ch, t, payload, seq)
|
||||
seq += 1
|
||||
t += dt
|
||||
|
||||
# --- USBL 2Hz, SNR low 20-25s, distance spike at t=40s
|
||||
t = 0.0
|
||||
last_dist = 100.0
|
||||
while t <= 60.0:
|
||||
if 20.0 <= t < 25.0:
|
||||
snr = 2.0
|
||||
else:
|
||||
snr = 12.0 + rng.gauss(0.0, 0.2)
|
||||
if abs(t - 40.0) < 0.01:
|
||||
dist = last_dist + 100.0 # 100m jump
|
||||
else:
|
||||
dist = last_dist + rng.gauss(0.0, 0.5)
|
||||
last_dist = dist
|
||||
payload = {"distance_m": dist, "snr_db": snr}
|
||||
_write_message(w, usbl_ch, t, payload, seq)
|
||||
seq += 1
|
||||
t += 0.5
|
||||
|
||||
# --- Battery 1Hz, 16.0V -> 12.0V linear over 60s
|
||||
for i in range(61):
|
||||
t = float(i)
|
||||
v = 16.0 - (4.0 * t / 60.0)
|
||||
payload = {"voltage_v": v}
|
||||
_write_message(w, bat_ch, t, payload, seq)
|
||||
seq += 1
|
||||
|
||||
w.finish()
|
||||
return path
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
import sys
|
||||
|
||||
out = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("fake.mcap")
|
||||
generate(out)
|
||||
print(f"Wrote {out}")
|
||||
Reference in New Issue
Block a user