""" process.py — Pipeline complet streaming moulin-mapper. Lit le flux JSONL ligne par ligne (streaming), applique dead-reckoning + scan-matching ICP, exporte les résultats. Usage : python3 process.py [--input ../data/sim/run_L.jsonl] [--truth ../data/sim/run_L_truth.csv] python3 stream_replay.py | python3 process.py --stdin """ import argparse import csv import json import os import sys import numpy as np from typing import List, Tuple, Optional # Assure imports depuis pipeline/ sys.path.insert(0, os.path.dirname(__file__)) from slam import ( DeadReckoning, SweepAccumulator, sweep_to_world_points, icp_2d, IncrementalMap, LoopClosureDetector, apply_loop_closure, ) ROV_Z = 1.5 # profondeur fixe ROV (m) — sera remplacé par depth réel ROV_ALT = 0.5 # hauteur fond (m) — sera remplacé par altitude réelle # --------------------------------------------------------------------------- # Lecture JSONL streaming # --------------------------------------------------------------------------- def stream_jsonl(source): """ Générateur : lit le flux JSONL ligne par ligne depuis un fichier ou stdin. Chaque appel yield un dict Python. """ for line in source: line = line.strip() if not line: continue try: yield json.loads(line) except json.JSONDecodeError: continue # --------------------------------------------------------------------------- # Chargement vérité terrain (optionnel) # --------------------------------------------------------------------------- def load_truth(truth_path: str) -> np.ndarray: """Retourne array (N, 5) : t, x, y, heading_deg, z""" rows = [] with open(truth_path) as f: reader = csv.DictReader(f) for row in reader: rows.append([float(row["t"]), float(row["x"]), float(row["y"]), float(row["heading_deg"]), float(row["z"])]) return np.array(rows) def interpolate_truth(truth: np.ndarray, t: float) -> np.ndarray: """Interpole la vérité terrain à l'instant t.""" idx = np.searchsorted(truth[:, 0], t) idx = min(idx, len(truth) - 1) return truth[idx] # --------------------------------------------------------------------------- # Export PLY ASCII (nuage 3D) # --------------------------------------------------------------------------- def write_ply(path: str, map2d: np.ndarray, depth: float, altitude: float) -> int: """ Extrude le contour 2D entre le fond et la surface → nuage 3D ASCII PLY. Retourne le nombre de points. """ pts = [] z_surface = -depth # z=0 à la surface, profondeur = négatif si on va vers le bas z_bottom = -(depth + altitude) for x, y in map2d: pts.append((x, y, z_surface)) pts.append((x, y, z_bottom)) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "w") as f: f.write("ply\nformat ascii 1.0\n") f.write(f"element vertex {len(pts)}\n") f.write("property float x\nproperty float y\nproperty float z\n") f.write("end_header\n") for px, py, pz in pts: f.write(f"{px:.4f} {py:.4f} {pz:.4f}\n") return len(pts) # --------------------------------------------------------------------------- # Pipeline principal # --------------------------------------------------------------------------- def run_pipeline(source, truth_path: Optional[str] = None, out_dir: str = "../data/out"): os.makedirs(out_dir, exist_ok=True) # Composants SLAM dr = DeadReckoning() sweep_acc = SweepAccumulator(steps=400) global_map = IncrementalMap(voxel_size=0.05) loop_detector = LoopClosureDetector(min_time=15.0, dist_thresh=0.8) # Trajectoire estimée : dead-reckoning + corrigée traj_dr: List[Tuple] = [] # (t, x, y) traj_corr: List[Tuple] = [] # (t, x, y) # Offset ICP cumulatif : déplacement total appliqué au-dessus du DR # Maintenu entre les sweeps pour que la pose corrigée reste cohérente icp_offset = np.array([0.0, 0.0]) # Accumulation erreurs pour RMS dr_errors = [] sm_errors = [] truth = None if truth_path and os.path.exists(truth_path): truth = load_truth(truth_path) print(f"Vérité terrain chargée : {len(truth)} points") n_sweeps = 0 n_loop_closures = 0 sweep_poses = [] # (sweep_idx, t, x_corr, y_corr) x_dr, y_dr, h_dr = 0.0, 0.0, 0.0 last_depth = ROV_Z last_altitude = ROV_ALT for record in stream_jsonl(source): t = record["t"] hdg = record["heading"] alt = record["altitude"] depth_r = record["depth"] ax = record["ax"] ay = record["ay"] az = record["az"] gz = record.get("gz", 0.0) vf = record.get("vf", None) # vitesse forward (optionnel) vl = record.get("vl", 0.0) ping_angle = record["ping360_angle"] ping_dist = record["ping360_distance"] if alt > 0.01: last_altitude = alt if depth_r > 0.01: last_depth = depth_r # --- Dead-reckoning --- x_dr, y_dr, h_dr = dr.update(t, hdg, ax, ay, az, gz=gz, vf=vf, vl=vl) # Pose corrigée = pose DR + offset ICP cumulatif # L'offset est mis à jour à chaque sweep ICP réussi pose_dr = np.array([x_dr, y_dr]) pose_corr_cur = pose_dr + icp_offset # --- Sweep Ping360 --- # On passe la pose CORRIGÉE au sweep pour générer des points # cohérents avec la carte globale déjà corrigée pose_for_sweep = (pose_corr_cur[0], pose_corr_cur[1], h_dr) completed = sweep_acc.add_beam(ping_angle, ping_dist, pose_for_sweep) if completed is not None: n_sweeps += 1 # Points murs depuis le sweep (en coordonnées pose_corr) sweep_pts = sweep_to_world_points(completed) if len(sweep_pts) >= 5: map_arr = global_map.get_array() if len(map_arr) >= 10: # ICP : recaler sweep sur carte → donne correction résiduelle # La correction est petite car la pose déjà corrigée dx, dy, dtheta, converged = icp_2d( sweep_pts, map_arr, max_iter=30, tol=1e-4, max_dist=0.5 ) if converged: # Mise à jour de l'offset cumulatif icp_offset = icp_offset + np.array([dx, dy]) pose_corr_cur = pose_dr + icp_offset # Ajouter les points du sweep à la carte globale # (avec la pose corrigée) global_map.add_points(sweep_pts + icp_offset) # Enregistrer la pose pour la trajectoire traj_dr.append((t, x_dr, y_dr)) traj_corr.append((t, pose_corr_cur[0], pose_corr_cur[1])) sweep_poses.append((n_sweeps - 1, t, pose_corr_cur[0], pose_corr_cur[1])) # Vérification erreurs vs vérité if truth is not None: gt = interpolate_truth(truth, t) gt_xy = gt[1:3] dr_errors.append(float(np.linalg.norm(pose_dr - gt_xy))) sm_errors.append(float(np.linalg.norm(pose_corr_cur - gt_xy))) # Détection fermeture de boucle loop_detector.add_pose(t, pose_corr_cur[0], pose_corr_cur[1], n_sweeps - 1) match_idx = loop_detector.check(t, pose_corr_cur[0], pose_corr_cur[1]) if match_idx is not None and match_idx < len(sweep_poses) - 3: n_loop_closures += 1 _, ox, oy = traj_corr[match_idx] correction = np.array([ox - pose_corr_cur[0], oy - pose_corr_cur[1]]) * 0.5 # Redistribuer la correction linéairement for i in range(match_idx, len(traj_corr)): alpha = (i - match_idx) / max(1, len(traj_corr) - 1 - match_idx) t2, x2, y2 = traj_corr[i] traj_corr[i] = (t2, x2 + alpha * correction[0], y2 + alpha * correction[1]) icp_offset = icp_offset + correction loop_detector.history.clear() # --------------------------------------------------------------------------- # Export résultats # --------------------------------------------------------------------------- # trajectory.csv traj_path = os.path.join(out_dir, "trajectory.csv") with open(traj_path, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["t", "x_dr", "y_dr", "x_corr", "y_corr"]) n = min(len(traj_dr), len(traj_corr)) for i in range(n): t2, xd, yd = traj_dr[i] _, xc, yc = traj_corr[i] writer.writerow([f"{t2:.4f}", f"{xd:.4f}", f"{yd:.4f}", f"{xc:.4f}", f"{yc:.4f}"]) # map_2d.csv map_arr = global_map.get_array() map_path = os.path.join(out_dir, "map_2d.csv") with open(map_path, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["x", "y"]) for x2, y2 in map_arr: writer.writerow([f"{x2:.4f}", f"{y2:.4f}"]) # cloud.ply ply_path = os.path.join(out_dir, "cloud.ply") n_ply_pts = write_ply(ply_path, map_arr, last_depth, last_altitude) # --------------------------------------------------------------------------- # Stats # --------------------------------------------------------------------------- print("\n=== Résultats pipeline ===") print(f"Sweeps traités : {n_sweeps}") print(f"Fermetures de boucle : {n_loop_closures}") print(f"Points carte 2D : {len(global_map)}") print(f"Points nuage 3D (PLY) : {n_ply_pts}") print(f"Taille cloud.ply : {os.path.getsize(ply_path)/1e3:.1f} Ko") if dr_errors and sm_errors: rms_dr = float(np.sqrt(np.mean(np.array(dr_errors)**2))) rms_sm = float(np.sqrt(np.mean(np.array(sm_errors)**2))) ratio = rms_sm / rms_dr if rms_dr > 0 else float("inf") print(f"\nRMS erreur dead-reckoning : {rms_dr:.3f} m") print(f"RMS erreur scan-matching : {rms_sm:.3f} m") print(f"Amélioration : {ratio:.2f}x (DR/SM={rms_dr/rms_sm:.1f}x)") if rms_sm < rms_dr: print("OK — scan-matching améliore la position vs dead-reckoning") else: print("AVERTISSEMENT — scan-matching n'améliore pas (données insuffisantes?)") else: print("\nPas de vérité terrain → pas de calcul RMS") print(f"\nFichiers exportés dans {out_dir}/") print(f" {traj_path}") print(f" {map_path}") print(f" {ply_path}") # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": parser = argparse.ArgumentParser(description="Pipeline SLAM moulin-mapper") parser.add_argument("--input", default="../data/sim/run_L.jsonl", help="Fichier JSONL en entrée") parser.add_argument("--truth", default="../data/sim/run_L_truth.csv", help="Fichier vérité terrain (optionnel)") parser.add_argument("--out-dir", default="../data/out") parser.add_argument("--stdin", action="store_true", help="Lire depuis stdin au lieu d'un fichier") args = parser.parse_args() if args.stdin: print("Lecture depuis stdin (streaming)...") run_pipeline(sys.stdin, truth_path=None, out_dir=args.out_dir) else: truth_p = args.truth if os.path.exists(args.truth) else None with open(args.input) as f: run_pipeline(f, truth_path=truth_p, out_dir=args.out_dir)