#!/usr/bin/env python3 """Check temporal alignment between MCAP AUV, USV PWM, and USBL data.""" import json, os, sys from datetime import datetime, timezone def fmt(ms): if ms == 0: return 'N/A' return datetime.fromtimestamp(ms/1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S') def load(path): with open(path) as f: return json.load(f) base = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output') sources = {} # MCAP signals mcap_path = os.path.join(base, 'mcap_signals.json') if os.path.exists(mcap_path): d = load(mcap_path) n = len(d.get('depth',[])) + len(d.get('pwm_auv',{}).get('samples',[])) + len(d.get('state',[])) sources['MCAP AUV'] = {'t_min': d['t_min_utc_ms'], 't_max': d['t_max_utc_ms'], 'n': n} else: print(f"MISSING: {mcap_path}") # USV PWM usv_path = os.path.join(base, 'usv_pwm.json') if os.path.exists(usv_path): d = load(usv_path) n = sum(len(v) for v in d.get('M',{}).values()) + sum(len(v) for v in d.get('RC',{}).values()) sources['USV PWM'] = {'t_min': d['t_min_utc_ms'], 't_max': d['t_max_utc_ms'], 'n': n} else: print(f"MISSING: {usv_path}") # USBL usbl_path = os.path.join(base, 'usbl.json') if os.path.exists(usbl_path): d = load(usbl_path) pts = d.get('points', []) if pts: t_vals = [p['t_ms'] for p in pts] sources['USBL'] = {'t_min': min(t_vals), 't_max': max(t_vals), 'n': len(pts)} else: sources['USBL'] = {'t_min': 0, 't_max': 0, 'n': 0} else: print(f"MISSING: {usbl_path}") print(f"\n{'Source':<12} | {'t_min UTC':<20} | {'t_max UTC':<20} | {'n_pts':>6}") print('-' * 68) for name, s in sources.items(): print(f"{name:<12} | {fmt(s['t_min']):<20} | {fmt(s['t_max']):<20} | {s['n']:>6}") # Overlap MCAP vs USV if 'MCAP AUV' in sources and 'USV PWM' in sources: mcap = sources['MCAP AUV'] usv = sources['USV PWM'] overlap_ms = min(mcap['t_max'], usv['t_max']) - max(mcap['t_min'], usv['t_min']) print(f"\nMCAP t_min: {fmt(mcap['t_min'])} UTC") print(f"USV t_min: {fmt(usv['t_min'])} UTC") diff_min = (mcap['t_min'] - usv['t_min']) / 60000 print(f"t_min diff: {diff_min:+.1f} min (MCAP vs USV)") if overlap_ms > 60000: print(f"OK - overlap: {overlap_ms//1000} s") elif overlap_ms < 0: print(f"WARNING: no overlap! gap = {-overlap_ms//1000} s") else: print(f"SUSPECT: overlap <60s: {overlap_ms//1000} s") if __name__ == '__main__': pass