""" slam.py — Dead-reckoning + Scan-matching ICP 2D + fermeture de boucle. Modules réutilisables importés par process.py. Dépendances : numpy, scipy uniquement. """ import numpy as np from scipy.spatial import cKDTree from typing import List, Optional, Tuple, Dict from collections import deque # --------------------------------------------------------------------------- # Dead-reckoning (intégration IMU + heading) # --------------------------------------------------------------------------- class DeadReckoning: """ Dead-reckoning ROV 2D : heading (magnéto + gyro) + vitesse odométrique. Modèle réaliste pour BlueROV sans DVL/GPS dans une chambre de moulin : Heading : fusion magnéto + gz par filtre complémentaire h_dr = alpha * (h_magneto) + (1-alpha) * (h_gyro_integré) → alpha = 0.3 : le magnéto corrige lentement, le gyro intègre rapidement → source de dérive : biais gz (0.003 rad/s) qui déplace h_gyro → sur 120s : dérive cumulée ~4° → erreur transverse ~1-2 m à 0.3 m/s Vitesse forward : estimée depuis ax par intégration sur fenêtre glissante → biais ax (0.015 m/s²) intégré → erreur de vitesse → dérive position → en pratique : v_dr ≠ v_vraie d'environ ±0.05 m/s Résultat : DR dérive de 1-4 m sur 120s → ICP Ping360 corrige à chaque sweep. """ def __init__(self, alpha_magneto: float = 0.3, tau: float = 2.0): """ alpha_magneto : poids du magnéto dans la fusion heading [0,1]. 0 = gyro pur (max dérive), 1 = magnéto pur (pas de dérive). 0.3 = compromis réaliste (magnéto perturbé par métaux du moulin). tau : constante de temps amortissement vitesse (s). """ self.x = 0.0 self.y = 0.0 self.h_gyro = 0.0 # heading intégré depuis gz self.heading_deg = 0.0 # heading fusionné self.vf = 0.0 # vitesse forward estimée (corps, m/s) self.vl = 0.0 # vitesse latérale estimée (corps, m/s) self.t_prev = None self.alpha = alpha_magneto self.tau = tau def reset(self, x: float = 0.0, y: float = 0.0, heading_deg: float = 0.0): self.x = x self.y = y self.h_gyro = heading_deg self.heading_deg = heading_deg self.vf = 0.0 self.vl = 0.0 self.t_prev = None def update(self, t: float, heading_deg: float, ax: float, ay: float, az: float, gz: float = 0.0, vf: float = None, vl: float = 0.0) -> Tuple[float, float, float]: """ heading_deg : cap magnéto (bruité + dérive lente). gz : taux de lacet (rad/s), biaisé → dérive h_gyro. ax, ay : accélérations corps (option : non utilisées si vf fourni). vf, vl : vitesse corps forward/latérale mesurée (m/s). Si vf=None, on intègre ax (dérive plus rapide). Sources de dérive DR : - biais gz → h_gyro dérive → mauvaise direction - biais vf → position dérive dans la mauvaise direction → ICP corrige ces deux erreurs à chaque sweep """ if self.t_prev is None: self.t_prev = t self.h_gyro = heading_deg self.heading_deg = heading_deg return self.x, self.y, self.heading_deg dt = t - self.t_prev if dt <= 0: return self.x, self.y, self.heading_deg self.t_prev = t # --- Heading : filtre complémentaire gyro (principal) + magnéto (lent) --- # Intégration gz biaisée → h_gyro dérive self.h_gyro = (self.h_gyro + np.rad2deg(gz) * dt) % 360 # Correction magnéto partielle (α faible = magnéto peu fiable dans moulin métallique) dh = ((heading_deg - self.h_gyro + 180) % 360) - 180 self.h_gyro = (self.h_gyro + self.alpha * dh) % 360 self.heading_deg = self.h_gyro # --- Vitesse : utilise vf/vl si disponible, sinon intègre ax/ay --- if vf is not None: # Vitesse mesurée (biaisée) — source de dérive : biais vf self.vf = vf self.vl = vl else: # Intégration accéléro (dérive plus rapide) decay = np.exp(-dt / self.tau) self.vf = self.vf * decay + ax * dt self.vl = self.vl * decay + ay * dt # --- Projection vitesse corps → monde --- h_rad = np.deg2rad(self.heading_deg) # avant ROV = (sin h, cos h) en ENU ; gauche = (-cos h, sin h) vx = self.vf * np.sin(h_rad) - self.vl * np.cos(h_rad) vy = self.vf * np.cos(h_rad) + self.vl * np.sin(h_rad) self.x += vx * dt self.y += vy * dt return self.x, self.y, self.heading_deg # --------------------------------------------------------------------------- # Gestion des sweeps Ping360 # --------------------------------------------------------------------------- class SweepAccumulator: """ Regroupe les faisceaux Ping360 en sweeps complets (0→360°). Un nouveau sweep démarre quand l'angle fait un tour complet. """ def __init__(self, steps: int = 400): self.steps = steps self.current_sweep: List[Dict] = [] self.last_angle = None self.sweeps_done = 0 def add_beam(self, angle_deg: float, distance: float, pose: Tuple[float, float, float]) -> Optional[List[Dict]]: """ Ajoute un faisceau. Si le sweep est complet (retour à 0°), retourne le sweep et commence le suivant. Sinon retourne None. pose = (x, y, heading_deg) Fonctionne quel que soit le pas angulaire (1 pas ou 4.5° par record). """ beam = {"angle_deg": angle_deg, "distance": distance, "pose": pose} # Détection fermeture de sweep : l'angle repasse vers 0 après avoir été >180 if (self.last_angle is not None and self.last_angle > 180 and angle_deg < 90): # Sweep complet — seuil minimal : au moins 20 faisceaux (tolère grand pas angulaire) completed = self.current_sweep.copy() self.current_sweep = [beam] self.last_angle = angle_deg self.sweeps_done += 1 if len(completed) >= 20: return completed return None self.current_sweep.append(beam) self.last_angle = angle_deg return None def sweep_to_world_points(sweep: List[Dict]) -> np.ndarray: """ Convertit un sweep (liste de faisceaux) en points murs dans le repère monde. Retourne un array (M, 2) de points (x, y), dropouts exclus. """ points = [] for beam in sweep: d = beam["distance"] if d <= 0.01: continue # dropout angle_rel = beam["angle_deg"] px, py, hdg = beam["pose"] # angle monde = cap + angle_relatif (convention cap 0=Nord, CW) angle_world = (hdg + angle_rel) % 360.0 a_rad = np.deg2rad(angle_world) # Direction en ENU : cap 0°=+Y, 90°=+X wx = px + d * np.sin(a_rad) wy = py + d * np.cos(a_rad) points.append([wx, wy]) if not points: return np.zeros((0, 2)) return np.array(points) # --------------------------------------------------------------------------- # ICP 2D scan-to-map # --------------------------------------------------------------------------- def icp_2d( source_pts: np.ndarray, map_pts: np.ndarray, max_iter: int = 20, tol: float = 1e-4, max_dist: float = 0.5, ) -> Tuple[float, float, float, bool]: """ ICP 2D : recale source_pts sur map_pts. Retourne (dx, dy, d_heading_rad, converged). Algorithme : 1. Pour chaque point source, chercher le plus proche dans map (cKDTree) 2. Estimer la transformation rigide 2D (rotation + translation) via SVD 3. Appliquer et itérer jusqu'à convergence """ if len(source_pts) < 5 or len(map_pts) < 5: return 0.0, 0.0, 0.0, False src = source_pts.copy() tree = cKDTree(map_pts) dx_total = 0.0 dy_total = 0.0 dtheta_total = 0.0 for iteration in range(max_iter): # Nearest-neighbor dists, idxs = tree.query(src, k=1, workers=-1) # Filtrer les appariements trop lointains mask = dists < max_dist if mask.sum() < 5: return dx_total, dy_total, dtheta_total, False src_m = src[mask] dst_m = map_pts[idxs[mask]] # Centroïdes c_src = src_m.mean(axis=0) c_dst = dst_m.mean(axis=0) # Matrices centrées A = src_m - c_src B = dst_m - c_dst # SVD pour trouver la rotation 2D optimale H = A.T @ B U, S, Vt = np.linalg.svd(H) R = Vt.T @ U.T # Assure det(R) = +1 (rotation pure, pas de réflexion) if np.linalg.det(R) < 0: Vt[-1, :] *= -1 R = Vt.T @ U.T t = c_dst - R @ c_src # Appliquer la transformation au nuage source src = (R @ src.T).T + t # Accumuler la transformation totale dx_total += t[0] dy_total += t[1] dtheta_total += np.arctan2(R[1, 0], R[0, 0]) # Convergence if np.abs(t).max() < tol and abs(np.arctan2(R[1, 0], R[0, 0])) < tol: return dx_total, dy_total, dtheta_total, True return dx_total, dy_total, dtheta_total, True # --------------------------------------------------------------------------- # Carte globale incrémentale # --------------------------------------------------------------------------- class IncrementalMap: """ Carte 2D des murs, stockée comme nuage de points. Décimation par voxel pour éviter la croissance infinie. """ def __init__(self, voxel_size: float = 0.05): self.points: List[np.ndarray] = [] self.voxel_size = voxel_size self._voxel_set = set() def add_points(self, pts: np.ndarray) -> None: """Ajoute des points, en filtrant les doublons par voxel.""" for pt in pts: key = (int(pt[0] / self.voxel_size), int(pt[1] / self.voxel_size)) if key not in self._voxel_set: self._voxel_set.add(key) self.points.append(pt) def get_array(self) -> np.ndarray: if not self.points: return np.zeros((0, 2)) return np.array(self.points) def __len__(self): return len(self.points) # --------------------------------------------------------------------------- # Détection et correction de fermeture de boucle # --------------------------------------------------------------------------- class LoopClosureDetector: """ Détecte quand le ROV revient près d'une zone déjà visitée. Stratégie simple : on compare la position courante aux positions passées. Si distance < seuil et qu'un temps minimum s'est écoulé → boucle détectée. """ def __init__(self, min_time: float = 15.0, dist_thresh: float = 0.8): self.min_time = min_time # on ne cherche pas dans les N dernières secondes self.dist_thresh = dist_thresh self.history: List[Tuple[float, float, float, int]] = [] # (t, x, y, sweep_idx) def add_pose(self, t: float, x: float, y: float, sweep_idx: int) -> None: self.history.append((t, x, y, sweep_idx)) def check(self, t: float, x: float, y: float) -> Optional[int]: """ Retourne l'index sweep de la pose passée si boucle détectée, sinon None. """ for (t_old, x_old, y_old, idx_old) in self.history: if t - t_old < self.min_time: continue dist = np.sqrt((x - x_old)**2 + (y - y_old)**2) if dist < self.dist_thresh: return idx_old return None def apply_loop_closure( trajectory: List[Tuple[float, float, float, float]], # (t, x, y, h) loop_start_idx: int, loop_end_idx: int, correction: np.ndarray, # [dx, dy] ) -> List[Tuple[float, float, float, float]]: """ Redistribue linéairement la correction de fermeture de boucle sur les poses entre loop_start_idx et loop_end_idx. C'est une simplification de la correction graph-SLAM (pas de g2o). """ n_loop = loop_end_idx - loop_start_idx if n_loop <= 0: return trajectory corrected = list(trajectory) for i in range(loop_start_idx, loop_end_idx + 1): alpha = (i - loop_start_idx) / n_loop t, x, y, h = corrected[i] corrected[i] = (t, x + alpha * correction[0], y + alpha * correction[1], h) return corrected