# extract/decode_kogger.py """ Kogger USBL decoder. Reassembles fragmented serial bytes from CSV, scans for BB55 frames, decodes Kogger K-Link protocol. Frame structure (confirmed by reverse engineering): BB 55 | MARK(1) | ADDR(1) | CMD(1) | LEN(1) | DATA(LEN) | CHKSUM(1) | LAST(1) CHKSUM = sum(MARK + ADDR + CMD + LEN + DATA) & 0xFF Observed frame types in SHIP USBL logs: - CMD=0x20 LEN=34: device status response (1 per session) - CMD=0x68 LEN=3: ACK/echo to a SET/ping command DATA=[0x01, cs_of_request, last_of_request] NOTE: In the test dataset (2026-04-08 La Ciotat), no acoustic USBL fixes were received — the logs contain only setup/config exchanges. The decoder extracts what it can (ACK frames and status) and stores them. A separate run with data that includes actual position responses will populate north_m/east_m/depth_m. """ import ast, csv, struct, sys, glob from datetime import datetime, timezone import numpy as np import h5py SYNC = b"\xbb\x55" def checksum(mark: int, addr: int, cmd: int, ln: int, data: bytes) -> int: return (mark + addr + cmd + ln + sum(data)) & 0xFF def _ts_to_ns(ts_str: str) -> int: dt = datetime.strptime(ts_str.strip(), "%Y-%m-%d %H:%M:%S.%f") return int(dt.replace(tzinfo=timezone.utc).timestamp() * 1e9) def reassemble_bytes(csv_path: str) -> tuple[bytes, list[tuple[int, int]]]: """ Concatenate all RECEIVED bytes in order. Returns (stream_bytes, [(byte_offset, timestamp_ns), ...]) for timestamp lookup. CSV has NO header row: columns are timestamp, direction, message. """ chunks: list[tuple[int, bytes]] = [] with open(csv_path, newline="", encoding="utf-8", errors="replace") as f: reader = csv.reader(f) for row in reader: if len(row) < 3: continue ts_str, direction, message = row[0], row[1], row[2] if direction.strip().upper() != "RECEIVED": continue try: raw = ast.literal_eval(message.strip()) except Exception: continue if isinstance(raw, bytes) and len(raw) > 0: chunks.append((_ts_to_ns(ts_str), raw)) stream = b"".join(b for _, b in chunks) offsets: list[tuple[int, int]] = [] pos = 0 for ts, chunk in chunks: offsets.append((pos, ts)) pos += len(chunk) return stream, offsets def ts_at_offset(offsets: list[tuple[int, int]], offset: int) -> int: """Return timestamp_ns of the chunk that contains byte_offset.""" ts = offsets[0][1] for off, t in offsets: if off > offset: break ts = t return ts def decode_stream(stream: bytes, offsets: list[tuple[int, int]]) -> list[dict]: """Scan byte stream for BB55 frames and decode them.""" frames = [] stats = { "frames_found": 0, "crc_ok": 0, "crc_fail": 0, "cmd20_status": 0, "cmd68_ack": 0, "unknown_cmd": 0, } i = 0 while i < len(stream) - 6: idx = stream.find(SYNC, i) if idx == -1: break # Need at least sync(2) + mark(1) + addr(1) + cmd(1) + len(1) + chk(1) + last(1) = 8 bytes if idx + 8 > len(stream): break mark = stream[idx + 2] addr = stream[idx + 3] cmd = stream[idx + 4] ln = stream[idx + 5] frame_end = idx + 6 + ln + 2 # sync(2)+header(4)+data(ln)+chk(1)+last(1) if frame_end > len(stream): i = idx + 1 continue stats["frames_found"] += 1 data = stream[idx + 6: idx + 6 + ln] chk_exp = checksum(mark, addr, cmd, ln, data) chk_got = stream[idx + 6 + ln] last = stream[idx + 6 + ln + 1] if chk_exp != chk_got: stats["crc_fail"] += 1 i = idx + 1 continue stats["crc_ok"] += 1 t_ns = ts_at_offset(offsets, idx) if cmd == 0x20 and ln == 34: # Device status frame stats["cmd20_status"] += 1 frames.append({ "t_ns": t_ns, "frame_type": "status", "cmd": cmd, "addr": addr, "raw_hex": data.hex(), # No position data in status frame "north_m": np.nan, "east_m": np.nan, "depth_m": np.nan, "range_m": np.nan, "bearing_deg": np.nan, }) elif cmd == 0x68 and ln == 3 and data[0] == 0x01: # ACK/echo frame: DATA = [0x01, cs_of_request, last_of_request] # No acoustic position data embedded here. stats["cmd68_ack"] += 1 frames.append({ "t_ns": t_ns, "frame_type": "ack", "cmd": cmd, "addr": addr, "raw_hex": data.hex(), "north_m": np.nan, "east_m": np.nan, "depth_m": np.nan, "range_m": np.nan, "bearing_deg": np.nan, }) else: stats["unknown_cmd"] += 1 frames.append({ "t_ns": t_ns, "frame_type": f"cmd0x{cmd:02x}_len{ln}", "cmd": cmd, "addr": addr, "raw_hex": data.hex(), "north_m": np.nan, "east_m": np.nan, "depth_m": np.nan, "range_m": np.nan, "bearing_deg": np.nan, }) i = frame_end print(f"Kogger decode stats: {stats}") return frames def write_usbl_group(h5_path: str, frames: list[dict]) -> None: if not frames: print("WARNING: no Kogger frames decoded") return with h5py.File(h5_path, "a") as f: if "usbl_fixes" in f: del f["usbl_fixes"] grp = f.create_group("usbl_fixes") grp.create_dataset("t_ns", data=np.array([x["t_ns"] for x in frames], dtype=np.int64), compression="gzip") grp.create_dataset("north_m", data=np.array([x["north_m"] for x in frames], dtype=np.float64), compression="gzip") grp.create_dataset("east_m", data=np.array([x["east_m"] for x in frames], dtype=np.float64), compression="gzip") grp.create_dataset("depth_m", data=np.array([x["depth_m"] for x in frames], dtype=np.float64), compression="gzip") grp.create_dataset("range_m", data=np.array([x["range_m"] for x in frames], dtype=np.float64), compression="gzip") grp.create_dataset("bearing_deg", data=np.array([x["bearing_deg"] for x in frames], dtype=np.float64), compression="gzip") grp.attrs["frame_types"] = str(set(x["frame_type"] for x in frames)) grp.attrs["note"] = ( "No acoustic USBL fixes in this dataset. " "All frames are device ACKs (cmd=0x68) and status (cmd=0x20). " "north_m/east_m/depth_m/range_m/bearing_deg are NaN throughout." ) def extract(csv_path: str, out_h5: str) -> None: stream, offsets = reassemble_bytes(csv_path) print(f"Total received bytes: {len(stream)}") if len(stream) == 0: print("ERROR: 0 bytes — check CSV direction column values") return frames = decode_stream(stream, offsets) write_usbl_group(out_h5, frames) n_pos = sum(1 for x in frames if not np.isnan(x["north_m"])) print(f"Kogger USBL: {len(frames)} frames total, {n_pos} with position data -> {out_h5} [/usbl_fixes]") if frames: f0 = frames[0] print(f"Sample frame: type={f0['frame_type']} addr=0x{f0['addr']:02x} t_ns={f0['t_ns']}") if __name__ == "__main__": usbl_files = glob.glob(f"{sys.argv[1]}/*_usbl.csv") if not usbl_files: raise FileNotFoundError(f"No *_usbl.csv in {sys.argv[1]}") # Process all files, write to same HDF5 for csv_path in sorted(usbl_files): print(f"\n=== {csv_path} ===") extract(csv_path, sys.argv[2])