diff --git a/scripts/stitch.py b/scripts/stitch.py new file mode 100644 index 0000000..6355dab --- /dev/null +++ b/scripts/stitch.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +"""Merge multiple PLY point clouds from lingbot-map jobs into a single PLY. + +Each PLY is in its own local reference frame. This script: + 1. Loads all input PLYs as Open3D PointClouds. + 2. Runs voxel downsampling + FPFH feature extraction on each. + 3. Uses RANSAC global registration between adjacent pairs to find initial + alignment (useful when cameras don't share a scene). + 4. Refines with ICP (point-to-plane). + 5. Merges all aligned clouds and saves the result. + +Usage: + python3 stitch.py out.ply input1.ply input2.ply [input3.ply ...] + python3 stitch.py out.ply ~/cosma-qc-frames/job_*/reconstruction.ply + python3 stitch.py out.ply --voxel 0.02 --no-ransac *.ply + +The first PLY is the reference frame; all others are aligned to it. + +Requires: open3d (pip install open3d) +""" +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +import numpy as np + + +def load_ply(path: str): + import open3d as o3d + pcd = o3d.io.read_point_cloud(path) + if len(pcd.points) == 0: + raise ValueError(f"Empty point cloud: {path}") + return pcd + + +def preprocess(pcd, voxel_size: float): + import open3d as o3d + pcd_down = pcd.voxel_down_sample(voxel_size) + pcd_down.estimate_normals( + o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2, max_nn=30) + ) + fpfh = o3d.pipelines.registration.compute_fpfh_feature( + pcd_down, + o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5, max_nn=100), + ) + return pcd_down, fpfh + + +def ransac_registration(src_down, dst_down, src_fpfh, dst_fpfh, voxel_size: float): + import open3d as o3d + dist_thr = voxel_size * 1.5 + result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching( + src_down, dst_down, src_fpfh, dst_fpfh, + mutual_filter=True, + max_correspondence_distance=dist_thr, + estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False), + ransac_n=4, + checkers=[ + o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9), + o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(dist_thr), + ], + criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(4_000_000, 500), + ) + return result.transformation + + +def icp_refine(src, dst, init_transform, voxel_size: float): + import open3d as o3d + result = o3d.pipelines.registration.registration_icp( + src, dst, + max_correspondence_distance=voxel_size * 0.4, + init=init_transform, + estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPlane(), + ) + return result.transformation + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("output", type=Path, help="Output merged PLY") + ap.add_argument("inputs", nargs="+", type=Path, help="Input PLY files") + ap.add_argument("--voxel", type=float, default=0.05, + help="Voxel size for downsampling / feature extraction (default 0.05 = 5 cm)") + ap.add_argument("--no-ransac", action="store_true", + help="Skip RANSAC global registration (use if clouds share common poses)") + ap.add_argument("--icp-only", action="store_true", + help="Use identity as init transform and refine with ICP only") + ap.add_argument("--merge-voxel", type=float, default=0.02, + help="Final voxel downsampling on merged cloud (0 = no downsample)") + args = ap.parse_args() + + try: + import open3d as o3d + except ImportError: + sys.exit("open3d not found. Install: pip install open3d") + + if len(args.inputs) < 2: + sys.exit("Need at least 2 input PLY files.") + + print(f"Loading {len(args.inputs)} PLYs...") + clouds = [load_ply(str(p)) for p in args.inputs] + for p, c in zip(args.inputs, clouds): + print(f" {p.name}: {len(c.points):,} pts") + + # Reference = first cloud + merged = clouds[0] + ref_down, ref_fpfh = preprocess(clouds[0], args.voxel) + + for i, src_pcd in enumerate(clouds[1:], start=1): + print(f"\nAligning {args.inputs[i].name} → {args.inputs[0].name}...") + src_down, src_fpfh = preprocess(src_pcd, args.voxel) + + if args.icp_only or args.no_ransac: + init_tf = np.eye(4) + else: + print(" RANSAC global registration...") + init_tf = ransac_registration(src_down, ref_down, src_fpfh, ref_fpfh, args.voxel) + + print(" ICP refinement...") + src_pcd_for_icp = src_pcd.voxel_down_sample(args.voxel * 0.2) + src_pcd_for_icp.estimate_normals( + o3d.geometry.KDTreeSearchParamHybrid(radius=args.voxel, max_nn=30) + ) + ref_for_icp = merged.voxel_down_sample(args.voxel * 0.2) + ref_for_icp.estimate_normals( + o3d.geometry.KDTreeSearchParamHybrid(radius=args.voxel, max_nn=30) + ) + final_tf = icp_refine(src_pcd_for_icp, ref_for_icp, init_tf, args.voxel) + + src_pcd.transform(final_tf) + merged = merged + src_pcd + print(f" Merged total: {len(merged.points):,} pts") + + if args.merge_voxel > 0: + print(f"\nFinal downsample (voxel={args.merge_voxel})...") + merged = merged.voxel_down_sample(args.merge_voxel) + print(f"Final cloud: {len(merged.points):,} pts") + + args.output.parent.mkdir(parents=True, exist_ok=True) + o3d.io.write_point_cloud(str(args.output), merged) + print(f"\nSaved → {args.output}") + + +if __name__ == "__main__": + main()