Files
kogger-transpondeur-continu/driver/test/test_auv.py
Poulpe 9a158f5c5f Initial: ContinuousTransponder wrapper for Kogger USBL
High-level Python wrapper around the upstream cosma-tech/kogger_acousticAntenna
driver. Configures a Kogger acoustic antenna as a permanent slave transponder
in a single start() call: address filter, echo filter, optional TDMA sync slot,
permanent response window, and Python callbacks for each ping received.

No modification to the upstream driver — only composes existing public methods
in the right order. Snapshot of upstream driver included read-only under driver/
for reference.

Includes:
- transponder_continu.py (302 lines): the wrapper class + CLI
- examples/auv_slave.py (79 lines): usage example with logging
- README.md: design rationale, usage, multi-AUV TDMA, watchdog, hardware wiring
- driver/: snapshot of cosma-tech/kogger_acousticAntenna at commit 1b539f9
  ('Add index slot for multi pinger', 2025-03-11)

Built for Cosma context (USV master + N AUVs slaves) following the design
conversation in Discord #ping-pong-ping (2026-04-27). See poulpe/ping-pong-ping
on Gitea for the interactive demo of the protocol.
2026-04-27 22:08:44 +00:00

179 lines
6.5 KiB
Python
Executable File

#! /usr/bin/env python
# Test to run AUV. Will respond to all ping received and save the state received
import time
import sys
import json
from loguru import logger
import os
import datetime
# Add the parent directory to the Python path to find the driver module
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
try:
#from kogger_protocol_driver import KoggerSBPDevice, setup_logging
import kogger_protocol_driver
except ImportError:
logger.critical("Failed to import KoggerSBPDevice. Make sure kogger_protocol_driver.py is in the parent directory.")
sys.exit(1)
# --- Script Configuration ---
# Set the desired logging level: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
LOG_LEVEL = "INFO"
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
LOG_FILE = "log/"+str(timestamp)+"_log_auv.log"
# Default serial port if not provided via command line
#DEFAULT_SERIAL_PORT = "/dev/ttyUSB0"
DEFAULT_SERIAL_PORT = "/dev/serial/by-path/platform-fd500000.pcie-pci-0000:01:00.0-usb-0:1.3:1.0"
DEFAULT_SERIAL_PORT = "/dev/ttyAMA4"
DEFAULT_SERIAL_SPEED = 921600
response_received = -1
antenna = None
def print_message(message):
"""
This function is a callback that will be executed for each message
received from the antenna that is not a direct response to a command.
"""
parsed = json.loads(str(message).replace("nan", "'nan'").replace("'",'"'))
logger.info(json.dumps(parsed, indent=2))
try:
global response_received
response_received = message["id"]
logger.info("resp="+str(response_received))
except Exception as e:
logger.info("exception="+str(e))
pass
def test_callback(message):
global antenna
data=antenna.get_usbl_data()
global response_received
response_received = message["id"]
logger.info("test!!!!!!"+str(data))
return message
def check_all_getters(antenna):
"""
Calls all get_* methods on the antenna object and prints the results.
"""
print("\n--- Checking all getter methods ---")
# A list of all getter methods to call.
# Some methods require arguments, which are provided as tuples.
# (method_name, args_tuple)
getters_to_test = [
#('get_timestamp', ()),
#('get_distance', (0,)),
#('get_distance', (1,)),
#('get_chart_data', ()),
('get_attitude', (0,)), #!!OK
#('get_attitude', (1,)),
#('get_temperature', ()),
#('get_dataset_config', (0,)),
#('get_distance_setup', ()),
#('get_chart_setup', ()),
#('get_transceiver_settings', ()),
#('get_sound_speed', ()),
#('get_uart_config', (1, 0)), # uart_id=1, version=0
#('get_uart_config', (1, 1)), # uart_id=1, version=1
('get_version_info', ()), #!! OK
('get_mark_status', ()), #!! OK
#('get_diagnostics', ()),
#('get_navigation_data', ()),
#('get_dvl_velocity_data', ()),
#('get_signal_encoder_data', ()),
#('get_signal_decoder_data', ()),
#('get_auto_response_timeout', ()),
#('get_auto_response_filter', ()),
#('get_auto_response_payload', ()),
('get_usbl_solution', ()),
]
for method_name, args in getters_to_test:
try:
method = getattr(antenna, method_name)
print(f"Calling {method_name}{args}...")
result = method(*args)
print(f"Result: {result}")
except Exception as e:
print(f"An error occurred while calling {method_name}: {e}")
print("-" * 20)
time.sleep(0.01) # Give the device a moment between commands
print("--- Finished checking all getter methods ---\n")
def main():
"""
Main function to connect to the Kogger antenna and listen for messages.
"""
# Determine the serial port to use
if len(sys.argv) > 1:
serial_port = sys.argv[1]
else:
serial_port = DEFAULT_SERIAL_PORT
print(f"No serial port provided. Using default: {serial_port}")
# Instantiate the driver
global antenna
antenna = kogger_protocol_driver.KoggerSBPDevice(serial_port, DEFAULT_SERIAL_SPEED, default_timeout=0.02, log_level=LOG_LEVEL, log_file=LOG_FILE)
try:
# Connect to the antenna
if not antenna.connect():
print(f"Failed to connect to the antenna on port {serial_port}", file=sys.stderr)
sys.exit(1)
print(f"Successfully connected to the antenna on {serial_port}.")
# Perform a one-time check of all getter functions
check_all_getters(antenna)
result = antenna.set_auto_response_filter(0)
logger.info("set_auto_response_filter(0)="+str(result))
result = antenna.set_auto_response_timeout(0xffffffff)
logger.info("set_auto_response_timeout(0xffffffff)="+str(result))
result = antenna.set_auto_response_payload(0xff)
logger.info("set_auto_response_payload(0xff)="+str(result))
antenna.register_precallback(kogger_protocol_driver.ID_USBL_SOLUTION, antenna.callback_usbl_solution)
antenna.register_callback(kogger_protocol_driver.ID_USBL_SOLUTION, test_callback)
antenna.register_default_callback(print_message)
response_received_old = -1
global response_received
while True:
logger.info("response_received="+str(response_received))
if response_received_old != response_received:
response_received_old = response_received
try:
result = antenna.set_auto_response_payload(response_received)
except:
result = antenna.set_auto_response_payload(0xff)
logger.info("antenna.set_auto_response_payload("+str(response_received)+")="+str(result))
time.sleep(1)
for i in range(10000):
#result = antenna.send_acoustic_ping(i%9)
result = antenna.send_acoustic_ping(0)
print("sent "+str(i%9)+"result="+str(result))
time.sleep(1)
exit()
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
print(f"An error occurred: {e}", file=sys.stderr)
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(f"type={exc_type}, fname={fname}, line={exc_tb.tb_lineno}")
finally:
# Ensure the connection is closed gracefully
print("Disconnecting from the antenna.")
antenna.disconnect()
if __name__ == "__main__":
main()