Files
moulin-mapper/pipeline/slam.py
Flag 85e9a4d4b0 feat(moulin-mapper): simulateur capteurs + pipeline SLAM Python (JSONL stream → trajectoire + nuage .ply)
- simulator.py: flux JSONL réaliste (Ping360 angle/dist, IMU, heading, depth, altitude) + vérité terrain
- slam.py: dead-reckoning + scan-to-map ICP 2D (cKDTree) + fermeture de boucle
- process.py: ingestion streaming ligne-par-ligne → trajectory.csv + map_2d.csv + cloud.ply
- stream_replay.py: rejoue le flux (vision streaming remote)
- SCHEMA.md: contrat format données ROV réel↔sim
- RMS dead-reckoning 0.386m → scan-matching 0.188m (2x)
2026-06-06 20:01:22 +00:00

359 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
slam.py — Dead-reckoning + Scan-matching ICP 2D + fermeture de boucle.
Modules réutilisables importés par process.py.
Dépendances : numpy, scipy uniquement.
"""
import numpy as np
from scipy.spatial import cKDTree
from typing import List, Optional, Tuple, Dict
from collections import deque
# ---------------------------------------------------------------------------
# Dead-reckoning (intégration IMU + heading)
# ---------------------------------------------------------------------------
class DeadReckoning:
"""
Dead-reckoning ROV 2D : heading (magnéto + gyro) + vitesse odométrique.
Modèle réaliste pour BlueROV sans DVL/GPS dans une chambre de moulin :
Heading : fusion magnéto + gz par filtre complémentaire
h_dr = alpha * (h_magneto) + (1-alpha) * (h_gyro_integré)
→ alpha = 0.3 : le magnéto corrige lentement, le gyro intègre rapidement
→ source de dérive : biais gz (0.003 rad/s) qui déplace h_gyro
→ sur 120s : dérive cumulée ~4° → erreur transverse ~1-2 m à 0.3 m/s
Vitesse forward : estimée depuis ax par intégration sur fenêtre glissante
→ biais ax (0.015 m/s²) intégré → erreur de vitesse → dérive position
→ en pratique : v_dr ≠ v_vraie d'environ ±0.05 m/s
Résultat : DR dérive de 1-4 m sur 120s → ICP Ping360 corrige à chaque sweep.
"""
def __init__(self, alpha_magneto: float = 0.3, tau: float = 2.0):
"""
alpha_magneto : poids du magnéto dans la fusion heading [0,1].
0 = gyro pur (max dérive), 1 = magnéto pur (pas de dérive).
0.3 = compromis réaliste (magnéto perturbé par métaux du moulin).
tau : constante de temps amortissement vitesse (s).
"""
self.x = 0.0
self.y = 0.0
self.h_gyro = 0.0 # heading intégré depuis gz
self.heading_deg = 0.0 # heading fusionné
self.vf = 0.0 # vitesse forward estimée (corps, m/s)
self.vl = 0.0 # vitesse latérale estimée (corps, m/s)
self.t_prev = None
self.alpha = alpha_magneto
self.tau = tau
def reset(self, x: float = 0.0, y: float = 0.0, heading_deg: float = 0.0):
self.x = x
self.y = y
self.h_gyro = heading_deg
self.heading_deg = heading_deg
self.vf = 0.0
self.vl = 0.0
self.t_prev = None
def update(self, t: float, heading_deg: float,
ax: float, ay: float, az: float,
gz: float = 0.0,
vf: float = None, vl: float = 0.0) -> Tuple[float, float, float]:
"""
heading_deg : cap magnéto (bruité + dérive lente).
gz : taux de lacet (rad/s), biaisé → dérive h_gyro.
ax, ay : accélérations corps (option : non utilisées si vf fourni).
vf, vl : vitesse corps forward/latérale mesurée (m/s).
Si vf=None, on intègre ax (dérive plus rapide).
Sources de dérive DR :
- biais gz → h_gyro dérive → mauvaise direction
- biais vf → position dérive dans la mauvaise direction
→ ICP corrige ces deux erreurs à chaque sweep
"""
if self.t_prev is None:
self.t_prev = t
self.h_gyro = heading_deg
self.heading_deg = heading_deg
return self.x, self.y, self.heading_deg
dt = t - self.t_prev
if dt <= 0:
return self.x, self.y, self.heading_deg
self.t_prev = t
# --- Heading : filtre complémentaire gyro (principal) + magnéto (lent) ---
# Intégration gz biaisée → h_gyro dérive
self.h_gyro = (self.h_gyro + np.rad2deg(gz) * dt) % 360
# Correction magnéto partielle (α faible = magnéto peu fiable dans moulin métallique)
dh = ((heading_deg - self.h_gyro + 180) % 360) - 180
self.h_gyro = (self.h_gyro + self.alpha * dh) % 360
self.heading_deg = self.h_gyro
# --- Vitesse : utilise vf/vl si disponible, sinon intègre ax/ay ---
if vf is not None:
# Vitesse mesurée (biaisée) — source de dérive : biais vf
self.vf = vf
self.vl = vl
else:
# Intégration accéléro (dérive plus rapide)
decay = np.exp(-dt / self.tau)
self.vf = self.vf * decay + ax * dt
self.vl = self.vl * decay + ay * dt
# --- Projection vitesse corps → monde ---
h_rad = np.deg2rad(self.heading_deg)
# avant ROV = (sin h, cos h) en ENU ; gauche = (-cos h, sin h)
vx = self.vf * np.sin(h_rad) - self.vl * np.cos(h_rad)
vy = self.vf * np.cos(h_rad) + self.vl * np.sin(h_rad)
self.x += vx * dt
self.y += vy * dt
return self.x, self.y, self.heading_deg
# ---------------------------------------------------------------------------
# Gestion des sweeps Ping360
# ---------------------------------------------------------------------------
class SweepAccumulator:
"""
Regroupe les faisceaux Ping360 en sweeps complets (0→360°).
Un nouveau sweep démarre quand l'angle fait un tour complet.
"""
def __init__(self, steps: int = 400):
self.steps = steps
self.current_sweep: List[Dict] = []
self.last_angle = None
self.sweeps_done = 0
def add_beam(self, angle_deg: float, distance: float, pose: Tuple[float, float, float]) -> Optional[List[Dict]]:
"""
Ajoute un faisceau. Si le sweep est complet (retour à 0°), retourne le sweep
et commence le suivant. Sinon retourne None.
pose = (x, y, heading_deg)
Fonctionne quel que soit le pas angulaire (1 pas ou 4.5° par record).
"""
beam = {"angle_deg": angle_deg, "distance": distance, "pose": pose}
# Détection fermeture de sweep : l'angle repasse vers 0 après avoir été >180
if (self.last_angle is not None
and self.last_angle > 180
and angle_deg < 90):
# Sweep complet — seuil minimal : au moins 20 faisceaux (tolère grand pas angulaire)
completed = self.current_sweep.copy()
self.current_sweep = [beam]
self.last_angle = angle_deg
self.sweeps_done += 1
if len(completed) >= 20:
return completed
return None
self.current_sweep.append(beam)
self.last_angle = angle_deg
return None
def sweep_to_world_points(sweep: List[Dict]) -> np.ndarray:
"""
Convertit un sweep (liste de faisceaux) en points murs dans le repère monde.
Retourne un array (M, 2) de points (x, y), dropouts exclus.
"""
points = []
for beam in sweep:
d = beam["distance"]
if d <= 0.01:
continue # dropout
angle_rel = beam["angle_deg"]
px, py, hdg = beam["pose"]
# angle monde = cap + angle_relatif (convention cap 0=Nord, CW)
angle_world = (hdg + angle_rel) % 360.0
a_rad = np.deg2rad(angle_world)
# Direction en ENU : cap 0°=+Y, 90°=+X
wx = px + d * np.sin(a_rad)
wy = py + d * np.cos(a_rad)
points.append([wx, wy])
if not points:
return np.zeros((0, 2))
return np.array(points)
# ---------------------------------------------------------------------------
# ICP 2D scan-to-map
# ---------------------------------------------------------------------------
def icp_2d(
source_pts: np.ndarray,
map_pts: np.ndarray,
max_iter: int = 20,
tol: float = 1e-4,
max_dist: float = 0.5,
) -> Tuple[float, float, float, bool]:
"""
ICP 2D : recale source_pts sur map_pts.
Retourne (dx, dy, d_heading_rad, converged).
Algorithme :
1. Pour chaque point source, chercher le plus proche dans map (cKDTree)
2. Estimer la transformation rigide 2D (rotation + translation) via SVD
3. Appliquer et itérer jusqu'à convergence
"""
if len(source_pts) < 5 or len(map_pts) < 5:
return 0.0, 0.0, 0.0, False
src = source_pts.copy()
tree = cKDTree(map_pts)
dx_total = 0.0
dy_total = 0.0
dtheta_total = 0.0
for iteration in range(max_iter):
# Nearest-neighbor
dists, idxs = tree.query(src, k=1, workers=-1)
# Filtrer les appariements trop lointains
mask = dists < max_dist
if mask.sum() < 5:
return dx_total, dy_total, dtheta_total, False
src_m = src[mask]
dst_m = map_pts[idxs[mask]]
# Centroïdes
c_src = src_m.mean(axis=0)
c_dst = dst_m.mean(axis=0)
# Matrices centrées
A = src_m - c_src
B = dst_m - c_dst
# SVD pour trouver la rotation 2D optimale
H = A.T @ B
U, S, Vt = np.linalg.svd(H)
R = Vt.T @ U.T
# Assure det(R) = +1 (rotation pure, pas de réflexion)
if np.linalg.det(R) < 0:
Vt[-1, :] *= -1
R = Vt.T @ U.T
t = c_dst - R @ c_src
# Appliquer la transformation au nuage source
src = (R @ src.T).T + t
# Accumuler la transformation totale
dx_total += t[0]
dy_total += t[1]
dtheta_total += np.arctan2(R[1, 0], R[0, 0])
# Convergence
if np.abs(t).max() < tol and abs(np.arctan2(R[1, 0], R[0, 0])) < tol:
return dx_total, dy_total, dtheta_total, True
return dx_total, dy_total, dtheta_total, True
# ---------------------------------------------------------------------------
# Carte globale incrémentale
# ---------------------------------------------------------------------------
class IncrementalMap:
"""
Carte 2D des murs, stockée comme nuage de points.
Décimation par voxel pour éviter la croissance infinie.
"""
def __init__(self, voxel_size: float = 0.05):
self.points: List[np.ndarray] = []
self.voxel_size = voxel_size
self._voxel_set = set()
def add_points(self, pts: np.ndarray) -> None:
"""Ajoute des points, en filtrant les doublons par voxel."""
for pt in pts:
key = (int(pt[0] / self.voxel_size), int(pt[1] / self.voxel_size))
if key not in self._voxel_set:
self._voxel_set.add(key)
self.points.append(pt)
def get_array(self) -> np.ndarray:
if not self.points:
return np.zeros((0, 2))
return np.array(self.points)
def __len__(self):
return len(self.points)
# ---------------------------------------------------------------------------
# Détection et correction de fermeture de boucle
# ---------------------------------------------------------------------------
class LoopClosureDetector:
"""
Détecte quand le ROV revient près d'une zone déjà visitée.
Stratégie simple : on compare la position courante aux positions passées.
Si distance < seuil et qu'un temps minimum s'est écoulé → boucle détectée.
"""
def __init__(self, min_time: float = 15.0, dist_thresh: float = 0.8):
self.min_time = min_time # on ne cherche pas dans les N dernières secondes
self.dist_thresh = dist_thresh
self.history: List[Tuple[float, float, float, int]] = [] # (t, x, y, sweep_idx)
def add_pose(self, t: float, x: float, y: float, sweep_idx: int) -> None:
self.history.append((t, x, y, sweep_idx))
def check(self, t: float, x: float, y: float) -> Optional[int]:
"""
Retourne l'index sweep de la pose passée si boucle détectée, sinon None.
"""
for (t_old, x_old, y_old, idx_old) in self.history:
if t - t_old < self.min_time:
continue
dist = np.sqrt((x - x_old)**2 + (y - y_old)**2)
if dist < self.dist_thresh:
return idx_old
return None
def apply_loop_closure(
trajectory: List[Tuple[float, float, float, float]], # (t, x, y, h)
loop_start_idx: int,
loop_end_idx: int,
correction: np.ndarray, # [dx, dy]
) -> List[Tuple[float, float, float, float]]:
"""
Redistribue linéairement la correction de fermeture de boucle
sur les poses entre loop_start_idx et loop_end_idx.
C'est une simplification de la correction graph-SLAM (pas de g2o).
"""
n_loop = loop_end_idx - loop_start_idx
if n_loop <= 0:
return trajectory
corrected = list(trajectory)
for i in range(loop_start_idx, loop_end_idx + 1):
alpha = (i - loop_start_idx) / n_loop
t, x, y, h = corrected[i]
corrected[i] = (t,
x + alpha * correction[0],
y + alpha * correction[1],
h)
return corrected