Files
Poulpe 9a158f5c5f Initial: ContinuousTransponder wrapper for Kogger USBL
High-level Python wrapper around the upstream cosma-tech/kogger_acousticAntenna
driver. Configures a Kogger acoustic antenna as a permanent slave transponder
in a single start() call: address filter, echo filter, optional TDMA sync slot,
permanent response window, and Python callbacks for each ping received.

No modification to the upstream driver — only composes existing public methods
in the right order. Snapshot of upstream driver included read-only under driver/
for reference.

Includes:
- transponder_continu.py (302 lines): the wrapper class + CLI
- examples/auv_slave.py (79 lines): usage example with logging
- README.md: design rationale, usage, multi-AUV TDMA, watchdog, hardware wiring
- driver/: snapshot of cosma-tech/kogger_acousticAntenna at commit 1b539f9
  ('Add index slot for multi pinger', 2025-03-11)

Built for Cosma context (USV master + N AUVs slaves) following the design
conversation in Discord #ping-pong-ping (2026-04-27). See poulpe/ping-pong-ping
on Gitea for the interactive demo of the protocol.
2026-04-27 22:08:44 +00:00

184 lines
6.5 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#! /usr/bin/env python
# Test to run USV. Will ping AUV, and wait for response.
import time
import sys
import tty
import termios
import select
import json
import os
from loguru import logger
import datetime
# Add the parent directory to the Python path to find the driver module
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from kogger_protocol_driver import KoggerSBPDevice
# --- Script Configuration ---
# Set the desired logging level: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
LOG_LEVEL = "INFO"
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
LOG_FILE = "log/"+str(timestamp)+"_log_usv.log"
# Default serial port if not provided via command line
DEFAULT_SERIAL_PORT = "/dev/ttyAMA4"
#DEFAULT_SERIAL_PORT = "/dev/serial/by-path/platform-fd500000.pcie-pci-0000:01:00.0-usb-0:1.3:1.0"
DEFAULT_SERIAL_SPEED = 921600
def print_message(message):
"""
This function is a callback that will be executed for each message
received from the antenna that is not a direct response to a command.
"""
parsed = json.loads(str(message).replace("nan", "'nan'").replace("'",'"'))
logger.info("printer:"+json.dumps(parsed, indent=2))
def is_data():
"""
Checks if there is data to be read from stdin.
"""
return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], [])
def check_all_getters(antenna):
"""
Calls all get_* methods on the antenna object and prints the results.
"""
print("\n--- Checking all getter methods ---")
# A list of all getter methods to call.
# Some methods require arguments, which are provided as tuples.
# (method_name, args_tuple)
getters_to_test = [
#('get_timestamp', ()),
#('get_distance', (0,)),
#('get_distance', (1,)),
#('get_chart_data', ()),
#('get_attitude', (0,)), #!!OK
#('get_attitude', (1,)),
#('get_temperature', ()),
#('get_dataset_config', (0,)),
#('get_distance_setup', ()),
#('get_chart_setup', ()),
#('get_transceiver_settings', ()),
#('get_sound_speed', ()),
#('get_uart_config', (1, 0)), # uart_id=1, version=0
#('get_uart_config', (1, 1)), # uart_id=1, version=1
('get_version_info', ()), #!! OK
#('get_mark_status', ()), #!! OK
#('get_diagnostics', ()),
#('get_navigation_data', ()),
#('get_dvl_velocity_data', ()),
#('get_signal_encoder_data', ()),
#('get_signal_decoder_data', ()),
#('get_auto_response_timeout', ()),
#('get_auto_response_filter', ()),
#('get_auto_response_payload', ()),
#('get_usbl_solution', ()),
]
for method_name, args in getters_to_test:
try:
method = getattr(antenna, method_name)
print(f"Calling {method_name}{args}...")
result = method(*args)
print(f"Result: {result}")
except Exception as e:
print(f"An error occurred while calling {method_name}: {e}")
print("-" * 20)
time.sleep(0.01) # Give the device a moment between commands
print("--- Finished checking all getter methods ---\n")
def main():
"""
Main function to connect to the Kogger antenna and listen for messages.
"""
global DEFAULT_SERIAL_PORT
global DEFAULT_SERIAL_SPEED
global LOG_LEVEL
global LOG_FILE
# Determine the serial port to use
if len(sys.argv)>1:
if sys.argv[1].startswith("/dev/"):
DEFAULT_SERIAL_PORT = sys.argv[1]
serial_port = sys.argv[1]
else:
serial_port = "/dev/ttyUSB"+str(sys.argv[1])
else:
print(DEFAULT_SERIAL_PORT)
serial_port = DEFAULT_SERIAL_PORT
print(f"No serial port provided. Using default: {serial_port}")
# Instantiate the driver
antenna = KoggerSBPDevice(serial_port, DEFAULT_SERIAL_SPEED, default_timeout=0.02, log_level=LOG_LEVEL, log_file=LOG_FILE)
try:
# Connect to the antenna
if not antenna.connect():
print(f"Failed to connect to the antenna on port {serial_port}", file=sys.stderr)
sys.exit(1)
print(f"Successfully connected to the antenna on {serial_port}.")
# Perform a one-time check of all getter functions
check_all_getters(antenna)
result = antenna.set_auto_response_filter(0)
logger.info("set_auto_response_filter(0)="+str(result))
result = antenna.set_auto_response_timeout(0xffffffff)
logger.info("set_auto_response_timeout(0xffffffff)="+str(result))
result = antenna.set_auto_response_payload(0xff)
logger.info("set_auto_response_payload(0xff)="+str(result))
antenna.register_default_callback(print_message)
print("\nPress a number (0-9) to send an acoustic ping. Press 'q' to quit.")
data_number = 255
last_ping_time = 0
str_star_moving_ping = ['|','/','','\\']
cnt_star_moving_ping = 0
str_star = ""
while True:
# Handle keyboard input
if is_data():
char = sys.stdin.read(1)
if char.isdigit():
data_number = int(char)
print(f"\r\n--> Switched data number to: {data_number}")
elif char.lower() == 'q':
print("\r\nQuitting...")
break
elif ord(char) == 3: # Ctrl+C
raise KeyboardInterrupt
# Send ping every 1 second
current_time = time.time()
if current_time - last_ping_time >= 1:
last_ping_time = current_time
result = antenna.send_acoustic_ping(data_number)
if result == True:
cnt_star_moving_ping = (cnt_star_moving_ping+1)%4
str_star = str_star_moving_ping[cnt_star_moving_ping]
else:
str_star = "!"
# Update the prompt on the same line
prompt = f"\r"+str_star+"--> Pinging with "+str(data_number)+". Press 0-9+ENTER to change, 'q'+ENTER to quit."
sys.stdout.write(prompt)
sys.stdout.flush()
time.sleep(0.1)
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
print(f"An error occurred: {e}", file=sys.stderr)
finally:
# Ensure the connection is closed gracefully
print("Disconnecting from the antenna.")
antenna.disconnect()
if __name__ == "__main__":
main()