import csv import gzip import json import math from datetime import datetime, timezone from pathlib import Path from typing import Any import numpy as np from .config import LTTB_MAX_PTS def _lttb(points: list[dict], max_pts: int) -> list[dict]: """Largest Triangle Three Buckets downsampling.""" n = len(points) if n <= max_pts: return points ts = np.array([p["t"] for p in points], dtype=float) vs = np.array([p["v"] if isinstance(p["v"], (int, float)) else 0.0 for p in points], dtype=float) bucket_size = (n - 2) / (max_pts - 2) out = [points[0]] a = 0 for i in range(max_pts - 2): avg_start = int((i + 1) * bucket_size) + 1 avg_end = min(int((i + 2) * bucket_size) + 1, n) avg_t = np.mean(ts[avg_start:avg_end]) avg_v = np.mean(vs[avg_start:avg_end]) rng_start = int(i * bucket_size) + 1 rng_end = min(int((i + 1) * bucket_size) + 1, n) max_area = -1.0 best = rng_start at, av = ts[a], vs[a] for j in range(rng_start, rng_end): area = abs((at - avg_t) * (vs[j] - av) - (ts[j] - at) * (avg_v - av)) * 0.5 if area > max_area: max_area = area best = j out.append(points[best]) a = best out.append(points[-1]) return out def _ts_to_epoch(ts_str: str) -> float: """Parse 'YYYY-MM-DD HH:MM:SS.ffffff' → epoch seconds.""" for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"): try: dt = datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc) return dt.timestamp() except ValueError: continue return 0.0 def _read_nav_log(nav_log_path: Path) -> dict[str, list[dict]]: """Read long-format navigation_log.csv → dict of signal arrays.""" signals: dict[str, list] = {} wanted = {"Yaw", "Heading", "Roll", "Pitch", "BattVoltage", "gps_fix", "Armed", "Mode", "M1", "M2"} with open(nav_log_path, newline="", encoding="utf-8") as f: reader = csv.reader(f) next(reader, None) # skip header for row in reader: if len(row) < 3: continue ts_str, field, val = row[0], row[1], row[2] if field not in wanted: continue t = _ts_to_epoch(ts_str) try: v: Any = float(val) except ValueError: v = val.strip() signals.setdefault(field, []).append({"t": t, "v": v}) return signals def _read_usbl_csv(usbl_csv_path: Path) -> dict[str, list[dict]]: """Read combined_usbl.csv → Dist and Azimuth signal arrays.""" dist_pts, az_pts = [], [] with open(usbl_csv_path, newline="", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: try: t = _ts_to_epoch(row["Timestamp"]) d = float(row["Dist"]) if row["Dist"] else None az = float(row["Azimuth"]) if row["Azimuth"] else None except (KeyError, ValueError): continue if d is not None and not math.isnan(d): dist_pts.append({"t": t, "v": d}) if az is not None and not math.isnan(az): az_pts.append({"t": t, "v": az}) return {"usbl_dist": dist_pts, "usbl_angle": az_pts} def _read_mcap_signals(mcap_json_path: Path) -> dict[str, list[dict]]: """Read mcap_signals.json → depth, motors M1-M6, state signals.""" with open(mcap_json_path) as f: data = json.load(f) signals: dict[str, list[dict]] = {} def _unpack(key: str, series: list): pts = [] for item in series: t = item.get("t_ms", item.get("t", 0)) if isinstance(t, (int, float)) and t > 1e9: t = t / 1000.0 # ms → s pts.append({"t": float(t), "v": item.get("v", item.get("value", 0))}) signals[key] = pts if "depth" in data: _unpack("depth", data["depth"]) if "pwm_auv" in data: for m_key, series in data["pwm_auv"].items(): _unpack(m_key.lower(), series) if "state" in data: _unpack("arm_status", data["state"]) # New signals added by extended extract_mcap_signals.py for key in ("pitch", "roll", "yaw", "altitude", "battery_v", "obstacle_dist"): if key in data: _unpack(key, data[key]) return signals def write_usv_json( nav_log_path: Path, usbl_csv_path: Path | None, output_path: Path, sortie_id: str, date: str, ) -> None: signals = _read_nav_log(nav_log_path) if usbl_csv_path and usbl_csv_path.exists(): signals.update(_read_usbl_csv(usbl_csv_path)) t_all = [p["t"] for pts in signals.values() for p in pts if pts] meta = { "sortie": sortie_id, "date": date, "vehicle": "USV", "t_start": min(t_all) if t_all else 0, "t_end": max(t_all) if t_all else 0, } downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()} payload = json.dumps({"meta": meta, "signals": downsampled}).encode() output_path.parent.mkdir(parents=True, exist_ok=True) with gzip.open(output_path, "wb") as f: f.write(payload) def write_auv_json( mcap_json_path: Path, output_path: Path, auv_id: str, sortie_id: str, date: str, ) -> None: signals = _read_mcap_signals(mcap_json_path) t_all = [p["t"] for pts in signals.values() for p in pts if pts] meta = { "sortie": sortie_id, "date": date, "vehicle": auv_id, "t_start": min(t_all) if t_all else 0, "t_end": max(t_all) if t_all else 0, } downsampled = {k: _lttb(v, LTTB_MAX_PTS) for k, v in signals.items()} payload = json.dumps({"meta": meta, "signals": downsampled}).encode() output_path.parent.mkdir(parents=True, exist_ok=True) with gzip.open(output_path, "wb") as f: f.write(payload)