#! /usr/bin/env python import os import sys import time import io # For BytesIO to simulate file reading for the mock from loguru import logger import threading # Ensure the kogger_protocol_driver.py is in the Python path # If it's in the same directory, this is usually fine. # Otherwise, you might need to adjust sys.path: # sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # If driver is one dir up try: from kogger_protocol_driver import KoggerSBPDevice, ID_TIMESTAMP, ID_ATTITUDE, ID_UART, ID_FLASH, RESP_OK, RESP_ERR_KEY except ImportError: logger.critical("Failed to import KoggerSBPDevice. Make sure kogger_protocol_driver.py is in the same directory or Python path.") sys.exit(1) # --- Loguru Setup for the test script --- logger.remove() logger.add(sys.stderr, level="DEBUG", format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}") # --- Content for test_messages.bin --- # Frame 1: Timestamp response (12345 ms) frame1_ts_resp = b'\xBB\x55\x00\x01\x01\x04\x39\x30\x00\x00\x49\x95' # Frame 2: Unsolicited Attitude data frame2_att_unsol = b'\xBB\x55\x00\x01\x04\x06\xE8\x03\xF4\x01\x38\xFF\xC8\x28' # Frame 3: RESP_OK to a hypothetical SET_UART_CONFIG command (ID 0x18), orig_chk A0B0 frame3_uart_resp_ok = b'\xBB\x55\x00\x81\x18\x03\x01\xA0\xB0\x2D\x24' # Frame 4: RESP_ERR_KEY to a hypothetical SAVE_SETTINGS_TO_FLASH command (ID 0x23), orig_chk C1D1 frame4_flash_resp_err = b'\xBB\x55\x00\x81\x23\x03\x07\xC1\xD1\xB0\x0D' # Frame 5: Another unsolicited Timestamp (67890 ms) frame5_ts_unsol = b'\xBB\x55\x00\x01\x01\x04\x32\x09\x01\x00\x42\xCD' # Concatenate frames to simulate the binary file content TEST_BINARY_DATA = frame1_ts_resp + frame2_att_unsol + frame3_uart_resp_ok + frame4_flash_resp_err + frame5_ts_unsol TEST_BINARY_FILE_PATH = "test_messages.bin" class MockSerial: """ A mock serial port that reads from a predefined byte stream (simulating a file) and logs data written to it. """ def __init__(self, port, baudrate, timeout=0.1, binary_data=b""): self.port = port self.baudrate = baudrate self.timeout = timeout # Read timeout self._is_open = False self.binary_stream = io.BytesIO(binary_data) # Use BytesIO to simulate file reading self.written_data = bytearray() # To capture data written by the driver # For KoggerSBPDevice, serial_read_timeout is passed to its __init__ # and used for its internal serial.Serial object. # Our mock uses its own 'timeout' for its 'read' method. # The driver's serial_read_timeout on KoggerSBPDevice will configure the *real* serial port # if it were used. Here, it's less critical for the mock itself, but good to have. logger.info(f"MockSerial initialized for port '{port}' at {baudrate} baud with {len(binary_data)} bytes of test data.") def open(self): self._is_open = True logger.info(f"MockSerial port {self.port} opened.") def close(self): self._is_open = False logger.info(f"MockSerial port {self.port} closed.") self.binary_stream.close() @property def is_open(self): return self._is_open @property def in_waiting(self): if not self._is_open: return 0 current_pos = self.binary_stream.tell() # Get the total number of bytes in the stream buffer # For BytesIO, getbuffer().nbytes gives total size total_size = self.binary_stream.getbuffer().nbytes return total_size - current_pos def read(self, size=1): if not self._is_open: raise serial.SerialException("Port not open") # Simulate timeout behavior: if no data, return empty bytes after timeout # For this mock, if we reach EOF, read() returns b''. # The driver's reader thread has its own loop and small reads. data = self.binary_stream.read(size) if data: logger.debug(f"MockSerial: read {len(data)} bytes: {data.hex().upper()}") # else: # logger.debug(f"MockSerial: read attempt, no data (EOF or simulated timeout)") return data def write(self, data): if not self.is_open: raise serial.SerialException("Port not open") logger.info(f"MockSerial: driver wrote {len(data)} bytes: {data.hex().upper()}") self.written_data.extend(data) return len(data) def reset_input_buffer(self): logger.info("MockSerial: reset_input_buffer called (clearing mock written_data for this example).") # In a real scenario, this clears hardware buffers. Here, it's less meaningful # unless the driver specifically expects some state change. # The driver's internal _receive_buffer will still exist. def reset_output_buffer(self): logger.info("MockSerial: reset_output_buffer called.") # Clears OS transmit buffer. No direct equivalent for simple mock. # --- Test Callbacks --- received_unsolicited_attitude = None received_unsolicited_timestamps = [] def attitude_callback(frame): global received_unsolicited_attitude logger.success(f"[ATTITUDE CALLBACK] Received frame ID: {frame['id']:#02x}") if frame['id'] == ID_ATTITUDE and frame.get('checksum_ok'): payload = frame['payload'] yaw, pitch, roll = struct.unpack(' 0 and 67890 in received_unsolicited_timestamps: logger.success(f" PASS: Second unsolicited timestamp (67890ms) received: {received_unsolicited_timestamps}") elif len(received_unsolicited_timestamps) > 0 : logger.warning(f" WARN: Unsolicited timestamp(s) received, but 67890ms not found. Got: {received_unsolicited_timestamps}") else: logger.error(" FAIL: Second unsolicited timestamp (67890ms) NOT received via callback.") # --- Check remaining data in mock port --- remaining_bytes = mock_port.in_waiting if remaining_bytes == 0: logger.success(f"\n--- All test data consumed from mock port. ---") else: logger.warning(f"\n--- {remaining_bytes} bytes remaining in mock port's data stream. ---") logger.debug(f"Remaining data: {mock_port.read(remaining_bytes).hex().upper()}") # Cleanup logger.info("Stopping driver and closing mock port...") driver.disconnect() # This will stop the thread and close the mock port # if os.path.exists(TEST_BINARY_FILE_PATH): # os.remove(TEST_BINARY_FILE_PATH) # logger.info(f"Removed test data file: {TEST_BINARY_FILE_PATH}") if __name__ == "__main__": run_tests() logger.info("Test script finished.")