Files
Poulpe 9a158f5c5f Initial: ContinuousTransponder wrapper for Kogger USBL
High-level Python wrapper around the upstream cosma-tech/kogger_acousticAntenna
driver. Configures a Kogger acoustic antenna as a permanent slave transponder
in a single start() call: address filter, echo filter, optional TDMA sync slot,
permanent response window, and Python callbacks for each ping received.

No modification to the upstream driver — only composes existing public methods
in the right order. Snapshot of upstream driver included read-only under driver/
for reference.

Includes:
- transponder_continu.py (302 lines): the wrapper class + CLI
- examples/auv_slave.py (79 lines): usage example with logging
- README.md: design rationale, usage, multi-AUV TDMA, watchdog, hardware wiring
- driver/: snapshot of cosma-tech/kogger_acousticAntenna at commit 1b539f9
  ('Add index slot for multi pinger', 2025-03-11)

Built for Cosma context (USV master + N AUVs slaves) following the design
conversation in Discord #ping-pong-ping (2026-04-27). See poulpe/ping-pong-ping
on Gitea for the interactive demo of the protocol.
2026-04-27 22:08:44 +00:00

842 lines
49 KiB
Python
Executable File

#! /usr/bin/env python
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox, filedialog
import threading
import queue
import sys
import os
import time # For delays in "Get All"
# Ensure kogger_protocol_driver.py is accessible
try:
from loguru import logger
from kogger_protocol_driver import KoggerSBPDevice, KEY_CONFIRM
from kogger_protocol_driver import (
ID_TIMESTAMP, ID_DIST, ID_ATTITUDE, ID_TEMP, ID_UART, ID_FLASH, ID_BOOT,
ID_DATASET, ID_DIST_SETUP, ID_CHART_SETUP, ID_TRANSC, ID_SND_SPD,
ID_VERSION, ID_MARK, ID_DIAG, ID_NAV, ID_DVL_VEL, ID_IMU_SETUP, ID_UPDATE
)
except ImportError as e:
try:
import logging
_crit_logger = logging.getLogger(__name__)
_crit_handler = logging.StreamHandler(sys.stderr)
_crit_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
_crit_handler.setFormatter(_crit_formatter)
_crit_logger.addHandler(_crit_handler)
_crit_logger.setLevel(logging.CRITICAL)
_crit_logger.critical(f"Failed to import necessary modules: {e}. Make sure 'kogger_protocol_driver.py' is in the same directory or your PYTHONPATH, and 'loguru' is installed.")
except ImportError:
print(f"CRITICAL ERROR: Failed to import necessary modules: {e}. Make sure 'kogger_protocol_driver.py' is in the same directory or your PYTHONPATH, and 'loguru' is installed.")
sys.exit(1)
class TkinterLogHandler:
"""A handler for Loguru that redirects log messages to a Tkinter Text widget."""
def __init__(self, text_widget):
"""
Initializes the log handler.
:param text_widget: The Tkinter Text widget to which logs will be written.
"""
self.text_widget = text_widget
self.queue = queue.Queue()
self.text_widget.after(100, self._process_log_queue)
def write(self, message):
"""
Called by Loguru to write a log message. Adds message to an internal queue.
:param message: The log message string.
"""
self.queue.put(message)
def _process_log_queue(self):
"""Processes messages from the log queue and inserts them into the Text widget."""
try:
while not self.queue.empty():
message = self.queue.get_nowait()
if self.text_widget.winfo_exists():
self.text_widget.configure(state=tk.NORMAL)
self.text_widget.insert(tk.END, message)
self.text_widget.see(tk.END)
self.text_widget.configure(state=tk.DISABLED)
if self.text_widget.winfo_exists():
self.text_widget.after(100, self._process_log_queue)
except Exception as e:
print(f"Error in TkinterLogHandler: {e}")
class KoggerGuiApp:
"""Main application class for the Kogger SBP Control Panel GUI."""
def __init__(self, root_window):
"""
Initializes the main application window and its components.
:param root_window: The main Tkinter window (tk.Tk instance).
"""
self.root = root_window
self.root.title("Kogger SBP Control Panel")
self.root.geometry("950x750")
self.driver = None
self.command_queue = queue.Queue() # For results from driver threads to GUI
self.unsolicited_log_queue = queue.Queue() # For unsolicited messages to GUI log
self.log_text_widget = scrolledtext.ScrolledText(self.root, state=tk.DISABLED, height=10, wrap=tk.WORD, font=("Consolas", 9))
self.log_text_widget.pack(side=tk.BOTTOM, fill=tk.X, padx=5, pady=5)
self.configure_logging()
self.connection_frame = ttk.LabelFrame(self.root, text="Connection")
self.connection_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5)
self.notebook = ttk.Notebook(self.root)
self.notebook.pack(expand=True, fill="both", padx=5, pady=5)
self.tab_measurements = ttk.Frame(self.notebook)
self.tab_device_settings = ttk.Frame(self.notebook)
self.tab_data_settings = ttk.Frame(self.notebook)
self.tab_system = ttk.Frame(self.notebook)
self.tab_navigation = ttk.Frame(self.notebook)
self.notebook.add(self.tab_measurements, text="Measurements & Actions")
self.notebook.add(self.tab_device_settings, text="Device Settings")
self.notebook.add(self.tab_data_settings, text="Data Config")
self.notebook.add(self.tab_system, text="System & Flash")
self.notebook.add(self.tab_navigation, text="Navigation")
self._create_connection_widgets()
self._create_measurement_widgets()
self._create_device_settings_widgets()
self._create_data_settings_widgets()
self._create_system_widgets()
self._create_navigation_widgets()
self.root.protocol("WM_DELETE_WINDOW", self._on_closing)
self.root.after(100, self._process_command_queue)
self.root.after(100, self._process_unsolicited_log_queue)
def configure_logging(self):
"""Configures Loguru to output to console and the GUI's log widget."""
logger.remove()
logger.add(sys.stderr, level="DEBUG",
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>")
if self.log_text_widget.winfo_exists():
log_handler = TkinterLogHandler(self.log_text_widget)
logger.add(log_handler, format="{time:HH:mm:ss} | {level: <7} | {message}", level="INFO")
logger.info("GUI Initialized. Logging configured.")
def _create_connection_widgets(self):
"""Creates widgets for the connection panel (port, baud, connect button)."""
ttk.Label(self.connection_frame, text="Port:").grid(row=0, column=0, padx=2, pady=2, sticky="w")
self.port_var = tk.StringVar(value="/dev/ttyUSB0")
ttk.Entry(self.connection_frame, textvariable=self.port_var, width=15).grid(row=0, column=1, padx=2, pady=2)
ttk.Label(self.connection_frame, text="Baud:").grid(row=0, column=2, padx=2, pady=2, sticky="w")
self.baud_var = tk.StringVar(value="115200")
ttk.Entry(self.connection_frame, textvariable=self.baud_var, width=10).grid(row=0, column=3, padx=2, pady=2)
ttk.Label(self.connection_frame, text="Addr:").grid(row=0, column=4, padx=2, pady=2, sticky="w")
self.addr_var = tk.StringVar(value="0")
ttk.Entry(self.connection_frame, textvariable=self.addr_var, width=3).grid(row=0, column=5, padx=2, pady=2)
self.connect_button = ttk.Button(self.connection_frame, text="Connect", command=self._toggle_connection)
self.connect_button.grid(row=0, column=6, padx=5, pady=2)
self.connection_status_var = tk.StringVar(value="Status: Disconnected")
ttk.Label(self.connection_frame, textvariable=self.connection_status_var).grid(row=0, column=7, padx=5, pady=2, sticky="w")
def _create_labeled_entry(self, parent, label_text, default_value="", width=10):
"""
Helper to create a labeled entry field.
:param parent: The parent widget.
:param label_text: Text for the label.
:param default_value: Default value for the entry.
:param width: Width of the entry field.
:return: Tuple (frame_containing_label_and_entry, string_var_for_entry, entry_widget).
"""
frame = ttk.Frame(parent)
ttk.Label(frame, text=label_text).pack(side=tk.LEFT, padx=(0,2))
var = tk.StringVar(value=default_value)
entry = ttk.Entry(frame, textvariable=var, width=width)
entry.pack(side=tk.LEFT)
return frame, var, entry
def _create_result_display(self, parent, label_text, width=50):
"""
Helper to create a display area (label + read-only entry) for command results.
:param parent: The parent widget.
:param label_text: Text for the label describing the result.
:param width: Width of the result display entry.
:return: Tuple (frame_containing_label_and_display, string_var_for_result_display).
"""
frame = ttk.Frame(parent)
ttk.Label(frame, text=label_text, anchor="w").pack(side=tk.LEFT, padx=(0, 2))
var = tk.StringVar(value="N/A")
entry = ttk.Entry(frame, textvariable=var, state="readonly", width=width)
entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
return frame, var
def _create_measurement_widgets(self):
"""Creates widgets for the 'Measurements & Actions' tab, including the 'Get All' button."""
parent = self.tab_measurements
row_idx = 0
self.get_all_button = ttk.Button(parent, text="🔄 Get All Available Data", command=self._get_all_data_sequentially)
self.get_all_button.grid(row=row_idx, column=0, columnspan=2, sticky="ew", padx=5, pady=10)
row_idx += 1
ttk.Separator(parent, orient='horizontal').grid(row=row_idx, column=0, columnspan=2, sticky='ew', pady=5)
row_idx +=1
# Timestamp
f, self.ts_result_var = self._create_result_display(parent, "Timestamp (ms):")
f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get Timestamp", command=lambda: self._run_driver_command(self.driver.get_timestamp, "Timestamp", self.ts_result_var)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2)
row_idx += 1
# Distance v0
f, self.dist_v0_result_var = self._create_result_display(parent, "Distance v0 (mm):")
f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get Distance (v0)", command=lambda: self._run_driver_command(self.driver.get_distance, "Distance v0", self.dist_v0_result_var, version=0)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2)
row_idx += 1
# Distance v1
f, self.dist_v1_result_var = self._create_result_display(parent, "Distance v1 (full):")
f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get Distance (v1)", command=lambda: self._run_driver_command(self.driver.get_distance, "Distance v1", self.dist_v1_result_var, version=1)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2)
row_idx += 1
# Attitude v0
f, self.att_v0_result_var = self._create_result_display(parent, "Attitude v0 (Euler):")
f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get Attitude (v0)", command=lambda: self._run_driver_command(self.driver.get_attitude, "Attitude v0", self.att_v0_result_var, version=0)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2)
row_idx += 1
# Attitude v1
f, self.att_v1_result_var = self._create_result_display(parent, "Attitude v1 (Quat):")
f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get Attitude (v1)", command=lambda: self._run_driver_command(self.driver.get_attitude, "Attitude v1", self.att_v1_result_var, version=1)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2)
row_idx += 1
# Temperature
f, self.temp_result_var = self._create_result_display(parent, "Temperature (°C):")
f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get Temperature", command=lambda: self._run_driver_command(self.driver.get_temperature, "Temperature", self.temp_result_var)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2)
row_idx += 1
# Chart Data
f, self.chart_result_var = self._create_result_display(parent, "Chart Info:", width=60)
f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get Chart Data", command=lambda: self._run_driver_command(self.driver.get_chart_data, "Chart Data", self.chart_result_var)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2)
row_idx += 1
def _create_device_settings_widgets(self):
"""Creates widgets for the 'Device Settings' tab (UART, Transceiver, Sound Speed)."""
parent = self.tab_device_settings
current_row = 0
# UART Config
uart_frame = ttk.LabelFrame(parent, text="UART Configuration (ID 0x18)")
uart_frame.grid(row=current_row, column=0, columnspan=3, sticky="ew", padx=5, pady=5, ipady=5)
current_row +=1
f_uart_id, self.uart_id_var, _ = self._create_labeled_entry(uart_frame, "UART ID:", "1", 3)
f_uart_id.grid(row=0, column=0, padx=2, pady=2, sticky="w")
f_uart_baud, self.uart_baud_var, _ = self._create_labeled_entry(uart_frame, "Set Baudrate:", "115200")
f_uart_baud.grid(row=1, column=0, padx=2, pady=2, sticky="w")
f_uart_addr, self.uart_new_addr_var, _ = self._create_labeled_entry(uart_frame, "Set New Dev Addr:", "0", 3)
f_uart_addr.grid(row=2, column=0, padx=2, pady=2, sticky="w")
ttk.Button(uart_frame, text="Set Baudrate (v0)", command=self._set_uart_baud).grid(row=1, column=1, padx=5, pady=2, sticky="ew")
ttk.Button(uart_frame, text="Set Dev Address (v1)", command=self._set_uart_dev_addr).grid(row=2, column=1, padx=5, pady=2, sticky="ew")
f_uart_get_v, self.uart_get_ver_var, _ = self._create_labeled_entry(uart_frame, "Get Info for Version (0/1):", "0", 2)
f_uart_get_v.grid(row=3, column=0, padx=2, pady=2, sticky="w")
ttk.Button(uart_frame, text="Get UART Config", command=self._get_uart_config).grid(row=3, column=1, padx=5, pady=2, sticky="ew")
f_uart_res, self.uart_get_result_var = self._create_result_display(uart_frame, "UART Get Result:")
f_uart_res.grid(row=4, column=0, columnspan=2, sticky="ew", padx=5, pady=2)
# Transceiver Settings
transc_frame = ttk.LabelFrame(parent, text="Transceiver Settings (ID 0x14)")
transc_frame.grid(row=current_row, column=0, columnspan=3, sticky="ew", padx=5, pady=5, ipady=5)
current_row += 1
f_freq, self.transc_freq_var, _ = self._create_labeled_entry(transc_frame, "Freq (kHz):", "675")
f_freq.grid(row=0, column=0)
f_pulse, self.transc_pulse_var, _ = self._create_labeled_entry(transc_frame, "Pulse Count:", "10")
f_pulse.grid(row=0, column=1)
self.transc_boost_var = tk.BooleanVar(value=True)
ttk.Checkbutton(transc_frame, text="Boost Enabled", variable=self.transc_boost_var).grid(row=0, column=2)
ttk.Button(transc_frame, text="Set Transceiver Cfg", command=self._set_transceiver_settings).grid(row=1, column=0, columnspan=3, sticky="ew")
f_transc_get, self.transc_get_result_var = self._create_result_display(transc_frame, "Current Transc:")
f_transc_get.grid(row=2, column=0, columnspan=2, sticky="ew")
ttk.Button(transc_frame, text="Get Transceiver Cfg", command=lambda: self._run_driver_command(self.driver.get_transceiver_settings, "Transceiver Settings", self.transc_get_result_var)).grid(row=2, column=2, sticky="ew")
# Sound Speed Settings
snd_spd_frame = ttk.LabelFrame(parent, text="Sound Speed (ID 0x15)")
snd_spd_frame.grid(row=current_row, column=0, columnspan=3, sticky="ew", padx=5, pady=5, ipady=5)
current_row += 1
f_sspd, self.sspd_var, _ = self._create_labeled_entry(snd_spd_frame, "Speed (mm/s):", "1500000")
f_sspd.grid(row=0, column=0)
ttk.Button(snd_spd_frame, text="Set Sound Speed", command=self._set_sound_speed).grid(row=0, column=1)
f_sspd_get, self.sspd_get_result_var = self._create_result_display(snd_spd_frame, "Current Speed:")
f_sspd_get.grid(row=1, column=0, sticky="ew")
ttk.Button(snd_spd_frame, text="Get Sound Speed", command=lambda: self._run_driver_command(self.driver.get_sound_speed, "Sound Speed", self.sspd_get_result_var)).grid(row=1, column=1)
def _create_data_settings_widgets(self):
"""Creates widgets for the 'Data Config' tab (Dataset, Distance Setup, Chart Setup)."""
parent = self.tab_data_settings
current_row = 0
dataset_frame = ttk.LabelFrame(parent, text="Dataset Configuration (ID 0x10)")
dataset_frame.grid(row=current_row, column=0, sticky="ew", padx=5, pady=5, ipady=5)
current_row += 1
f_ds_ch, self.ds_ch_id_var, _ = self._create_labeled_entry(dataset_frame, "Ch ID (0-2):", "0", 3)
f_ds_ch.grid(row=0, column=0)
f_ds_period, self.ds_period_var, _ = self._create_labeled_entry(dataset_frame, "Period (ms):", "0")
f_ds_period.grid(row=0, column=1)
f_ds_mask, self.ds_mask_var, _ = self._create_labeled_entry(dataset_frame, "Mask (hex):", "0x0")
f_ds_mask.grid(row=0, column=2)
ttk.Button(dataset_frame, text="Set Dataset Cfg", command=self._set_dataset_config).grid(row=1, column=0, columnspan=3, sticky="ew")
f_ds_get_ch, self.ds_get_ch_id_var, _ = self._create_labeled_entry(dataset_frame, "Get Ch ID:", "0", 3)
f_ds_get_ch.grid(row=2, column=0)
f_ds_get, self.ds_get_result_var = self._create_result_display(dataset_frame, "Current Dataset Cfg:")
f_ds_get.grid(row=3, column=0, columnspan=2, sticky="ew")
ttk.Button(dataset_frame, text="Get Dataset Cfg", command=self._get_dataset_config).grid(row=2, column=1, sticky="ew")
distsetup_frame = ttk.LabelFrame(parent, text="Distance Setup (ID 0x11)")
distsetup_frame.grid(row=current_row, column=0, sticky="ew", padx=5, pady=5, ipady=5)
current_row += 1
f_distoff, self.distsetup_offset_var, _ = self._create_labeled_entry(distsetup_frame, "Start Offset (mm):", "0")
f_distoff.grid(row=0, column=0)
f_distmax, self.distsetup_max_var, _ = self._create_labeled_entry(distsetup_frame, "Max Dist (mm):", "50000")
f_distmax.grid(row=0, column=1)
ttk.Button(distsetup_frame, text="Set Distance Setup", command=self._set_distance_setup).grid(row=0, column=2)
f_distsetup_get, self.distsetup_get_result_var = self._create_result_display(distsetup_frame, "Current Dist Setup:")
f_distsetup_get.grid(row=1, column=0, columnspan=2, sticky="ew")
ttk.Button(distsetup_frame, text="Get Distance Setup", command=lambda: self._run_driver_command(self.driver.get_distance_setup, "Distance Setup", self.distsetup_get_result_var)).grid(row=1, column=2)
chartsetup_frame = ttk.LabelFrame(parent, text="Chart Setup (ID 0x12)")
chartsetup_frame.grid(row=current_row, column=0, sticky="ew", padx=5, pady=5, ipady=5)
current_row += 1
f_cs_count, self.cs_count_var, _ = self._create_labeled_entry(chartsetup_frame, "Sample Count:", "1000")
f_cs_count.grid(row=0,column=0)
f_cs_resol, self.cs_resol_var, _ = self._create_labeled_entry(chartsetup_frame, "Sample Resol (mm):", "10")
f_cs_resol.grid(row=0,column=1)
f_cs_offset, self.cs_offset_var, _ = self._create_labeled_entry(chartsetup_frame, "Sample Offset:", "0")
f_cs_offset.grid(row=0,column=2)
ttk.Button(chartsetup_frame, text="Set Chart Setup", command=self._set_chart_setup).grid(row=1, column=0, columnspan=3, sticky="ew")
f_cs_get, self.cs_get_result_var = self._create_result_display(chartsetup_frame, "Current Chart Setup:")
f_cs_get.grid(row=2, column=0, columnspan=2, sticky="ew")
ttk.Button(chartsetup_frame, text="Get Chart Setup", command=lambda: self._run_driver_command(self.driver.get_chart_setup, "Chart Setup", self.cs_get_result_var)).grid(row=2, column=2)
def _create_system_widgets(self):
"""Creates widgets for the 'System & Flash' tab."""
parent = self.tab_system
current_row = 0
f_ver, self.ver_info_result_var = self._create_result_display(parent, "Version Info:", width=70)
f_ver.grid(row=current_row, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get Version Info", command=lambda: self._run_driver_command(self.driver.get_version_info, "Version Info", self.ver_info_result_var)).grid(row=current_row, column=0, sticky="w", padx=5, pady=2)
current_row += 1
mark_frame = ttk.LabelFrame(parent, text="Device Mark (ID 0x21)")
mark_frame.grid(row=current_row, column=0, columnspan=2, sticky="ew", padx=5, pady=5)
current_row += 1
self.mark_enable_var = tk.BooleanVar()
ttk.Checkbutton(mark_frame, text="Enable Mark (via ID_MARK SETTING)", variable=self.mark_enable_var).pack(side=tk.LEFT)
ttk.Button(mark_frame, text="Set Mark", command=self._set_mark).pack(side=tk.LEFT, padx=5)
f_mark_get, self.mark_get_result_var = self._create_result_display(mark_frame, "Mark Status:")
f_mark_get.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5)
ttk.Button(mark_frame, text="Get Mark Status", command=lambda: self._run_driver_command(self.driver.get_mark_status, "Mark Status", self.mark_get_result_var)).pack(side=tk.LEFT, padx=5)
f_diag, self.diag_result_var = self._create_result_display(parent, "Diagnostics:", width=70)
f_diag.grid(row=current_row, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get Diagnostics", command=lambda: self._run_driver_command(self.driver.get_diagnostics, "Diagnostics", self.diag_result_var)).grid(row=current_row, column=0, sticky="w", padx=5, pady=2)
current_row += 1
flash_frame = ttk.LabelFrame(parent, text="Flash Operations (ID 0x23)")
flash_frame.grid(row=current_row, column=0, columnspan=2, sticky="ew", padx=5, pady=5)
current_row += 1
ttk.Button(flash_frame, text="Save Settings to Flash", command=lambda: self._run_driver_command(self.driver.save_settings_to_flash, "Save Flash")).grid(row=0, column=0, padx=5, pady=2)
ttk.Button(flash_frame, text="Restore Settings from Flash", command=lambda: self._run_driver_command(self.driver.restore_settings_from_flash, "Restore Flash")).grid(row=0, column=1, padx=5, pady=2)
ttk.Button(flash_frame, text="Erase Flash Settings", command=lambda: self._run_driver_command(self.driver.erase_flash_settings, "Erase Flash")).grid(row=0, column=2, padx=5, pady=2)
boot_frame = ttk.LabelFrame(parent, text="Boot Operations (ID 0x24)")
boot_frame.grid(row=current_row, column=0, columnspan=2, sticky="ew", padx=5, pady=5)
current_row += 1
ttk.Button(boot_frame, text="Reboot Device", command=self._reboot_device).grid(row=0, column=0, padx=5, pady=2)
ttk.Button(boot_frame, text="Run FW from Bootloader", command=lambda: self._run_driver_command(self.driver.run_firmware_from_bootloader, "Run FW")).grid(row=0, column=1, padx=5, pady=2)
imu_frame = ttk.LabelFrame(parent, text="IMU Calibration (ID 0x1B - In Dev)")
imu_frame.grid(row=current_row, column=0, columnspan=2, sticky="ew", padx=5, pady=5)
current_row +=1
self.imu_gyro_var = tk.BooleanVar()
ttk.Checkbutton(imu_frame, text="Calibrate Gyro", variable=self.imu_gyro_var).pack(side=tk.LEFT, padx=5)
self.imu_accel_var = tk.BooleanVar()
ttk.Checkbutton(imu_frame, text="Calibrate Accel", variable=self.imu_accel_var).pack(side=tk.LEFT, padx=5)
ttk.Button(imu_frame, text="Start IMU Calibration", command=self._set_imu_calibration).pack(side=tk.LEFT, padx=5)
fw_frame = ttk.LabelFrame(parent, text="Firmware Update (ID 0x25 - Simplified)")
fw_frame.grid(row=current_row, column=0, columnspan=2, sticky="ew", padx=5, pady=5)
current_row += 1
f_fw_pkt, self.fw_pkt_num_var, _ = self._create_labeled_entry(fw_frame, "Packet #:", "1", 5)
f_fw_pkt.pack(side=tk.LEFT)
f_fw_data, self.fw_data_var, _ = self._create_labeled_entry(fw_frame, "Data (hex):", "", 40)
f_fw_data.pack(side=tk.LEFT)
ttk.Button(fw_frame, text="Upload FW Chunk", command=self._upload_fw_chunk).pack(side=tk.LEFT, padx=5)
def _create_navigation_widgets(self):
"""Creates widgets for the 'Navigation' tab."""
parent = self.tab_navigation
current_row = 0
f_nav, self.nav_result_var = self._create_result_display(parent, "Navigation Data:", width=60)
f_nav.grid(row=current_row, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get Nav Data", command=lambda: self._run_driver_command(self.driver.get_navigation_data, "Nav Data", self.nav_result_var)).grid(row=current_row, column=0, sticky="w", padx=5, pady=2)
current_row += 1
f_dvl, self.dvl_result_var = self._create_result_display(parent, "DVL Velocity:", width=70)
f_dvl.grid(row=current_row, column=1, sticky="ew", padx=5, pady=2)
ttk.Button(parent, text="Get DVL Velocity", command=lambda: self._run_driver_command(self.driver.get_dvl_velocity_data, "DVL Velocity", self.dvl_result_var)).grid(row=current_row, column=0, sticky="w", padx=5, pady=2)
current_row += 1
# --- Driver Interaction & Threading ---
def _toggle_connection(self):
"""Toggles the device connection state (connect or disconnect)."""
if self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open:
self._disconnect_driver()
else:
self._connect_driver()
def _connect_driver(self):
"""Initiates connection to the device in a separate thread."""
port = self.port_var.get()
if not port:
messagebox.showerror("Error", "Serial port cannot be empty.")
return
try:
baud = int(self.baud_var.get())
addr = int(self.addr_var.get())
except ValueError:
messagebox.showerror("Error", "Baud rate and Address must be integers.")
return
self.driver = KoggerSBPDevice(port, baud, addr, default_timeout=1.5)
self.driver.register_default_callback(self._handle_unsolicited_from_driver)
self.connection_status_var.set("Status: Connecting...")
self.connect_button.config(text="Connecting...", state=tk.DISABLED)
threading.Thread(target=self._connect_thread_target, daemon=True).start()
def _connect_thread_target(self):
"""Target function for the connection thread. Handles actual connection attempt."""
if self.driver.connect():
self.command_queue.put(("status_update", ("Status: Connected", "green", "Disconnect")))
else:
self.command_queue.put(("status_update", ("Status: Failed to Connect", "red", "Connect")))
self.driver = None
def _disconnect_driver(self):
"""Initiates disconnection from the device in a separate thread."""
if self.driver:
logger.info("GUI: Disconnecting driver...")
if hasattr(self.driver, 'unregister_default_callback'):
self.driver.unregister_default_callback()
self.connect_button.config(text="Disconnecting...", state=tk.DISABLED)
threading.Thread(target=self._disconnect_thread_target, daemon=True).start()
else:
logger.info("GUI: Driver not initialized, cannot disconnect.")
def _disconnect_thread_target(self):
"""Target function for the disconnection thread."""
if self.driver:
self.driver.disconnect()
self.driver = None
self.command_queue.put(("status_update", ("Status: Disconnected", "black", "Connect")))
def _run_driver_command(self, driver_method_ref, command_name, result_var_target=None, *args, **kwargs):
"""
Executes a given driver method in a separate thread.
Posts its result to the GUI's command queue for UI updates.
:param driver_method_ref: The method of the `KoggerSBPDevice` instance to call.
:param command_name: A descriptive name for the command (for logging/display).
:param result_var_target: The Tkinter StringVar to update with the result.
:param args: Positional arguments for the driver method.
:param kwargs: Keyword arguments for the driver method.
"""
if not (self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open):
logger.warning(f"Cannot run '{command_name}': Not connected.")
messagebox.showwarning("Not Connected", "Please connect to the device first.")
if result_var_target: result_var_target.set("Error: Not Connected")
return
logger.info(f"GUI: Queuing execution for: {command_name} with args: {args}, kwargs: {kwargs}")
if result_var_target: result_var_target.set("Executing...")
thread = threading.Thread(target=self._command_thread_target,
args=(driver_method_ref, command_name, result_var_target) + args,
kwargs=kwargs, daemon=True)
thread.start()
def _command_thread_target(self, driver_method_ref, command_name, result_var_target, *args, **kwargs):
"""
Target function for the thread that executes a single driver command.
Posts the result or error to the GUI's command queue.
"""
try:
result = driver_method_ref(*args, **kwargs)
self.command_queue.put((command_name, result, result_var_target))
except Exception as e:
logger.error(f"Exception in driver command thread for '{command_name}': {e}", exc_info=True)
self.command_queue.put((command_name, f"Exception: {type(e).__name__}", result_var_target))
def _process_command_queue(self):
"""Processes results and status updates from the command queue to update the GUI."""
try:
while True:
message_type, data, target_var = self.command_queue.get_nowait()
if message_type == "status_update":
status_text, _, button_text = data
self.connection_status_var.set(status_text)
self.connect_button.config(text=button_text, state=tk.NORMAL)
logger.info(f"GUI: Connection status updated to '{status_text}'")
elif message_type == "get_all_finished":
if hasattr(self, 'get_all_button'): # Check if button exists
self.get_all_button.config(state=tk.NORMAL)
logger.info("GUI: 'Get All Available Data' sequence finished.")
elif message_type == "set_fetching_status": # New message type for "Get All"
# data is command_name_desc, target_var is the actual StringVar
command_name_desc = data
if target_var and isinstance(target_var, tk.StringVar):
target_var.set("Fetching...")
logger.debug(f"GUI: Set fetching status for {command_name_desc}")
else:
logger.warning(f"GUI: Invalid target_var for set_fetching_status of {command_name_desc}")
else: # Assumed to be a command result
command_name = message_type
result = data
result_var_target = target_var
logger.info(f"GUI: Received result for '{command_name}': {str(result)[:100]}")
if result_var_target and isinstance(result_var_target, tk.StringVar):
display_str = "N/A"
if isinstance(result, dict):
items = []
for k, v_item in result.items():
if isinstance(v_item, bytes): items.append(f"{k}: {v_item.hex().upper()[:20]}{'...' if len(v_item)>10 else ''}")
elif isinstance(v_item, float): items.append(f"{k}: {v_item:.3f}")
else: items.append(f"{k}: {v_item}")
display_str = ", ".join(items)
if len(display_str) > 70: display_str = display_str[:67] + "..."
elif isinstance(result, bool): display_str = "Success" if result else "Failed/False"
elif result is None: display_str = "No data / Failed / Timeout"
elif isinstance(result, (str, bytes)): display_str = result if isinstance(result, str) else result.hex().upper()
else: display_str = str(result)
result_var_target.set(display_str)
if result is True: logger.success(f"Command '{command_name}' reported success (True).")
elif result is False: logger.warning(f"Command '{command_name}' reported failure (False).")
elif result is None: logger.warning(f"Command '{command_name}' returned None (no data or error).")
except queue.Empty:
pass
except Exception as e:
logger.error(f"Error processing command queue: {e}", exc_info=True)
finally:
if self.root.winfo_exists():
self.root.after(100, self._process_command_queue)
def _handle_unsolicited_from_driver(self, frame):
""" Puts unsolicited messages (received by driver's callback) into a GUI-thread-safe queue. """
self.unsolicited_log_queue.put(frame)
def _process_unsolicited_log_queue(self):
"""Processes unsolicited messages from its queue and logs them via Loguru (to Tkinter widget)."""
try:
while True:
frame = self.unsolicited_log_queue.get_nowait()
logger.info(f"[UNSOLICITED In GUI] ID: {frame['id']:#02x}, Mode: {frame['mode']:#02x}, Payload: {frame['payload'].hex().upper()}")
except queue.Empty:
pass
finally:
if self.root.winfo_exists():
self.root.after(100, self._process_unsolicited_log_queue)
def _on_closing(self):
"""Handles the event when the main window is closed by the user."""
logger.info("Application closing initiated by user...")
if self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open:
logger.info("Attempting to disconnect driver on close...")
self._disconnect_driver() # This is now threaded, main window might close before it finishes
self.root.destroy()
logger.info("Application window destroyed.") # This log might go to console only
# --- Specific Command Handlers for SET operations & "Get All" ---
def _set_uart_baud(self):
"""Handles 'Set UART Baudrate' button click. Parses inputs and calls driver."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
try:
uart_id = int(self.uart_id_var.get())
baud = int(self.uart_baud_var.get())
self._run_driver_command(self.driver.set_uart_config, "Set UART Baudrate",
uart_id=uart_id, baudrate=baud, new_dev_address=None)
except ValueError: messagebox.showerror("Input Error", "UART ID and Baudrate must be integers.")
def _set_uart_dev_addr(self):
"""Handles 'Set UART Device Address' button click. Parses inputs and calls driver."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
try:
uart_id = int(self.uart_id_var.get())
addr = int(self.uart_new_addr_var.get())
if not (0 <= addr <= 15):
messagebox.showerror("Input Error", "Device Address must be between 0 and 15.")
return
self._run_driver_command(self.driver.set_uart_config, "Set UART Device Address",
uart_id=uart_id, new_dev_address=addr)
except ValueError: messagebox.showerror("Input Error", "UART ID and New Address must be integers.")
def _get_uart_config(self):
"""Handles 'Get UART Config' button click. Parses inputs and calls driver."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
try:
uart_id = int(self.uart_id_var.get())
version_str = self.uart_get_ver_var.get()
version = int(version_str) if version_str else 0 # Default to version 0 if empty
if version not in [0,1]:
messagebox.showerror("Input Error", "Version must be 0 or 1.")
return
self._run_driver_command(self.driver.get_uart_config, f"Get UART Config v{version}", self.uart_get_result_var,
uart_id=uart_id, version=version)
except ValueError: messagebox.showerror("Input Error", "UART ID and Version must be integers.")
def _set_transceiver_settings(self):
"""Handles 'Set Transceiver Config' button click."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
try:
freq = int(self.transc_freq_var.get())
pulse = int(self.transc_pulse_var.get())
boost = self.transc_boost_var.get()
self._run_driver_command(self.driver.set_transceiver_settings, "Set Transceiver Settings",
frequency_khz=freq, pulse_count=pulse, boost_enabled=boost)
except ValueError: messagebox.showerror("Input Error", "Frequency and Pulse Count must be integers.")
def _set_sound_speed(self):
"""Handles 'Set Sound Speed' button click."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
try:
speed = int(self.sspd_var.get())
self._run_driver_command(self.driver.set_sound_speed, "Set Sound Speed", sound_speed_mm_s=speed)
except ValueError: messagebox.showerror("Input Error", "Sound speed must be an integer.")
def _set_dataset_config(self):
"""Handles 'Set Dataset Config' button click."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
try:
ch_id = int(self.ds_ch_id_var.get())
period = int(self.ds_period_var.get())
mask_str = self.ds_mask_var.get()
mask = int(mask_str, 16) if mask_str.lower().startswith("0x") else int(mask_str)
self._run_driver_command(self.driver.set_dataset_config, "Set Dataset Config",
channel_id=ch_id, channel_period_ms=period, channel_mask=mask)
except ValueError: messagebox.showerror("Input Error", "Channel ID, Period, and Mask must be valid integers (Mask can be hex '0x...').")
def _get_dataset_config(self):
"""Handles 'Get Dataset Config' button click."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
try:
ch_id_str = self.ds_get_ch_id_var.get()
ch_id = int(ch_id_str) if ch_id_str else 0 # Default to 0 if empty
self._run_driver_command(self.driver.get_dataset_config, "Get Dataset Config", self.ds_get_result_var,
channel_id_to_request=ch_id)
except ValueError: messagebox.showerror("Input Error", "Channel ID must be an integer.")
def _set_distance_setup(self):
"""Handles 'Set Distance Setup' button click."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
try:
offset = int(self.distsetup_offset_var.get())
max_dist = int(self.distsetup_max_var.get())
self._run_driver_command(self.driver.set_distance_setup, "Set Distance Setup",
start_offset_mm=offset, max_distance_mm=max_dist)
except ValueError: messagebox.showerror("Input Error", "Offset and Max Distance must be integers.")
def _set_chart_setup(self):
"""Handles 'Set Chart Setup' button click."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
try:
count = int(self.cs_count_var.get())
resol = int(self.cs_resol_var.get())
offset = int(self.cs_offset_var.get())
self._run_driver_command(self.driver.set_chart_setup, "Set Chart Setup",
sample_count=count, sample_resolution_mm=resol, sample_offset_num=offset)
except ValueError: messagebox.showerror("Input Error", "Sample Count, Resolution, and Offset must be integers.")
def _set_mark(self):
"""Handles 'Set Mark' button click."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
enable = self.mark_enable_var.get()
if enable:
self._run_driver_command(self.driver.set_mark, "Set Mark (Request Enable)", result_var_target=self.mark_get_result_var, enable_mark=True)
else:
logger.warning("GUI: 'Set Mark' (disable) requested via ID_MARK checkbutton. Protocol docs suggest ID_MARK SETTING command is for setting the mark. Disabling the mark is usually done by host sending a normal command with the MODE.Mark bit cleared.")
messagebox.showinfo("Info", "The ID_MARK SETTING command primarily sets the mark. To clear it, the host typically sends any frame with the mark bit cleared in its MODE field.")
def _reboot_device(self):
"""Handles 'Reboot Device' button click with confirmation."""
if not (self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open):
messagebox.showwarning("Not Connected", "Please connect to the device first.")
return
if messagebox.askyesno("Confirm Reboot", "Are you sure you want to reboot the device? This will disconnect."):
logger.info("GUI: Initiating device reboot...")
self.connect_button.config(text="Rebooting...", state=tk.DISABLED)
threading.Thread(target=self._reboot_thread_target, daemon=True).start()
def _reboot_thread_target(self):
"""Target function for the reboot operation thread."""
success = False
if self.driver:
success = self.driver.reboot_device()
if success: # driver.reboot_device() calls disconnect internally
self.command_queue.put(("status_update", ("Status: Rebooted. Disconnected.", "black", "Connect")))
else:
self.command_queue.put(("status_update", ("Status: Reboot Failed", "red", "Connect")))
def _set_imu_calibration(self):
"""Handles 'Start IMU Calibration' button click."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
gyro = self.imu_gyro_var.get()
accel = self.imu_accel_var.get()
if not gyro and not accel:
messagebox.showinfo("IMU Calibration", "No calibration type selected (Gyro or Accelerometer).")
return
self._run_driver_command(self.driver.set_imu_calibration, "IMU Calibration",
calibrate_gyro=gyro, calibrate_accelerometer=accel)
def _upload_fw_chunk(self):
"""Handles 'Upload FW Chunk' button click."""
if not self.driver: messagebox.showerror("Error", "Not connected!"); return
try:
pkt_num_str = self.fw_pkt_num_var.get()
if not pkt_num_str: messagebox.showerror("Input Error", "Packet number is required."); return
pkt_num = int(pkt_num_str)
data_hex = self.fw_data_var.get()
if not data_hex:
messagebox.showerror("Input Error", "Firmware data (hex) cannot be empty.")
return
data_bytes = bytes.fromhex(data_hex)
self._run_driver_command(self.driver.upload_firmware_update_chunk, "Upload FW Chunk",
packet_number=pkt_num, update_data_chunk=data_bytes)
except ValueError:
messagebox.showerror("Input Error", "Packet number must be int, data must be valid hex string (e.g., AABBCCDD).")
except Exception as e:
messagebox.showerror("Error", f"Failed to prepare FW chunk: {e}")
def _get_all_data_sequentially(self):
"""
Handles the 'Get All Available Data' button click.
Initiates a sequence of all implemented 'get_*' commands in a separate thread.
"""
if not (self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open):
messagebox.showwarning("Not Connected", "Please connect to the device first.")
return
logger.info("GUI: Starting 'Get All Available Data' sequence...")
if hasattr(self, 'get_all_button'): # Ensure button exists
self.get_all_button.config(state=tk.DISABLED)
threading.Thread(target=self._get_all_data_thread_target, daemon=True).start()
def _get_all_data_thread_target(self):
"""
Target function for the 'Get All Data' thread.
Sequentially calls driver methods and posts results to the command queue.
"""
# Ensure all these result_vars are defined in your _create_..._widgets methods
# Default UART ID and Dataset Channel ID for "Get All"
uart_id_for_get_all = int(self.uart_id_var.get() or "1")
dataset_ch_id_for_get_all = int(self.ds_get_ch_id_var.get() or "0")
commands_to_run_config = [
# (Driver Method, "Command Name for Log", Target StringVar, (args_tuple), {kwargs_dict})
(self.driver.get_timestamp, "Timestamp (All)", self.ts_result_var, (), {}),
(self.driver.get_distance, "Distance v0 (All)", self.dist_v0_result_var, (0,), {}),
(self.driver.get_distance, "Distance v1 (All)", self.dist_v1_result_var, (1,), {}),
(self.driver.get_chart_data, "Chart Data (All)", self.chart_result_var, (), {}),
(self.driver.get_attitude, "Attitude v0 (All)", self.att_v0_result_var, (0,), {}),
(self.driver.get_attitude, "Attitude v1 (All)", self.att_v1_result_var, (1,), {}),
(self.driver.get_temperature, "Temperature (All)", self.temp_result_var, (), {}),
(self.driver.get_dataset_config, "Dataset Cfg (All)", self.ds_get_result_var, (dataset_ch_id_for_get_all,), {}),
(self.driver.get_distance_setup, "Distance Setup (All)", self.distsetup_get_result_var, (), {}),
(self.driver.get_chart_setup, "Chart Setup (All)", self.cs_get_result_var, (), {}),
(self.driver.get_transceiver_settings, "Transceiver Cfg (All)", self.transc_get_result_var, (), {}),
(self.driver.get_sound_speed, "Sound Speed (All)", self.sspd_get_result_var, (), {}),
(self.driver.get_uart_config, "UART Cfg v0 (All)", self.uart_get_result_var, (uart_id_for_get_all, 0), {}),
(self.driver.get_uart_config, "UART Cfg v1 (All)", self.uart_get_result_var, (uart_id_for_get_all, 1), {}),
(self.driver.get_version_info, "Version Info (All)", self.ver_info_result_var, (), {}),
(self.driver.get_mark_status, "Mark Status (All)", self.mark_get_result_var, (), {}),
(self.driver.get_diagnostics, "Diagnostics (All)", self.diag_result_var, (), {}),
(self.driver.get_navigation_data, "Nav Data (All)", self.nav_result_var, (), {}),
(self.driver.get_dvl_velocity_data, "DVL Velocity (All)", self.dvl_result_var, (), {}),
]
for method_ref, cmd_name, res_var, args, kwargs in commands_to_run_config:
if hasattr(self.root, '_already_closed') and self.root._already_closed: # Check if window is closing
logger.info("'Get All' sequence aborted as window is closing.")
break
if not (self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open):
logger.warning("'Get All' sequence aborted: Not connected.")
break
logger.info(f"GUI 'Get All': Requesting '{cmd_name}'...")
# Send a message to GUI to update status to "Fetching..." for this specific res_var
self.command_queue.put(("set_fetching_status", cmd_name, res_var))
actual_result = None
try:
# This is the blocking call to the driver method.
# The driver method itself uses _execute_command which handles its own threading for send/receive.
# This _get_all_data_thread_target thread will wait here until method_ref completes.
actual_result = method_ref(*args, **kwargs)
except Exception as e:
logger.error(f"Exception during 'Get All' for '{cmd_name}': {e}", exc_info=True)
actual_result = f"Exception: {type(e).__name__}"
# Put the actual result (or error string) onto the queue for GUI update
self.command_queue.put((cmd_name, actual_result, res_var))
time.sleep(0.35) # Pause slightly longer between actual device commands
# Signal completion to re-enable the button
self.command_queue.put(("get_all_finished", None, None))
if __name__ == "__main__":
main_window = tk.Tk()
app = KoggerGuiApp(main_window)
try:
main_window.mainloop()
except Exception as e:
# This is a last resort if mainloop itself or _on_closing has an issue
print(f"CRITICAL GUI ERROR during mainloop or close: {e}")
if 'logger' in globals() or 'logger' in locals(): # Check if logger might exist
try:
logger.critical(f"Tkinter mainloop crashed or error during close: {e}", exc_info=True)
except: # logger itself might fail if stderr is redirected weirdly on crash
print("Logger failed during critical GUI error.")
# It's often good practice to re-raise or sys.exit(1) after such an error
# raise