#!/usr/bin/env python3 """ A script to parse Kogger SB protocol logs and convert UsblSolution messages directly to a CSV file. """ import argparse import ast import csv import struct import sys # Based on the Kogger SB protocol specification ID_MAP = { 0x01: "ID_TIMESTAMP", 0x02: "ID_DIST", 0x03: "ID_CHART", 0x04: "ID_ATTITUDE", 0x05: "ID_TEMP", 0x10: "ID_DATASET", 0x11: "ID_DIST_SETUP", 0x12: "ID_CHART_SETUP", 0x14: "ID_TRANSC", 0x15: "ID_SND_SPD", 0x18: "ID_UART", 0x1B: "ID_IMU_SETUP", 0x20: "ID_VERSION", 0x21: "ID_MARK", 0x22: "ID_DIAG", 0x23: "ID_FLASH", 0x24: "ID_BOOT", 0x25: "ID_UPDATE", 0x64: "ID_NAV", 0x65: "ID_USBL_SOLUTION", 0x66: "ID_SIGNAL_ENCODER", 0x67: "ID_SIGNAL_DECODER", 0x68: "ID_USBL_CONTROL", 0x79: "ID_DVL_VEL", # Aliases 102: "ID_SIGNAL_ENCODER", 103: "ID_SIGNAL_DECODER", 121: "ID_DVL_VEL", } def parse_route(route_byte): """Parses the ROUTE byte.""" dev_address = route_byte & 0b1111 return {"dev_address": dev_address} def parse_mode(mode_byte): """Parses the MODE byte.""" type_val = mode_byte & 0b11 version = (mode_byte >> 3) & 0b111 return { "type_val": type_val, "version": version, } def parse_payload_structured(msg_id, mode, payload): """ Parses the payload and returns a structured dictionary for UsblSolution, or a simple representation for other types. """ msg_name = ID_MAP.get(msg_id) if msg_name == "ID_USBL_SOLUTION" and mode['version'] == 0: if len(payload) == 152: try: fmt = '= 8: # Minimum message size if not reassembly_buffer.startswith(b'\xbbU'): sync_pos = reassembly_buffer.find(b'\xbbU') if sync_pos == -1: reassembly_buffer = b'' break else: reassembly_buffer = reassembly_buffer[sync_pos:] if len(reassembly_buffer) < 6: break payload_len = reassembly_buffer[5] full_msg_len = 6 + payload_len + 2 # header + payload + checksum if len(reassembly_buffer) >= full_msg_len: message_to_parse = reassembly_buffer[:full_msg_len] parsed_data = parse_message_structured(message_to_parse) if (parsed_data and not parsed_data.get("error") and isinstance(parsed_data.get('payload_parsed'), dict)): payload_info = parsed_data['payload_parsed'] if payload_info.get('type') == 'UsblSolution': solution_data = payload_info['data'] for key, value in solution_data.items(): csv_writer.writerow([timestamp, key, value]) reassembly_buffer = reassembly_buffer[full_msg_len:] else: break except FileNotFoundError: print(f"Error: File not found at {input_file}", file=sys.stderr) sys.exit(1) except Exception as e: print(f"An unexpected error occurred: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Convert Kogger SB protocol logs directly to a CSV file, extracting UsblSolution data." ) input_file = sys.argv[1] if sys.argv == 3: output_file = sys.argv[2] else: name, extension = input_file.rsplit('.', 1) # Add "_diff" to the name and then add the extension back output_file = f"{name}_csv.{extension}" print("Generate new csv file :"+str(output_file)) process_log_to_csv(input_file, output_file)