#! /usr/bin/env python # Test to run AUV. Will respond to all ping received and save the state received import time import sys import json from loguru import logger import os import datetime # Add the parent directory to the Python path to find the driver module sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) try: #from kogger_protocol_driver import KoggerSBPDevice, setup_logging import kogger_protocol_driver except ImportError: logger.critical("Failed to import KoggerSBPDevice. Make sure kogger_protocol_driver.py is in the parent directory.") sys.exit(1) # --- Script Configuration --- # Set the desired logging level: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" LOG_LEVEL = "INFO" timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") LOG_FILE = "log/"+str(timestamp)+"_log_auv.log" # Default serial port if not provided via command line #DEFAULT_SERIAL_PORT = "/dev/ttyUSB0" DEFAULT_SERIAL_PORT = "/dev/serial/by-path/platform-fd500000.pcie-pci-0000:01:00.0-usb-0:1.3:1.0" DEFAULT_SERIAL_PORT = "/dev/ttyAMA4" DEFAULT_SERIAL_SPEED = 921600 response_received = -1 antenna = None def print_message(message): """ This function is a callback that will be executed for each message received from the antenna that is not a direct response to a command. """ parsed = json.loads(str(message).replace("nan", "'nan'").replace("'",'"')) logger.info(json.dumps(parsed, indent=2)) try: global response_received response_received = message["id"] logger.info("resp="+str(response_received)) except Exception as e: logger.info("exception="+str(e)) pass def test_callback(message): global antenna data=antenna.get_usbl_data() global response_received response_received = message["id"] logger.info("test!!!!!!"+str(data)) return message def check_all_getters(antenna): """ Calls all get_* methods on the antenna object and prints the results. """ print("\n--- Checking all getter methods ---") # A list of all getter methods to call. # Some methods require arguments, which are provided as tuples. # (method_name, args_tuple) getters_to_test = [ #('get_timestamp', ()), #('get_distance', (0,)), #('get_distance', (1,)), #('get_chart_data', ()), ('get_attitude', (0,)), #!!OK #('get_attitude', (1,)), #('get_temperature', ()), #('get_dataset_config', (0,)), #('get_distance_setup', ()), #('get_chart_setup', ()), #('get_transceiver_settings', ()), #('get_sound_speed', ()), #('get_uart_config', (1, 0)), # uart_id=1, version=0 #('get_uart_config', (1, 1)), # uart_id=1, version=1 ('get_version_info', ()), #!! OK ('get_mark_status', ()), #!! OK #('get_diagnostics', ()), #('get_navigation_data', ()), #('get_dvl_velocity_data', ()), #('get_signal_encoder_data', ()), #('get_signal_decoder_data', ()), #('get_auto_response_timeout', ()), #('get_auto_response_filter', ()), #('get_auto_response_payload', ()), ('get_usbl_solution', ()), ] for method_name, args in getters_to_test: try: method = getattr(antenna, method_name) print(f"Calling {method_name}{args}...") result = method(*args) print(f"Result: {result}") except Exception as e: print(f"An error occurred while calling {method_name}: {e}") print("-" * 20) time.sleep(0.01) # Give the device a moment between commands print("--- Finished checking all getter methods ---\n") def main(): """ Main function to connect to the Kogger antenna and listen for messages. """ # Determine the serial port to use if len(sys.argv) > 1: serial_port = sys.argv[1] else: serial_port = DEFAULT_SERIAL_PORT print(f"No serial port provided. Using default: {serial_port}") # Instantiate the driver global antenna antenna = kogger_protocol_driver.KoggerSBPDevice(serial_port, DEFAULT_SERIAL_SPEED, default_timeout=0.02, log_level=LOG_LEVEL, log_file=LOG_FILE) try: # Connect to the antenna if not antenna.connect(): print(f"Failed to connect to the antenna on port {serial_port}", file=sys.stderr) sys.exit(1) print(f"Successfully connected to the antenna on {serial_port}.") # Perform a one-time check of all getter functions check_all_getters(antenna) result = antenna.set_auto_response_filter(0) logger.info("set_auto_response_filter(0)="+str(result)) result = antenna.set_auto_response_timeout(0xffffffff) logger.info("set_auto_response_timeout(0xffffffff)="+str(result)) result = antenna.set_auto_response_payload(0xff) logger.info("set_auto_response_payload(0xff)="+str(result)) antenna.register_precallback(kogger_protocol_driver.ID_USBL_SOLUTION, antenna.callback_usbl_solution) antenna.register_callback(kogger_protocol_driver.ID_USBL_SOLUTION, test_callback) antenna.register_default_callback(print_message) response_received_old = -1 global response_received while True: logger.info("response_received="+str(response_received)) if response_received_old != response_received: response_received_old = response_received try: result = antenna.set_auto_response_payload(response_received) except: result = antenna.set_auto_response_payload(0xff) logger.info("antenna.set_auto_response_payload("+str(response_received)+")="+str(result)) time.sleep(1) for i in range(10000): #result = antenna.send_acoustic_ping(i%9) result = antenna.send_acoustic_ping(0) print("sent "+str(i%9)+"result="+str(result)) time.sleep(1) exit() except KeyboardInterrupt: print("\nExiting...") except Exception as e: print(f"An error occurred: {e}", file=sys.stderr) exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] print(f"type={exc_type}, fname={fname}, line={exc_tb.tb_lineno}") finally: # Ensure the connection is closed gracefully print("Disconnecting from the antenna.") antenna.disconnect() if __name__ == "__main__": main()