feat: Kogger USBL decoder + nav merge
- tools/parse_kogger_usbl.py: decode SBP protocol (ID=0x65 USBL_SOLUTION) from raw *_usbl.csv files, output combined_usbl.csv with Dist/Az/Elev/SNR - tools/merge_nav_usbl.py: merge USBL data with navigation_log.csv, interpolate USV lat/lon/heading, compute AUV absolute position (azimuth relative to USV heading convention) - vendor/Kogger-Protocol: SBP spec reference (submodule) - 69-sttropez: 13986 USBL records decoded, avg USV-AUV dist 39m
This commit is contained in:
319
tools/merge_nav_usbl.py
Normal file
319
tools/merge_nav_usbl.py
Normal file
@@ -0,0 +1,319 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
merge_nav_usbl.py — Merge USBL decoded data with USV navigation log
|
||||
|
||||
Inputs:
|
||||
--usbl : combined_usbl.csv (output of parse_kogger_usbl.py)
|
||||
--nav-dir: directory containing *_navigation_log.csv files
|
||||
--output : output CSV (default: combined_nav_usbl.csv)
|
||||
|
||||
Nav log format: timestamp,data,value (long format)
|
||||
data=Lat → latitude_deg
|
||||
data=Lon → longitude_deg
|
||||
data=Heading → heading_deg
|
||||
|
||||
Interpolation: for each USBL timestamp, find nearest nav point within 1s window.
|
||||
If multiple sessions, use session matching by timestamp overlap.
|
||||
|
||||
AUV position calculation:
|
||||
azimuth_deg from USBL is RELATIVE to USV heading (yaw) based on USBL hardware mounting.
|
||||
AUV bearing from North = (USV heading + azimuth_deg) mod 360
|
||||
Horizontal dist = dist_m * cos(elevation_deg * pi/180)
|
||||
|
||||
Note: if azimuth is already absolute (referenced to North), do NOT add heading.
|
||||
Check: usbl_yaw in payload should match nav Heading if relative azimuth.
|
||||
We use RELATIVE convention (add USV heading) — documented here.
|
||||
|
||||
Geodetic forward (haversine):
|
||||
Using flat-earth approximation valid for distances < 500m:
|
||||
dlat = horiz_dist * cos(bearing) / R_earth
|
||||
dlon = horiz_dist * sin(bearing) / (R_earth * cos(lat))
|
||||
|
||||
R_earth = 6371000 m
|
||||
"""
|
||||
|
||||
import csv
|
||||
import io
|
||||
import sys
|
||||
import math
|
||||
import os
|
||||
|
||||
R_EARTH = 6371000.0 # meters
|
||||
|
||||
|
||||
def parse_nav_log(nav_file):
|
||||
"""
|
||||
Parse navigation_log.csv into:
|
||||
- nav_points: sorted list of (timestamp_str, lat, lon) from Lat+Lon entries
|
||||
- heading_series: sorted list of (timestamp_str, heading_deg) from Heading entries
|
||||
|
||||
Lat/Lon and Heading have different timestamps so must be interpolated separately.
|
||||
Returns (nav_points, heading_series).
|
||||
"""
|
||||
lat_by_ts = {}
|
||||
lon_by_ts = {}
|
||||
heading_series_raw = []
|
||||
|
||||
with open(nav_file) as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
data = row.get('data', '')
|
||||
ts = row.get('timestamp', '')
|
||||
try:
|
||||
val_raw = row.get('value', '') or ''
|
||||
if not val_raw:
|
||||
continue
|
||||
val = float(val_raw)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
if data == 'Lat':
|
||||
lat_by_ts[ts] = val
|
||||
elif data == 'Lon':
|
||||
lon_by_ts[ts] = val
|
||||
elif data == 'Heading':
|
||||
heading_series_raw.append((ts, val))
|
||||
|
||||
# Build nav_points: timestamps where both Lat and Lon appear (same ts)
|
||||
nav_points = []
|
||||
for ts in sorted(set(lat_by_ts.keys()) & set(lon_by_ts.keys())):
|
||||
nav_points.append((ts, lat_by_ts[ts], lon_by_ts[ts]))
|
||||
|
||||
# heading_series sorted by timestamp
|
||||
heading_series = sorted(heading_series_raw, key=lambda x: x[0])
|
||||
|
||||
return nav_points, heading_series
|
||||
|
||||
|
||||
def ts_to_seconds(ts_str):
|
||||
"""Convert '2026-03-24 09:29:05.230392' to float seconds since epoch (approx)."""
|
||||
# Simple: parse date+time, compute offset
|
||||
try:
|
||||
date_part, time_part = ts_str.strip().split(' ', 1)
|
||||
y, mo, d = date_part.split('-')
|
||||
parts = time_part.split(':')
|
||||
h, m = int(parts[0]), int(parts[1])
|
||||
s_str = parts[2]
|
||||
s = float(s_str)
|
||||
# Days since fixed epoch (don't need absolute, just relative diffs)
|
||||
total = (int(y)*365 + int(mo)*30 + int(d)) * 86400 + h*3600 + m*60 + s
|
||||
return total
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
|
||||
def find_nearest(ts_sec, series_sec, series, max_gap=1.0):
|
||||
"""Find nearest entry in sorted series within max_gap seconds."""
|
||||
best_idx = -1
|
||||
best_dt = float('inf')
|
||||
for i, (s_sec, entry) in enumerate(zip(series_sec, series)):
|
||||
dt = abs(s_sec - ts_sec)
|
||||
if dt < best_dt:
|
||||
best_dt = dt
|
||||
best_idx = i
|
||||
elif s_sec > ts_sec + max_gap:
|
||||
break
|
||||
if best_idx >= 0 and best_dt <= max_gap:
|
||||
return series[best_idx], best_dt
|
||||
return None, None
|
||||
|
||||
|
||||
def geodetic_forward(lat_deg, lon_deg, bearing_deg, dist_m):
|
||||
"""
|
||||
Compute destination point given start lat/lon, bearing (deg from North), distance (m).
|
||||
Flat-earth approximation valid for dist < 500m.
|
||||
"""
|
||||
bearing_rad = math.radians(bearing_deg)
|
||||
lat_rad = math.radians(lat_deg)
|
||||
dlat = dist_m * math.cos(bearing_rad) / R_EARTH
|
||||
dlon = dist_m * math.sin(bearing_rad) / (R_EARTH * math.cos(lat_rad))
|
||||
auv_lat = lat_deg + math.degrees(dlat)
|
||||
auv_lon = lon_deg + math.degrees(dlon)
|
||||
return auv_lat, auv_lon
|
||||
|
||||
|
||||
def haversine_dist(lat1, lon1, lat2, lon2):
|
||||
"""Distance in meters between two lat/lon points."""
|
||||
phi1, phi2 = math.radians(lat1), math.radians(lat2)
|
||||
dphi = math.radians(lat2 - lat1)
|
||||
dlam = math.radians(lon2 - lon1)
|
||||
a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlam/2)**2
|
||||
return 2 * R_EARTH * math.asin(math.sqrt(a))
|
||||
|
||||
|
||||
def find_nav_file_for_session(usbl_file, nav_dir):
|
||||
"""Match nav file by common timestamp prefix."""
|
||||
base = os.path.basename(usbl_file)
|
||||
# e.g. 2026-03-24_09-28-44_USV003_usbl.csv -> 2026-03-24_09-28-44_USV003
|
||||
prefix = base.replace('_usbl.csv', '')
|
||||
nav_candidate = os.path.join(nav_dir, prefix + '_navigation_log.csv')
|
||||
if os.path.exists(nav_candidate):
|
||||
return nav_candidate
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--usbl', required=True, help='combined_usbl.csv')
|
||||
parser.add_argument('--nav-dir', required=True, help='Directory with *_navigation_log.csv')
|
||||
parser.add_argument('--output', default='combined_nav_usbl.csv')
|
||||
parser.add_argument('--max-gap', type=float, default=1.0, help='Max timestamp gap in seconds')
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load USBL records
|
||||
usbl_records = []
|
||||
with open(args.usbl) as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
usbl_records.append(row)
|
||||
|
||||
print("USBL records loaded: %d" % len(usbl_records))
|
||||
|
||||
# Group by source_file
|
||||
from collections import defaultdict
|
||||
by_source = defaultdict(list)
|
||||
for rec in usbl_records:
|
||||
by_source[rec.get('source_file', '')].append(rec)
|
||||
|
||||
# Load nav files
|
||||
nav_data = {} # source_file -> (nav_points, nav_points_sec, heading_series, heading_sec)
|
||||
nav_dir = args.nav_dir
|
||||
|
||||
for source_file in by_source.keys():
|
||||
# Try to match nav file
|
||||
prefix = source_file.replace('_usbl.csv', '')
|
||||
nav_file = os.path.join(nav_dir, prefix + '_navigation_log.csv')
|
||||
if not os.path.exists(nav_file):
|
||||
# Try to find by scanning directory
|
||||
matched = None
|
||||
for fn in os.listdir(nav_dir):
|
||||
if prefix in fn and 'navigation_log' in fn:
|
||||
matched = os.path.join(nav_dir, fn)
|
||||
break
|
||||
if matched is None:
|
||||
print("WARNING: no nav file found for %s" % source_file)
|
||||
nav_data[source_file] = ([], [], [], [])
|
||||
continue
|
||||
nav_file = matched
|
||||
nav_points, heading_series = parse_nav_log(nav_file)
|
||||
nav_points_sec = [ts_to_seconds(pt[0]) for pt in nav_points]
|
||||
heading_sec = [ts_to_seconds(h[0]) for h in heading_series]
|
||||
nav_data[source_file] = (nav_points, nav_points_sec, heading_series, heading_sec)
|
||||
print("Nav loaded for %s: %d pos points, %d heading points" % (
|
||||
source_file, len(nav_points), len(heading_series)))
|
||||
|
||||
# Process and write output
|
||||
output_rows = []
|
||||
stats_match = 0
|
||||
stats_nomatch = 0
|
||||
|
||||
for source_file, records in by_source.items():
|
||||
nav_points, nav_points_sec, heading_series, heading_sec = nav_data.get(
|
||||
source_file, ([], [], [], []))
|
||||
|
||||
for rec in records:
|
||||
ts_str = rec.get('Timestamp', '')
|
||||
ts_sec = ts_to_seconds(ts_str)
|
||||
|
||||
dist_str = rec.get('Dist', '')
|
||||
azimuth_str = rec.get('Azimuth', '')
|
||||
elev_str = rec.get('Elev', '')
|
||||
snr_str = rec.get('SNR', '')
|
||||
|
||||
try:
|
||||
dist = float(dist_str) if dist_str else float('nan')
|
||||
azimuth = float(azimuth_str) if azimuth_str else float('nan')
|
||||
elev = float(elev_str) if elev_str else float('nan')
|
||||
snr = float(snr_str) if snr_str else float('nan')
|
||||
except ValueError:
|
||||
dist, azimuth, elev, snr = float('nan'), float('nan'), float('nan'), float('nan')
|
||||
|
||||
nav_pt, dt = find_nearest(ts_sec, nav_points_sec, nav_points, args.max_gap)
|
||||
hdg_pt, _ = find_nearest(ts_sec, heading_sec, heading_series, args.max_gap)
|
||||
|
||||
if nav_pt is None:
|
||||
stats_nomatch += 1
|
||||
lat_usv, lon_usv, heading_usv = float('nan'), float('nan'), float('nan')
|
||||
auv_lat, auv_lon = float('nan'), float('nan')
|
||||
else:
|
||||
stats_match += 1
|
||||
_, lat_usv, lon_usv = nav_pt
|
||||
heading_usv = hdg_pt[1] if hdg_pt is not None else float('nan')
|
||||
|
||||
# Calculate AUV absolute position
|
||||
# Azimuth from USBL is relative to USV heading (yaw convention)
|
||||
# AUV bearing from North = (USV Heading + azimuth_deg) mod 360
|
||||
if not (math.isnan(dist) or math.isnan(azimuth) or
|
||||
math.isnan(lat_usv) or math.isnan(heading_usv)):
|
||||
horiz_dist = dist * math.cos(math.radians(elev)) if not math.isnan(elev) else dist
|
||||
abs_bearing = (heading_usv + azimuth) % 360
|
||||
auv_lat, auv_lon = geodetic_forward(lat_usv, lon_usv, abs_bearing, horiz_dist)
|
||||
else:
|
||||
auv_lat, auv_lon = float('nan'), float('nan')
|
||||
|
||||
output_rows.append({
|
||||
'Timestamp': ts_str,
|
||||
'lat': '%.7f' % lat_usv if not math.isnan(lat_usv) else '',
|
||||
'lon': '%.7f' % lon_usv if not math.isnan(lon_usv) else '',
|
||||
'Heading': '%.2f' % heading_usv if not math.isnan(heading_usv) else '',
|
||||
'Dist': '%.4f' % dist if not math.isnan(dist) else '',
|
||||
'Azimuth': '%.4f' % azimuth if not math.isnan(azimuth) else '',
|
||||
'Elev': '%.4f' % elev if not math.isnan(elev) else '',
|
||||
'SNR': '%.4f' % snr if not math.isnan(snr) else '',
|
||||
'auv_lat': '%.7f' % auv_lat if not math.isnan(auv_lat) else '',
|
||||
'auv_lon': '%.7f' % auv_lon if not math.isnan(auv_lon) else '',
|
||||
'nav_dt_s': '%.3f' % dt if dt is not None else '',
|
||||
})
|
||||
|
||||
print("\n=== Merge stats ===")
|
||||
print("Matched: %d No nav match: %d" % (stats_match, stats_nomatch))
|
||||
|
||||
with open(args.output, 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(['Timestamp', 'lat', 'lon', 'Heading', 'Dist', 'Azimuth', 'Elev', 'SNR',
|
||||
'auv_lat', 'auv_lon', 'nav_dt_s'])
|
||||
for row in output_rows:
|
||||
writer.writerow([
|
||||
row['Timestamp'], row['lat'], row['lon'], row['Heading'],
|
||||
row['Dist'], row['Azimuth'], row['Elev'], row['SNR'],
|
||||
row['auv_lat'], row['auv_lon'], row['nav_dt_s']
|
||||
])
|
||||
|
||||
print("Output: %s (%d rows)" % (args.output, len(output_rows)))
|
||||
|
||||
# Sample output
|
||||
if output_rows:
|
||||
print("\n=== Sample (first 5 rows) ===")
|
||||
print("Timestamp,lat,lon,Heading,Dist,Azimuth,Elev,SNR,auv_lat,auv_lon")
|
||||
for row in output_rows[:5]:
|
||||
print("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s" % (
|
||||
row['Timestamp'], row['lat'], row['lon'], row['Heading'],
|
||||
row['Dist'], row['Azimuth'], row['Elev'], row['SNR'],
|
||||
row['auv_lat'], row['auv_lon']))
|
||||
|
||||
# AUV position stats
|
||||
auv_lats = [float(r['auv_lat']) for r in output_rows if r['auv_lat']]
|
||||
auv_lons = [float(r['auv_lon']) for r in output_rows if r['auv_lon']]
|
||||
usv_lats = [float(r['lat']) for r in output_rows if r['lat']]
|
||||
usv_lons = [float(r['lon']) for r in output_rows if r['lon']]
|
||||
|
||||
if auv_lats and usv_lats:
|
||||
print("\n=== Position stats ===")
|
||||
print("AUV lat range: %.6f - %.6f" % (min(auv_lats), max(auv_lats)))
|
||||
print("AUV lon range: %.6f - %.6f" % (min(auv_lons), max(auv_lons)))
|
||||
print("USV lat range: %.6f - %.6f" % (min(usv_lats), max(usv_lats)))
|
||||
|
||||
# Average USV-AUV distance
|
||||
dists = []
|
||||
for r in output_rows:
|
||||
if r['lat'] and r['auv_lat']:
|
||||
d = haversine_dist(float(r['lat']), float(r['lon']),
|
||||
float(r['auv_lat']), float(r['auv_lon']))
|
||||
dists.append(d)
|
||||
if dists:
|
||||
print("Avg USV-AUV dist: %.2f m (min=%.2f max=%.2f)" % (
|
||||
sum(dists)/len(dists), min(dists), max(dists)))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
242
tools/parse_kogger_usbl.py
Normal file
242
tools/parse_kogger_usbl.py
Normal file
@@ -0,0 +1,242 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
parse_kogger_usbl.py — Decode Kogger USBL raw CSV files (SBP protocol)
|
||||
|
||||
Protocol spec:
|
||||
Frame: BB 55 | ROUTE | MODE | ID | LENGTH | PAYLOAD[LENGTH] | CHKSUM1 | CHKSUM2
|
||||
Checksum: Fletcher-16 over (ROUTE + MODE + ID + LENGTH + PAYLOAD)
|
||||
|
||||
Frame ID 0x65 = ID_USBL_SOLUTION
|
||||
Struct (packed, little-endian):
|
||||
id(U1) role(U1) watermark(U2)
|
||||
timestamp_us(S8) ping_counter(U4) carrier_counter(S8)
|
||||
distance_m(F4) distance_unc(F4)
|
||||
azimuth_deg(F4) azimuth_unc(F4)
|
||||
elevation_deg(F4) elevation_unc(F4)
|
||||
snr(F4)
|
||||
x_m(F4) y_m(F4) latitude_deg(D8) longitude_deg(D8) depth_m(F4)
|
||||
usbl_yaw(F4) usbl_pitch(F4) usbl_roll(F4)
|
||||
usbl_latitude(D8) usbl_longitude(D8) last_iTOW(U4)
|
||||
beacon_n(F4) beacon_e(F4)
|
||||
[+ 32 bytes extra NaN padding observed in firmware v2]
|
||||
|
||||
Timestamp assignment: timestamp from the last RECEIVED packet before the frame sync byte.
|
||||
|
||||
Usage:
|
||||
python3 parse_kogger_usbl.py FILE1.csv [FILE2.csv ...] -o combined_usbl.csv
|
||||
"""
|
||||
|
||||
import ast
|
||||
import csv
|
||||
import io
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
import collections
|
||||
import math
|
||||
|
||||
SYNC = b"\xbb\x55"
|
||||
ID_USBL_SOLUTION = 0x65
|
||||
|
||||
USBL_FMT = '<BBHqIq' + 'f'*7 + 'ff' + 'dd' + 'f' + 'fff' + 'dd' + 'I' + 'ff'
|
||||
USBL_FMT_SIZE = struct.calcsize(USBL_FMT)
|
||||
|
||||
|
||||
def parse_bytes_field(field):
|
||||
"""Parse b'...' Python literal from CSV field."""
|
||||
field = field.strip()
|
||||
if not (field.startswith("b'") or field.startswith('b"')):
|
||||
return b""
|
||||
try:
|
||||
result = ast.literal_eval(field)
|
||||
if isinstance(result, str):
|
||||
result = result.encode('latin-1')
|
||||
return result
|
||||
except Exception:
|
||||
return b""
|
||||
|
||||
|
||||
def fletcher16(data):
|
||||
c1, c2 = 0, 0
|
||||
for byte in data:
|
||||
c1 = (c1 + byte) & 0xFF
|
||||
c2 = (c2 + c1) & 0xFF
|
||||
return c1, c2
|
||||
|
||||
|
||||
def parse_usbl_csv(csv_file):
|
||||
"""
|
||||
Parse a raw USBL CSV file, reconstruct byte stream, decode SBP frames.
|
||||
Returns list of dicts with decoded USBL_SOLUTION data.
|
||||
"""
|
||||
with open(csv_file) as f:
|
||||
content = f.read()
|
||||
|
||||
buf = b""
|
||||
ts_offsets = [] # (byte_offset_in_buf, timestamp_str)
|
||||
|
||||
reader = csv.reader(io.StringIO(content))
|
||||
for row in reader:
|
||||
if len(row) < 3:
|
||||
continue
|
||||
ts, direction, raw = row[0], row[1], row[2]
|
||||
if direction != "RECEIVED":
|
||||
continue
|
||||
b = parse_bytes_field(raw)
|
||||
if b:
|
||||
off = len(buf)
|
||||
buf += b
|
||||
ts_offsets.append((off, ts))
|
||||
|
||||
# Find all BB55 sync positions
|
||||
positions = []
|
||||
i = 0
|
||||
while True:
|
||||
pos = buf.find(SYNC, i)
|
||||
if pos == -1:
|
||||
break
|
||||
positions.append(pos)
|
||||
i = pos + 1
|
||||
|
||||
frame_id_counter = collections.Counter()
|
||||
usbl_records = []
|
||||
valid_total = 0
|
||||
|
||||
for pos in positions:
|
||||
if pos + 6 > len(buf):
|
||||
continue
|
||||
route = buf[pos+2]
|
||||
mode = buf[pos+3]
|
||||
frame_id = buf[pos+4]
|
||||
length = buf[pos+5]
|
||||
if pos + 6 + length + 2 > len(buf):
|
||||
continue
|
||||
payload = buf[pos+6:pos+6+length]
|
||||
chk1_a = buf[pos+6+length]
|
||||
chk2_a = buf[pos+6+length+1]
|
||||
|
||||
c1, c2 = fletcher16(buf[pos+2:pos+6+length])
|
||||
if c1 != chk1_a or c2 != chk2_a:
|
||||
continue
|
||||
|
||||
valid_total += 1
|
||||
frame_id_counter[frame_id] += 1
|
||||
|
||||
if frame_id != ID_USBL_SOLUTION:
|
||||
continue
|
||||
|
||||
# Get timestamp: last ts_offset entry before this position
|
||||
ts = ts_offsets[0][1] if ts_offsets else ""
|
||||
for off, t in ts_offsets:
|
||||
if off <= pos:
|
||||
ts = t
|
||||
else:
|
||||
break
|
||||
|
||||
if len(payload) < USBL_FMT_SIZE:
|
||||
continue
|
||||
|
||||
fields = struct.unpack_from(USBL_FMT, payload)
|
||||
rec = {
|
||||
'Timestamp': ts,
|
||||
'usbl_id': fields[0],
|
||||
'usbl_role': fields[1],
|
||||
'usbl_timestamp_us': fields[3],
|
||||
'ping_counter': fields[4],
|
||||
'Dist': fields[6],
|
||||
'dist_unc': fields[7],
|
||||
'Azimuth': fields[8],
|
||||
'azimuth_unc': fields[9],
|
||||
'Elev': fields[10],
|
||||
'elev_unc': fields[11],
|
||||
'SNR': fields[12],
|
||||
'x_m': fields[13],
|
||||
'y_m': fields[14],
|
||||
'usbl_lat_computed': fields[15],
|
||||
'usbl_lon_computed': fields[16],
|
||||
'depth_m': fields[17],
|
||||
'usbl_yaw': fields[18],
|
||||
'usbl_pitch': fields[19],
|
||||
'usbl_roll': fields[20],
|
||||
'source_file': os.path.basename(csv_file),
|
||||
}
|
||||
usbl_records.append(rec)
|
||||
|
||||
return usbl_records, frame_id_counter, valid_total, len(positions)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description='Decode Kogger USBL raw CSV files')
|
||||
parser.add_argument('files', nargs='+', help='Input *_usbl.csv files')
|
||||
parser.add_argument('-o', '--output', default='combined_usbl.csv', help='Output CSV')
|
||||
args = parser.parse_args()
|
||||
|
||||
all_records = []
|
||||
total_sync = 0
|
||||
total_valid = 0
|
||||
global_id_counter = collections.Counter()
|
||||
|
||||
for csv_file in args.files:
|
||||
print("Processing: %s" % csv_file)
|
||||
records, id_counter, valid, n_sync = parse_usbl_csv(csv_file)
|
||||
all_records.extend(records)
|
||||
total_sync += n_sync
|
||||
total_valid += valid
|
||||
global_id_counter.update(id_counter)
|
||||
print(" Sync markers: %d Valid frames: %d USBL records: %d" % (n_sync, valid, len(records)))
|
||||
|
||||
print("\n=== Summary ===")
|
||||
print("Total sync markers (BB55): %d" % total_sync)
|
||||
print("Total valid frames: %d" % total_valid)
|
||||
print("Total USBL_SOLUTION records: %d" % len(all_records))
|
||||
print("\nFrame ID histogram:")
|
||||
for fid, cnt in sorted(global_id_counter.items(), key=lambda x: -x[1]):
|
||||
name = "USBL_SOLUTION" if fid == 0x65 else ("USBL_CONTROL" if fid == 0x68 else "UNKNOWN")
|
||||
print(" ID=0x%02x(%3d) %-15s : %d frames" % (fid, fid, name, cnt))
|
||||
|
||||
if all_records:
|
||||
dists = [r['Dist'] for r in all_records if not math.isnan(r['Dist'])]
|
||||
azs = [r['Azimuth'] for r in all_records if not math.isnan(r['Azimuth'])]
|
||||
snrs = [r['SNR'] for r in all_records if not math.isnan(r['SNR'])]
|
||||
if dists:
|
||||
dists_sorted = sorted(dists)
|
||||
n = len(dists_sorted)
|
||||
median = dists_sorted[n//2]
|
||||
print("\nDist (m): min=%.2f median=%.2f max=%.2f" % (min(dists), median, max(dists)))
|
||||
if azs:
|
||||
print("Azimuth : min=%.2f max=%.2f" % (min(azs), max(azs)))
|
||||
if snrs:
|
||||
print("SNR : min=%.2f max=%.2f" % (min(snrs), max(snrs)))
|
||||
|
||||
# Write output CSV
|
||||
with open(args.output, 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(['Timestamp', 'Dist', 'Azimuth', 'Elev', 'SNR', 'FrameID',
|
||||
'x_m', 'y_m', 'depth_m', 'dist_unc', 'azimuth_unc', 'elev_unc',
|
||||
'usbl_yaw', 'usbl_pitch', 'usbl_roll', 'source_file'])
|
||||
for r in all_records:
|
||||
writer.writerow([
|
||||
r['Timestamp'],
|
||||
'' if math.isnan(r['Dist']) else '%.4f' % r['Dist'],
|
||||
'' if math.isnan(r['Azimuth']) else '%.4f' % r['Azimuth'],
|
||||
'' if math.isnan(r['Elev']) else '%.4f' % r['Elev'],
|
||||
'' if math.isnan(r['SNR']) else '%.4f' % r['SNR'],
|
||||
'0x65',
|
||||
'' if math.isnan(r['x_m']) else '%.4f' % r['x_m'],
|
||||
'' if math.isnan(r['y_m']) else '%.4f' % r['y_m'],
|
||||
'' if math.isnan(r['depth_m']) else '%.4f' % r['depth_m'],
|
||||
'%.4f' % r['dist_unc'],
|
||||
'%.4f' % r['azimuth_unc'],
|
||||
'%.4f' % r['elev_unc'],
|
||||
'%.4f' % r['usbl_yaw'],
|
||||
'%.4f' % r['usbl_pitch'],
|
||||
'%.4f' % r['usbl_roll'],
|
||||
r['source_file'],
|
||||
])
|
||||
|
||||
print("\nOutput: %s (%d records)" % (args.output, len(all_records)))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user