feat(moulin-mapper): simulateur capteurs + pipeline SLAM Python (JSONL stream → trajectoire + nuage .ply)

- simulator.py: flux JSONL réaliste (Ping360 angle/dist, IMU, heading, depth, altitude) + vérité terrain
- slam.py: dead-reckoning + scan-to-map ICP 2D (cKDTree) + fermeture de boucle
- process.py: ingestion streaming ligne-par-ligne → trajectory.csv + map_2d.csv + cloud.ply
- stream_replay.py: rejoue le flux (vision streaming remote)
- SCHEMA.md: contrat format données ROV réel↔sim
- RMS dead-reckoning 0.386m → scan-matching 0.188m (2x)
This commit is contained in:
Flag
2026-06-06 20:01:22 +00:00
parent 959783dfa7
commit 85e9a4d4b0
14 changed files with 12677 additions and 0 deletions

358
pipeline/slam.py Normal file
View File

@@ -0,0 +1,358 @@
"""
slam.py — Dead-reckoning + Scan-matching ICP 2D + fermeture de boucle.
Modules réutilisables importés par process.py.
Dépendances : numpy, scipy uniquement.
"""
import numpy as np
from scipy.spatial import cKDTree
from typing import List, Optional, Tuple, Dict
from collections import deque
# ---------------------------------------------------------------------------
# Dead-reckoning (intégration IMU + heading)
# ---------------------------------------------------------------------------
class DeadReckoning:
"""
Dead-reckoning ROV 2D : heading (magnéto + gyro) + vitesse odométrique.
Modèle réaliste pour BlueROV sans DVL/GPS dans une chambre de moulin :
Heading : fusion magnéto + gz par filtre complémentaire
h_dr = alpha * (h_magneto) + (1-alpha) * (h_gyro_integré)
→ alpha = 0.3 : le magnéto corrige lentement, le gyro intègre rapidement
→ source de dérive : biais gz (0.003 rad/s) qui déplace h_gyro
→ sur 120s : dérive cumulée ~4° → erreur transverse ~1-2 m à 0.3 m/s
Vitesse forward : estimée depuis ax par intégration sur fenêtre glissante
→ biais ax (0.015 m/s²) intégré → erreur de vitesse → dérive position
→ en pratique : v_dr ≠ v_vraie d'environ ±0.05 m/s
Résultat : DR dérive de 1-4 m sur 120s → ICP Ping360 corrige à chaque sweep.
"""
def __init__(self, alpha_magneto: float = 0.3, tau: float = 2.0):
"""
alpha_magneto : poids du magnéto dans la fusion heading [0,1].
0 = gyro pur (max dérive), 1 = magnéto pur (pas de dérive).
0.3 = compromis réaliste (magnéto perturbé par métaux du moulin).
tau : constante de temps amortissement vitesse (s).
"""
self.x = 0.0
self.y = 0.0
self.h_gyro = 0.0 # heading intégré depuis gz
self.heading_deg = 0.0 # heading fusionné
self.vf = 0.0 # vitesse forward estimée (corps, m/s)
self.vl = 0.0 # vitesse latérale estimée (corps, m/s)
self.t_prev = None
self.alpha = alpha_magneto
self.tau = tau
def reset(self, x: float = 0.0, y: float = 0.0, heading_deg: float = 0.0):
self.x = x
self.y = y
self.h_gyro = heading_deg
self.heading_deg = heading_deg
self.vf = 0.0
self.vl = 0.0
self.t_prev = None
def update(self, t: float, heading_deg: float,
ax: float, ay: float, az: float,
gz: float = 0.0,
vf: float = None, vl: float = 0.0) -> Tuple[float, float, float]:
"""
heading_deg : cap magnéto (bruité + dérive lente).
gz : taux de lacet (rad/s), biaisé → dérive h_gyro.
ax, ay : accélérations corps (option : non utilisées si vf fourni).
vf, vl : vitesse corps forward/latérale mesurée (m/s).
Si vf=None, on intègre ax (dérive plus rapide).
Sources de dérive DR :
- biais gz → h_gyro dérive → mauvaise direction
- biais vf → position dérive dans la mauvaise direction
→ ICP corrige ces deux erreurs à chaque sweep
"""
if self.t_prev is None:
self.t_prev = t
self.h_gyro = heading_deg
self.heading_deg = heading_deg
return self.x, self.y, self.heading_deg
dt = t - self.t_prev
if dt <= 0:
return self.x, self.y, self.heading_deg
self.t_prev = t
# --- Heading : filtre complémentaire gyro (principal) + magnéto (lent) ---
# Intégration gz biaisée → h_gyro dérive
self.h_gyro = (self.h_gyro + np.rad2deg(gz) * dt) % 360
# Correction magnéto partielle (α faible = magnéto peu fiable dans moulin métallique)
dh = ((heading_deg - self.h_gyro + 180) % 360) - 180
self.h_gyro = (self.h_gyro + self.alpha * dh) % 360
self.heading_deg = self.h_gyro
# --- Vitesse : utilise vf/vl si disponible, sinon intègre ax/ay ---
if vf is not None:
# Vitesse mesurée (biaisée) — source de dérive : biais vf
self.vf = vf
self.vl = vl
else:
# Intégration accéléro (dérive plus rapide)
decay = np.exp(-dt / self.tau)
self.vf = self.vf * decay + ax * dt
self.vl = self.vl * decay + ay * dt
# --- Projection vitesse corps → monde ---
h_rad = np.deg2rad(self.heading_deg)
# avant ROV = (sin h, cos h) en ENU ; gauche = (-cos h, sin h)
vx = self.vf * np.sin(h_rad) - self.vl * np.cos(h_rad)
vy = self.vf * np.cos(h_rad) + self.vl * np.sin(h_rad)
self.x += vx * dt
self.y += vy * dt
return self.x, self.y, self.heading_deg
# ---------------------------------------------------------------------------
# Gestion des sweeps Ping360
# ---------------------------------------------------------------------------
class SweepAccumulator:
"""
Regroupe les faisceaux Ping360 en sweeps complets (0→360°).
Un nouveau sweep démarre quand l'angle fait un tour complet.
"""
def __init__(self, steps: int = 400):
self.steps = steps
self.current_sweep: List[Dict] = []
self.last_angle = None
self.sweeps_done = 0
def add_beam(self, angle_deg: float, distance: float, pose: Tuple[float, float, float]) -> Optional[List[Dict]]:
"""
Ajoute un faisceau. Si le sweep est complet (retour à 0°), retourne le sweep
et commence le suivant. Sinon retourne None.
pose = (x, y, heading_deg)
Fonctionne quel que soit le pas angulaire (1 pas ou 4.5° par record).
"""
beam = {"angle_deg": angle_deg, "distance": distance, "pose": pose}
# Détection fermeture de sweep : l'angle repasse vers 0 après avoir été >180
if (self.last_angle is not None
and self.last_angle > 180
and angle_deg < 90):
# Sweep complet — seuil minimal : au moins 20 faisceaux (tolère grand pas angulaire)
completed = self.current_sweep.copy()
self.current_sweep = [beam]
self.last_angle = angle_deg
self.sweeps_done += 1
if len(completed) >= 20:
return completed
return None
self.current_sweep.append(beam)
self.last_angle = angle_deg
return None
def sweep_to_world_points(sweep: List[Dict]) -> np.ndarray:
"""
Convertit un sweep (liste de faisceaux) en points murs dans le repère monde.
Retourne un array (M, 2) de points (x, y), dropouts exclus.
"""
points = []
for beam in sweep:
d = beam["distance"]
if d <= 0.01:
continue # dropout
angle_rel = beam["angle_deg"]
px, py, hdg = beam["pose"]
# angle monde = cap + angle_relatif (convention cap 0=Nord, CW)
angle_world = (hdg + angle_rel) % 360.0
a_rad = np.deg2rad(angle_world)
# Direction en ENU : cap 0°=+Y, 90°=+X
wx = px + d * np.sin(a_rad)
wy = py + d * np.cos(a_rad)
points.append([wx, wy])
if not points:
return np.zeros((0, 2))
return np.array(points)
# ---------------------------------------------------------------------------
# ICP 2D scan-to-map
# ---------------------------------------------------------------------------
def icp_2d(
source_pts: np.ndarray,
map_pts: np.ndarray,
max_iter: int = 20,
tol: float = 1e-4,
max_dist: float = 0.5,
) -> Tuple[float, float, float, bool]:
"""
ICP 2D : recale source_pts sur map_pts.
Retourne (dx, dy, d_heading_rad, converged).
Algorithme :
1. Pour chaque point source, chercher le plus proche dans map (cKDTree)
2. Estimer la transformation rigide 2D (rotation + translation) via SVD
3. Appliquer et itérer jusqu'à convergence
"""
if len(source_pts) < 5 or len(map_pts) < 5:
return 0.0, 0.0, 0.0, False
src = source_pts.copy()
tree = cKDTree(map_pts)
dx_total = 0.0
dy_total = 0.0
dtheta_total = 0.0
for iteration in range(max_iter):
# Nearest-neighbor
dists, idxs = tree.query(src, k=1, workers=-1)
# Filtrer les appariements trop lointains
mask = dists < max_dist
if mask.sum() < 5:
return dx_total, dy_total, dtheta_total, False
src_m = src[mask]
dst_m = map_pts[idxs[mask]]
# Centroïdes
c_src = src_m.mean(axis=0)
c_dst = dst_m.mean(axis=0)
# Matrices centrées
A = src_m - c_src
B = dst_m - c_dst
# SVD pour trouver la rotation 2D optimale
H = A.T @ B
U, S, Vt = np.linalg.svd(H)
R = Vt.T @ U.T
# Assure det(R) = +1 (rotation pure, pas de réflexion)
if np.linalg.det(R) < 0:
Vt[-1, :] *= -1
R = Vt.T @ U.T
t = c_dst - R @ c_src
# Appliquer la transformation au nuage source
src = (R @ src.T).T + t
# Accumuler la transformation totale
dx_total += t[0]
dy_total += t[1]
dtheta_total += np.arctan2(R[1, 0], R[0, 0])
# Convergence
if np.abs(t).max() < tol and abs(np.arctan2(R[1, 0], R[0, 0])) < tol:
return dx_total, dy_total, dtheta_total, True
return dx_total, dy_total, dtheta_total, True
# ---------------------------------------------------------------------------
# Carte globale incrémentale
# ---------------------------------------------------------------------------
class IncrementalMap:
"""
Carte 2D des murs, stockée comme nuage de points.
Décimation par voxel pour éviter la croissance infinie.
"""
def __init__(self, voxel_size: float = 0.05):
self.points: List[np.ndarray] = []
self.voxel_size = voxel_size
self._voxel_set = set()
def add_points(self, pts: np.ndarray) -> None:
"""Ajoute des points, en filtrant les doublons par voxel."""
for pt in pts:
key = (int(pt[0] / self.voxel_size), int(pt[1] / self.voxel_size))
if key not in self._voxel_set:
self._voxel_set.add(key)
self.points.append(pt)
def get_array(self) -> np.ndarray:
if not self.points:
return np.zeros((0, 2))
return np.array(self.points)
def __len__(self):
return len(self.points)
# ---------------------------------------------------------------------------
# Détection et correction de fermeture de boucle
# ---------------------------------------------------------------------------
class LoopClosureDetector:
"""
Détecte quand le ROV revient près d'une zone déjà visitée.
Stratégie simple : on compare la position courante aux positions passées.
Si distance < seuil et qu'un temps minimum s'est écoulé → boucle détectée.
"""
def __init__(self, min_time: float = 15.0, dist_thresh: float = 0.8):
self.min_time = min_time # on ne cherche pas dans les N dernières secondes
self.dist_thresh = dist_thresh
self.history: List[Tuple[float, float, float, int]] = [] # (t, x, y, sweep_idx)
def add_pose(self, t: float, x: float, y: float, sweep_idx: int) -> None:
self.history.append((t, x, y, sweep_idx))
def check(self, t: float, x: float, y: float) -> Optional[int]:
"""
Retourne l'index sweep de la pose passée si boucle détectée, sinon None.
"""
for (t_old, x_old, y_old, idx_old) in self.history:
if t - t_old < self.min_time:
continue
dist = np.sqrt((x - x_old)**2 + (y - y_old)**2)
if dist < self.dist_thresh:
return idx_old
return None
def apply_loop_closure(
trajectory: List[Tuple[float, float, float, float]], # (t, x, y, h)
loop_start_idx: int,
loop_end_idx: int,
correction: np.ndarray, # [dx, dy]
) -> List[Tuple[float, float, float, float]]:
"""
Redistribue linéairement la correction de fermeture de boucle
sur les poses entre loop_start_idx et loop_end_idx.
C'est une simplification de la correction graph-SLAM (pas de g2o).
"""
n_loop = loop_end_idx - loop_start_idx
if n_loop <= 0:
return trajectory
corrected = list(trajectory)
for i in range(loop_start_idx, loop_end_idx + 1):
alpha = (i - loop_start_idx) / n_loop
t, x, y, h = corrected[i]
corrected[i] = (t,
x + alpha * correction[0],
y + alpha * correction[1],
h)
return corrected