#!/usr/bin/env python from loguru import logger import os import time import csv from datetime import datetime import struct # 0:Nothing, 1:Only important, 2:All PRINT_LEVEL = 1 # PATH File for kogger log # AUV log #KOGGER_FILE = "2025-09-01_11-40-00_kogger_log_auv.csv" # USV log KOGGER_FILE = "2025-09-01_11-40-02_kogger_log_usv.csv" # --- Frame parsing logic for simulation flexibility --- SYNC1 = 0xBB SYNC2 = 0x55 ID_USBL_CONTROL = 0x68 TYPE_SETTING = 2 def _parse_sim_frame(frame_bytes): """A simple parser for simulation matching flexibility.""" if not isinstance(frame_bytes, bytes) or len(frame_bytes) < 8: return None if frame_bytes[0] != SYNC1 or frame_bytes[1] != SYNC2: return None try: s1, s2, route, mode, cmd_id, length = struct.unpack_from('> 3) & 0x07 return { 'id': cmd_id, 'type': msg_type, 'version': version, 'payload': frame_bytes[6:6+length] } except (struct.error, IndexError): return None # --- End of parsing logic --- class simu_serial(): is_open = True name = "" log_messages = [] log_idx = 0 in_waiting = 1 kogger_next = None received_write = 0 start_sim_time = None start_log_time = None def _print(text, level): if level <= PRINT_LEVEL: logger.debug(text) def _parse_and_merge_csv(filename): messages = [] script_dir = os.path.dirname(__file__) file_path = os.path.join(script_dir, filename) if not os.path.exists(file_path): # Fallback to current dir if not found next to script file_path = filename with open(file_path, 'r') as f: reader = csv.reader(f) for row in reader: ts_str, msg_type, msg_content = row timestamp = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S.%f') try: byte_msg = eval(msg_content) except Exception as e: simu_serial._print(f"Could not eval message content: {msg_content}, error: {e}", 1) byte_msg = msg_content.encode() messages.append({'timestamp': timestamp, 'type': msg_type, 'msg': byte_msg}) merged = [] i = 0 while i < len(messages): msg = messages[i] if msg['type'] == 'RECEIVED' and msg['msg'] == b'\xbb': full_msg = b'\xbb' ts = msg['timestamp'] i += 1 while i < len(messages) and messages[i]['type'] == 'RECEIVED' and messages[i]['msg'] != b'\xbb': full_msg += messages[i]['msg'] i += 1 merged.append({'timestamp': ts, 'type': 'RECEIVED', 'msg': full_msg}) else: merged.append(msg) i += 1 return merged def Serial(port, baudrate=921600, timeout=1): logger.error("!!!!!!Init kogger in simulation mode!!!!!!!") simu_serial.name = port simu_serial.log_messages = simu_serial._parse_and_merge_csv(KOGGER_FILE) if not simu_serial.log_messages: logger.error(f"No messages loaded from {KOGGER_FILE}") return simu_serial simu_serial.log_idx = 0 simu_serial.start_sim_time = time.time() simu_serial.start_log_time = simu_serial.log_messages[0]['timestamp'] return simu_serial def close(): simu_serial._print("Close kogger", 1) return True def read(size=1): if simu_serial.received_write == 1: line = simu_serial.kogger_next simu_serial.kogger_next = None simu_serial.received_write = 0 simu_serial._print(f"read (from write): {line}", 2) return line if simu_serial.log_idx >= len(simu_serial.log_messages): simu_serial._print("End of log file.", 1) time.sleep(1) return b'' next_msg = simu_serial.log_messages[simu_serial.log_idx] log_time_elapsed = (next_msg['timestamp'] - simu_serial.start_log_time).total_seconds() sim_time_elapsed = time.time() - simu_serial.start_sim_time if sim_time_elapsed < log_time_elapsed: sleep_duration = log_time_elapsed - sim_time_elapsed if sleep_duration > 0: time.sleep(sleep_duration) # Re-check current message as time has passed if simu_serial.log_idx >= len(simu_serial.log_messages): return b'' next_msg = simu_serial.log_messages[simu_serial.log_idx] if next_msg['type'] == 'RECEIVED': simu_serial.log_idx += 1 simu_serial._print(f"read (unsolicited): {next_msg['msg']}", 2) return next_msg['msg'] else: # SENT return b'' def write(text): simu_serial._print(f"Write: {text}", 2) incoming_frame_info = _parse_sim_frame(text) search_idx = simu_serial.log_idx while search_idx < len(simu_serial.log_messages): log_entry = simu_serial.log_messages[search_idx] if log_entry['type'] == 'SENT': match = False # Check for flexible match on acoustic ping (USBL_CONTROL with state) if (incoming_frame_info and incoming_frame_info['id'] == ID_USBL_CONTROL and incoming_frame_info['type'] == TYPE_SETTING and incoming_frame_info['version'] == 1): log_frame_info = _parse_sim_frame(log_entry['msg']) if (log_frame_info and log_frame_info['id'] == ID_USBL_CONTROL and log_frame_info['type'] == TYPE_SETTING and log_frame_info['version'] == 1): simu_serial._print(f"Flexible match for USBL_CONTROL message. Incoming: {text.hex()}, Logged: {log_entry['msg'].hex()}", 2) match = True # Fallback to exact match for all other messages if not match and log_entry['msg'] == text: match = True if match: # Sync time log_time_elapsed = (log_entry['timestamp'] - simu_serial.start_log_time).total_seconds() simu_serial.start_sim_time = time.time() - log_time_elapsed response_idx = search_idx + 1 while response_idx < len(simu_serial.log_messages): if simu_serial.log_messages[response_idx]['type'] == 'RECEIVED': simu_serial.kogger_next = simu_serial.log_messages[response_idx]['msg'] simu_serial.received_write = 1 # Advance log time to response time log_time_elapsed_resp = (simu_serial.log_messages[response_idx]['timestamp'] - simu_serial.start_log_time).total_seconds() sim_time_elapsed = time.time() - simu_serial.start_sim_time if sim_time_elapsed < log_time_elapsed_resp: sleep_duration = log_time_elapsed_resp - sim_time_elapsed if sleep_duration > 0: time.sleep(sleep_duration) simu_serial.log_idx = response_idx + 1 return response_idx += 1 logger.warning(f"No RECEIVED message found after SENT: {text}") simu_serial.log_idx = search_idx + 1 return search_idx += 1 logger.error(f"Unexpected SENT message: {text} from index {simu_serial.log_idx}") def reset_output_buffer(): pass