- simulator.py: flux JSONL réaliste (Ping360 angle/dist, IMU, heading, depth, altitude) + vérité terrain - slam.py: dead-reckoning + scan-to-map ICP 2D (cKDTree) + fermeture de boucle - process.py: ingestion streaming ligne-par-ligne → trajectory.csv + map_2d.csv + cloud.ply - stream_replay.py: rejoue le flux (vision streaming remote) - SCHEMA.md: contrat format données ROV réel↔sim - RMS dead-reckoning 0.386m → scan-matching 0.188m (2x)
359 lines
12 KiB
Python
359 lines
12 KiB
Python
"""
|
||
slam.py — Dead-reckoning + Scan-matching ICP 2D + fermeture de boucle.
|
||
|
||
Modules réutilisables importés par process.py.
|
||
Dépendances : numpy, scipy uniquement.
|
||
"""
|
||
|
||
import numpy as np
|
||
from scipy.spatial import cKDTree
|
||
from typing import List, Optional, Tuple, Dict
|
||
from collections import deque
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Dead-reckoning (intégration IMU + heading)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class DeadReckoning:
|
||
"""
|
||
Dead-reckoning ROV 2D : heading (magnéto + gyro) + vitesse odométrique.
|
||
|
||
Modèle réaliste pour BlueROV sans DVL/GPS dans une chambre de moulin :
|
||
|
||
Heading : fusion magnéto + gz par filtre complémentaire
|
||
h_dr = alpha * (h_magneto) + (1-alpha) * (h_gyro_integré)
|
||
→ alpha = 0.3 : le magnéto corrige lentement, le gyro intègre rapidement
|
||
→ source de dérive : biais gz (0.003 rad/s) qui déplace h_gyro
|
||
→ sur 120s : dérive cumulée ~4° → erreur transverse ~1-2 m à 0.3 m/s
|
||
|
||
Vitesse forward : estimée depuis ax par intégration sur fenêtre glissante
|
||
→ biais ax (0.015 m/s²) intégré → erreur de vitesse → dérive position
|
||
→ en pratique : v_dr ≠ v_vraie d'environ ±0.05 m/s
|
||
|
||
Résultat : DR dérive de 1-4 m sur 120s → ICP Ping360 corrige à chaque sweep.
|
||
"""
|
||
|
||
def __init__(self, alpha_magneto: float = 0.3, tau: float = 2.0):
|
||
"""
|
||
alpha_magneto : poids du magnéto dans la fusion heading [0,1].
|
||
0 = gyro pur (max dérive), 1 = magnéto pur (pas de dérive).
|
||
0.3 = compromis réaliste (magnéto perturbé par métaux du moulin).
|
||
tau : constante de temps amortissement vitesse (s).
|
||
"""
|
||
self.x = 0.0
|
||
self.y = 0.0
|
||
self.h_gyro = 0.0 # heading intégré depuis gz
|
||
self.heading_deg = 0.0 # heading fusionné
|
||
self.vf = 0.0 # vitesse forward estimée (corps, m/s)
|
||
self.vl = 0.0 # vitesse latérale estimée (corps, m/s)
|
||
self.t_prev = None
|
||
self.alpha = alpha_magneto
|
||
self.tau = tau
|
||
|
||
def reset(self, x: float = 0.0, y: float = 0.0, heading_deg: float = 0.0):
|
||
self.x = x
|
||
self.y = y
|
||
self.h_gyro = heading_deg
|
||
self.heading_deg = heading_deg
|
||
self.vf = 0.0
|
||
self.vl = 0.0
|
||
self.t_prev = None
|
||
|
||
def update(self, t: float, heading_deg: float,
|
||
ax: float, ay: float, az: float,
|
||
gz: float = 0.0,
|
||
vf: float = None, vl: float = 0.0) -> Tuple[float, float, float]:
|
||
"""
|
||
heading_deg : cap magnéto (bruité + dérive lente).
|
||
gz : taux de lacet (rad/s), biaisé → dérive h_gyro.
|
||
ax, ay : accélérations corps (option : non utilisées si vf fourni).
|
||
vf, vl : vitesse corps forward/latérale mesurée (m/s).
|
||
Si vf=None, on intègre ax (dérive plus rapide).
|
||
|
||
Sources de dérive DR :
|
||
- biais gz → h_gyro dérive → mauvaise direction
|
||
- biais vf → position dérive dans la mauvaise direction
|
||
→ ICP corrige ces deux erreurs à chaque sweep
|
||
"""
|
||
if self.t_prev is None:
|
||
self.t_prev = t
|
||
self.h_gyro = heading_deg
|
||
self.heading_deg = heading_deg
|
||
return self.x, self.y, self.heading_deg
|
||
|
||
dt = t - self.t_prev
|
||
if dt <= 0:
|
||
return self.x, self.y, self.heading_deg
|
||
self.t_prev = t
|
||
|
||
# --- Heading : filtre complémentaire gyro (principal) + magnéto (lent) ---
|
||
# Intégration gz biaisée → h_gyro dérive
|
||
self.h_gyro = (self.h_gyro + np.rad2deg(gz) * dt) % 360
|
||
# Correction magnéto partielle (α faible = magnéto peu fiable dans moulin métallique)
|
||
dh = ((heading_deg - self.h_gyro + 180) % 360) - 180
|
||
self.h_gyro = (self.h_gyro + self.alpha * dh) % 360
|
||
self.heading_deg = self.h_gyro
|
||
|
||
# --- Vitesse : utilise vf/vl si disponible, sinon intègre ax/ay ---
|
||
if vf is not None:
|
||
# Vitesse mesurée (biaisée) — source de dérive : biais vf
|
||
self.vf = vf
|
||
self.vl = vl
|
||
else:
|
||
# Intégration accéléro (dérive plus rapide)
|
||
decay = np.exp(-dt / self.tau)
|
||
self.vf = self.vf * decay + ax * dt
|
||
self.vl = self.vl * decay + ay * dt
|
||
|
||
# --- Projection vitesse corps → monde ---
|
||
h_rad = np.deg2rad(self.heading_deg)
|
||
# avant ROV = (sin h, cos h) en ENU ; gauche = (-cos h, sin h)
|
||
vx = self.vf * np.sin(h_rad) - self.vl * np.cos(h_rad)
|
||
vy = self.vf * np.cos(h_rad) + self.vl * np.sin(h_rad)
|
||
|
||
self.x += vx * dt
|
||
self.y += vy * dt
|
||
|
||
return self.x, self.y, self.heading_deg
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Gestion des sweeps Ping360
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class SweepAccumulator:
|
||
"""
|
||
Regroupe les faisceaux Ping360 en sweeps complets (0→360°).
|
||
Un nouveau sweep démarre quand l'angle fait un tour complet.
|
||
"""
|
||
|
||
def __init__(self, steps: int = 400):
|
||
self.steps = steps
|
||
self.current_sweep: List[Dict] = []
|
||
self.last_angle = None
|
||
self.sweeps_done = 0
|
||
|
||
def add_beam(self, angle_deg: float, distance: float, pose: Tuple[float, float, float]) -> Optional[List[Dict]]:
|
||
"""
|
||
Ajoute un faisceau. Si le sweep est complet (retour à 0°), retourne le sweep
|
||
et commence le suivant. Sinon retourne None.
|
||
|
||
pose = (x, y, heading_deg)
|
||
Fonctionne quel que soit le pas angulaire (1 pas ou 4.5° par record).
|
||
"""
|
||
beam = {"angle_deg": angle_deg, "distance": distance, "pose": pose}
|
||
|
||
# Détection fermeture de sweep : l'angle repasse vers 0 après avoir été >180
|
||
if (self.last_angle is not None
|
||
and self.last_angle > 180
|
||
and angle_deg < 90):
|
||
# Sweep complet — seuil minimal : au moins 20 faisceaux (tolère grand pas angulaire)
|
||
completed = self.current_sweep.copy()
|
||
self.current_sweep = [beam]
|
||
self.last_angle = angle_deg
|
||
self.sweeps_done += 1
|
||
if len(completed) >= 20:
|
||
return completed
|
||
return None
|
||
|
||
self.current_sweep.append(beam)
|
||
self.last_angle = angle_deg
|
||
return None
|
||
|
||
|
||
def sweep_to_world_points(sweep: List[Dict]) -> np.ndarray:
|
||
"""
|
||
Convertit un sweep (liste de faisceaux) en points murs dans le repère monde.
|
||
Retourne un array (M, 2) de points (x, y), dropouts exclus.
|
||
"""
|
||
points = []
|
||
for beam in sweep:
|
||
d = beam["distance"]
|
||
if d <= 0.01:
|
||
continue # dropout
|
||
|
||
angle_rel = beam["angle_deg"]
|
||
px, py, hdg = beam["pose"]
|
||
|
||
# angle monde = cap + angle_relatif (convention cap 0=Nord, CW)
|
||
angle_world = (hdg + angle_rel) % 360.0
|
||
a_rad = np.deg2rad(angle_world)
|
||
|
||
# Direction en ENU : cap 0°=+Y, 90°=+X
|
||
wx = px + d * np.sin(a_rad)
|
||
wy = py + d * np.cos(a_rad)
|
||
points.append([wx, wy])
|
||
|
||
if not points:
|
||
return np.zeros((0, 2))
|
||
return np.array(points)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ICP 2D scan-to-map
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def icp_2d(
|
||
source_pts: np.ndarray,
|
||
map_pts: np.ndarray,
|
||
max_iter: int = 20,
|
||
tol: float = 1e-4,
|
||
max_dist: float = 0.5,
|
||
) -> Tuple[float, float, float, bool]:
|
||
"""
|
||
ICP 2D : recale source_pts sur map_pts.
|
||
Retourne (dx, dy, d_heading_rad, converged).
|
||
|
||
Algorithme :
|
||
1. Pour chaque point source, chercher le plus proche dans map (cKDTree)
|
||
2. Estimer la transformation rigide 2D (rotation + translation) via SVD
|
||
3. Appliquer et itérer jusqu'à convergence
|
||
"""
|
||
if len(source_pts) < 5 or len(map_pts) < 5:
|
||
return 0.0, 0.0, 0.0, False
|
||
|
||
src = source_pts.copy()
|
||
tree = cKDTree(map_pts)
|
||
|
||
dx_total = 0.0
|
||
dy_total = 0.0
|
||
dtheta_total = 0.0
|
||
|
||
for iteration in range(max_iter):
|
||
# Nearest-neighbor
|
||
dists, idxs = tree.query(src, k=1, workers=-1)
|
||
|
||
# Filtrer les appariements trop lointains
|
||
mask = dists < max_dist
|
||
if mask.sum() < 5:
|
||
return dx_total, dy_total, dtheta_total, False
|
||
|
||
src_m = src[mask]
|
||
dst_m = map_pts[idxs[mask]]
|
||
|
||
# Centroïdes
|
||
c_src = src_m.mean(axis=0)
|
||
c_dst = dst_m.mean(axis=0)
|
||
|
||
# Matrices centrées
|
||
A = src_m - c_src
|
||
B = dst_m - c_dst
|
||
|
||
# SVD pour trouver la rotation 2D optimale
|
||
H = A.T @ B
|
||
U, S, Vt = np.linalg.svd(H)
|
||
R = Vt.T @ U.T
|
||
|
||
# Assure det(R) = +1 (rotation pure, pas de réflexion)
|
||
if np.linalg.det(R) < 0:
|
||
Vt[-1, :] *= -1
|
||
R = Vt.T @ U.T
|
||
|
||
t = c_dst - R @ c_src
|
||
|
||
# Appliquer la transformation au nuage source
|
||
src = (R @ src.T).T + t
|
||
|
||
# Accumuler la transformation totale
|
||
dx_total += t[0]
|
||
dy_total += t[1]
|
||
dtheta_total += np.arctan2(R[1, 0], R[0, 0])
|
||
|
||
# Convergence
|
||
if np.abs(t).max() < tol and abs(np.arctan2(R[1, 0], R[0, 0])) < tol:
|
||
return dx_total, dy_total, dtheta_total, True
|
||
|
||
return dx_total, dy_total, dtheta_total, True
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Carte globale incrémentale
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class IncrementalMap:
|
||
"""
|
||
Carte 2D des murs, stockée comme nuage de points.
|
||
Décimation par voxel pour éviter la croissance infinie.
|
||
"""
|
||
|
||
def __init__(self, voxel_size: float = 0.05):
|
||
self.points: List[np.ndarray] = []
|
||
self.voxel_size = voxel_size
|
||
self._voxel_set = set()
|
||
|
||
def add_points(self, pts: np.ndarray) -> None:
|
||
"""Ajoute des points, en filtrant les doublons par voxel."""
|
||
for pt in pts:
|
||
key = (int(pt[0] / self.voxel_size), int(pt[1] / self.voxel_size))
|
||
if key not in self._voxel_set:
|
||
self._voxel_set.add(key)
|
||
self.points.append(pt)
|
||
|
||
def get_array(self) -> np.ndarray:
|
||
if not self.points:
|
||
return np.zeros((0, 2))
|
||
return np.array(self.points)
|
||
|
||
def __len__(self):
|
||
return len(self.points)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Détection et correction de fermeture de boucle
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class LoopClosureDetector:
|
||
"""
|
||
Détecte quand le ROV revient près d'une zone déjà visitée.
|
||
Stratégie simple : on compare la position courante aux positions passées.
|
||
Si distance < seuil et qu'un temps minimum s'est écoulé → boucle détectée.
|
||
"""
|
||
|
||
def __init__(self, min_time: float = 15.0, dist_thresh: float = 0.8):
|
||
self.min_time = min_time # on ne cherche pas dans les N dernières secondes
|
||
self.dist_thresh = dist_thresh
|
||
self.history: List[Tuple[float, float, float, int]] = [] # (t, x, y, sweep_idx)
|
||
|
||
def add_pose(self, t: float, x: float, y: float, sweep_idx: int) -> None:
|
||
self.history.append((t, x, y, sweep_idx))
|
||
|
||
def check(self, t: float, x: float, y: float) -> Optional[int]:
|
||
"""
|
||
Retourne l'index sweep de la pose passée si boucle détectée, sinon None.
|
||
"""
|
||
for (t_old, x_old, y_old, idx_old) in self.history:
|
||
if t - t_old < self.min_time:
|
||
continue
|
||
dist = np.sqrt((x - x_old)**2 + (y - y_old)**2)
|
||
if dist < self.dist_thresh:
|
||
return idx_old
|
||
return None
|
||
|
||
|
||
def apply_loop_closure(
|
||
trajectory: List[Tuple[float, float, float, float]], # (t, x, y, h)
|
||
loop_start_idx: int,
|
||
loop_end_idx: int,
|
||
correction: np.ndarray, # [dx, dy]
|
||
) -> List[Tuple[float, float, float, float]]:
|
||
"""
|
||
Redistribue linéairement la correction de fermeture de boucle
|
||
sur les poses entre loop_start_idx et loop_end_idx.
|
||
|
||
C'est une simplification de la correction graph-SLAM (pas de g2o).
|
||
"""
|
||
n_loop = loop_end_idx - loop_start_idx
|
||
if n_loop <= 0:
|
||
return trajectory
|
||
|
||
corrected = list(trajectory)
|
||
for i in range(loop_start_idx, loop_end_idx + 1):
|
||
alpha = (i - loop_start_idx) / n_loop
|
||
t, x, y, h = corrected[i]
|
||
corrected[i] = (t,
|
||
x + alpha * correction[0],
|
||
y + alpha * correction[1],
|
||
h)
|
||
return corrected
|