diff --git a/pipeline_runner/processor.py b/pipeline_runner/processor.py new file mode 100644 index 0000000..6f31bf9 --- /dev/null +++ b/pipeline_runner/processor.py @@ -0,0 +1,174 @@ +import csv +import gzip +import json +import math +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +import numpy as np + +from .config import LTTB_MAX_PTS + + +def _lttb(points: list[dict], max_pts: int) -> list[dict]: + """Largest Triangle Three Buckets downsampling.""" + n = len(points) + if n <= max_pts: + return points + ts = np.array([p["t"] for p in points], dtype=float) + vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float) + bucket_size = (n - 2) / (max_pts - 2) + out = [points[0]] + a = 0 + for i in range(max_pts - 2): + avg_start = int((i + 1) * bucket_size) + 1 + avg_end = min(int((i + 2) * bucket_size) + 1, n) + avg_t = np.mean(ts[avg_start:avg_end]) + avg_v = np.mean(vs[avg_start:avg_end]) + rng_start = int(i * bucket_size) + 1 + rng_end = min(int((i + 1) * bucket_size) + 1, n) + max_area = -1.0 + best = rng_start + at, av = ts[a], vs[a] + for j in range(rng_start, rng_end): + area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5 + if area > max_area: + max_area = area + best = j + out.append(points[best]) + a = best + out.append(points[-1]) + return out + + +def _ts_to_epoch(ts_str: str) -> float: + """Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds.""" + for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"): + try: + dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc) + return dt.timestamp() + except ValueError: + continue + return 0.0 + + +def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]: + """Read long-format navigation_log.csv → dict of signal arrays.""" + signals: dict[str, list] = {} + wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix", + "Armed", "Mode", "M1", "M2"} + with open(nav_log_path, newline="", encoding="utf-8") as f: + reader = csv.reader(f) + next(reader, None) # skip header + for row in reader: + if len(row) < 3: + continue + ts_str, field, val = row[0], row[1], row[2] + if field not in wanted: + continue + t = _ts_to_epoch(ts_str) + try: + v: Any = float(val) + except ValueError: + v = val.strip() + signals.setdefault(field, []).append({"t": t, "v": v}) + return signals + + +def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]: + """Read combined_usbl.csv → Dist and Azimuth signal arrays.""" + dist_pts, az_pts = [], [] + with open(usbl_csv_path, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + try: + t = _ts_to_epoch(row["Timestamp"]) + d = float(row["Dist"]) if row["Dist"] else None + az = float(row["Azimuth"]) if row["Azimuth"] else None + except (KeyError, ValueError): + continue + if d is not None and not math.isnan(d): + dist_pts.append({"t": t, "v": d}) + if az is not None and not math.isnan(az): + az_pts.append({"t": t, "v": az}) + return {"usbl_dist": dist_pts, "usbl_angle": az_pts} + + +def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]: + """Read mcap_signals.json → depth, motors M1-M6, state signals.""" + with open(mcap_json_path) as f: + data = json.load(f) + signals: dict[str, list[dict]] = {} + + def _unpack(key: str, series: list): + pts = [] + for item in series: + t = item.get("t_ms", item.get("t", 0)) + if isinstance(t, (int, float)) and t > 1e9: + t = t / 1000.0 # ms → s + pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))}) + signals[key] = pts + + if "depth" in data: + _unpack("depth", data["depth"]) + if "pwm_auv" in data: + for m_key, series in data["pwm_auv"].items(): + _unpack(m_key.lower(), series) + if "state" in data: + _unpack("arm_status", data["state"]) + # New signals added by extended extract_mcap_signals.py + for key in ("pitch", "roll", "yaw", "altitude", "battery_v", "obstacle_dist"): + if key in data: + _unpack(key, data[key]) + return signals + + +def write_usv_json( + nav_log_path: Path, + usbl_csv_path: Path | None, + output_path: Path, + sortie_id: str, + date: str, +) -> None: + signals = _read_nav_log(nav_log_path) + if usbl_csv_path and usbl_csv_path.exists(): + signals.update(_read_usbl_csv(usbl_csv_path)) + + t_all = [p["t"] for pts in signals.values() for p in pts if pts] + meta = { + "sortie": sortie_id, + "date": date, + "vehicle": "USV", + "t_start": min(t_all) if t_all else 0, + "t_end": max(t_all) if t_all else 0, + } + + downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()} + payload = json.dumps({"meta": meta, "signals": downsampled}).encode() + output_path.parent.mkdir(parents=True, exist_ok=True) + with gzip.open(output_path, "wb") as f: + f.write(payload) + + +def write_auv_json( + mcap_json_path: Path, + output_path: Path, + auv_id: str, + sortie_id: str, + date: str, +) -> None: + signals = _read_mcap_signals(mcap_json_path) + t_all = [p["t"] for pts in signals.values() for p in pts if pts] + meta = { + "sortie": sortie_id, + "date": date, + "vehicle": auv_id, + "t_start": min(t_all) if t_all else 0, + "t_end": max(t_all) if t_all else 0, + } + downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()} + payload = json.dumps({"meta": meta, "signals": downsampled}).encode() + output_path.parent.mkdir(parents=True, exist_ok=True) + with gzip.open(output_path, "wb") as f: + f.write(payload)