# fuse/fuse_trajectory.py """ Umeyama-based trajectory fusion. Aligns lingbot local poses to world coordinates using USBL/GPS absolute fixes. Gracefully handles missing lingbot poses or no absolute AUV fixes. """ import sys from pathlib import Path import numpy as np import h5py def umeyama(src: np.ndarray, dst: np.ndarray, weights: np.ndarray | None = None) -> tuple[float, np.ndarray, np.ndarray]: """ Weighted Umeyama: find scale, R, t such that dst ≈ scale * R @ src + t src, dst : (N, 3) float64 weights : (N,) non-negative Raises ValueError if N < 3. """ N = len(src) if N < 3: raise ValueError(f"Umeyama requires at least 3 point pairs, got {N}") if weights is None: weights = np.ones(N, dtype=np.float64) w = weights / weights.sum() mu_s = (w[:, None] * src).sum(0) mu_d = (w[:, None] * dst).sum(0) src_c, dst_c = src - mu_s, dst - mu_d cov = (dst_c * w[:, None]).T @ src_c U, D, Vt = np.linalg.svd(cov) S = np.eye(3) if np.linalg.det(U) * np.linalg.det(Vt) < 0: S[2, 2] = -1 R = U @ S @ Vt var_s = (w * np.sum(src_c ** 2, axis=1)).sum() scale = float(np.sum(D * np.diag(S)) / var_s) if var_s > 0 else 1.0 t = mu_d - scale * R @ mu_s return scale, R, t def umeyama_2d(src: np.ndarray, dst: np.ndarray, weights: np.ndarray | None = None) -> tuple[float, np.ndarray, np.ndarray]: """ 2D Umeyama: XY alignment only. src, dst : (N, 2) float64 Returns scale, R (2x2), t (2,) """ N = len(src) if N < 2: raise ValueError(f"umeyama_2d requires at least 2 point pairs, got {N}") if weights is None: weights = np.ones(N, dtype=np.float64) w = weights / weights.sum() mu_s = (w[:, None] * src).sum(0) mu_d = (w[:, None] * dst).sum(0) src_c, dst_c = src - mu_s, dst - mu_d cov = (dst_c * w[:, None]).T @ src_c U, D, Vt = np.linalg.svd(cov) S = np.eye(2) if np.linalg.det(U) * np.linalg.det(Vt) < 0: S[1, 1] = -1 R2 = U @ S @ Vt var_s = (w * np.sum(src_c ** 2, axis=1)).sum() scale = float(np.sum(D * np.diag(S)) / var_s) if var_s > 0 else 1.0 t2 = mu_d - scale * R2 @ mu_s return scale, R2, t2 def fuse(fixes_h5: str, poses_npz: str, out_h5: str, outlier_sigma: float = 2.0) -> None: # 1. Load fixes with h5py.File(fixes_h5, "r") as f: usv_e = f["usv_gps/easting"][:] usv_n = f["usv_gps/northing"][:] usv_t = f["usv_gps/t_ns"][:] utm_zone = f["usv_gps"].attrs.get("utm_zone", "31T") auv_lat = f["auv_mcap/lat"][:] auv_lon = f["auv_mcap/lon"][:] auv_dep = f["auv_mcap/depth_m"][:] auv_t = f["auv_mcap/t_ns"][:] auv_alt_m = f["auv_mcap/altitude_m"][:] if "auv_mcap/altitude_m" in f else None usbl_n = f["usbl_fixes/north_m"][:] if "usbl_fixes" in f else np.array([]) usbl_e = f["usbl_fixes/east_m"][:] if "usbl_fixes" in f else np.array([]) usbl_d = f["usbl_fixes/depth_m"][:] if "usbl_fixes" in f else np.array([]) usbl_t = f["usbl_fixes/t_ns"][:] if "usbl_fixes" in f else np.array([], dtype=np.int64) # 2. Check for lingbot poses poses_path = Path(poses_npz) if not poses_path.exists(): print(f"WARNING: lingbot poses not found at {poses_npz}") print("Saving sources only — re-run after Plan 2 generates lingbot_poses.npz") with h5py.File(out_h5, "w") as f: f.attrs["status"] = "no_lingbot_poses" f.attrs["utm_zone"] = utm_zone return # 3. Load lingbot poses data = np.load(poses_npz) poses_34 = data["poses"] # (N, 3, 4) pose_t = data["timestamps_ns"] # (N,) int64 ling_xyz = poses_34[:, :3, 3] # camera positions in local frame # 4. Build absolute AUV reference points has_usbl_fixes = (len(usbl_n) > 0 and not np.all(usbl_n == 0) and not np.all(usbl_e == 0)) has_auv_gps = not np.all(auv_lat == 0.0) if not has_usbl_fixes and not has_auv_gps: print("WARNING: No absolute AUV position fixes available.") print("Cannot perform Umeyama alignment — saving local lingbot trajectory only.") _save_local_only(out_h5, pose_t, poses_34, ling_xyz, utm_zone) return # 5. Build world reference: interpolate AUV absolute pos at lingbot timestamps if has_auv_gps: from pyproj import Transformer zone_num = int("".join(c for c in utm_zone if c.isdigit())) hemi = "north" if utm_zone[-1].upper() >= "N" else "south" proj = f"+proj=utm +zone={zone_num} +{hemi} +ellps=WGS84" tr = Transformer.from_crs("EPSG:4326", proj, always_xy=True) auv_x, auv_y = tr.transform(auv_lon, auv_lat) auv_xyz_world = np.column_stack([auv_x, auv_y, -auv_dep]) ref_t = auv_t else: # USBL relative offsets + USV GPS → AUV absolute usv_e_i = np.interp(usbl_t.astype(float), usv_t.astype(float), usv_e) usv_n_i = np.interp(usbl_t.astype(float), usv_t.astype(float), usv_n) auv_xyz_world = np.column_stack([usv_e_i + usbl_e, usv_n_i + usbl_n, -usbl_d]) ref_t = usbl_t # 6. Match lingbot timestamps → world reference points src_pts, dst_pts, weights = [], [], [] for i, pt in enumerate(pose_t): idx = np.argmin(np.abs(ref_t - pt)) if np.abs(ref_t[idx] - pt) > 5e9: # > 5s gap → skip continue src_pts.append(ling_xyz[i]) dst_pts.append(auv_xyz_world[idx]) weights.append(1.0) if len(src_pts) < 3: print(f"WARNING: Only {len(src_pts)} correspondences (need ≥3). Saving local only.") _save_local_only(out_h5, pose_t, poses_34, ling_xyz, utm_zone) return src = np.array(src_pts) dst = np.array(dst_pts) w = np.array(weights) # 7. Choose alignment strategy: 2D when altitude data available, else 3D has_altitude = (auv_alt_m is not None and len(auv_alt_m) > 0 and not np.all(auv_alt_m == 0)) if has_altitude: # 2D Umeyama (XY only) + pressure depth for Z src_xy = src[:, :2] dst_xy = dst[:, :2] scale_xy, R2, t2 = umeyama_2d(src_xy, dst_xy, w) residuals_2d = np.linalg.norm( (scale_xy * (R2 @ src_xy.T).T + t2) - dst_xy, axis=1) sigma = residuals_2d.std() mask = (residuals_2d < outlier_sigma * sigma) if sigma > 0 \ else np.ones(len(src_xy), dtype=bool) print(f"2D Umeyama: {len(src_xy)} total, {mask.sum()} after outlier rejection " f"(sigma_xy={sigma:.3f}m, scale_xy={scale_xy:.4f})") scale_xy, R2, t2 = umeyama_2d(src_xy[mask], dst_xy[mask], w[mask]) # Expand R2 to 3x3 (identity for Z) R3 = np.eye(3) R3[:2, :2] = R2 t3 = np.array([t2[0], t2[1], 0.0]) # Transform lingbot poses; override Z from pressure sensor at each timestamp N_poses = len(pose_t) poses_world = np.zeros((N_poses, 4, 4)) poses_world[:, 3, 3] = 1.0 for i in range(N_poses): R_local = poses_34[i, :3, :3] t_local = poses_34[i, :3, 3] poses_world[i, :3, :3] = R3 @ R_local t_xy = scale_xy * R2 @ t_local[:2] + t2 # Z from pressure depth (interpolated at lingbot timestamp) z_world = float(np.interp( float(pose_t[i]), auv_t.astype(float), auv_dep.astype(float))) poses_world[i, :3, 3] = [t_xy[0], t_xy[1], z_world] xyz_world = poses_world[:, :3, 3] residuals_2d_refined = np.linalg.norm( (scale_xy * (R2 @ src_xy[mask].T).T + t2) - dst_xy[mask], axis=1) rmse_m = float(residuals_2d_refined.mean()) scale_out = scale_xy R_out = R3 t_out = t3 print(f"2D fusion OK: scale_xy={scale_xy:.4f} RMSE_xy={rmse_m:.3f}m -> {out_h5}") else: # 3D Umeyama fallback (original logic) scale, R, t = umeyama(src, dst, w) residuals = np.linalg.norm((scale * (R @ src.T).T + t) - dst, axis=1) sigma = residuals.std() mask = (residuals < outlier_sigma * sigma) if sigma > 0 \ else np.ones(len(src), dtype=bool) print(f"3D Umeyama: {len(src)} total, {mask.sum()} after outlier rejection " f"(sigma={sigma:.3f}m)") scale, R, t = umeyama(src[mask], dst[mask], w[mask]) N_poses = len(pose_t) poses_world = np.zeros((N_poses, 4, 4)) poses_world[:, 3, 3] = 1.0 for i in range(N_poses): R_local = poses_34[i, :3, :3] t_local = poses_34[i, :3, 3] poses_world[i, :3, :3] = R @ R_local poses_world[i, :3, 3] = scale * R @ t_local + t xyz_world = poses_world[:, :3, 3] residuals_final = residuals rmse_m = float(residuals[mask].mean()) scale_out = scale R_out = R t_out = t print(f"3D fusion OK: scale={scale:.4f} RMSE={rmse_m:.3f}m -> {out_h5}") # 9. Write trajectory_world.h5 with h5py.File(out_h5, "w") as f: f.attrs["status"] = "aligned" f.attrs["utm_zone"] = utm_zone f.attrs["fusion_mode"] = "2d" if has_altitude else "3d" al = f.create_group("alignment") al.attrs["scale"] = scale_out al.attrs["rmse_m"] = rmse_m al.attrs["n_correspondences"] = int(mask.sum()) al.create_dataset("R", data=R_out) al.create_dataset("t", data=t_out) pw = f.create_group("poses_world") pw.create_dataset("t_ns", data=pose_t, compression="gzip") pw.create_dataset("x_m", data=xyz_world[:, 0], compression="gzip") pw.create_dataset("y_m", data=xyz_world[:, 1], compression="gzip") pw.create_dataset("z_m", data=xyz_world[:, 2], compression="gzip") pw.create_dataset("T_4x4", data=poses_world, compression="gzip") def _save_local_only(out_h5: str, pose_t, poses_34, ling_xyz, utm_zone): with h5py.File(out_h5, "w") as f: f.attrs["status"] = "local_only" f.attrs["utm_zone"] = utm_zone pw = f.create_group("poses_world") pw.create_dataset("t_ns", data=pose_t, compression="gzip") pw.create_dataset("x_m", data=ling_xyz[:, 0], compression="gzip") pw.create_dataset("y_m", data=ling_xyz[:, 1], compression="gzip") pw.create_dataset("z_m", data=ling_xyz[:, 2], compression="gzip") n = len(poses_34); p44 = np.zeros((n,4,4)); p44[:,:3,:] = poses_34; p44[:,3,3] = 1.0; pw.create_dataset("T_4x4", data=p44, compression="gzip") print(f"Saved local lingbot trajectory (no world alignment) → {out_h5}") if __name__ == "__main__": fuse(sys.argv[1], sys.argv[2], sys.argv[3])