297 lines
12 KiB
Python
297 lines
12 KiB
Python
# fuse/fuse_trajectory.py
|
|
"""
|
|
Umeyama-based trajectory fusion.
|
|
Aligns lingbot local poses to world coordinates using USBL/GPS absolute fixes.
|
|
Gracefully handles missing lingbot poses or no absolute AUV fixes.
|
|
"""
|
|
import sys
|
|
from pathlib import Path
|
|
import numpy as np
|
|
import h5py
|
|
|
|
|
|
def umeyama(src: np.ndarray, dst: np.ndarray,
|
|
weights: np.ndarray | None = None) -> tuple[float, np.ndarray, np.ndarray]:
|
|
"""
|
|
Weighted Umeyama: find scale, R, t such that dst ≈ scale * R @ src + t
|
|
src, dst : (N, 3) float64
|
|
weights : (N,) non-negative
|
|
Raises ValueError if N < 3.
|
|
"""
|
|
N = len(src)
|
|
if N < 3:
|
|
raise ValueError(f"Umeyama requires at least 3 point pairs, got {N}")
|
|
if weights is None:
|
|
weights = np.ones(N, dtype=np.float64)
|
|
w = weights / weights.sum()
|
|
mu_s = (w[:, None] * src).sum(0)
|
|
mu_d = (w[:, None] * dst).sum(0)
|
|
src_c, dst_c = src - mu_s, dst - mu_d
|
|
cov = (dst_c * w[:, None]).T @ src_c
|
|
U, D, Vt = np.linalg.svd(cov)
|
|
S = np.eye(3)
|
|
if np.linalg.det(U) * np.linalg.det(Vt) < 0:
|
|
S[2, 2] = -1
|
|
R = U @ S @ Vt
|
|
var_s = (w * np.sum(src_c ** 2, axis=1)).sum()
|
|
scale = float(np.sum(D * np.diag(S)) / var_s) if var_s > 0 else 1.0
|
|
t = mu_d - scale * R @ mu_s
|
|
return scale, R, t
|
|
|
|
|
|
def umeyama_2d(src: np.ndarray, dst: np.ndarray,
|
|
weights: np.ndarray | None = None) -> tuple[float, np.ndarray, np.ndarray]:
|
|
"""
|
|
2D Umeyama: XY alignment only.
|
|
src, dst : (N, 2) float64
|
|
Returns scale, R (2x2), t (2,)
|
|
"""
|
|
N = len(src)
|
|
if N < 2:
|
|
raise ValueError(f"umeyama_2d requires at least 2 point pairs, got {N}")
|
|
if weights is None:
|
|
weights = np.ones(N, dtype=np.float64)
|
|
w = weights / weights.sum()
|
|
mu_s = (w[:, None] * src).sum(0)
|
|
mu_d = (w[:, None] * dst).sum(0)
|
|
src_c, dst_c = src - mu_s, dst - mu_d
|
|
cov = (dst_c * w[:, None]).T @ src_c
|
|
U, D, Vt = np.linalg.svd(cov)
|
|
S = np.eye(2)
|
|
if np.linalg.det(U) * np.linalg.det(Vt) < 0:
|
|
S[1, 1] = -1
|
|
R2 = U @ S @ Vt
|
|
var_s = (w * np.sum(src_c ** 2, axis=1)).sum()
|
|
scale = float(np.sum(D * np.diag(S)) / var_s) if var_s > 0 else 1.0
|
|
t2 = mu_d - scale * R2 @ mu_s
|
|
return scale, R2, t2
|
|
|
|
|
|
def fuse(fixes_h5: str, poses_npz: str, out_h5: str,
|
|
outlier_sigma: float = 2.0) -> None:
|
|
|
|
# 1. Load fixes
|
|
with h5py.File(fixes_h5, "r") as f:
|
|
usv_e = f["usv_gps/easting"][:]
|
|
usv_n = f["usv_gps/northing"][:]
|
|
usv_t = f["usv_gps/t_ns"][:]
|
|
utm_zone = f["usv_gps"].attrs.get("utm_zone", "31T")
|
|
|
|
auv_lat = f["auv_mcap/lat"][:]
|
|
auv_lon = f["auv_mcap/lon"][:]
|
|
auv_dep = f["auv_mcap/depth_m"][:]
|
|
auv_t = f["auv_mcap/t_ns"][:]
|
|
auv_alt_m = f["auv_mcap/altitude_m"][:] if "auv_mcap/altitude_m" in f else None
|
|
|
|
usbl_n = f["usbl_fixes/north_m"][:] if "usbl_fixes" in f else np.array([])
|
|
usbl_e = f["usbl_fixes/east_m"][:] if "usbl_fixes" in f else np.array([])
|
|
usbl_d = f["usbl_fixes/depth_m"][:] if "usbl_fixes" in f else np.array([])
|
|
usbl_t = f["usbl_fixes/t_ns"][:] if "usbl_fixes" in f else np.array([], dtype=np.int64)
|
|
usbl_bearing = f["usbl_fixes/bearing_deg"][:] if "usbl_fixes" in f else np.array([])
|
|
usbl_range_m = f["usbl_fixes/range_m"][:] if "usbl_fixes" in f else np.array([])
|
|
usv_rtk = f["usv_gps/rtk_status"][:]
|
|
usv_hdg_raw = f["usv_gps/heading_deg"][:] if "usv_gps/heading_deg" in f else np.full(len(usv_t), np.nan)
|
|
|
|
# 2. Check for lingbot poses
|
|
poses_path = Path(poses_npz)
|
|
if not poses_path.exists():
|
|
print(f"WARNING: lingbot poses not found at {poses_npz}")
|
|
print("Saving sources only — re-run after Plan 2 generates lingbot_poses.npz")
|
|
with h5py.File(out_h5, "w") as f:
|
|
f.attrs["status"] = "no_lingbot_poses"
|
|
f.attrs["utm_zone"] = utm_zone
|
|
return
|
|
|
|
# 3. Load lingbot poses
|
|
data = np.load(poses_npz)
|
|
poses_34 = data["poses"] # (N, 3, 4)
|
|
pose_t = data["timestamps_ns"] # (N,) int64
|
|
ling_xyz = poses_34[:, :3, 3] # camera positions in local frame
|
|
|
|
# 4. Build absolute AUV reference points
|
|
usbl_weights = np.array([]) # populated only if USBL path used
|
|
has_usbl_fixes = (len(usbl_bearing) > 0
|
|
and not np.all(np.isnan(usbl_bearing))
|
|
and not np.all(np.isnan(usbl_range_m)))
|
|
|
|
has_auv_gps = not np.all(auv_lat == 0.0)
|
|
|
|
if not has_usbl_fixes and not has_auv_gps:
|
|
print("WARNING: No absolute AUV position fixes available.")
|
|
print("Cannot perform Umeyama alignment — saving local lingbot trajectory only.")
|
|
_save_local_only(out_h5, pose_t, poses_34, ling_xyz, utm_zone)
|
|
return
|
|
|
|
# 5. Build world reference: interpolate AUV absolute pos at lingbot timestamps
|
|
if has_auv_gps:
|
|
from pyproj import Transformer
|
|
zone_num = int("".join(c for c in utm_zone if c.isdigit()))
|
|
hemi = "north" if utm_zone[-1].upper() >= "N" else "south"
|
|
proj = f"+proj=utm +zone={zone_num} +{hemi} +ellps=WGS84"
|
|
tr = Transformer.from_crs("EPSG:4326", proj, always_xy=True)
|
|
auv_x, auv_y = tr.transform(auv_lon, auv_lat)
|
|
auv_xyz_world = np.column_stack([auv_x, auv_y, -auv_dep])
|
|
ref_t = auv_t
|
|
else:
|
|
# USBL bearing+range → world AUV position using USV heading
|
|
usbl_t_f = usbl_t.astype(float)
|
|
usv_t_f = usv_t.astype(float)
|
|
usv_e_i = np.interp(usbl_t_f, usv_t_f, usv_e)
|
|
usv_n_i = np.interp(usbl_t_f, usv_t_f, usv_n)
|
|
usv_rtk_i = np.interp(usbl_t_f, usv_t_f, usv_rtk.astype(float))
|
|
|
|
has_heading = not np.all(np.isnan(usv_hdg_raw))
|
|
if has_heading:
|
|
usv_hdg_i = np.interp(usbl_t_f, usv_t_f, usv_hdg_raw)
|
|
else:
|
|
print("WARNING: no USV heading in HDF5 — using bearing as world-frame offset (inaccurate)")
|
|
usv_hdg_i = np.zeros(len(usbl_t))
|
|
|
|
# Horizontal range (remove depth component)
|
|
range_h = np.sqrt(np.maximum(0.0, usbl_range_m ** 2 - usbl_d ** 2))
|
|
angle_rad = np.deg2rad(usv_hdg_i + usbl_bearing)
|
|
auv_e_abs = usv_e_i + range_h * np.sin(angle_rad)
|
|
auv_n_abs = usv_n_i + range_h * np.cos(angle_rad)
|
|
auv_xyz_world = np.column_stack([auv_e_abs, auv_n_abs, -usbl_d])
|
|
ref_t = usbl_t
|
|
|
|
# RTK quality weights for USBL correspondences
|
|
rtk_weight_map = np.array([0.05, 0.3, 1.0]) # index 0=3D, 1=float, 2=fix
|
|
usbl_weights = rtk_weight_map[np.clip(np.round(usv_rtk_i).astype(int), 0, 2)]
|
|
|
|
# 6. Match lingbot timestamps → world reference points
|
|
src_pts, dst_pts, weights = [], [], []
|
|
for i, pt in enumerate(pose_t):
|
|
idx = np.argmin(np.abs(ref_t - pt))
|
|
if np.abs(ref_t[idx] - pt) > 5e9: # > 5s gap → skip
|
|
continue
|
|
src_pts.append(ling_xyz[i])
|
|
dst_pts.append(auv_xyz_world[idx])
|
|
# Weight: USBL path uses RTK-based weight; GPS path uses 1.0
|
|
if has_usbl_fixes and not has_auv_gps:
|
|
weights.append(float(usbl_weights[idx]))
|
|
else:
|
|
weights.append(1.0)
|
|
|
|
if len(src_pts) < 3:
|
|
print(f"WARNING: Only {len(src_pts)} correspondences (need ≥3). Saving local only.")
|
|
_save_local_only(out_h5, pose_t, poses_34, ling_xyz, utm_zone)
|
|
return
|
|
|
|
src = np.array(src_pts)
|
|
dst = np.array(dst_pts)
|
|
w = np.array(weights)
|
|
|
|
# 7. Choose alignment strategy: 2D when altitude data available, else 3D
|
|
has_altitude = (auv_alt_m is not None and len(auv_alt_m) > 0
|
|
and not np.all(auv_alt_m == 0))
|
|
|
|
if has_altitude:
|
|
# 2D Umeyama (XY only) + pressure depth for Z
|
|
src_xy = src[:, :2]
|
|
dst_xy = dst[:, :2]
|
|
scale_xy, R2, t2 = umeyama_2d(src_xy, dst_xy, w)
|
|
|
|
residuals_2d = np.linalg.norm(
|
|
(scale_xy * (R2 @ src_xy.T).T + t2) - dst_xy, axis=1)
|
|
sigma = residuals_2d.std()
|
|
mask = (residuals_2d < outlier_sigma * sigma) if sigma > 0 \
|
|
else np.ones(len(src_xy), dtype=bool)
|
|
print(f"2D Umeyama: {len(src_xy)} total, {mask.sum()} after outlier rejection "
|
|
f"(sigma_xy={sigma:.3f}m, scale_xy={scale_xy:.4f})")
|
|
scale_xy, R2, t2 = umeyama_2d(src_xy[mask], dst_xy[mask], w[mask])
|
|
|
|
# Expand R2 to 3x3 (identity for Z)
|
|
R3 = np.eye(3)
|
|
R3[:2, :2] = R2
|
|
t3 = np.array([t2[0], t2[1], 0.0])
|
|
|
|
# Transform lingbot poses; override Z from pressure sensor at each timestamp
|
|
N_poses = len(pose_t)
|
|
poses_world = np.zeros((N_poses, 4, 4))
|
|
poses_world[:, 3, 3] = 1.0
|
|
|
|
for i in range(N_poses):
|
|
R_local = poses_34[i, :3, :3]
|
|
t_local = poses_34[i, :3, 3]
|
|
poses_world[i, :3, :3] = R3 @ R_local
|
|
t_xy = scale_xy * R2 @ t_local[:2] + t2
|
|
# Z from pressure depth (interpolated at lingbot timestamp)
|
|
z_world = float(np.interp(
|
|
float(pose_t[i]), auv_t.astype(float), auv_dep.astype(float)))
|
|
poses_world[i, :3, 3] = [t_xy[0], t_xy[1], z_world]
|
|
|
|
xyz_world = poses_world[:, :3, 3]
|
|
residuals_2d_refined = np.linalg.norm(
|
|
(scale_xy * (R2 @ src_xy[mask].T).T + t2) - dst_xy[mask], axis=1)
|
|
rmse_m = float(residuals_2d_refined.mean())
|
|
scale_out = scale_xy
|
|
R_out = R3
|
|
t_out = t3
|
|
print(f"2D fusion OK: scale_xy={scale_xy:.4f} RMSE_xy={rmse_m:.3f}m -> {out_h5}")
|
|
|
|
else:
|
|
# 3D Umeyama fallback (original logic)
|
|
scale, R, t = umeyama(src, dst, w)
|
|
residuals = np.linalg.norm((scale * (R @ src.T).T + t) - dst, axis=1)
|
|
sigma = residuals.std()
|
|
mask = (residuals < outlier_sigma * sigma) if sigma > 0 \
|
|
else np.ones(len(src), dtype=bool)
|
|
print(f"3D Umeyama: {len(src)} total, {mask.sum()} after outlier rejection "
|
|
f"(sigma={sigma:.3f}m)")
|
|
scale, R, t = umeyama(src[mask], dst[mask], w[mask])
|
|
|
|
N_poses = len(pose_t)
|
|
poses_world = np.zeros((N_poses, 4, 4))
|
|
poses_world[:, 3, 3] = 1.0
|
|
for i in range(N_poses):
|
|
R_local = poses_34[i, :3, :3]
|
|
t_local = poses_34[i, :3, 3]
|
|
poses_world[i, :3, :3] = R @ R_local
|
|
poses_world[i, :3, 3] = scale * R @ t_local + t
|
|
|
|
xyz_world = poses_world[:, :3, 3]
|
|
residuals_final = residuals
|
|
rmse_m = float(residuals[mask].mean())
|
|
scale_out = scale
|
|
R_out = R
|
|
t_out = t
|
|
print(f"3D fusion OK: scale={scale:.4f} RMSE={rmse_m:.3f}m -> {out_h5}")
|
|
|
|
# 9. Write trajectory_world.h5
|
|
with h5py.File(out_h5, "w") as f:
|
|
f.attrs["status"] = "aligned"
|
|
f.attrs["utm_zone"] = utm_zone
|
|
f.attrs["fusion_mode"] = "2d" if has_altitude else "3d"
|
|
|
|
al = f.create_group("alignment")
|
|
al.attrs["scale"] = scale_out
|
|
al.attrs["rmse_m"] = rmse_m
|
|
al.attrs["n_correspondences"] = int(mask.sum())
|
|
al.create_dataset("R", data=R_out)
|
|
al.create_dataset("t", data=t_out)
|
|
|
|
pw = f.create_group("poses_world")
|
|
pw.create_dataset("t_ns", data=pose_t, compression="gzip")
|
|
pw.create_dataset("x_m", data=xyz_world[:, 0], compression="gzip")
|
|
pw.create_dataset("y_m", data=xyz_world[:, 1], compression="gzip")
|
|
pw.create_dataset("z_m", data=xyz_world[:, 2], compression="gzip")
|
|
pw.create_dataset("T_4x4", data=poses_world, compression="gzip")
|
|
|
|
|
|
def _save_local_only(out_h5: str, pose_t, poses_34, ling_xyz, utm_zone):
|
|
with h5py.File(out_h5, "w") as f:
|
|
f.attrs["status"] = "local_only"
|
|
f.attrs["utm_zone"] = utm_zone
|
|
pw = f.create_group("poses_world")
|
|
pw.create_dataset("t_ns", data=pose_t, compression="gzip")
|
|
pw.create_dataset("x_m", data=ling_xyz[:, 0], compression="gzip")
|
|
pw.create_dataset("y_m", data=ling_xyz[:, 1], compression="gzip")
|
|
pw.create_dataset("z_m", data=ling_xyz[:, 2], compression="gzip")
|
|
n = len(poses_34); p44 = np.zeros((n,4,4)); p44[:,:3,:] = poses_34; p44[:,3,3] = 1.0; pw.create_dataset("T_4x4", data=p44, compression="gzip")
|
|
print(f"Saved local lingbot trajectory (no world alignment) → {out_h5}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
fuse(sys.argv[1], sys.argv[2], sys.argv[3])
|