feat: USBL heading rotation + poids RTK pour fixes acoustiques
This commit is contained in:
@@ -29,7 +29,8 @@ def parse_nav_log(source: Union[str, IO]) -> list[dict]:
|
||||
val = row.get("value", "").strip()
|
||||
if ts not in buf:
|
||||
buf[ts] = {"ts_str": ts, "easting": None, "northing": None,
|
||||
"utm_num": None, "utm_let": None, "rtk_status": 0}
|
||||
"utm_num": None, "utm_let": None, "rtk_status": 0,
|
||||
"heading_deg": None, "fix_type_raw": None}
|
||||
if data == "Easting":
|
||||
buf[ts]["easting"] = float(val)
|
||||
elif data == "Northing":
|
||||
@@ -38,17 +39,44 @@ def parse_nav_log(source: Union[str, IO]) -> list[dict]:
|
||||
buf[ts]["utm_num"] = str(int(float(val)))
|
||||
elif data == "UTM_letter":
|
||||
buf[ts]["utm_let"] = val
|
||||
elif data.lower() in ("heading", "hdg", "cog", "course", "yaw",
|
||||
"yaw_deg", "heading_deg", "course_deg"):
|
||||
try:
|
||||
buf[ts]["heading_deg"] = float(val)
|
||||
except ValueError:
|
||||
pass
|
||||
elif data.lower() in ("fix_type", "gps_fix_type", "rtk_type",
|
||||
"fix_status", "gps_status"):
|
||||
try:
|
||||
buf[ts]["fix_type_raw"] = int(float(val))
|
||||
except ValueError:
|
||||
pass
|
||||
rows = []
|
||||
for r in buf.values():
|
||||
if r["easting"] is None or r["northing"] is None:
|
||||
continue
|
||||
zone = (r["utm_num"] or "") + (r["utm_let"] or "")
|
||||
# Map ArduPilot fix_type (0-6) → our 3-level RTK status
|
||||
# 0=NoGPS,1=NoFix,2=2D,3=3D,4=DGPS → 0 (3D/none)
|
||||
# 5=RTK_Float → 1, 6=RTK_Fixed → 2
|
||||
raw_fix = r.get("fix_type_raw")
|
||||
if raw_fix is not None:
|
||||
if raw_fix >= 6:
|
||||
rtk = 2
|
||||
elif raw_fix == 5:
|
||||
rtk = 1
|
||||
else:
|
||||
rtk = 0
|
||||
else:
|
||||
rtk = r["rtk_status"] # default 0
|
||||
|
||||
rows.append({
|
||||
"t_ns": _ts_to_ns(r["ts_str"]),
|
||||
"easting": r["easting"],
|
||||
"northing": r["northing"],
|
||||
"utm_zone": zone,
|
||||
"rtk_status": r["rtk_status"],
|
||||
"t_ns": _ts_to_ns(r["ts_str"]),
|
||||
"easting": r["easting"],
|
||||
"northing": r["northing"],
|
||||
"utm_zone": zone,
|
||||
"rtk_status": rtk,
|
||||
"heading_deg": r.get("heading_deg"), # float or None
|
||||
})
|
||||
return sorted(rows, key=lambda r: r["t_ns"])
|
||||
finally:
|
||||
@@ -61,14 +89,20 @@ def write_usv_gps_group(h5_path: str, rows: list[dict]) -> None:
|
||||
e = np.array([r["easting"] for r in rows], dtype=np.float64)
|
||||
n = np.array([r["northing"] for r in rows], dtype=np.float64)
|
||||
rtk = np.array([r["rtk_status"] for r in rows], dtype=np.int8)
|
||||
hdg = np.array(
|
||||
[r["heading_deg"] if r["heading_deg"] is not None else np.nan
|
||||
for r in rows],
|
||||
dtype=np.float64,
|
||||
)
|
||||
with h5py.File(h5_path, "a") as f:
|
||||
if "usv_gps" in f:
|
||||
del f["usv_gps"]
|
||||
grp = f.create_group("usv_gps")
|
||||
grp.create_dataset("t_ns", data=t, compression="gzip")
|
||||
grp.create_dataset("easting", data=e, compression="gzip")
|
||||
grp.create_dataset("northing", data=n, compression="gzip")
|
||||
grp.create_dataset("rtk_status", data=rtk, compression="gzip")
|
||||
grp.create_dataset("t_ns", data=t, compression="gzip")
|
||||
grp.create_dataset("easting", data=e, compression="gzip")
|
||||
grp.create_dataset("northing", data=n, compression="gzip")
|
||||
grp.create_dataset("rtk_status", data=rtk, compression="gzip")
|
||||
grp.create_dataset("heading_deg", data=hdg, compression="gzip")
|
||||
grp.attrs["utm_zone"] = rows[0]["utm_zone"] if rows else ""
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user