""" slam_incremental.py — SLAM incrémental alimenté enregistrement par enregistrement. Adapté depuis pipeline/slam.py pour une utilisation serveur (pas de batch). Appel: une seule méthode add_record(rec) par message reçu. Quand un sweep est complété → ICP + fermeture de boucle auto. """ import numpy as np from scipy.spatial import cKDTree from typing import List, Optional, Tuple, Dict # --------------------------------------------------------------------------- # Dead-reckoning # --------------------------------------------------------------------------- class DeadReckoning: def __init__(self, alpha_magneto: float = 0.3, tau: float = 2.0): self.x = 0.0 self.y = 0.0 self.h_gyro = 0.0 self.heading_deg = 0.0 self.vf = 0.0 self.vl = 0.0 self.t_prev = None self.alpha = alpha_magneto self.tau = tau def reset(self): self.x = 0.0 self.y = 0.0 self.h_gyro = 0.0 self.heading_deg = 0.0 self.vf = 0.0 self.vl = 0.0 self.t_prev = None def update(self, t: float, heading_deg: float, ax: float, ay: float, gz: float, vf: float = None, vl: float = 0.0) -> Tuple[float, float, float]: if self.t_prev is None: self.t_prev = t self.h_gyro = heading_deg self.heading_deg = heading_deg return self.x, self.y, self.heading_deg dt = t - self.t_prev if dt <= 0: return self.x, self.y, self.heading_deg self.t_prev = t self.h_gyro = (self.h_gyro + np.rad2deg(gz) * dt) % 360 dh = ((heading_deg - self.h_gyro + 180) % 360) - 180 self.h_gyro = (self.h_gyro + self.alpha * dh) % 360 self.heading_deg = self.h_gyro if vf is not None: self.vf = vf self.vl = vl else: decay = np.exp(-dt / self.tau) self.vf = self.vf * decay + ax * dt self.vl = 0.0 h_rad = np.deg2rad(self.heading_deg) vx = self.vf * np.sin(h_rad) - self.vl * np.cos(h_rad) vy = self.vf * np.cos(h_rad) + self.vl * np.sin(h_rad) self.x += vx * dt self.y += vy * dt return self.x, self.y, self.heading_deg # --------------------------------------------------------------------------- # Accumulation de sweep # --------------------------------------------------------------------------- class SweepAccumulator: def __init__(self): self.current_sweep: List[Dict] = [] self.last_angle = None self.sweeps_done = 0 def reset(self): self.current_sweep = [] self.last_angle = None self.sweeps_done = 0 def add_beam(self, angle_deg: float, distance: float, pose: Tuple[float, float, float]) -> Optional[List[Dict]]: beam = {"angle_deg": angle_deg, "distance": distance, "pose": pose} if (self.last_angle is not None and self.last_angle > 180 and angle_deg < 90): completed = self.current_sweep.copy() self.current_sweep = [beam] self.last_angle = angle_deg self.sweeps_done += 1 if len(completed) >= 20: return completed return None self.current_sweep.append(beam) self.last_angle = angle_deg return None def sweep_to_world_points(sweep: List[Dict]) -> np.ndarray: points = [] for beam in sweep: d = beam["distance"] if d <= 0.01: continue angle_rel = beam["angle_deg"] px, py, hdg = beam["pose"] angle_world = (hdg + angle_rel) % 360.0 a_rad = np.deg2rad(angle_world) wx = px + d * np.sin(a_rad) wy = py + d * np.cos(a_rad) points.append([wx, wy]) if not points: return np.zeros((0, 2)) return np.array(points) # --------------------------------------------------------------------------- # ICP 2D # --------------------------------------------------------------------------- def icp_2d( source_pts: np.ndarray, map_pts: np.ndarray, max_iter: int = 20, tol: float = 1e-4, max_dist: float = 0.5, ) -> Tuple[float, float, float, bool]: if len(source_pts) < 5 or len(map_pts) < 5: return 0.0, 0.0, 0.0, False src = source_pts.copy() tree = cKDTree(map_pts) dx_total = 0.0 dy_total = 0.0 dtheta_total = 0.0 for _ in range(max_iter): dists, idxs = tree.query(src, k=1, workers=-1) mask = dists < max_dist if mask.sum() < 5: return dx_total, dy_total, dtheta_total, False src_m = src[mask] dst_m = map_pts[idxs[mask]] c_src = src_m.mean(axis=0) c_dst = dst_m.mean(axis=0) A = src_m - c_src B = dst_m - c_dst H = A.T @ B U, S, Vt = np.linalg.svd(H) R = Vt.T @ U.T if np.linalg.det(R) < 0: Vt[-1, :] *= -1 R = Vt.T @ U.T t = c_dst - R @ c_src src = (R @ src.T).T + t dx_total += t[0] dy_total += t[1] dtheta_total += np.arctan2(R[1, 0], R[0, 0]) if np.abs(t).max() < tol and abs(np.arctan2(R[1, 0], R[0, 0])) < tol: return dx_total, dy_total, dtheta_total, True return dx_total, dy_total, dtheta_total, True # --------------------------------------------------------------------------- # Carte incrémentale # --------------------------------------------------------------------------- class IncrementalMap: def __init__(self, voxel_size: float = 0.05): self.points: List[np.ndarray] = [] self.voxel_size = voxel_size self._voxel_set = set() def reset(self): self.points = [] self._voxel_set = set() def add_points(self, pts: np.ndarray) -> int: """Retourne le nombre de nouveaux points ajoutés.""" added = 0 for pt in pts: key = (int(pt[0] / self.voxel_size), int(pt[1] / self.voxel_size)) if key not in self._voxel_set: self._voxel_set.add(key) self.points.append(pt) added += 1 return added def get_array(self) -> np.ndarray: if not self.points: return np.zeros((0, 2)) return np.array(self.points) def __len__(self): return len(self.points) # --------------------------------------------------------------------------- # Fermeture de boucle # --------------------------------------------------------------------------- class LoopClosureDetector: def __init__(self, min_time: float = 15.0, dist_thresh: float = 0.8): self.min_time = min_time self.dist_thresh = dist_thresh self.history: List[Tuple[float, float, float, int]] = [] def reset(self): self.history = [] def add_pose(self, t: float, x: float, y: float, sweep_idx: int) -> None: self.history.append((t, x, y, sweep_idx)) def check(self, t: float, x: float, y: float) -> Optional[int]: for (t_old, x_old, y_old, idx_old) in self.history: if t - t_old < self.min_time: continue dist = np.sqrt((x - x_old)**2 + (y - y_old)**2) if dist < self.dist_thresh: return idx_old return None # --------------------------------------------------------------------------- # Orchestrateur SLAM incrémental # --------------------------------------------------------------------------- class IncrementalSLAM: """ Reçoit les enregistrements un par un via add_record(). Après chaque sweep complet → ICP + fermeture de boucle. Expose: pose_dr, pose_corrected, map, trajectory, loop_closures. """ def __init__(self): self.dr = DeadReckoning() self.sweep_acc = SweepAccumulator() self.imap = IncrementalMap() self.lcd = LoopClosureDetector() # Pose courante (x, y, heading_deg) self.pose_dr = (0.0, 0.0, 0.0) self.pose_corrected = (0.0, 0.0, 0.0) # Trajectoire : liste de (t, x, y, h) — corrigée self.trajectory: List[Tuple[float, float, float, float]] = [] # Stats self.records_count = 0 self.sweeps_done = 0 self.loop_closures = 0 self.last_t = 0.0 # Points ajoutés au dernier sweep (pour delta live) self._last_new_points: np.ndarray = np.zeros((0, 2)) def reset(self): self.dr.reset() self.sweep_acc.reset() self.imap.reset() self.lcd.reset() self.pose_dr = (0.0, 0.0, 0.0) self.pose_corrected = (0.0, 0.0, 0.0) self.trajectory = [] self.records_count = 0 self.sweeps_done = 0 self.loop_closures = 0 self.last_t = 0.0 self._last_new_points = np.zeros((0, 2)) def add_record(self, rec: dict) -> bool: """ Traite un enregistrement. Retourne True si un sweep vient d'être complété (le client /ws/live recevra alors un delta). """ t = rec["t"] self.records_count += 1 self.last_t = t # DR x, y, h = self.dr.update( t=t, heading_deg=rec["heading"], ax=rec["ax"], ay=rec["ay"], gz=rec["gz"], vf=rec.get("vf"), vl=rec.get("vl", 0.0), ) self.pose_dr = (x, y, h) # Sweep accumulation sweep = self.sweep_acc.add_beam( angle_deg=rec["ping360_angle"], distance=rec["ping360_distance"], pose=(x, y, h), ) if sweep is None: return False # --- Sweep complet --- self.sweeps_done = self.sweep_acc.sweeps_done new_pts = sweep_to_world_points(sweep) # ICP si la carte n'est pas vide map_arr = self.imap.get_array() dx_icp = dy_icp = dtheta_icp = 0.0 if len(map_arr) >= 5 and len(new_pts) >= 5: dx_icp, dy_icp, dtheta_icp, _ = icp_2d(new_pts, map_arr) # Appliquer correction ICP à la pose courante x_c = x + dx_icp y_c = y + dy_icp h_c = (h + np.rad2deg(dtheta_icp)) % 360 self.pose_corrected = (x_c, y_c, h_c) # Recalculer les points corrigés pour la carte corrected_sweep = [ { "angle_deg": b["angle_deg"], "distance": b["distance"], "pose": ( b["pose"][0] + dx_icp, b["pose"][1] + dy_icp, b["pose"][2] + np.rad2deg(dtheta_icp), ) } for b in sweep ] map_pts_new = sweep_to_world_points(corrected_sweep) else: self.pose_corrected = (x, y, h) map_pts_new = new_pts # Ajouter les points à la carte added = self.imap.add_points(map_pts_new) self._last_new_points = map_pts_new if added > 0 else np.zeros((0, 2)) # Trajectoire xc, yc, hc = self.pose_corrected self.trajectory.append((t, xc, yc, hc)) # Fermeture de boucle loop_old_idx = self.lcd.check(t, xc, yc) if loop_old_idx is not None: self.loop_closures += 1 # Redistribution linéaire correction sur dernières poses n_loop = len(self.trajectory) - loop_old_idx if n_loop > 1: corr = np.array([ self.trajectory[loop_old_idx][1] - xc, self.trajectory[loop_old_idx][2] - yc, ]) for i in range(loop_old_idx, len(self.trajectory)): alpha = (i - loop_old_idx) / n_loop tt, xx, yy, hh = self.trajectory[i] self.trajectory[i] = (tt, xx + alpha * corr[0], yy + alpha * corr[1], hh) self.lcd.add_pose(t, xc, yc, len(self.trajectory) - 1) return True def get_state_snapshot(self) -> dict: """Retourne l'état complet (pour connexion initiale /ws/live).""" map_arr = self.imap.get_array() return { "type": "snapshot", "records": self.records_count, "sweeps": self.sweeps_done, "loop_closures": self.loop_closures, "last_t": self.last_t, "pose_dr": list(self.pose_dr), "pose_corrected": list(self.pose_corrected), "map_points": map_arr.tolist(), "trajectory": [list(p) for p in self.trajectory], } def get_sweep_delta(self) -> dict: """Retourne le delta après un sweep (nouveaux points + pose actuelle).""" return { "type": "delta", "records": self.records_count, "sweeps": self.sweeps_done, "loop_closures": self.loop_closures, "last_t": self.last_t, "pose_dr": list(self.pose_dr), "pose_corrected": list(self.pose_corrected), "new_map_points": self._last_new_points.tolist(), "trajectory_tail": [list(p) for p in self.trajectory[-5:]], }