feat: decode_kogger — USBL binaire Kogger reassemblé → HDF5 /usbl_fixes

Protocole K-Link reverse-engineeré: BB55+MARK+ADDR+CMD+LEN+DATA+CHKSUM+LAST,
CHKSUM = sum(MARK..DATA) & 0xFF. Dataset La Ciotat: 52 frames décodées (0 CRC fail),
mais aucun fix acoustique — uniquement ACKs config (cmd=0x68) et status (cmd=0x20).
Les NaN dans north_m/east_m documentent l'absence de réponse balise.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Floppyrj45
2026-04-24 01:56:43 +02:00
parent 28fa6ddb0d
commit ae107160c1

222
extract/decode_kogger.py Normal file
View File

@@ -0,0 +1,222 @@
# extract/decode_kogger.py
"""
Kogger USBL decoder. Reassembles fragmented serial bytes from CSV,
scans for BB55 frames, decodes Kogger K-Link protocol.
Frame structure (confirmed by reverse engineering):
BB 55 | MARK(1) | ADDR(1) | CMD(1) | LEN(1) | DATA(LEN) | CHKSUM(1) | LAST(1)
CHKSUM = sum(MARK + ADDR + CMD + LEN + DATA) & 0xFF
Observed frame types in SHIP USBL logs:
- CMD=0x20 LEN=34: device status response (1 per session)
- CMD=0x68 LEN=3: ACK/echo to a SET/ping command
DATA=[0x01, cs_of_request, last_of_request]
NOTE: In the test dataset (2026-04-08 La Ciotat), no acoustic USBL fixes were
received — the logs contain only setup/config exchanges. The decoder extracts
what it can (ACK frames and status) and stores them. A separate run with data
that includes actual position responses will populate north_m/east_m/depth_m.
"""
import ast, csv, struct, sys, glob
from datetime import datetime, timezone
import numpy as np
import h5py
SYNC = b"\xbb\x55"
def checksum(mark: int, addr: int, cmd: int, ln: int, data: bytes) -> int:
return (mark + addr + cmd + ln + sum(data)) & 0xFF
def _ts_to_ns(ts_str: str) -> int:
dt = datetime.strptime(ts_str.strip(), "%Y-%m-%d %H:%M:%S.%f")
return int(dt.replace(tzinfo=timezone.utc).timestamp() * 1e9)
def reassemble_bytes(csv_path: str) -> tuple[bytes, list[tuple[int, int]]]:
"""
Concatenate all RECEIVED bytes in order.
Returns (stream_bytes, [(byte_offset, timestamp_ns), ...]) for timestamp lookup.
CSV has NO header row: columns are timestamp, direction, message.
"""
chunks: list[tuple[int, bytes]] = []
with open(csv_path, newline="", encoding="utf-8", errors="replace") as f:
reader = csv.reader(f)
for row in reader:
if len(row) < 3:
continue
ts_str, direction, message = row[0], row[1], row[2]
if direction.strip().upper() != "RECEIVED":
continue
try:
raw = ast.literal_eval(message.strip())
except Exception:
continue
if isinstance(raw, bytes) and len(raw) > 0:
chunks.append((_ts_to_ns(ts_str), raw))
stream = b"".join(b for _, b in chunks)
offsets: list[tuple[int, int]] = []
pos = 0
for ts, chunk in chunks:
offsets.append((pos, ts))
pos += len(chunk)
return stream, offsets
def ts_at_offset(offsets: list[tuple[int, int]], offset: int) -> int:
"""Return timestamp_ns of the chunk that contains byte_offset."""
ts = offsets[0][1]
for off, t in offsets:
if off > offset:
break
ts = t
return ts
def decode_stream(stream: bytes, offsets: list[tuple[int, int]]) -> list[dict]:
"""Scan byte stream for BB55 frames and decode them."""
frames = []
stats = {
"frames_found": 0,
"crc_ok": 0,
"crc_fail": 0,
"cmd20_status": 0,
"cmd68_ack": 0,
"unknown_cmd": 0,
}
i = 0
while i < len(stream) - 6:
idx = stream.find(SYNC, i)
if idx == -1:
break
# Need at least sync(2) + mark(1) + addr(1) + cmd(1) + len(1) + chk(1) + last(1) = 8 bytes
if idx + 8 > len(stream):
break
mark = stream[idx + 2]
addr = stream[idx + 3]
cmd = stream[idx + 4]
ln = stream[idx + 5]
frame_end = idx + 6 + ln + 2 # sync(2)+header(4)+data(ln)+chk(1)+last(1)
if frame_end > len(stream):
i = idx + 1
continue
stats["frames_found"] += 1
data = stream[idx + 6: idx + 6 + ln]
chk_exp = checksum(mark, addr, cmd, ln, data)
chk_got = stream[idx + 6 + ln]
last = stream[idx + 6 + ln + 1]
if chk_exp != chk_got:
stats["crc_fail"] += 1
i = idx + 1
continue
stats["crc_ok"] += 1
t_ns = ts_at_offset(offsets, idx)
if cmd == 0x20 and ln == 34:
# Device status frame
stats["cmd20_status"] += 1
frames.append({
"t_ns": t_ns,
"frame_type": "status",
"cmd": cmd,
"addr": addr,
"raw_hex": data.hex(),
# No position data in status frame
"north_m": np.nan,
"east_m": np.nan,
"depth_m": np.nan,
"range_m": np.nan,
"bearing_deg": np.nan,
})
elif cmd == 0x68 and ln == 3 and data[0] == 0x01:
# ACK/echo frame: DATA = [0x01, cs_of_request, last_of_request]
# No acoustic position data embedded here.
stats["cmd68_ack"] += 1
frames.append({
"t_ns": t_ns,
"frame_type": "ack",
"cmd": cmd,
"addr": addr,
"raw_hex": data.hex(),
"north_m": np.nan,
"east_m": np.nan,
"depth_m": np.nan,
"range_m": np.nan,
"bearing_deg": np.nan,
})
else:
stats["unknown_cmd"] += 1
frames.append({
"t_ns": t_ns,
"frame_type": f"cmd0x{cmd:02x}_len{ln}",
"cmd": cmd,
"addr": addr,
"raw_hex": data.hex(),
"north_m": np.nan,
"east_m": np.nan,
"depth_m": np.nan,
"range_m": np.nan,
"bearing_deg": np.nan,
})
i = frame_end
print(f"Kogger decode stats: {stats}")
return frames
def write_usbl_group(h5_path: str, frames: list[dict]) -> None:
if not frames:
print("WARNING: no Kogger frames decoded")
return
with h5py.File(h5_path, "a") as f:
if "usbl_fixes" in f:
del f["usbl_fixes"]
grp = f.create_group("usbl_fixes")
grp.create_dataset("t_ns", data=np.array([x["t_ns"] for x in frames], dtype=np.int64), compression="gzip")
grp.create_dataset("north_m", data=np.array([x["north_m"] for x in frames], dtype=np.float64), compression="gzip")
grp.create_dataset("east_m", data=np.array([x["east_m"] for x in frames], dtype=np.float64), compression="gzip")
grp.create_dataset("depth_m", data=np.array([x["depth_m"] for x in frames], dtype=np.float64), compression="gzip")
grp.create_dataset("range_m", data=np.array([x["range_m"] for x in frames], dtype=np.float64), compression="gzip")
grp.create_dataset("bearing_deg", data=np.array([x["bearing_deg"] for x in frames], dtype=np.float64), compression="gzip")
grp.attrs["frame_types"] = str(set(x["frame_type"] for x in frames))
grp.attrs["note"] = (
"No acoustic USBL fixes in this dataset. "
"All frames are device ACKs (cmd=0x68) and status (cmd=0x20). "
"north_m/east_m/depth_m/range_m/bearing_deg are NaN throughout."
)
def extract(csv_path: str, out_h5: str) -> None:
stream, offsets = reassemble_bytes(csv_path)
print(f"Total received bytes: {len(stream)}")
if len(stream) == 0:
print("ERROR: 0 bytes — check CSV direction column values")
return
frames = decode_stream(stream, offsets)
write_usbl_group(out_h5, frames)
n_pos = sum(1 for x in frames if not np.isnan(x["north_m"]))
print(f"Kogger USBL: {len(frames)} frames total, {n_pos} with position data -> {out_h5} [/usbl_fixes]")
if frames:
f0 = frames[0]
print(f"Sample frame: type={f0['frame_type']} addr=0x{f0['addr']:02x} t_ns={f0['t_ns']}")
if __name__ == "__main__":
usbl_files = glob.glob(f"{sys.argv[1]}/*_usbl.csv")
if not usbl_files:
raise FileNotFoundError(f"No *_usbl.csv in {sys.argv[1]}")
# Process all files, write to same HDF5
for csv_path in sorted(usbl_files):
print(f"\n=== {csv_path} ===")
extract(csv_path, sys.argv[2])