"""Generate a synthetic MCAP file for AUV log analyzer testing. Scenarios baked in (relative to t0): - IMU 50 Hz over 60 s. - IMU outlier at t=30 s (accel magnitude spike, ~6x baseline). - IMU gap: no messages between t=45 s and t=48 s (watchdog fires). - USBL 2 Hz over 60 s, nominal SNR=12 dB; 5 s window at t=20 s with SNR=2. - USBL distance jump of 100 m at t=40 s. - Battery 1 Hz, linearly from 16.0 V to 12.0 V over 60 s. """ from __future__ import annotations import json import math import random from pathlib import Path from mcap.writer import Writer IMU_TOPIC = "/mavros/imu/data" USBL_TOPIC = "/usbl_reading/usbl_solution" BATTERY_TOPIC = "/mavros/battery" T0_NS = 1_700_000_000_000_000_000 # 2023-11-14 22:13:20 UTC, stable ref def _ns(seconds_from_t0: float) -> int: return T0_NS + int(seconds_from_t0 * 1e9) def _register(writer: Writer, topic: str) -> int: schema_id = writer.register_schema( name=f"{topic.strip('/').replace('/', '_')}_json", encoding="jsonschema", data=b"{}", ) return writer.register_channel( topic=topic, message_encoding="json", schema_id=schema_id ) def _write_message(writer: Writer, channel_id: int, t_s: float, payload: dict, seq: int) -> None: data = json.dumps(payload).encode("utf-8") ns = _ns(t_s) writer.add_message( channel_id=channel_id, log_time=ns, data=data, publish_time=ns, sequence=seq, ) def generate(path: Path, seed: int = 42) -> Path: """Write a synthetic MCAP to `path`. Returns `path`.""" rng = random.Random(seed) path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "wb") as fh: w = Writer(fh) w.start(profile="", library="cosma-fake") imu_ch = _register(w, IMU_TOPIC) usbl_ch = _register(w, USBL_TOPIC) bat_ch = _register(w, BATTERY_TOPIC) seq = 0 # --- IMU 50Hz, with outlier at t=30s and gap 45-48s dt = 0.02 # 50 Hz t = 0.0 while t <= 60.0: if 45.0 < t < 48.0: # watchdog gap (skip publishing) t += dt continue if abs(t - 30.0) < 1e-6: # outlier single sample ax, ay, az = 30.0, 30.0, 30.0 else: ax = rng.gauss(0.0, 0.05) ay = rng.gauss(0.0, 0.05) az = rng.gauss(9.81, 0.05) payload = { "linear_acceleration": {"x": ax, "y": ay, "z": az}, "angular_velocity": { "x": rng.gauss(0.0, 0.001), "y": rng.gauss(0.0, 0.001), "z": rng.gauss(0.0, 0.001), }, } _write_message(w, imu_ch, t, payload, seq) seq += 1 t += dt # --- USBL 2Hz, SNR low 20-25s, distance spike at t=40s t = 0.0 last_dist = 100.0 while t <= 60.0: if 20.0 <= t < 25.0: snr = 2.0 else: snr = 12.0 + rng.gauss(0.0, 0.2) if abs(t - 40.0) < 0.01: dist = last_dist + 100.0 # 100m jump else: dist = last_dist + rng.gauss(0.0, 0.5) last_dist = dist payload = {"distance_m": dist, "snr_db": snr} _write_message(w, usbl_ch, t, payload, seq) seq += 1 t += 0.5 # --- Battery 1Hz, 16.0V -> 12.0V linear over 60s for i in range(61): t = float(i) v = 16.0 - (4.0 * t / 60.0) payload = {"voltage_v": v} _write_message(w, bat_ch, t, payload, seq) seq += 1 w.finish() return path if __name__ == "__main__": # pragma: no cover import sys out = Path(sys.argv[1]) if len(sys.argv) > 1 else Path("fake.mcap") generate(out) print(f"Wrote {out}")