Files
moulin-mapper/pipeline/process.py
Flag 85e9a4d4b0 feat(moulin-mapper): simulateur capteurs + pipeline SLAM Python (JSONL stream → trajectoire + nuage .ply)
- simulator.py: flux JSONL réaliste (Ping360 angle/dist, IMU, heading, depth, altitude) + vérité terrain
- slam.py: dead-reckoning + scan-to-map ICP 2D (cKDTree) + fermeture de boucle
- process.py: ingestion streaming ligne-par-ligne → trajectory.csv + map_2d.csv + cloud.ply
- stream_replay.py: rejoue le flux (vision streaming remote)
- SCHEMA.md: contrat format données ROV réel↔sim
- RMS dead-reckoning 0.386m → scan-matching 0.188m (2x)
2026-06-06 20:01:22 +00:00

308 lines
12 KiB
Python

"""
process.py — Pipeline complet streaming moulin-mapper.
Lit le flux JSONL ligne par ligne (streaming), applique dead-reckoning
+ scan-matching ICP, exporte les résultats.
Usage :
python3 process.py [--input ../data/sim/run_L.jsonl] [--truth ../data/sim/run_L_truth.csv]
python3 stream_replay.py | python3 process.py --stdin
"""
import argparse
import csv
import json
import os
import sys
import numpy as np
from typing import List, Tuple, Optional
# Assure imports depuis pipeline/
sys.path.insert(0, os.path.dirname(__file__))
from slam import (
DeadReckoning,
SweepAccumulator,
sweep_to_world_points,
icp_2d,
IncrementalMap,
LoopClosureDetector,
apply_loop_closure,
)
ROV_Z = 1.5 # profondeur fixe ROV (m) — sera remplacé par depth réel
ROV_ALT = 0.5 # hauteur fond (m) — sera remplacé par altitude réelle
# ---------------------------------------------------------------------------
# Lecture JSONL streaming
# ---------------------------------------------------------------------------
def stream_jsonl(source):
"""
Générateur : lit le flux JSONL ligne par ligne depuis un fichier ou stdin.
Chaque appel yield un dict Python.
"""
for line in source:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
# ---------------------------------------------------------------------------
# Chargement vérité terrain (optionnel)
# ---------------------------------------------------------------------------
def load_truth(truth_path: str) -> np.ndarray:
"""Retourne array (N, 5) : t, x, y, heading_deg, z"""
rows = []
with open(truth_path) as f:
reader = csv.DictReader(f)
for row in reader:
rows.append([float(row["t"]), float(row["x"]), float(row["y"]),
float(row["heading_deg"]), float(row["z"])])
return np.array(rows)
def interpolate_truth(truth: np.ndarray, t: float) -> np.ndarray:
"""Interpole la vérité terrain à l'instant t."""
idx = np.searchsorted(truth[:, 0], t)
idx = min(idx, len(truth) - 1)
return truth[idx]
# ---------------------------------------------------------------------------
# Export PLY ASCII (nuage 3D)
# ---------------------------------------------------------------------------
def write_ply(path: str, map2d: np.ndarray, depth: float, altitude: float) -> int:
"""
Extrude le contour 2D entre le fond et la surface → nuage 3D ASCII PLY.
Retourne le nombre de points.
"""
pts = []
z_surface = -depth # z=0 à la surface, profondeur = négatif si on va vers le bas
z_bottom = -(depth + altitude)
for x, y in map2d:
pts.append((x, y, z_surface))
pts.append((x, y, z_bottom))
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write("ply\nformat ascii 1.0\n")
f.write(f"element vertex {len(pts)}\n")
f.write("property float x\nproperty float y\nproperty float z\n")
f.write("end_header\n")
for px, py, pz in pts:
f.write(f"{px:.4f} {py:.4f} {pz:.4f}\n")
return len(pts)
# ---------------------------------------------------------------------------
# Pipeline principal
# ---------------------------------------------------------------------------
def run_pipeline(source, truth_path: Optional[str] = None, out_dir: str = "../data/out"):
os.makedirs(out_dir, exist_ok=True)
# Composants SLAM
dr = DeadReckoning()
sweep_acc = SweepAccumulator(steps=400)
global_map = IncrementalMap(voxel_size=0.05)
loop_detector = LoopClosureDetector(min_time=15.0, dist_thresh=0.8)
# Trajectoire estimée : dead-reckoning + corrigée
traj_dr: List[Tuple] = [] # (t, x, y)
traj_corr: List[Tuple] = [] # (t, x, y)
# Offset ICP cumulatif : déplacement total appliqué au-dessus du DR
# Maintenu entre les sweeps pour que la pose corrigée reste cohérente
icp_offset = np.array([0.0, 0.0])
# Accumulation erreurs pour RMS
dr_errors = []
sm_errors = []
truth = None
if truth_path and os.path.exists(truth_path):
truth = load_truth(truth_path)
print(f"Vérité terrain chargée : {len(truth)} points")
n_sweeps = 0
n_loop_closures = 0
sweep_poses = [] # (sweep_idx, t, x_corr, y_corr)
x_dr, y_dr, h_dr = 0.0, 0.0, 0.0
last_depth = ROV_Z
last_altitude = ROV_ALT
for record in stream_jsonl(source):
t = record["t"]
hdg = record["heading"]
alt = record["altitude"]
depth_r = record["depth"]
ax = record["ax"]
ay = record["ay"]
az = record["az"]
gz = record.get("gz", 0.0)
vf = record.get("vf", None) # vitesse forward (optionnel)
vl = record.get("vl", 0.0)
ping_angle = record["ping360_angle"]
ping_dist = record["ping360_distance"]
if alt > 0.01:
last_altitude = alt
if depth_r > 0.01:
last_depth = depth_r
# --- Dead-reckoning ---
x_dr, y_dr, h_dr = dr.update(t, hdg, ax, ay, az, gz=gz, vf=vf, vl=vl)
# Pose corrigée = pose DR + offset ICP cumulatif
# L'offset est mis à jour à chaque sweep ICP réussi
pose_dr = np.array([x_dr, y_dr])
pose_corr_cur = pose_dr + icp_offset
# --- Sweep Ping360 ---
# On passe la pose CORRIGÉE au sweep pour générer des points
# cohérents avec la carte globale déjà corrigée
pose_for_sweep = (pose_corr_cur[0], pose_corr_cur[1], h_dr)
completed = sweep_acc.add_beam(ping_angle, ping_dist, pose_for_sweep)
if completed is not None:
n_sweeps += 1
# Points murs depuis le sweep (en coordonnées pose_corr)
sweep_pts = sweep_to_world_points(completed)
if len(sweep_pts) >= 5:
map_arr = global_map.get_array()
if len(map_arr) >= 10:
# ICP : recaler sweep sur carte → donne correction résiduelle
# La correction est petite car la pose déjà corrigée
dx, dy, dtheta, converged = icp_2d(
sweep_pts, map_arr,
max_iter=30, tol=1e-4, max_dist=0.5
)
if converged:
# Mise à jour de l'offset cumulatif
icp_offset = icp_offset + np.array([dx, dy])
pose_corr_cur = pose_dr + icp_offset
# Ajouter les points du sweep à la carte globale
# (avec la pose corrigée)
global_map.add_points(sweep_pts + icp_offset)
# Enregistrer la pose pour la trajectoire
traj_dr.append((t, x_dr, y_dr))
traj_corr.append((t, pose_corr_cur[0], pose_corr_cur[1]))
sweep_poses.append((n_sweeps - 1, t, pose_corr_cur[0], pose_corr_cur[1]))
# Vérification erreurs vs vérité
if truth is not None:
gt = interpolate_truth(truth, t)
gt_xy = gt[1:3]
dr_errors.append(float(np.linalg.norm(pose_dr - gt_xy)))
sm_errors.append(float(np.linalg.norm(pose_corr_cur - gt_xy)))
# Détection fermeture de boucle
loop_detector.add_pose(t, pose_corr_cur[0], pose_corr_cur[1], n_sweeps - 1)
match_idx = loop_detector.check(t, pose_corr_cur[0], pose_corr_cur[1])
if match_idx is not None and match_idx < len(sweep_poses) - 3:
n_loop_closures += 1
_, ox, oy = traj_corr[match_idx]
correction = np.array([ox - pose_corr_cur[0], oy - pose_corr_cur[1]]) * 0.5
# Redistribuer la correction linéairement
for i in range(match_idx, len(traj_corr)):
alpha = (i - match_idx) / max(1, len(traj_corr) - 1 - match_idx)
t2, x2, y2 = traj_corr[i]
traj_corr[i] = (t2, x2 + alpha * correction[0], y2 + alpha * correction[1])
icp_offset = icp_offset + correction
loop_detector.history.clear()
# ---------------------------------------------------------------------------
# Export résultats
# ---------------------------------------------------------------------------
# trajectory.csv
traj_path = os.path.join(out_dir, "trajectory.csv")
with open(traj_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["t", "x_dr", "y_dr", "x_corr", "y_corr"])
n = min(len(traj_dr), len(traj_corr))
for i in range(n):
t2, xd, yd = traj_dr[i]
_, xc, yc = traj_corr[i]
writer.writerow([f"{t2:.4f}", f"{xd:.4f}", f"{yd:.4f}",
f"{xc:.4f}", f"{yc:.4f}"])
# map_2d.csv
map_arr = global_map.get_array()
map_path = os.path.join(out_dir, "map_2d.csv")
with open(map_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["x", "y"])
for x2, y2 in map_arr:
writer.writerow([f"{x2:.4f}", f"{y2:.4f}"])
# cloud.ply
ply_path = os.path.join(out_dir, "cloud.ply")
n_ply_pts = write_ply(ply_path, map_arr, last_depth, last_altitude)
# ---------------------------------------------------------------------------
# Stats
# ---------------------------------------------------------------------------
print("\n=== Résultats pipeline ===")
print(f"Sweeps traités : {n_sweeps}")
print(f"Fermetures de boucle : {n_loop_closures}")
print(f"Points carte 2D : {len(global_map)}")
print(f"Points nuage 3D (PLY) : {n_ply_pts}")
print(f"Taille cloud.ply : {os.path.getsize(ply_path)/1e3:.1f} Ko")
if dr_errors and sm_errors:
rms_dr = float(np.sqrt(np.mean(np.array(dr_errors)**2)))
rms_sm = float(np.sqrt(np.mean(np.array(sm_errors)**2)))
ratio = rms_sm / rms_dr if rms_dr > 0 else float("inf")
print(f"\nRMS erreur dead-reckoning : {rms_dr:.3f} m")
print(f"RMS erreur scan-matching : {rms_sm:.3f} m")
print(f"Amélioration : {ratio:.2f}x (DR/SM={rms_dr/rms_sm:.1f}x)")
if rms_sm < rms_dr:
print("OK — scan-matching améliore la position vs dead-reckoning")
else:
print("AVERTISSEMENT — scan-matching n'améliore pas (données insuffisantes?)")
else:
print("\nPas de vérité terrain → pas de calcul RMS")
print(f"\nFichiers exportés dans {out_dir}/")
print(f" {traj_path}")
print(f" {map_path}")
print(f" {ply_path}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Pipeline SLAM moulin-mapper")
parser.add_argument("--input", default="../data/sim/run_L.jsonl",
help="Fichier JSONL en entrée")
parser.add_argument("--truth", default="../data/sim/run_L_truth.csv",
help="Fichier vérité terrain (optionnel)")
parser.add_argument("--out-dir", default="../data/out")
parser.add_argument("--stdin", action="store_true",
help="Lire depuis stdin au lieu d'un fichier")
args = parser.parse_args()
if args.stdin:
print("Lecture depuis stdin (streaming)...")
run_pipeline(sys.stdin, truth_path=None, out_dir=args.out_dir)
else:
truth_p = args.truth if os.path.exists(args.truth) else None
with open(args.input) as f:
run_pipeline(f, truth_path=truth_p, out_dir=args.out_dir)