#! /usr/bin/env python import tkinter as tk from tkinter import ttk, scrolledtext, messagebox, filedialog import threading import queue import sys import os import time # For delays in "Get All" # Ensure kogger_protocol_driver.py is accessible try: from loguru import logger from kogger_protocol_driver import KoggerSBPDevice, KEY_CONFIRM from kogger_protocol_driver import ( ID_TIMESTAMP, ID_DIST, ID_ATTITUDE, ID_TEMP, ID_UART, ID_FLASH, ID_BOOT, ID_DATASET, ID_DIST_SETUP, ID_CHART_SETUP, ID_TRANSC, ID_SND_SPD, ID_VERSION, ID_MARK, ID_DIAG, ID_NAV, ID_DVL_VEL, ID_IMU_SETUP, ID_UPDATE ) except ImportError as e: try: import logging _crit_logger = logging.getLogger(__name__) _crit_handler = logging.StreamHandler(sys.stderr) _crit_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') _crit_handler.setFormatter(_crit_formatter) _crit_logger.addHandler(_crit_handler) _crit_logger.setLevel(logging.CRITICAL) _crit_logger.critical(f"Failed to import necessary modules: {e}. Make sure 'kogger_protocol_driver.py' is in the same directory or your PYTHONPATH, and 'loguru' is installed.") except ImportError: print(f"CRITICAL ERROR: Failed to import necessary modules: {e}. Make sure 'kogger_protocol_driver.py' is in the same directory or your PYTHONPATH, and 'loguru' is installed.") sys.exit(1) class TkinterLogHandler: """A handler for Loguru that redirects log messages to a Tkinter Text widget.""" def __init__(self, text_widget): """ Initializes the log handler. :param text_widget: The Tkinter Text widget to which logs will be written. """ self.text_widget = text_widget self.queue = queue.Queue() self.text_widget.after(100, self._process_log_queue) def write(self, message): """ Called by Loguru to write a log message. Adds message to an internal queue. :param message: The log message string. """ self.queue.put(message) def _process_log_queue(self): """Processes messages from the log queue and inserts them into the Text widget.""" try: while not self.queue.empty(): message = self.queue.get_nowait() if self.text_widget.winfo_exists(): self.text_widget.configure(state=tk.NORMAL) self.text_widget.insert(tk.END, message) self.text_widget.see(tk.END) self.text_widget.configure(state=tk.DISABLED) if self.text_widget.winfo_exists(): self.text_widget.after(100, self._process_log_queue) except Exception as e: print(f"Error in TkinterLogHandler: {e}") class KoggerGuiApp: """Main application class for the Kogger SBP Control Panel GUI.""" def __init__(self, root_window): """ Initializes the main application window and its components. :param root_window: The main Tkinter window (tk.Tk instance). """ self.root = root_window self.root.title("Kogger SBP Control Panel") self.root.geometry("950x750") self.driver = None self.command_queue = queue.Queue() # For results from driver threads to GUI self.unsolicited_log_queue = queue.Queue() # For unsolicited messages to GUI log self.log_text_widget = scrolledtext.ScrolledText(self.root, state=tk.DISABLED, height=10, wrap=tk.WORD, font=("Consolas", 9)) self.log_text_widget.pack(side=tk.BOTTOM, fill=tk.X, padx=5, pady=5) self.configure_logging() self.connection_frame = ttk.LabelFrame(self.root, text="Connection") self.connection_frame.pack(side=tk.TOP, fill=tk.X, padx=5, pady=5) self.notebook = ttk.Notebook(self.root) self.notebook.pack(expand=True, fill="both", padx=5, pady=5) self.tab_measurements = ttk.Frame(self.notebook) self.tab_device_settings = ttk.Frame(self.notebook) self.tab_data_settings = ttk.Frame(self.notebook) self.tab_system = ttk.Frame(self.notebook) self.tab_navigation = ttk.Frame(self.notebook) self.notebook.add(self.tab_measurements, text="Measurements & Actions") self.notebook.add(self.tab_device_settings, text="Device Settings") self.notebook.add(self.tab_data_settings, text="Data Config") self.notebook.add(self.tab_system, text="System & Flash") self.notebook.add(self.tab_navigation, text="Navigation") self._create_connection_widgets() self._create_measurement_widgets() self._create_device_settings_widgets() self._create_data_settings_widgets() self._create_system_widgets() self._create_navigation_widgets() self.root.protocol("WM_DELETE_WINDOW", self._on_closing) self.root.after(100, self._process_command_queue) self.root.after(100, self._process_unsolicited_log_queue) def configure_logging(self): """Configures Loguru to output to console and the GUI's log widget.""" logger.remove() logger.add(sys.stderr, level="DEBUG", format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}") if self.log_text_widget.winfo_exists(): log_handler = TkinterLogHandler(self.log_text_widget) logger.add(log_handler, format="{time:HH:mm:ss} | {level: <7} | {message}", level="INFO") logger.info("GUI Initialized. Logging configured.") def _create_connection_widgets(self): """Creates widgets for the connection panel (port, baud, connect button).""" ttk.Label(self.connection_frame, text="Port:").grid(row=0, column=0, padx=2, pady=2, sticky="w") self.port_var = tk.StringVar(value="/dev/ttyUSB0") ttk.Entry(self.connection_frame, textvariable=self.port_var, width=15).grid(row=0, column=1, padx=2, pady=2) ttk.Label(self.connection_frame, text="Baud:").grid(row=0, column=2, padx=2, pady=2, sticky="w") self.baud_var = tk.StringVar(value="115200") ttk.Entry(self.connection_frame, textvariable=self.baud_var, width=10).grid(row=0, column=3, padx=2, pady=2) ttk.Label(self.connection_frame, text="Addr:").grid(row=0, column=4, padx=2, pady=2, sticky="w") self.addr_var = tk.StringVar(value="0") ttk.Entry(self.connection_frame, textvariable=self.addr_var, width=3).grid(row=0, column=5, padx=2, pady=2) self.connect_button = ttk.Button(self.connection_frame, text="Connect", command=self._toggle_connection) self.connect_button.grid(row=0, column=6, padx=5, pady=2) self.connection_status_var = tk.StringVar(value="Status: Disconnected") ttk.Label(self.connection_frame, textvariable=self.connection_status_var).grid(row=0, column=7, padx=5, pady=2, sticky="w") def _create_labeled_entry(self, parent, label_text, default_value="", width=10): """ Helper to create a labeled entry field. :param parent: The parent widget. :param label_text: Text for the label. :param default_value: Default value for the entry. :param width: Width of the entry field. :return: Tuple (frame_containing_label_and_entry, string_var_for_entry, entry_widget). """ frame = ttk.Frame(parent) ttk.Label(frame, text=label_text).pack(side=tk.LEFT, padx=(0,2)) var = tk.StringVar(value=default_value) entry = ttk.Entry(frame, textvariable=var, width=width) entry.pack(side=tk.LEFT) return frame, var, entry def _create_result_display(self, parent, label_text, width=50): """ Helper to create a display area (label + read-only entry) for command results. :param parent: The parent widget. :param label_text: Text for the label describing the result. :param width: Width of the result display entry. :return: Tuple (frame_containing_label_and_display, string_var_for_result_display). """ frame = ttk.Frame(parent) ttk.Label(frame, text=label_text, anchor="w").pack(side=tk.LEFT, padx=(0, 2)) var = tk.StringVar(value="N/A") entry = ttk.Entry(frame, textvariable=var, state="readonly", width=width) entry.pack(side=tk.LEFT, fill=tk.X, expand=True) return frame, var def _create_measurement_widgets(self): """Creates widgets for the 'Measurements & Actions' tab, including the 'Get All' button.""" parent = self.tab_measurements row_idx = 0 self.get_all_button = ttk.Button(parent, text="🔄 Get All Available Data", command=self._get_all_data_sequentially) self.get_all_button.grid(row=row_idx, column=0, columnspan=2, sticky="ew", padx=5, pady=10) row_idx += 1 ttk.Separator(parent, orient='horizontal').grid(row=row_idx, column=0, columnspan=2, sticky='ew', pady=5) row_idx +=1 # Timestamp f, self.ts_result_var = self._create_result_display(parent, "Timestamp (ms):") f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get Timestamp", command=lambda: self._run_driver_command(self.driver.get_timestamp, "Timestamp", self.ts_result_var)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2) row_idx += 1 # Distance v0 f, self.dist_v0_result_var = self._create_result_display(parent, "Distance v0 (mm):") f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get Distance (v0)", command=lambda: self._run_driver_command(self.driver.get_distance, "Distance v0", self.dist_v0_result_var, version=0)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2) row_idx += 1 # Distance v1 f, self.dist_v1_result_var = self._create_result_display(parent, "Distance v1 (full):") f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get Distance (v1)", command=lambda: self._run_driver_command(self.driver.get_distance, "Distance v1", self.dist_v1_result_var, version=1)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2) row_idx += 1 # Attitude v0 f, self.att_v0_result_var = self._create_result_display(parent, "Attitude v0 (Euler):") f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get Attitude (v0)", command=lambda: self._run_driver_command(self.driver.get_attitude, "Attitude v0", self.att_v0_result_var, version=0)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2) row_idx += 1 # Attitude v1 f, self.att_v1_result_var = self._create_result_display(parent, "Attitude v1 (Quat):") f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get Attitude (v1)", command=lambda: self._run_driver_command(self.driver.get_attitude, "Attitude v1", self.att_v1_result_var, version=1)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2) row_idx += 1 # Temperature f, self.temp_result_var = self._create_result_display(parent, "Temperature (°C):") f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get Temperature", command=lambda: self._run_driver_command(self.driver.get_temperature, "Temperature", self.temp_result_var)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2) row_idx += 1 # Chart Data f, self.chart_result_var = self._create_result_display(parent, "Chart Info:", width=60) f.grid(row=row_idx, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get Chart Data", command=lambda: self._run_driver_command(self.driver.get_chart_data, "Chart Data", self.chart_result_var)).grid(row=row_idx, column=0, sticky="w", padx=5, pady=2) row_idx += 1 def _create_device_settings_widgets(self): """Creates widgets for the 'Device Settings' tab (UART, Transceiver, Sound Speed).""" parent = self.tab_device_settings current_row = 0 # UART Config uart_frame = ttk.LabelFrame(parent, text="UART Configuration (ID 0x18)") uart_frame.grid(row=current_row, column=0, columnspan=3, sticky="ew", padx=5, pady=5, ipady=5) current_row +=1 f_uart_id, self.uart_id_var, _ = self._create_labeled_entry(uart_frame, "UART ID:", "1", 3) f_uart_id.grid(row=0, column=0, padx=2, pady=2, sticky="w") f_uart_baud, self.uart_baud_var, _ = self._create_labeled_entry(uart_frame, "Set Baudrate:", "115200") f_uart_baud.grid(row=1, column=0, padx=2, pady=2, sticky="w") f_uart_addr, self.uart_new_addr_var, _ = self._create_labeled_entry(uart_frame, "Set New Dev Addr:", "0", 3) f_uart_addr.grid(row=2, column=0, padx=2, pady=2, sticky="w") ttk.Button(uart_frame, text="Set Baudrate (v0)", command=self._set_uart_baud).grid(row=1, column=1, padx=5, pady=2, sticky="ew") ttk.Button(uart_frame, text="Set Dev Address (v1)", command=self._set_uart_dev_addr).grid(row=2, column=1, padx=5, pady=2, sticky="ew") f_uart_get_v, self.uart_get_ver_var, _ = self._create_labeled_entry(uart_frame, "Get Info for Version (0/1):", "0", 2) f_uart_get_v.grid(row=3, column=0, padx=2, pady=2, sticky="w") ttk.Button(uart_frame, text="Get UART Config", command=self._get_uart_config).grid(row=3, column=1, padx=5, pady=2, sticky="ew") f_uart_res, self.uart_get_result_var = self._create_result_display(uart_frame, "UART Get Result:") f_uart_res.grid(row=4, column=0, columnspan=2, sticky="ew", padx=5, pady=2) # Transceiver Settings transc_frame = ttk.LabelFrame(parent, text="Transceiver Settings (ID 0x14)") transc_frame.grid(row=current_row, column=0, columnspan=3, sticky="ew", padx=5, pady=5, ipady=5) current_row += 1 f_freq, self.transc_freq_var, _ = self._create_labeled_entry(transc_frame, "Freq (kHz):", "675") f_freq.grid(row=0, column=0) f_pulse, self.transc_pulse_var, _ = self._create_labeled_entry(transc_frame, "Pulse Count:", "10") f_pulse.grid(row=0, column=1) self.transc_boost_var = tk.BooleanVar(value=True) ttk.Checkbutton(transc_frame, text="Boost Enabled", variable=self.transc_boost_var).grid(row=0, column=2) ttk.Button(transc_frame, text="Set Transceiver Cfg", command=self._set_transceiver_settings).grid(row=1, column=0, columnspan=3, sticky="ew") f_transc_get, self.transc_get_result_var = self._create_result_display(transc_frame, "Current Transc:") f_transc_get.grid(row=2, column=0, columnspan=2, sticky="ew") ttk.Button(transc_frame, text="Get Transceiver Cfg", command=lambda: self._run_driver_command(self.driver.get_transceiver_settings, "Transceiver Settings", self.transc_get_result_var)).grid(row=2, column=2, sticky="ew") # Sound Speed Settings snd_spd_frame = ttk.LabelFrame(parent, text="Sound Speed (ID 0x15)") snd_spd_frame.grid(row=current_row, column=0, columnspan=3, sticky="ew", padx=5, pady=5, ipady=5) current_row += 1 f_sspd, self.sspd_var, _ = self._create_labeled_entry(snd_spd_frame, "Speed (mm/s):", "1500000") f_sspd.grid(row=0, column=0) ttk.Button(snd_spd_frame, text="Set Sound Speed", command=self._set_sound_speed).grid(row=0, column=1) f_sspd_get, self.sspd_get_result_var = self._create_result_display(snd_spd_frame, "Current Speed:") f_sspd_get.grid(row=1, column=0, sticky="ew") ttk.Button(snd_spd_frame, text="Get Sound Speed", command=lambda: self._run_driver_command(self.driver.get_sound_speed, "Sound Speed", self.sspd_get_result_var)).grid(row=1, column=1) def _create_data_settings_widgets(self): """Creates widgets for the 'Data Config' tab (Dataset, Distance Setup, Chart Setup).""" parent = self.tab_data_settings current_row = 0 dataset_frame = ttk.LabelFrame(parent, text="Dataset Configuration (ID 0x10)") dataset_frame.grid(row=current_row, column=0, sticky="ew", padx=5, pady=5, ipady=5) current_row += 1 f_ds_ch, self.ds_ch_id_var, _ = self._create_labeled_entry(dataset_frame, "Ch ID (0-2):", "0", 3) f_ds_ch.grid(row=0, column=0) f_ds_period, self.ds_period_var, _ = self._create_labeled_entry(dataset_frame, "Period (ms):", "0") f_ds_period.grid(row=0, column=1) f_ds_mask, self.ds_mask_var, _ = self._create_labeled_entry(dataset_frame, "Mask (hex):", "0x0") f_ds_mask.grid(row=0, column=2) ttk.Button(dataset_frame, text="Set Dataset Cfg", command=self._set_dataset_config).grid(row=1, column=0, columnspan=3, sticky="ew") f_ds_get_ch, self.ds_get_ch_id_var, _ = self._create_labeled_entry(dataset_frame, "Get Ch ID:", "0", 3) f_ds_get_ch.grid(row=2, column=0) f_ds_get, self.ds_get_result_var = self._create_result_display(dataset_frame, "Current Dataset Cfg:") f_ds_get.grid(row=3, column=0, columnspan=2, sticky="ew") ttk.Button(dataset_frame, text="Get Dataset Cfg", command=self._get_dataset_config).grid(row=2, column=1, sticky="ew") distsetup_frame = ttk.LabelFrame(parent, text="Distance Setup (ID 0x11)") distsetup_frame.grid(row=current_row, column=0, sticky="ew", padx=5, pady=5, ipady=5) current_row += 1 f_distoff, self.distsetup_offset_var, _ = self._create_labeled_entry(distsetup_frame, "Start Offset (mm):", "0") f_distoff.grid(row=0, column=0) f_distmax, self.distsetup_max_var, _ = self._create_labeled_entry(distsetup_frame, "Max Dist (mm):", "50000") f_distmax.grid(row=0, column=1) ttk.Button(distsetup_frame, text="Set Distance Setup", command=self._set_distance_setup).grid(row=0, column=2) f_distsetup_get, self.distsetup_get_result_var = self._create_result_display(distsetup_frame, "Current Dist Setup:") f_distsetup_get.grid(row=1, column=0, columnspan=2, sticky="ew") ttk.Button(distsetup_frame, text="Get Distance Setup", command=lambda: self._run_driver_command(self.driver.get_distance_setup, "Distance Setup", self.distsetup_get_result_var)).grid(row=1, column=2) chartsetup_frame = ttk.LabelFrame(parent, text="Chart Setup (ID 0x12)") chartsetup_frame.grid(row=current_row, column=0, sticky="ew", padx=5, pady=5, ipady=5) current_row += 1 f_cs_count, self.cs_count_var, _ = self._create_labeled_entry(chartsetup_frame, "Sample Count:", "1000") f_cs_count.grid(row=0,column=0) f_cs_resol, self.cs_resol_var, _ = self._create_labeled_entry(chartsetup_frame, "Sample Resol (mm):", "10") f_cs_resol.grid(row=0,column=1) f_cs_offset, self.cs_offset_var, _ = self._create_labeled_entry(chartsetup_frame, "Sample Offset:", "0") f_cs_offset.grid(row=0,column=2) ttk.Button(chartsetup_frame, text="Set Chart Setup", command=self._set_chart_setup).grid(row=1, column=0, columnspan=3, sticky="ew") f_cs_get, self.cs_get_result_var = self._create_result_display(chartsetup_frame, "Current Chart Setup:") f_cs_get.grid(row=2, column=0, columnspan=2, sticky="ew") ttk.Button(chartsetup_frame, text="Get Chart Setup", command=lambda: self._run_driver_command(self.driver.get_chart_setup, "Chart Setup", self.cs_get_result_var)).grid(row=2, column=2) def _create_system_widgets(self): """Creates widgets for the 'System & Flash' tab.""" parent = self.tab_system current_row = 0 f_ver, self.ver_info_result_var = self._create_result_display(parent, "Version Info:", width=70) f_ver.grid(row=current_row, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get Version Info", command=lambda: self._run_driver_command(self.driver.get_version_info, "Version Info", self.ver_info_result_var)).grid(row=current_row, column=0, sticky="w", padx=5, pady=2) current_row += 1 mark_frame = ttk.LabelFrame(parent, text="Device Mark (ID 0x21)") mark_frame.grid(row=current_row, column=0, columnspan=2, sticky="ew", padx=5, pady=5) current_row += 1 self.mark_enable_var = tk.BooleanVar() ttk.Checkbutton(mark_frame, text="Enable Mark (via ID_MARK SETTING)", variable=self.mark_enable_var).pack(side=tk.LEFT) ttk.Button(mark_frame, text="Set Mark", command=self._set_mark).pack(side=tk.LEFT, padx=5) f_mark_get, self.mark_get_result_var = self._create_result_display(mark_frame, "Mark Status:") f_mark_get.pack(side=tk.LEFT, fill=tk.X, expand=True, padx=5) ttk.Button(mark_frame, text="Get Mark Status", command=lambda: self._run_driver_command(self.driver.get_mark_status, "Mark Status", self.mark_get_result_var)).pack(side=tk.LEFT, padx=5) f_diag, self.diag_result_var = self._create_result_display(parent, "Diagnostics:", width=70) f_diag.grid(row=current_row, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get Diagnostics", command=lambda: self._run_driver_command(self.driver.get_diagnostics, "Diagnostics", self.diag_result_var)).grid(row=current_row, column=0, sticky="w", padx=5, pady=2) current_row += 1 flash_frame = ttk.LabelFrame(parent, text="Flash Operations (ID 0x23)") flash_frame.grid(row=current_row, column=0, columnspan=2, sticky="ew", padx=5, pady=5) current_row += 1 ttk.Button(flash_frame, text="Save Settings to Flash", command=lambda: self._run_driver_command(self.driver.save_settings_to_flash, "Save Flash")).grid(row=0, column=0, padx=5, pady=2) ttk.Button(flash_frame, text="Restore Settings from Flash", command=lambda: self._run_driver_command(self.driver.restore_settings_from_flash, "Restore Flash")).grid(row=0, column=1, padx=5, pady=2) ttk.Button(flash_frame, text="Erase Flash Settings", command=lambda: self._run_driver_command(self.driver.erase_flash_settings, "Erase Flash")).grid(row=0, column=2, padx=5, pady=2) boot_frame = ttk.LabelFrame(parent, text="Boot Operations (ID 0x24)") boot_frame.grid(row=current_row, column=0, columnspan=2, sticky="ew", padx=5, pady=5) current_row += 1 ttk.Button(boot_frame, text="Reboot Device", command=self._reboot_device).grid(row=0, column=0, padx=5, pady=2) ttk.Button(boot_frame, text="Run FW from Bootloader", command=lambda: self._run_driver_command(self.driver.run_firmware_from_bootloader, "Run FW")).grid(row=0, column=1, padx=5, pady=2) imu_frame = ttk.LabelFrame(parent, text="IMU Calibration (ID 0x1B - In Dev)") imu_frame.grid(row=current_row, column=0, columnspan=2, sticky="ew", padx=5, pady=5) current_row +=1 self.imu_gyro_var = tk.BooleanVar() ttk.Checkbutton(imu_frame, text="Calibrate Gyro", variable=self.imu_gyro_var).pack(side=tk.LEFT, padx=5) self.imu_accel_var = tk.BooleanVar() ttk.Checkbutton(imu_frame, text="Calibrate Accel", variable=self.imu_accel_var).pack(side=tk.LEFT, padx=5) ttk.Button(imu_frame, text="Start IMU Calibration", command=self._set_imu_calibration).pack(side=tk.LEFT, padx=5) fw_frame = ttk.LabelFrame(parent, text="Firmware Update (ID 0x25 - Simplified)") fw_frame.grid(row=current_row, column=0, columnspan=2, sticky="ew", padx=5, pady=5) current_row += 1 f_fw_pkt, self.fw_pkt_num_var, _ = self._create_labeled_entry(fw_frame, "Packet #:", "1", 5) f_fw_pkt.pack(side=tk.LEFT) f_fw_data, self.fw_data_var, _ = self._create_labeled_entry(fw_frame, "Data (hex):", "", 40) f_fw_data.pack(side=tk.LEFT) ttk.Button(fw_frame, text="Upload FW Chunk", command=self._upload_fw_chunk).pack(side=tk.LEFT, padx=5) def _create_navigation_widgets(self): """Creates widgets for the 'Navigation' tab.""" parent = self.tab_navigation current_row = 0 f_nav, self.nav_result_var = self._create_result_display(parent, "Navigation Data:", width=60) f_nav.grid(row=current_row, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get Nav Data", command=lambda: self._run_driver_command(self.driver.get_navigation_data, "Nav Data", self.nav_result_var)).grid(row=current_row, column=0, sticky="w", padx=5, pady=2) current_row += 1 f_dvl, self.dvl_result_var = self._create_result_display(parent, "DVL Velocity:", width=70) f_dvl.grid(row=current_row, column=1, sticky="ew", padx=5, pady=2) ttk.Button(parent, text="Get DVL Velocity", command=lambda: self._run_driver_command(self.driver.get_dvl_velocity_data, "DVL Velocity", self.dvl_result_var)).grid(row=current_row, column=0, sticky="w", padx=5, pady=2) current_row += 1 # --- Driver Interaction & Threading --- def _toggle_connection(self): """Toggles the device connection state (connect or disconnect).""" if self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open: self._disconnect_driver() else: self._connect_driver() def _connect_driver(self): """Initiates connection to the device in a separate thread.""" port = self.port_var.get() if not port: messagebox.showerror("Error", "Serial port cannot be empty.") return try: baud = int(self.baud_var.get()) addr = int(self.addr_var.get()) except ValueError: messagebox.showerror("Error", "Baud rate and Address must be integers.") return self.driver = KoggerSBPDevice(port, baud, addr, default_timeout=1.5) self.driver.register_default_callback(self._handle_unsolicited_from_driver) self.connection_status_var.set("Status: Connecting...") self.connect_button.config(text="Connecting...", state=tk.DISABLED) threading.Thread(target=self._connect_thread_target, daemon=True).start() def _connect_thread_target(self): """Target function for the connection thread. Handles actual connection attempt.""" if self.driver.connect(): self.command_queue.put(("status_update", ("Status: Connected", "green", "Disconnect"))) else: self.command_queue.put(("status_update", ("Status: Failed to Connect", "red", "Connect"))) self.driver = None def _disconnect_driver(self): """Initiates disconnection from the device in a separate thread.""" if self.driver: logger.info("GUI: Disconnecting driver...") if hasattr(self.driver, 'unregister_default_callback'): self.driver.unregister_default_callback() self.connect_button.config(text="Disconnecting...", state=tk.DISABLED) threading.Thread(target=self._disconnect_thread_target, daemon=True).start() else: logger.info("GUI: Driver not initialized, cannot disconnect.") def _disconnect_thread_target(self): """Target function for the disconnection thread.""" if self.driver: self.driver.disconnect() self.driver = None self.command_queue.put(("status_update", ("Status: Disconnected", "black", "Connect"))) def _run_driver_command(self, driver_method_ref, command_name, result_var_target=None, *args, **kwargs): """ Executes a given driver method in a separate thread. Posts its result to the GUI's command queue for UI updates. :param driver_method_ref: The method of the `KoggerSBPDevice` instance to call. :param command_name: A descriptive name for the command (for logging/display). :param result_var_target: The Tkinter StringVar to update with the result. :param args: Positional arguments for the driver method. :param kwargs: Keyword arguments for the driver method. """ if not (self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open): logger.warning(f"Cannot run '{command_name}': Not connected.") messagebox.showwarning("Not Connected", "Please connect to the device first.") if result_var_target: result_var_target.set("Error: Not Connected") return logger.info(f"GUI: Queuing execution for: {command_name} with args: {args}, kwargs: {kwargs}") if result_var_target: result_var_target.set("Executing...") thread = threading.Thread(target=self._command_thread_target, args=(driver_method_ref, command_name, result_var_target) + args, kwargs=kwargs, daemon=True) thread.start() def _command_thread_target(self, driver_method_ref, command_name, result_var_target, *args, **kwargs): """ Target function for the thread that executes a single driver command. Posts the result or error to the GUI's command queue. """ try: result = driver_method_ref(*args, **kwargs) self.command_queue.put((command_name, result, result_var_target)) except Exception as e: logger.error(f"Exception in driver command thread for '{command_name}': {e}", exc_info=True) self.command_queue.put((command_name, f"Exception: {type(e).__name__}", result_var_target)) def _process_command_queue(self): """Processes results and status updates from the command queue to update the GUI.""" try: while True: message_type, data, target_var = self.command_queue.get_nowait() if message_type == "status_update": status_text, _, button_text = data self.connection_status_var.set(status_text) self.connect_button.config(text=button_text, state=tk.NORMAL) logger.info(f"GUI: Connection status updated to '{status_text}'") elif message_type == "get_all_finished": if hasattr(self, 'get_all_button'): # Check if button exists self.get_all_button.config(state=tk.NORMAL) logger.info("GUI: 'Get All Available Data' sequence finished.") elif message_type == "set_fetching_status": # New message type for "Get All" # data is command_name_desc, target_var is the actual StringVar command_name_desc = data if target_var and isinstance(target_var, tk.StringVar): target_var.set("Fetching...") logger.debug(f"GUI: Set fetching status for {command_name_desc}") else: logger.warning(f"GUI: Invalid target_var for set_fetching_status of {command_name_desc}") else: # Assumed to be a command result command_name = message_type result = data result_var_target = target_var logger.info(f"GUI: Received result for '{command_name}': {str(result)[:100]}") if result_var_target and isinstance(result_var_target, tk.StringVar): display_str = "N/A" if isinstance(result, dict): items = [] for k, v_item in result.items(): if isinstance(v_item, bytes): items.append(f"{k}: {v_item.hex().upper()[:20]}{'...' if len(v_item)>10 else ''}") elif isinstance(v_item, float): items.append(f"{k}: {v_item:.3f}") else: items.append(f"{k}: {v_item}") display_str = ", ".join(items) if len(display_str) > 70: display_str = display_str[:67] + "..." elif isinstance(result, bool): display_str = "Success" if result else "Failed/False" elif result is None: display_str = "No data / Failed / Timeout" elif isinstance(result, (str, bytes)): display_str = result if isinstance(result, str) else result.hex().upper() else: display_str = str(result) result_var_target.set(display_str) if result is True: logger.success(f"Command '{command_name}' reported success (True).") elif result is False: logger.warning(f"Command '{command_name}' reported failure (False).") elif result is None: logger.warning(f"Command '{command_name}' returned None (no data or error).") except queue.Empty: pass except Exception as e: logger.error(f"Error processing command queue: {e}", exc_info=True) finally: if self.root.winfo_exists(): self.root.after(100, self._process_command_queue) def _handle_unsolicited_from_driver(self, frame): """ Puts unsolicited messages (received by driver's callback) into a GUI-thread-safe queue. """ self.unsolicited_log_queue.put(frame) def _process_unsolicited_log_queue(self): """Processes unsolicited messages from its queue and logs them via Loguru (to Tkinter widget).""" try: while True: frame = self.unsolicited_log_queue.get_nowait() logger.info(f"[UNSOLICITED In GUI] ID: {frame['id']:#02x}, Mode: {frame['mode']:#02x}, Payload: {frame['payload'].hex().upper()}") except queue.Empty: pass finally: if self.root.winfo_exists(): self.root.after(100, self._process_unsolicited_log_queue) def _on_closing(self): """Handles the event when the main window is closed by the user.""" logger.info("Application closing initiated by user...") if self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open: logger.info("Attempting to disconnect driver on close...") self._disconnect_driver() # This is now threaded, main window might close before it finishes self.root.destroy() logger.info("Application window destroyed.") # This log might go to console only # --- Specific Command Handlers for SET operations & "Get All" --- def _set_uart_baud(self): """Handles 'Set UART Baudrate' button click. Parses inputs and calls driver.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return try: uart_id = int(self.uart_id_var.get()) baud = int(self.uart_baud_var.get()) self._run_driver_command(self.driver.set_uart_config, "Set UART Baudrate", uart_id=uart_id, baudrate=baud, new_dev_address=None) except ValueError: messagebox.showerror("Input Error", "UART ID and Baudrate must be integers.") def _set_uart_dev_addr(self): """Handles 'Set UART Device Address' button click. Parses inputs and calls driver.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return try: uart_id = int(self.uart_id_var.get()) addr = int(self.uart_new_addr_var.get()) if not (0 <= addr <= 15): messagebox.showerror("Input Error", "Device Address must be between 0 and 15.") return self._run_driver_command(self.driver.set_uart_config, "Set UART Device Address", uart_id=uart_id, new_dev_address=addr) except ValueError: messagebox.showerror("Input Error", "UART ID and New Address must be integers.") def _get_uart_config(self): """Handles 'Get UART Config' button click. Parses inputs and calls driver.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return try: uart_id = int(self.uart_id_var.get()) version_str = self.uart_get_ver_var.get() version = int(version_str) if version_str else 0 # Default to version 0 if empty if version not in [0,1]: messagebox.showerror("Input Error", "Version must be 0 or 1.") return self._run_driver_command(self.driver.get_uart_config, f"Get UART Config v{version}", self.uart_get_result_var, uart_id=uart_id, version=version) except ValueError: messagebox.showerror("Input Error", "UART ID and Version must be integers.") def _set_transceiver_settings(self): """Handles 'Set Transceiver Config' button click.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return try: freq = int(self.transc_freq_var.get()) pulse = int(self.transc_pulse_var.get()) boost = self.transc_boost_var.get() self._run_driver_command(self.driver.set_transceiver_settings, "Set Transceiver Settings", frequency_khz=freq, pulse_count=pulse, boost_enabled=boost) except ValueError: messagebox.showerror("Input Error", "Frequency and Pulse Count must be integers.") def _set_sound_speed(self): """Handles 'Set Sound Speed' button click.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return try: speed = int(self.sspd_var.get()) self._run_driver_command(self.driver.set_sound_speed, "Set Sound Speed", sound_speed_mm_s=speed) except ValueError: messagebox.showerror("Input Error", "Sound speed must be an integer.") def _set_dataset_config(self): """Handles 'Set Dataset Config' button click.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return try: ch_id = int(self.ds_ch_id_var.get()) period = int(self.ds_period_var.get()) mask_str = self.ds_mask_var.get() mask = int(mask_str, 16) if mask_str.lower().startswith("0x") else int(mask_str) self._run_driver_command(self.driver.set_dataset_config, "Set Dataset Config", channel_id=ch_id, channel_period_ms=period, channel_mask=mask) except ValueError: messagebox.showerror("Input Error", "Channel ID, Period, and Mask must be valid integers (Mask can be hex '0x...').") def _get_dataset_config(self): """Handles 'Get Dataset Config' button click.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return try: ch_id_str = self.ds_get_ch_id_var.get() ch_id = int(ch_id_str) if ch_id_str else 0 # Default to 0 if empty self._run_driver_command(self.driver.get_dataset_config, "Get Dataset Config", self.ds_get_result_var, channel_id_to_request=ch_id) except ValueError: messagebox.showerror("Input Error", "Channel ID must be an integer.") def _set_distance_setup(self): """Handles 'Set Distance Setup' button click.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return try: offset = int(self.distsetup_offset_var.get()) max_dist = int(self.distsetup_max_var.get()) self._run_driver_command(self.driver.set_distance_setup, "Set Distance Setup", start_offset_mm=offset, max_distance_mm=max_dist) except ValueError: messagebox.showerror("Input Error", "Offset and Max Distance must be integers.") def _set_chart_setup(self): """Handles 'Set Chart Setup' button click.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return try: count = int(self.cs_count_var.get()) resol = int(self.cs_resol_var.get()) offset = int(self.cs_offset_var.get()) self._run_driver_command(self.driver.set_chart_setup, "Set Chart Setup", sample_count=count, sample_resolution_mm=resol, sample_offset_num=offset) except ValueError: messagebox.showerror("Input Error", "Sample Count, Resolution, and Offset must be integers.") def _set_mark(self): """Handles 'Set Mark' button click.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return enable = self.mark_enable_var.get() if enable: self._run_driver_command(self.driver.set_mark, "Set Mark (Request Enable)", result_var_target=self.mark_get_result_var, enable_mark=True) else: logger.warning("GUI: 'Set Mark' (disable) requested via ID_MARK checkbutton. Protocol docs suggest ID_MARK SETTING command is for setting the mark. Disabling the mark is usually done by host sending a normal command with the MODE.Mark bit cleared.") messagebox.showinfo("Info", "The ID_MARK SETTING command primarily sets the mark. To clear it, the host typically sends any frame with the mark bit cleared in its MODE field.") def _reboot_device(self): """Handles 'Reboot Device' button click with confirmation.""" if not (self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open): messagebox.showwarning("Not Connected", "Please connect to the device first.") return if messagebox.askyesno("Confirm Reboot", "Are you sure you want to reboot the device? This will disconnect."): logger.info("GUI: Initiating device reboot...") self.connect_button.config(text="Rebooting...", state=tk.DISABLED) threading.Thread(target=self._reboot_thread_target, daemon=True).start() def _reboot_thread_target(self): """Target function for the reboot operation thread.""" success = False if self.driver: success = self.driver.reboot_device() if success: # driver.reboot_device() calls disconnect internally self.command_queue.put(("status_update", ("Status: Rebooted. Disconnected.", "black", "Connect"))) else: self.command_queue.put(("status_update", ("Status: Reboot Failed", "red", "Connect"))) def _set_imu_calibration(self): """Handles 'Start IMU Calibration' button click.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return gyro = self.imu_gyro_var.get() accel = self.imu_accel_var.get() if not gyro and not accel: messagebox.showinfo("IMU Calibration", "No calibration type selected (Gyro or Accelerometer).") return self._run_driver_command(self.driver.set_imu_calibration, "IMU Calibration", calibrate_gyro=gyro, calibrate_accelerometer=accel) def _upload_fw_chunk(self): """Handles 'Upload FW Chunk' button click.""" if not self.driver: messagebox.showerror("Error", "Not connected!"); return try: pkt_num_str = self.fw_pkt_num_var.get() if not pkt_num_str: messagebox.showerror("Input Error", "Packet number is required."); return pkt_num = int(pkt_num_str) data_hex = self.fw_data_var.get() if not data_hex: messagebox.showerror("Input Error", "Firmware data (hex) cannot be empty.") return data_bytes = bytes.fromhex(data_hex) self._run_driver_command(self.driver.upload_firmware_update_chunk, "Upload FW Chunk", packet_number=pkt_num, update_data_chunk=data_bytes) except ValueError: messagebox.showerror("Input Error", "Packet number must be int, data must be valid hex string (e.g., AABBCCDD).") except Exception as e: messagebox.showerror("Error", f"Failed to prepare FW chunk: {e}") def _get_all_data_sequentially(self): """ Handles the 'Get All Available Data' button click. Initiates a sequence of all implemented 'get_*' commands in a separate thread. """ if not (self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open): messagebox.showwarning("Not Connected", "Please connect to the device first.") return logger.info("GUI: Starting 'Get All Available Data' sequence...") if hasattr(self, 'get_all_button'): # Ensure button exists self.get_all_button.config(state=tk.DISABLED) threading.Thread(target=self._get_all_data_thread_target, daemon=True).start() def _get_all_data_thread_target(self): """ Target function for the 'Get All Data' thread. Sequentially calls driver methods and posts results to the command queue. """ # Ensure all these result_vars are defined in your _create_..._widgets methods # Default UART ID and Dataset Channel ID for "Get All" uart_id_for_get_all = int(self.uart_id_var.get() or "1") dataset_ch_id_for_get_all = int(self.ds_get_ch_id_var.get() or "0") commands_to_run_config = [ # (Driver Method, "Command Name for Log", Target StringVar, (args_tuple), {kwargs_dict}) (self.driver.get_timestamp, "Timestamp (All)", self.ts_result_var, (), {}), (self.driver.get_distance, "Distance v0 (All)", self.dist_v0_result_var, (0,), {}), (self.driver.get_distance, "Distance v1 (All)", self.dist_v1_result_var, (1,), {}), (self.driver.get_chart_data, "Chart Data (All)", self.chart_result_var, (), {}), (self.driver.get_attitude, "Attitude v0 (All)", self.att_v0_result_var, (0,), {}), (self.driver.get_attitude, "Attitude v1 (All)", self.att_v1_result_var, (1,), {}), (self.driver.get_temperature, "Temperature (All)", self.temp_result_var, (), {}), (self.driver.get_dataset_config, "Dataset Cfg (All)", self.ds_get_result_var, (dataset_ch_id_for_get_all,), {}), (self.driver.get_distance_setup, "Distance Setup (All)", self.distsetup_get_result_var, (), {}), (self.driver.get_chart_setup, "Chart Setup (All)", self.cs_get_result_var, (), {}), (self.driver.get_transceiver_settings, "Transceiver Cfg (All)", self.transc_get_result_var, (), {}), (self.driver.get_sound_speed, "Sound Speed (All)", self.sspd_get_result_var, (), {}), (self.driver.get_uart_config, "UART Cfg v0 (All)", self.uart_get_result_var, (uart_id_for_get_all, 0), {}), (self.driver.get_uart_config, "UART Cfg v1 (All)", self.uart_get_result_var, (uart_id_for_get_all, 1), {}), (self.driver.get_version_info, "Version Info (All)", self.ver_info_result_var, (), {}), (self.driver.get_mark_status, "Mark Status (All)", self.mark_get_result_var, (), {}), (self.driver.get_diagnostics, "Diagnostics (All)", self.diag_result_var, (), {}), (self.driver.get_navigation_data, "Nav Data (All)", self.nav_result_var, (), {}), (self.driver.get_dvl_velocity_data, "DVL Velocity (All)", self.dvl_result_var, (), {}), ] for method_ref, cmd_name, res_var, args, kwargs in commands_to_run_config: if hasattr(self.root, '_already_closed') and self.root._already_closed: # Check if window is closing logger.info("'Get All' sequence aborted as window is closing.") break if not (self.driver and self.driver.serial_conn and hasattr(self.driver.serial_conn, 'is_open') and self.driver.serial_conn.is_open): logger.warning("'Get All' sequence aborted: Not connected.") break logger.info(f"GUI 'Get All': Requesting '{cmd_name}'...") # Send a message to GUI to update status to "Fetching..." for this specific res_var self.command_queue.put(("set_fetching_status", cmd_name, res_var)) actual_result = None try: # This is the blocking call to the driver method. # The driver method itself uses _execute_command which handles its own threading for send/receive. # This _get_all_data_thread_target thread will wait here until method_ref completes. actual_result = method_ref(*args, **kwargs) except Exception as e: logger.error(f"Exception during 'Get All' for '{cmd_name}': {e}", exc_info=True) actual_result = f"Exception: {type(e).__name__}" # Put the actual result (or error string) onto the queue for GUI update self.command_queue.put((cmd_name, actual_result, res_var)) time.sleep(0.35) # Pause slightly longer between actual device commands # Signal completion to re-enable the button self.command_queue.put(("get_all_finished", None, None)) if __name__ == "__main__": main_window = tk.Tk() app = KoggerGuiApp(main_window) try: main_window.mainloop() except Exception as e: # This is a last resort if mainloop itself or _on_closing has an issue print(f"CRITICAL GUI ERROR during mainloop or close: {e}") if 'logger' in globals() or 'logger' in locals(): # Check if logger might exist try: logger.critical(f"Tkinter mainloop crashed or error during close: {e}", exc_info=True) except: # logger itself might fail if stderr is redirected weirdly on crash print("Logger failed during critical GUI error.") # It's often good practice to re-raise or sys.exit(1) after such an error # raise