Files
Poulpe 9a158f5c5f Initial: ContinuousTransponder wrapper for Kogger USBL
High-level Python wrapper around the upstream cosma-tech/kogger_acousticAntenna
driver. Configures a Kogger acoustic antenna as a permanent slave transponder
in a single start() call: address filter, echo filter, optional TDMA sync slot,
permanent response window, and Python callbacks for each ping received.

No modification to the upstream driver — only composes existing public methods
in the right order. Snapshot of upstream driver included read-only under driver/
for reference.

Includes:
- transponder_continu.py (302 lines): the wrapper class + CLI
- examples/auv_slave.py (79 lines): usage example with logging
- README.md: design rationale, usage, multi-AUV TDMA, watchdog, hardware wiring
- driver/: snapshot of cosma-tech/kogger_acousticAntenna at commit 1b539f9
  ('Add index slot for multi pinger', 2025-03-11)

Built for Cosma context (USV master + N AUVs slaves) following the design
conversation in Discord #ping-pong-ping (2026-04-27). See poulpe/ping-pong-ping
on Gitea for the interactive demo of the protocol.
2026-04-27 22:08:44 +00:00

204 lines
6.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
A script to parse Kogger SB protocol logs and convert UsblSolution messages
directly to a CSV file.
"""
import argparse
import ast
import csv
import struct
import sys
# Based on the Kogger SB protocol specification
ID_MAP = {
0x01: "ID_TIMESTAMP",
0x02: "ID_DIST",
0x03: "ID_CHART",
0x04: "ID_ATTITUDE",
0x05: "ID_TEMP",
0x10: "ID_DATASET",
0x11: "ID_DIST_SETUP",
0x12: "ID_CHART_SETUP",
0x14: "ID_TRANSC",
0x15: "ID_SND_SPD",
0x18: "ID_UART",
0x1B: "ID_IMU_SETUP",
0x20: "ID_VERSION",
0x21: "ID_MARK",
0x22: "ID_DIAG",
0x23: "ID_FLASH",
0x24: "ID_BOOT",
0x25: "ID_UPDATE",
0x64: "ID_NAV",
0x65: "ID_USBL_SOLUTION",
0x66: "ID_SIGNAL_ENCODER",
0x67: "ID_SIGNAL_DECODER",
0x68: "ID_USBL_CONTROL",
0x79: "ID_DVL_VEL",
# Aliases
102: "ID_SIGNAL_ENCODER",
103: "ID_SIGNAL_DECODER",
121: "ID_DVL_VEL",
}
def parse_route(route_byte):
"""Parses the ROUTE byte."""
dev_address = route_byte & 0b1111
return {"dev_address": dev_address}
def parse_mode(mode_byte):
"""Parses the MODE byte."""
type_val = mode_byte & 0b11
version = (mode_byte >> 3) & 0b111
return {
"type_val": type_val,
"version": version,
}
def parse_payload_structured(msg_id, mode, payload):
"""
Parses the payload and returns a structured dictionary for UsblSolution,
or a simple representation for other types.
"""
msg_name = ID_MAP.get(msg_id)
if msg_name == "ID_USBL_SOLUTION" and mode['version'] == 0:
if len(payload) == 152:
try:
fmt = '<BBHqIqfffffffffddffffddIff8f'
sol = struct.unpack(fmt, payload)
return {
"type": "UsblSolution",
"data": {
"ID": f"{sol[0]}",
"Dist": f"{sol[6]:.2f}",
"Azimuth": f"{sol[8]:.2f}",
"Elev": f"{sol[10]:.2f}",
"SNR": f"{sol[12]:.1f}"
}
}
except struct.error as e:
return {"type": "Error", "data": f"Failed to unpack UsblSolution: {e}"}
else:
return {"type": "Error", "data": f"UsblSolution payload wrong size ({len(payload)} bytes)"}
# Fallback for other message types
return {"type": msg_name or "Unknown", "data": payload.hex()}
def parse_message_structured(data):
"""Parses a full binary message frame and returns a dictionary."""
if len(data) < 8:
return {"error": "Message too short"}
try:
sync1, sync2, route_byte, mode_byte, msg_id, length = struct.unpack('<BBBBBB', data[:6])
except struct.error:
return {"error": "Could not unpack header"}
if sync1 != 0xBB or sync2 != 0x55:
return {"error": "Invalid sync bytes"}
header_end = 6
payload_end = header_end + length
checksum_end = payload_end + 2
if len(data) != checksum_end:
return {"error": f"Length mismatch. Header says {length}, but packet is {len(data)}"}
payload = data[header_end:payload_end]
mode_info = parse_mode(mode_byte)
payload_parsed = parse_payload_structured(msg_id, mode_info, payload)
return {
"msg_id": msg_id,
"payload_parsed": payload_parsed
}
def process_log_to_csv(input_file, output_file):
"""Reads a Kogger log, parses it, and writes UsblSolution data to a CSV."""
reassembly_buffer = b''
last_direction = None
try:
with open(input_file, 'r', newline='') as infile, open(output_file, 'w', newline='') as outfile:
csv_writer = csv.writer(outfile)
reader = csv.reader(infile)
for i, row in enumerate(reader):
if len(row) != 3:
continue
timestamp, direction, data_str = row
try:
data_bytes = ast.literal_eval(data_str)
if not isinstance(data_bytes, bytes):
raise TypeError("Not a bytes object")
except (ValueError, SyntaxError, TypeError):
continue
if direction != last_direction:
reassembly_buffer = b''
last_direction = direction
if direction != "RECEIVED":
continue
reassembly_buffer += data_bytes
while len(reassembly_buffer) >= 8: # Minimum message size
if not reassembly_buffer.startswith(b'\xbbU'):
sync_pos = reassembly_buffer.find(b'\xbbU')
if sync_pos == -1:
reassembly_buffer = b''
break
else:
reassembly_buffer = reassembly_buffer[sync_pos:]
if len(reassembly_buffer) < 6:
break
payload_len = reassembly_buffer[5]
full_msg_len = 6 + payload_len + 2 # header + payload + checksum
if len(reassembly_buffer) >= full_msg_len:
message_to_parse = reassembly_buffer[:full_msg_len]
parsed_data = parse_message_structured(message_to_parse)
if (parsed_data and not parsed_data.get("error") and
isinstance(parsed_data.get('payload_parsed'), dict)):
payload_info = parsed_data['payload_parsed']
if payload_info.get('type') == 'UsblSolution':
solution_data = payload_info['data']
for key, value in solution_data.items():
csv_writer.writerow([timestamp, key, value])
reassembly_buffer = reassembly_buffer[full_msg_len:]
else:
break
except FileNotFoundError:
print(f"Error: File not found at {input_file}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"An unexpected error occurred: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Convert Kogger SB protocol logs directly to a CSV file, extracting UsblSolution data."
)
input_file = sys.argv[1]
if sys.argv == 3:
output_file = sys.argv[2]
else:
name, extension = input_file.rsplit('.', 1)
# Add "_diff" to the name and then add the extension back
output_file = f"{name}_csv.{extension}"
print("Generate new csv file :"+str(output_file))
process_log_to_csv(input_file, output_file)