#!/usr/bin/env python3 """Merge multiple PLY point clouds from lingbot-map jobs into a single PLY. Each PLY is in its own local reference frame. This script: 1. Loads all input PLYs as Open3D PointClouds. 2. Runs voxel downsampling + FPFH feature extraction on each. 3. Uses RANSAC global registration between adjacent pairs to find initial alignment (useful when cameras don't share a scene). 4. Refines with ICP (point-to-plane). 5. Merges all aligned clouds and saves the result. Usage: python3 stitch.py out.ply input1.ply input2.ply [input3.ply ...] python3 stitch.py out.ply ~/cosma-qc-frames/job_*/reconstruction.ply python3 stitch.py out.ply --voxel 0.02 --no-ransac *.ply The first PLY is the reference frame; all others are aligned to it. Requires: open3d (pip install open3d) """ from __future__ import annotations import argparse import sys from pathlib import Path import numpy as np def load_ply(path: str): import open3d as o3d pcd = o3d.io.read_point_cloud(path) if len(pcd.points) == 0: raise ValueError(f"Empty point cloud: {path}") return pcd def preprocess(pcd, voxel_size: float): import open3d as o3d pcd_down = pcd.voxel_down_sample(voxel_size) pcd_down.estimate_normals( o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2, max_nn=30) ) fpfh = o3d.pipelines.registration.compute_fpfh_feature( pcd_down, o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5, max_nn=100), ) return pcd_down, fpfh def ransac_registration(src_down, dst_down, src_fpfh, dst_fpfh, voxel_size: float): import open3d as o3d dist_thr = voxel_size * 1.5 result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching( src_down, dst_down, src_fpfh, dst_fpfh, mutual_filter=True, max_correspondence_distance=dist_thr, estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False), ransac_n=4, checkers=[ o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9), o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(dist_thr), ], criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(4_000_000, 500), ) return result.transformation def icp_refine(src, dst, init_transform, voxel_size: float): import open3d as o3d result = o3d.pipelines.registration.registration_icp( src, dst, max_correspondence_distance=voxel_size * 0.4, init=init_transform, estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPlane(), ) return result.transformation def main(): ap = argparse.ArgumentParser() ap.add_argument("output", type=Path, help="Output merged PLY") ap.add_argument("inputs", nargs="+", type=Path, help="Input PLY files") ap.add_argument("--voxel", type=float, default=0.05, help="Voxel size for downsampling / feature extraction (default 0.05 = 5 cm)") ap.add_argument("--no-ransac", action="store_true", help="Skip RANSAC global registration (use if clouds share common poses)") ap.add_argument("--icp-only", action="store_true", help="Use identity as init transform and refine with ICP only") ap.add_argument("--merge-voxel", type=float, default=0.02, help="Final voxel downsampling on merged cloud (0 = no downsample)") args = ap.parse_args() try: import open3d as o3d except ImportError: sys.exit("open3d not found. Install: pip install open3d") if len(args.inputs) < 2: sys.exit("Need at least 2 input PLY files.") print(f"Loading {len(args.inputs)} PLYs...") clouds = [load_ply(str(p)) for p in args.inputs] for p, c in zip(args.inputs, clouds): print(f" {p.name}: {len(c.points):,} pts") # Reference = first cloud merged = clouds[0] ref_down, ref_fpfh = preprocess(clouds[0], args.voxel) for i, src_pcd in enumerate(clouds[1:], start=1): print(f"\nAligning {args.inputs[i].name} → {args.inputs[0].name}...") src_down, src_fpfh = preprocess(src_pcd, args.voxel) if args.icp_only or args.no_ransac: init_tf = np.eye(4) else: print(" RANSAC global registration...") init_tf = ransac_registration(src_down, ref_down, src_fpfh, ref_fpfh, args.voxel) print(" ICP refinement...") src_pcd_for_icp = src_pcd.voxel_down_sample(args.voxel * 0.2) src_pcd_for_icp.estimate_normals( o3d.geometry.KDTreeSearchParamHybrid(radius=args.voxel, max_nn=30) ) ref_for_icp = merged.voxel_down_sample(args.voxel * 0.2) ref_for_icp.estimate_normals( o3d.geometry.KDTreeSearchParamHybrid(radius=args.voxel, max_nn=30) ) final_tf = icp_refine(src_pcd_for_icp, ref_for_icp, init_tf, args.voxel) src_pcd.transform(final_tf) merged = merged + src_pcd print(f" Merged total: {len(merged.points):,} pts") if args.merge_voxel > 0: print(f"\nFinal downsample (voxel={args.merge_voxel})...") merged = merged.voxel_down_sample(args.merge_voxel) print(f"Final cloud: {len(merged.points):,} pts") args.output.parent.mkdir(parents=True, exist_ok=True) o3d.io.write_point_cloud(str(args.output), merged) print(f"\nSaved → {args.output}") if __name__ == "__main__": main()