Files
cosma-nav-tools/pipeline_runner/processor.py

175 lines
5.9 KiB
Python

import csv
import gzip
import json
import math
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import numpy as np
from .config import LTTB_MAX_PTS
def _lttb(points: list[dict], max_pts: int) -> list[dict]:
"""Largest Triangle Three Buckets downsampling."""
n = len(points)
if n <= max_pts:
return points
ts = np.array([p["t"] for p in points], dtype=float)
vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float)
bucket_size = (n - 2) / (max_pts - 2)
out = [points[0]]
a = 0
for i in range(max_pts - 2):
avg_start = int((i + 1) * bucket_size) + 1
avg_end = min(int((i + 2) * bucket_size) + 1, n)
avg_t = np.mean(ts[avg_start:avg_end])
avg_v = np.mean(vs[avg_start:avg_end])
rng_start = int(i * bucket_size) + 1
rng_end = min(int((i + 1) * bucket_size) + 1, n)
max_area = -1.0
best = rng_start
at, av = ts[a], vs[a]
for j in range(rng_start, rng_end):
area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5
if area > max_area:
max_area = area
best = j
out.append(points[best])
a = best
out.append(points[-1])
return out
def _ts_to_epoch(ts_str: str) -> float:
"""Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds."""
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
try:
dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc)
return dt.timestamp()
except ValueError:
continue
return 0.0
def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]:
"""Read long-format navigation_log.csv → dict of signal arrays."""
signals: dict[str, list] = {}
wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix",
"Armed", "Mode", "M1", "M2"}
with open(nav_log_path, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
next(reader, None) # skip header
for row in reader:
if len(row) < 3:
continue
ts_str, field, val = row[0], row[1], row[2]
if field not in wanted:
continue
t = _ts_to_epoch(ts_str)
try:
v: Any = float(val)
except ValueError:
v = val.strip()
signals.setdefault(field, []).append({"t": t, "v": v})
return signals
def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]:
"""Read combined_usbl.csv → Dist and Azimuth signal arrays."""
dist_pts, az_pts = [], []
with open(usbl_csv_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
try:
t = _ts_to_epoch(row["Timestamp"])
d = float(row["Dist"]) if row["Dist"] else None
az = float(row["Azimuth"]) if row["Azimuth"] else None
except (KeyError, ValueError):
continue
if d is not None and not math.isnan(d):
dist_pts.append({"t": t, "v": d})
if az is not None and not math.isnan(az):
az_pts.append({"t": t, "v": az})
return {"usbl_dist": dist_pts, "usbl_angle": az_pts}
def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]:
"""Read mcap_signals.json → depth, motors M1-M6, state signals."""
with open(mcap_json_path) as f:
data = json.load(f)
signals: dict[str, list[dict]] = {}
def _unpack(key: str, series: list):
pts = []
for item in series:
t = item.get("t_ms", item.get("t", 0))
if isinstance(t, (int, float)) and t > 1e9:
t = t / 1000.0 # ms → s
pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))})
signals[key] = pts
if "depth" in data:
_unpack("depth", data["depth"])
if "pwm_auv" in data:
for m_key, series in data["pwm_auv"].items():
_unpack(m_key.lower(), series)
if "state" in data:
_unpack("arm_status", data["state"])
# New signals added by extended extract_mcap_signals.py
for key in ("pitch", "roll", "yaw", "altitude", "battery_v", "obstacle_dist"):
if key in data:
_unpack(key, data[key])
return signals
def write_usv_json(
nav_log_path: Path,
usbl_csv_path: Path | None,
output_path: Path,
sortie_id: str,
date: str,
) -> None:
signals = _read_nav_log(nav_log_path)
if usbl_csv_path and usbl_csv_path.exists():
signals.update(_read_usbl_csv(usbl_csv_path))
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
meta = {
"sortie": sortie_id,
"date": date,
"vehicle": "USV",
"t_start": min(t_all) if t_all else 0,
"t_end": max(t_all) if t_all else 0,
}
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
output_path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(output_path, "wb") as f:
f.write(payload)
def write_auv_json(
mcap_json_path: Path,
output_path: Path,
auv_id: str,
sortie_id: str,
date: str,
) -> None:
signals = _read_mcap_signals(mcap_json_path)
t_all = [p["t"] for pts in signals.values() for p in pts if pts]
meta = {
"sortie": sortie_id,
"date": date,
"vehicle": auv_id,
"t_start": min(t_all) if t_all else 0,
"t_end": max(t_all) if t_all else 0,
}
downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()}
payload = json.dumps({"meta": meta, "signals": downsampled}).encode()
output_path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(output_path, "wb") as f:
f.write(payload)