diff --git a/extract/decode_kogger.py b/extract/decode_kogger.py new file mode 100644 index 0000000..6ab733f --- /dev/null +++ b/extract/decode_kogger.py @@ -0,0 +1,222 @@ +# extract/decode_kogger.py +""" +Kogger USBL decoder. Reassembles fragmented serial bytes from CSV, +scans for BB55 frames, decodes Kogger K-Link protocol. + +Frame structure (confirmed by reverse engineering): + BB 55 | MARK(1) | ADDR(1) | CMD(1) | LEN(1) | DATA(LEN) | CHKSUM(1) | LAST(1) + CHKSUM = sum(MARK + ADDR + CMD + LEN + DATA) & 0xFF + +Observed frame types in SHIP USBL logs: + - CMD=0x20 LEN=34: device status response (1 per session) + - CMD=0x68 LEN=3: ACK/echo to a SET/ping command + DATA=[0x01, cs_of_request, last_of_request] + +NOTE: In the test dataset (2026-04-08 La Ciotat), no acoustic USBL fixes were +received — the logs contain only setup/config exchanges. The decoder extracts +what it can (ACK frames and status) and stores them. A separate run with data +that includes actual position responses will populate north_m/east_m/depth_m. +""" +import ast, csv, struct, sys, glob +from datetime import datetime, timezone +import numpy as np +import h5py + +SYNC = b"\xbb\x55" + + +def checksum(mark: int, addr: int, cmd: int, ln: int, data: bytes) -> int: + return (mark + addr + cmd + ln + sum(data)) & 0xFF + + +def _ts_to_ns(ts_str: str) -> int: + dt = datetime.strptime(ts_str.strip(), "%Y-%m-%d %H:%M:%S.%f") + return int(dt.replace(tzinfo=timezone.utc).timestamp() * 1e9) + + +def reassemble_bytes(csv_path: str) -> tuple[bytes, list[tuple[int, int]]]: + """ + Concatenate all RECEIVED bytes in order. + Returns (stream_bytes, [(byte_offset, timestamp_ns), ...]) for timestamp lookup. + CSV has NO header row: columns are timestamp, direction, message. + """ + chunks: list[tuple[int, bytes]] = [] + with open(csv_path, newline="", encoding="utf-8", errors="replace") as f: + reader = csv.reader(f) + for row in reader: + if len(row) < 3: + continue + ts_str, direction, message = row[0], row[1], row[2] + if direction.strip().upper() != "RECEIVED": + continue + try: + raw = ast.literal_eval(message.strip()) + except Exception: + continue + if isinstance(raw, bytes) and len(raw) > 0: + chunks.append((_ts_to_ns(ts_str), raw)) + + stream = b"".join(b for _, b in chunks) + offsets: list[tuple[int, int]] = [] + pos = 0 + for ts, chunk in chunks: + offsets.append((pos, ts)) + pos += len(chunk) + return stream, offsets + + +def ts_at_offset(offsets: list[tuple[int, int]], offset: int) -> int: + """Return timestamp_ns of the chunk that contains byte_offset.""" + ts = offsets[0][1] + for off, t in offsets: + if off > offset: + break + ts = t + return ts + + +def decode_stream(stream: bytes, offsets: list[tuple[int, int]]) -> list[dict]: + """Scan byte stream for BB55 frames and decode them.""" + frames = [] + stats = { + "frames_found": 0, + "crc_ok": 0, + "crc_fail": 0, + "cmd20_status": 0, + "cmd68_ack": 0, + "unknown_cmd": 0, + } + i = 0 + while i < len(stream) - 6: + idx = stream.find(SYNC, i) + if idx == -1: + break + + # Need at least sync(2) + mark(1) + addr(1) + cmd(1) + len(1) + chk(1) + last(1) = 8 bytes + if idx + 8 > len(stream): + break + + mark = stream[idx + 2] + addr = stream[idx + 3] + cmd = stream[idx + 4] + ln = stream[idx + 5] + frame_end = idx + 6 + ln + 2 # sync(2)+header(4)+data(ln)+chk(1)+last(1) + + if frame_end > len(stream): + i = idx + 1 + continue + + stats["frames_found"] += 1 + data = stream[idx + 6: idx + 6 + ln] + chk_exp = checksum(mark, addr, cmd, ln, data) + chk_got = stream[idx + 6 + ln] + last = stream[idx + 6 + ln + 1] + + if chk_exp != chk_got: + stats["crc_fail"] += 1 + i = idx + 1 + continue + + stats["crc_ok"] += 1 + t_ns = ts_at_offset(offsets, idx) + + if cmd == 0x20 and ln == 34: + # Device status frame + stats["cmd20_status"] += 1 + frames.append({ + "t_ns": t_ns, + "frame_type": "status", + "cmd": cmd, + "addr": addr, + "raw_hex": data.hex(), + # No position data in status frame + "north_m": np.nan, + "east_m": np.nan, + "depth_m": np.nan, + "range_m": np.nan, + "bearing_deg": np.nan, + }) + + elif cmd == 0x68 and ln == 3 and data[0] == 0x01: + # ACK/echo frame: DATA = [0x01, cs_of_request, last_of_request] + # No acoustic position data embedded here. + stats["cmd68_ack"] += 1 + frames.append({ + "t_ns": t_ns, + "frame_type": "ack", + "cmd": cmd, + "addr": addr, + "raw_hex": data.hex(), + "north_m": np.nan, + "east_m": np.nan, + "depth_m": np.nan, + "range_m": np.nan, + "bearing_deg": np.nan, + }) + + else: + stats["unknown_cmd"] += 1 + frames.append({ + "t_ns": t_ns, + "frame_type": f"cmd0x{cmd:02x}_len{ln}", + "cmd": cmd, + "addr": addr, + "raw_hex": data.hex(), + "north_m": np.nan, + "east_m": np.nan, + "depth_m": np.nan, + "range_m": np.nan, + "bearing_deg": np.nan, + }) + + i = frame_end + + print(f"Kogger decode stats: {stats}") + return frames + + +def write_usbl_group(h5_path: str, frames: list[dict]) -> None: + if not frames: + print("WARNING: no Kogger frames decoded") + return + with h5py.File(h5_path, "a") as f: + if "usbl_fixes" in f: + del f["usbl_fixes"] + grp = f.create_group("usbl_fixes") + grp.create_dataset("t_ns", data=np.array([x["t_ns"] for x in frames], dtype=np.int64), compression="gzip") + grp.create_dataset("north_m", data=np.array([x["north_m"] for x in frames], dtype=np.float64), compression="gzip") + grp.create_dataset("east_m", data=np.array([x["east_m"] for x in frames], dtype=np.float64), compression="gzip") + grp.create_dataset("depth_m", data=np.array([x["depth_m"] for x in frames], dtype=np.float64), compression="gzip") + grp.create_dataset("range_m", data=np.array([x["range_m"] for x in frames], dtype=np.float64), compression="gzip") + grp.create_dataset("bearing_deg", data=np.array([x["bearing_deg"] for x in frames], dtype=np.float64), compression="gzip") + grp.attrs["frame_types"] = str(set(x["frame_type"] for x in frames)) + grp.attrs["note"] = ( + "No acoustic USBL fixes in this dataset. " + "All frames are device ACKs (cmd=0x68) and status (cmd=0x20). " + "north_m/east_m/depth_m/range_m/bearing_deg are NaN throughout." + ) + + +def extract(csv_path: str, out_h5: str) -> None: + stream, offsets = reassemble_bytes(csv_path) + print(f"Total received bytes: {len(stream)}") + if len(stream) == 0: + print("ERROR: 0 bytes — check CSV direction column values") + return + frames = decode_stream(stream, offsets) + write_usbl_group(out_h5, frames) + n_pos = sum(1 for x in frames if not np.isnan(x["north_m"])) + print(f"Kogger USBL: {len(frames)} frames total, {n_pos} with position data -> {out_h5} [/usbl_fixes]") + if frames: + f0 = frames[0] + print(f"Sample frame: type={f0['frame_type']} addr=0x{f0['addr']:02x} t_ns={f0['t_ns']}") + + +if __name__ == "__main__": + usbl_files = glob.glob(f"{sys.argv[1]}/*_usbl.csv") + if not usbl_files: + raise FileNotFoundError(f"No *_usbl.csv in {sys.argv[1]}") + # Process all files, write to same HDF5 + for csv_path in sorted(usbl_files): + print(f"\n=== {csv_path} ===") + extract(csv_path, sys.argv[2])