Files
cosma-nav-tools/tools/check_sync.py

72 lines
2.4 KiB
Python

#!/usr/bin/env python3
"""Check temporal alignment between MCAP AUV, USV PWM, and USBL data."""
import json, os, sys
from datetime import datetime, timezone
def fmt(ms):
if ms == 0: return 'N/A'
return datetime.fromtimestamp(ms/1000, tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S')
def load(path):
with open(path) as f:
return json.load(f)
base = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
sources = {}
# MCAP signals
mcap_path = os.path.join(base, 'mcap_signals.json')
if os.path.exists(mcap_path):
d = load(mcap_path)
n = len(d.get('depth',[])) + len(d.get('pwm_auv',{}).get('samples',[])) + len(d.get('state',[]))
sources['MCAP AUV'] = {'t_min': d['t_min_utc_ms'], 't_max': d['t_max_utc_ms'], 'n': n}
else:
print(f"MISSING: {mcap_path}")
# USV PWM
usv_path = os.path.join(base, 'usv_pwm.json')
if os.path.exists(usv_path):
d = load(usv_path)
n = sum(len(v) for v in d.get('M',{}).values()) + sum(len(v) for v in d.get('RC',{}).values())
sources['USV PWM'] = {'t_min': d['t_min_utc_ms'], 't_max': d['t_max_utc_ms'], 'n': n}
else:
print(f"MISSING: {usv_path}")
# USBL
usbl_path = os.path.join(base, 'usbl.json')
if os.path.exists(usbl_path):
d = load(usbl_path)
pts = d.get('points', [])
if pts:
t_vals = [p['t_ms'] for p in pts]
sources['USBL'] = {'t_min': min(t_vals), 't_max': max(t_vals), 'n': len(pts)}
else:
sources['USBL'] = {'t_min': 0, 't_max': 0, 'n': 0}
else:
print(f"MISSING: {usbl_path}")
print(f"\n{'Source':<12} | {'t_min UTC':<20} | {'t_max UTC':<20} | {'n_pts':>6}")
print('-' * 68)
for name, s in sources.items():
print(f"{name:<12} | {fmt(s['t_min']):<20} | {fmt(s['t_max']):<20} | {s['n']:>6}")
# Overlap MCAP vs USV
if 'MCAP AUV' in sources and 'USV PWM' in sources:
mcap = sources['MCAP AUV']
usv = sources['USV PWM']
overlap_ms = min(mcap['t_max'], usv['t_max']) - max(mcap['t_min'], usv['t_min'])
print(f"\nMCAP t_min: {fmt(mcap['t_min'])} UTC")
print(f"USV t_min: {fmt(usv['t_min'])} UTC")
diff_min = (mcap['t_min'] - usv['t_min']) / 60000
print(f"t_min diff: {diff_min:+.1f} min (MCAP vs USV)")
if overlap_ms > 60000:
print(f"OK - overlap: {overlap_ms//1000} s")
elif overlap_ms < 0:
print(f"WARNING: no overlap! gap = {-overlap_ms//1000} s")
else:
print(f"SUSPECT: overlap <60s: {overlap_ms//1000} s")
if __name__ == '__main__':
pass