Files
moulin-mapper/server/slam_incremental.py
Flag 6e83bbd73f feat(server): ingest temps réel WS + GUI live + client PC
Serveur FastAPI reçoit le flux JSONL (sim ou ROV réel) sur /ws/ingest,
SLAM incrémental, rediffuse carte+poses sur /ws/live, GUI live et export PLY.
Déployé Docker sur caddy-net, exposé /moulin-live/. Client PC stream_client.py.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 20:27:17 +00:00

401 lines
13 KiB
Python

"""
slam_incremental.py — SLAM incrémental alimenté enregistrement par enregistrement.
Adapté depuis pipeline/slam.py pour une utilisation serveur (pas de batch).
Appel: une seule méthode add_record(rec) par message reçu.
Quand un sweep est complété → ICP + fermeture de boucle auto.
"""
import numpy as np
from scipy.spatial import cKDTree
from typing import List, Optional, Tuple, Dict
# ---------------------------------------------------------------------------
# Dead-reckoning
# ---------------------------------------------------------------------------
class DeadReckoning:
def __init__(self, alpha_magneto: float = 0.3, tau: float = 2.0):
self.x = 0.0
self.y = 0.0
self.h_gyro = 0.0
self.heading_deg = 0.0
self.vf = 0.0
self.vl = 0.0
self.t_prev = None
self.alpha = alpha_magneto
self.tau = tau
def reset(self):
self.x = 0.0
self.y = 0.0
self.h_gyro = 0.0
self.heading_deg = 0.0
self.vf = 0.0
self.vl = 0.0
self.t_prev = None
def update(self, t: float, heading_deg: float,
ax: float, ay: float, gz: float,
vf: float = None, vl: float = 0.0) -> Tuple[float, float, float]:
if self.t_prev is None:
self.t_prev = t
self.h_gyro = heading_deg
self.heading_deg = heading_deg
return self.x, self.y, self.heading_deg
dt = t - self.t_prev
if dt <= 0:
return self.x, self.y, self.heading_deg
self.t_prev = t
self.h_gyro = (self.h_gyro + np.rad2deg(gz) * dt) % 360
dh = ((heading_deg - self.h_gyro + 180) % 360) - 180
self.h_gyro = (self.h_gyro + self.alpha * dh) % 360
self.heading_deg = self.h_gyro
if vf is not None:
self.vf = vf
self.vl = vl
else:
decay = np.exp(-dt / self.tau)
self.vf = self.vf * decay + ax * dt
self.vl = 0.0
h_rad = np.deg2rad(self.heading_deg)
vx = self.vf * np.sin(h_rad) - self.vl * np.cos(h_rad)
vy = self.vf * np.cos(h_rad) + self.vl * np.sin(h_rad)
self.x += vx * dt
self.y += vy * dt
return self.x, self.y, self.heading_deg
# ---------------------------------------------------------------------------
# Accumulation de sweep
# ---------------------------------------------------------------------------
class SweepAccumulator:
def __init__(self):
self.current_sweep: List[Dict] = []
self.last_angle = None
self.sweeps_done = 0
def reset(self):
self.current_sweep = []
self.last_angle = None
self.sweeps_done = 0
def add_beam(self, angle_deg: float, distance: float,
pose: Tuple[float, float, float]) -> Optional[List[Dict]]:
beam = {"angle_deg": angle_deg, "distance": distance, "pose": pose}
if (self.last_angle is not None
and self.last_angle > 180
and angle_deg < 90):
completed = self.current_sweep.copy()
self.current_sweep = [beam]
self.last_angle = angle_deg
self.sweeps_done += 1
if len(completed) >= 20:
return completed
return None
self.current_sweep.append(beam)
self.last_angle = angle_deg
return None
def sweep_to_world_points(sweep: List[Dict]) -> np.ndarray:
points = []
for beam in sweep:
d = beam["distance"]
if d <= 0.01:
continue
angle_rel = beam["angle_deg"]
px, py, hdg = beam["pose"]
angle_world = (hdg + angle_rel) % 360.0
a_rad = np.deg2rad(angle_world)
wx = px + d * np.sin(a_rad)
wy = py + d * np.cos(a_rad)
points.append([wx, wy])
if not points:
return np.zeros((0, 2))
return np.array(points)
# ---------------------------------------------------------------------------
# ICP 2D
# ---------------------------------------------------------------------------
def icp_2d(
source_pts: np.ndarray,
map_pts: np.ndarray,
max_iter: int = 20,
tol: float = 1e-4,
max_dist: float = 0.5,
) -> Tuple[float, float, float, bool]:
if len(source_pts) < 5 or len(map_pts) < 5:
return 0.0, 0.0, 0.0, False
src = source_pts.copy()
tree = cKDTree(map_pts)
dx_total = 0.0
dy_total = 0.0
dtheta_total = 0.0
for _ in range(max_iter):
dists, idxs = tree.query(src, k=1, workers=-1)
mask = dists < max_dist
if mask.sum() < 5:
return dx_total, dy_total, dtheta_total, False
src_m = src[mask]
dst_m = map_pts[idxs[mask]]
c_src = src_m.mean(axis=0)
c_dst = dst_m.mean(axis=0)
A = src_m - c_src
B = dst_m - c_dst
H = A.T @ B
U, S, Vt = np.linalg.svd(H)
R = Vt.T @ U.T
if np.linalg.det(R) < 0:
Vt[-1, :] *= -1
R = Vt.T @ U.T
t = c_dst - R @ c_src
src = (R @ src.T).T + t
dx_total += t[0]
dy_total += t[1]
dtheta_total += np.arctan2(R[1, 0], R[0, 0])
if np.abs(t).max() < tol and abs(np.arctan2(R[1, 0], R[0, 0])) < tol:
return dx_total, dy_total, dtheta_total, True
return dx_total, dy_total, dtheta_total, True
# ---------------------------------------------------------------------------
# Carte incrémentale
# ---------------------------------------------------------------------------
class IncrementalMap:
def __init__(self, voxel_size: float = 0.05):
self.points: List[np.ndarray] = []
self.voxel_size = voxel_size
self._voxel_set = set()
def reset(self):
self.points = []
self._voxel_set = set()
def add_points(self, pts: np.ndarray) -> int:
"""Retourne le nombre de nouveaux points ajoutés."""
added = 0
for pt in pts:
key = (int(pt[0] / self.voxel_size), int(pt[1] / self.voxel_size))
if key not in self._voxel_set:
self._voxel_set.add(key)
self.points.append(pt)
added += 1
return added
def get_array(self) -> np.ndarray:
if not self.points:
return np.zeros((0, 2))
return np.array(self.points)
def __len__(self):
return len(self.points)
# ---------------------------------------------------------------------------
# Fermeture de boucle
# ---------------------------------------------------------------------------
class LoopClosureDetector:
def __init__(self, min_time: float = 15.0, dist_thresh: float = 0.8):
self.min_time = min_time
self.dist_thresh = dist_thresh
self.history: List[Tuple[float, float, float, int]] = []
def reset(self):
self.history = []
def add_pose(self, t: float, x: float, y: float, sweep_idx: int) -> None:
self.history.append((t, x, y, sweep_idx))
def check(self, t: float, x: float, y: float) -> Optional[int]:
for (t_old, x_old, y_old, idx_old) in self.history:
if t - t_old < self.min_time:
continue
dist = np.sqrt((x - x_old)**2 + (y - y_old)**2)
if dist < self.dist_thresh:
return idx_old
return None
# ---------------------------------------------------------------------------
# Orchestrateur SLAM incrémental
# ---------------------------------------------------------------------------
class IncrementalSLAM:
"""
Reçoit les enregistrements un par un via add_record().
Après chaque sweep complet → ICP + fermeture de boucle.
Expose: pose_dr, pose_corrected, map, trajectory, loop_closures.
"""
def __init__(self):
self.dr = DeadReckoning()
self.sweep_acc = SweepAccumulator()
self.imap = IncrementalMap()
self.lcd = LoopClosureDetector()
# Pose courante (x, y, heading_deg)
self.pose_dr = (0.0, 0.0, 0.0)
self.pose_corrected = (0.0, 0.0, 0.0)
# Trajectoire : liste de (t, x, y, h) — corrigée
self.trajectory: List[Tuple[float, float, float, float]] = []
# Stats
self.records_count = 0
self.sweeps_done = 0
self.loop_closures = 0
self.last_t = 0.0
# Points ajoutés au dernier sweep (pour delta live)
self._last_new_points: np.ndarray = np.zeros((0, 2))
def reset(self):
self.dr.reset()
self.sweep_acc.reset()
self.imap.reset()
self.lcd.reset()
self.pose_dr = (0.0, 0.0, 0.0)
self.pose_corrected = (0.0, 0.0, 0.0)
self.trajectory = []
self.records_count = 0
self.sweeps_done = 0
self.loop_closures = 0
self.last_t = 0.0
self._last_new_points = np.zeros((0, 2))
def add_record(self, rec: dict) -> bool:
"""
Traite un enregistrement. Retourne True si un sweep vient d'être complété
(le client /ws/live recevra alors un delta).
"""
t = rec["t"]
self.records_count += 1
self.last_t = t
# DR
x, y, h = self.dr.update(
t=t,
heading_deg=rec["heading"],
ax=rec["ax"],
ay=rec["ay"],
gz=rec["gz"],
vf=rec.get("vf"),
vl=rec.get("vl", 0.0),
)
self.pose_dr = (x, y, h)
# Sweep accumulation
sweep = self.sweep_acc.add_beam(
angle_deg=rec["ping360_angle"],
distance=rec["ping360_distance"],
pose=(x, y, h),
)
if sweep is None:
return False
# --- Sweep complet ---
self.sweeps_done = self.sweep_acc.sweeps_done
new_pts = sweep_to_world_points(sweep)
# ICP si la carte n'est pas vide
map_arr = self.imap.get_array()
dx_icp = dy_icp = dtheta_icp = 0.0
if len(map_arr) >= 5 and len(new_pts) >= 5:
dx_icp, dy_icp, dtheta_icp, _ = icp_2d(new_pts, map_arr)
# Appliquer correction ICP à la pose courante
x_c = x + dx_icp
y_c = y + dy_icp
h_c = (h + np.rad2deg(dtheta_icp)) % 360
self.pose_corrected = (x_c, y_c, h_c)
# Recalculer les points corrigés pour la carte
corrected_sweep = [
{
"angle_deg": b["angle_deg"],
"distance": b["distance"],
"pose": (
b["pose"][0] + dx_icp,
b["pose"][1] + dy_icp,
b["pose"][2] + np.rad2deg(dtheta_icp),
)
}
for b in sweep
]
map_pts_new = sweep_to_world_points(corrected_sweep)
else:
self.pose_corrected = (x, y, h)
map_pts_new = new_pts
# Ajouter les points à la carte
added = self.imap.add_points(map_pts_new)
self._last_new_points = map_pts_new if added > 0 else np.zeros((0, 2))
# Trajectoire
xc, yc, hc = self.pose_corrected
self.trajectory.append((t, xc, yc, hc))
# Fermeture de boucle
loop_old_idx = self.lcd.check(t, xc, yc)
if loop_old_idx is not None:
self.loop_closures += 1
# Redistribution linéaire correction sur dernières poses
n_loop = len(self.trajectory) - loop_old_idx
if n_loop > 1:
corr = np.array([
self.trajectory[loop_old_idx][1] - xc,
self.trajectory[loop_old_idx][2] - yc,
])
for i in range(loop_old_idx, len(self.trajectory)):
alpha = (i - loop_old_idx) / n_loop
tt, xx, yy, hh = self.trajectory[i]
self.trajectory[i] = (tt, xx + alpha * corr[0], yy + alpha * corr[1], hh)
self.lcd.add_pose(t, xc, yc, len(self.trajectory) - 1)
return True
def get_state_snapshot(self) -> dict:
"""Retourne l'état complet (pour connexion initiale /ws/live)."""
map_arr = self.imap.get_array()
return {
"type": "snapshot",
"records": self.records_count,
"sweeps": self.sweeps_done,
"loop_closures": self.loop_closures,
"last_t": self.last_t,
"pose_dr": list(self.pose_dr),
"pose_corrected": list(self.pose_corrected),
"map_points": map_arr.tolist(),
"trajectory": [list(p) for p in self.trajectory],
}
def get_sweep_delta(self) -> dict:
"""Retourne le delta après un sweep (nouveaux points + pose actuelle)."""
return {
"type": "delta",
"records": self.records_count,
"sweeps": self.sweeps_done,
"loop_closures": self.loop_closures,
"last_t": self.last_t,
"pose_dr": list(self.pose_dr),
"pose_corrected": list(self.pose_corrected),
"new_map_points": self._last_new_points.tolist(),
"trajectory_tail": [list(p) for p in self.trajectory[-5:]],
}