#!/usr/bin/env python3 """ parse_kogger_usbl.py — Decode Kogger USBL raw CSV files (SBP protocol) Protocol spec: Frame: BB 55 | ROUTE | MODE | ID | LENGTH | PAYLOAD[LENGTH] | CHKSUM1 | CHKSUM2 Checksum: Fletcher-16 over (ROUTE + MODE + ID + LENGTH + PAYLOAD) Frame ID 0x65 = ID_USBL_SOLUTION Struct (packed, little-endian): id(U1) role(U1) watermark(U2) timestamp_us(S8) ping_counter(U4) carrier_counter(S8) distance_m(F4) distance_unc(F4) azimuth_deg(F4) azimuth_unc(F4) elevation_deg(F4) elevation_unc(F4) snr(F4) x_m(F4) y_m(F4) latitude_deg(D8) longitude_deg(D8) depth_m(F4) usbl_yaw(F4) usbl_pitch(F4) usbl_roll(F4) usbl_latitude(D8) usbl_longitude(D8) last_iTOW(U4) beacon_n(F4) beacon_e(F4) [+ 32 bytes extra NaN padding observed in firmware v2] Timestamp assignment: timestamp from the last RECEIVED packet before the frame sync byte. Usage: python3 parse_kogger_usbl.py FILE1.csv [FILE2.csv ...] -o combined_usbl.csv """ import ast import csv import io import os import struct import sys import collections import math SYNC = b"\xbb\x55" ID_USBL_SOLUTION = 0x65 USBL_FMT = ' len(buf): continue route = buf[pos+2] mode = buf[pos+3] frame_id = buf[pos+4] length = buf[pos+5] if pos + 6 + length + 2 > len(buf): continue payload = buf[pos+6:pos+6+length] chk1_a = buf[pos+6+length] chk2_a = buf[pos+6+length+1] c1, c2 = fletcher16(buf[pos+2:pos+6+length]) if c1 != chk1_a or c2 != chk2_a: continue valid_total += 1 frame_id_counter[frame_id] += 1 if frame_id != ID_USBL_SOLUTION: continue # Get timestamp: last ts_offset entry before this position ts = ts_offsets[0][1] if ts_offsets else "" for off, t in ts_offsets: if off <= pos: ts = t else: break if len(payload) < USBL_FMT_SIZE: continue fields = struct.unpack_from(USBL_FMT, payload) rec = { 'Timestamp': ts, 'usbl_id': fields[0], 'usbl_role': fields[1], 'usbl_timestamp_us': fields[3], 'ping_counter': fields[4], 'Dist': fields[6], 'dist_unc': fields[7], 'Azimuth': fields[8], 'azimuth_unc': fields[9], 'Elev': fields[10], 'elev_unc': fields[11], 'SNR': fields[12], 'x_m': fields[13], 'y_m': fields[14], 'usbl_lat_computed': fields[15], 'usbl_lon_computed': fields[16], 'depth_m': fields[17], 'usbl_yaw': fields[18], 'usbl_pitch': fields[19], 'usbl_roll': fields[20], 'source_file': os.path.basename(csv_file), } usbl_records.append(rec) return usbl_records, frame_id_counter, valid_total, len(positions) def main(): import argparse parser = argparse.ArgumentParser(description='Decode Kogger USBL raw CSV files') parser.add_argument('files', nargs='+', help='Input *_usbl.csv files') parser.add_argument('-o', '--output', default='combined_usbl.csv', help='Output CSV') args = parser.parse_args() all_records = [] total_sync = 0 total_valid = 0 global_id_counter = collections.Counter() for csv_file in args.files: print("Processing: %s" % csv_file) records, id_counter, valid, n_sync = parse_usbl_csv(csv_file) all_records.extend(records) total_sync += n_sync total_valid += valid global_id_counter.update(id_counter) print(" Sync markers: %d Valid frames: %d USBL records: %d" % (n_sync, valid, len(records))) print("\n=== Summary ===") print("Total sync markers (BB55): %d" % total_sync) print("Total valid frames: %d" % total_valid) print("Total USBL_SOLUTION records: %d" % len(all_records)) print("\nFrame ID histogram:") for fid, cnt in sorted(global_id_counter.items(), key=lambda x: -x[1]): name = "USBL_SOLUTION" if fid == 0x65 else ("USBL_CONTROL" if fid == 0x68 else "UNKNOWN") print(" ID=0x%02x(%3d) %-15s : %d frames" % (fid, fid, name, cnt)) if all_records: dists = [r['Dist'] for r in all_records if not math.isnan(r['Dist'])] azs = [r['Azimuth'] for r in all_records if not math.isnan(r['Azimuth'])] snrs = [r['SNR'] for r in all_records if not math.isnan(r['SNR'])] if dists: dists_sorted = sorted(dists) n = len(dists_sorted) median = dists_sorted[n//2] print("\nDist (m): min=%.2f median=%.2f max=%.2f" % (min(dists), median, max(dists))) if azs: print("Azimuth : min=%.2f max=%.2f" % (min(azs), max(azs))) if snrs: print("SNR : min=%.2f max=%.2f" % (min(snrs), max(snrs))) # Write output CSV with open(args.output, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['Timestamp', 'Dist', 'Azimuth', 'Elev', 'SNR', 'FrameID', 'x_m', 'y_m', 'depth_m', 'dist_unc', 'azimuth_unc', 'elev_unc', 'usbl_yaw', 'usbl_pitch', 'usbl_roll', 'source_file']) for r in all_records: writer.writerow([ r['Timestamp'], '' if math.isnan(r['Dist']) else '%.4f' % r['Dist'], '' if math.isnan(r['Azimuth']) else '%.4f' % r['Azimuth'], '' if math.isnan(r['Elev']) else '%.4f' % r['Elev'], '' if math.isnan(r['SNR']) else '%.4f' % r['SNR'], '0x65', '' if math.isnan(r['x_m']) else '%.4f' % r['x_m'], '' if math.isnan(r['y_m']) else '%.4f' % r['y_m'], '' if math.isnan(r['depth_m']) else '%.4f' % r['depth_m'], '%.4f' % r['dist_unc'], '%.4f' % r['azimuth_unc'], '%.4f' % r['elev_unc'], '%.4f' % r['usbl_yaw'], '%.4f' % r['usbl_pitch'], '%.4f' % r['usbl_roll'], r['source_file'], ]) print("\nOutput: %s (%d records)" % (args.output, len(all_records))) if __name__ == '__main__': main()