feat: calibration stéréo OpenCV, capture synchrone, Flask interface MJPEG

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Floppyrj45
2026-04-19 18:07:20 +02:00
parent 9a46824199
commit f9436df2a0
4 changed files with 688 additions and 0 deletions

View File

@@ -0,0 +1,245 @@
#!/usr/bin/env python3
"""
Calibration stéréo à partir de paires d'images de damier.
Damier: 9x6 cases intérieures, case 25mm.
Sortie: config/stereo_calib.yaml
"""
import cv2
import numpy as np
import yaml
import os
import sys
import argparse
import glob
# Paramètres du damier
CHESSBOARD_SIZE = (9, 6) # coins intérieurs (colonnes, lignes)
SQUARE_SIZE_MM = 25.0 # taille d'une case en mm
SQUARE_SIZE_M = SQUARE_SIZE_MM / 1000.0
# Chemins
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CALIB_IMAGES_DIR = os.path.join(SCRIPT_DIR, "calib_images")
CONFIG_DIR = os.path.join(SCRIPT_DIR, "..", "..", "config")
OUTPUT_YAML = os.path.join(CONFIG_DIR, "stereo_calib.yaml")
# Critères de terminaison pour cornerSubPix
SUBPIX_CRITERIA = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
STEREO_CRITERIA = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 1e-5)
def prepare_object_points():
"""Génère les coordonnées 3D du damier (plan Z=0)."""
objp = np.zeros((CHESSBOARD_SIZE[0] * CHESSBOARD_SIZE[1], 3), np.float32)
objp[:, :2] = np.mgrid[
0:CHESSBOARD_SIZE[0], 0:CHESSBOARD_SIZE[1]
].T.reshape(-1, 2) * SQUARE_SIZE_M
return objp
def load_pairs():
"""Charge les paires d'images gauche/droite."""
lefts = sorted(glob.glob(os.path.join(CALIB_IMAGES_DIR, "left_*.png")))
rights = sorted(glob.glob(os.path.join(CALIB_IMAGES_DIR, "right_*.png")))
if len(lefts) == 0 or len(rights) == 0:
print(f"[ERREUR] Aucune image dans {CALIB_IMAGES_DIR}", file=sys.stderr)
print("[ERREUR] Lancer d'abord stereo_capture.py", file=sys.stderr)
sys.exit(1)
if len(lefts) != len(rights):
print(f"[WARN] {len(lefts)} gauche vs {len(rights)} droite — utilise le min")
n = min(len(lefts), len(rights))
print(f"[INFO] Chargement {n} paires de damier...")
return lefts[:n], rights[:n]
def find_corners(image_paths, label=""):
"""Détecte les coins du damier dans une liste d'images."""
objp = prepare_object_points()
obj_points = [] # 3D
img_points = [] # 2D
valid_indices = []
img_size = None
for i, path in enumerate(image_paths):
img = cv2.imread(path)
if img is None:
print(f"[WARN] Impossible de lire: {path}")
continue
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img_size = gray.shape[::-1] # (width, height)
ret, corners = cv2.findChessboardCorners(
gray, CHESSBOARD_SIZE,
cv2.CALIB_CB_ADAPTIVE_THRESH + cv2.CALIB_CB_NORMALIZE_IMAGE
)
if ret:
corners_refined = cv2.cornerSubPix(
gray, corners, (11, 11), (-1, -1), SUBPIX_CRITERIA
)
obj_points.append(objp)
img_points.append(corners_refined)
valid_indices.append(i)
else:
print(f"[WARN] {label} image {i:04d}: damier non détecté")
print(f"[INFO] {label}: {len(obj_points)}/{len(image_paths)} images valides")
return obj_points, img_points, img_size, valid_indices
def calibrate_stereo(lefts, rights):
"""Calibre le système stéréo."""
obj_l, pts_l, size_l, idx_l = find_corners(lefts, "Gauche")
obj_r, pts_r, size_r, idx_r = find_corners(rights, "Droite")
if size_l != size_r:
print("[ERREUR] Taille images gauche/droite différente", file=sys.stderr)
sys.exit(1)
img_size = size_l
# Garder seulement les paires où les 2 caméras détectent le damier
valid_pairs = set(idx_l) & set(idx_r)
idx_map_l = {v: i for i, v in enumerate(idx_l)}
idx_map_r = {v: i for i, v in enumerate(idx_r)}
obj_pts = []
pts_left = []
pts_right = []
for idx in sorted(valid_pairs):
obj_pts.append(obj_l[idx_map_l[idx]])
pts_left.append(pts_l[idx_map_l[idx]])
pts_right.append(pts_r[idx_map_r[idx]])
print(f"[INFO] {len(obj_pts)} paires valides pour calibration stéréo")
if len(obj_pts) < 10:
print("[ERREUR] Moins de 10 paires valides. Recapturer.", file=sys.stderr)
sys.exit(1)
# Calibration individuelle d'abord
print("[INFO] Calibration caméra gauche...")
_, K1, D1, _, _ = cv2.calibrateCamera(obj_pts, pts_left, img_size, None, None)
print("[INFO] Calibration caméra droite...")
_, K2, D2, _, _ = cv2.calibrateCamera(obj_pts, pts_right, img_size, None, None)
# Calibration stéréo
print("[INFO] Calibration stéréo...")
flags = cv2.CALIB_FIX_INTRINSIC
rms, K1, D1, K2, D2, R, T, E, F = cv2.stereoCalibrate(
obj_pts, pts_left, pts_right,
K1, D1, K2, D2,
img_size,
criteria=STEREO_CRITERIA,
flags=flags
)
print(f"[INFO] RMS reprojection error: {rms:.4f} px")
if rms > 1.0:
print("[WARN] RMS > 1.0 px — reconsidérer la calibration")
elif rms < 0.5:
print("[OK] RMS excellent (< 0.5 px)")
# Rectification
R1, R2, P1, P2, Q, roi_l, roi_r = cv2.stereoRectify(
K1, D1, K2, D2, img_size, R, T, alpha=0
)
return {
"K1": K1, "D1": D1, "K2": K2, "D2": D2,
"R": R, "T": T, "E": E, "F": F,
"R1": R1, "R2": R2, "P1": P1, "P2": P2, "Q": Q,
"img_size": img_size,
"rms": rms
}
def save_yaml(calib, path):
"""Sauvegarde les paramètres en YAML."""
os.makedirs(os.path.dirname(path), exist_ok=True)
def mat_to_list(m):
return m.tolist()
data = {
"rms_reprojection_error": float(calib["rms"]),
"image_size": list(calib["img_size"]),
"K1": mat_to_list(calib["K1"]),
"D1": mat_to_list(calib["D1"]),
"K2": mat_to_list(calib["K2"]),
"D2": mat_to_list(calib["D2"]),
"R": mat_to_list(calib["R"]),
"T": mat_to_list(calib["T"]),
"E": mat_to_list(calib["E"]),
"F": mat_to_list(calib["F"]),
"R1": mat_to_list(calib["R1"]),
"R2": mat_to_list(calib["R2"]),
"P1": mat_to_list(calib["P1"]),
"P2": mat_to_list(calib["P2"]),
"Q": mat_to_list(calib["Q"]),
}
with open(path, "w") as f:
yaml.dump(data, f, default_flow_style=False)
print(f"[OK] Sauvegarde: {path}")
def show_rectified(calib, left_path, right_path):
"""Affiche une paire rectifiée pour vérification visuelle."""
img_l = cv2.imread(left_path)
img_r = cv2.imread(right_path)
sz = (calib["img_size"][0], calib["img_size"][1])
map1_l, map2_l = cv2.initUndistortRectifyMap(
np.array(calib["K1"]), np.array(calib["D1"]),
np.array(calib["R1"]), np.array(calib["P1"]),
sz, cv2.CV_16SC2
)
map1_r, map2_r = cv2.initUndistortRectifyMap(
np.array(calib["K2"]), np.array(calib["D2"]),
np.array(calib["R2"]), np.array(calib["P2"]),
sz, cv2.CV_16SC2
)
rect_l = cv2.remap(img_l, map1_l, map2_l, cv2.INTER_LINEAR)
rect_r = cv2.remap(img_r, map1_r, map2_r, cv2.INTER_LINEAR)
# Tracer lignes épipolaires horizontales
combined = cv2.hconcat([
cv2.resize(rect_l, (640, 360)),
cv2.resize(rect_r, (640, 360))
])
for y in range(0, 360, 30):
cv2.line(combined, (0, y), (1280, y), (0, 255, 0), 1)
cv2.imshow("Rectified — lignes vertes doivent etre horizontales", combined)
cv2.waitKey(0)
cv2.destroyAllWindows()
def main():
parser = argparse.ArgumentParser(description="Calibration stéréo OpenCV")
parser.add_argument("--show-rectified", action="store_true",
help="Afficher une paire rectifiée après calibration")
args = parser.parse_args()
lefts, rights = load_pairs()
calib = calibrate_stereo(lefts, rights)
save_yaml(calib, OUTPUT_YAML)
if args.show_rectified and len(lefts) > 0:
# Charger le yaml pour show
with open(OUTPUT_YAML) as f:
calib_loaded = yaml.safe_load(f)
show_rectified(calib_loaded, lefts[0], rights[0])
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python3
"""
Capture synchrone de paires d'images stéréo pour calibration.
Utilise deux webcams USB (index 0 et 1).
Appuyer ESPACE pour sauvegarder, Q pour quitter.
"""
import cv2
import os
import sys
import time
# Dossier de sortie
OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "calib_images")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Index des caméras
CAM_LEFT = 0
CAM_RIGHT = 1
# Résolution cible
WIDTH = 1280
HEIGHT = 720
def open_camera(index):
"""Ouvre une caméra et configure la résolution."""
cap = cv2.VideoCapture(index, cv2.CAP_DSHOW) # CAP_DSHOW sur Windows
if not cap.isOpened():
# Essai sans backend explicite
cap = cv2.VideoCapture(index)
if not cap.isOpened():
print(f"[ERREUR] Impossible d'ouvrir la caméra {index}", file=sys.stderr)
return None
cap.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT)
cap.set(cv2.CAP_PROP_FPS, 30)
return cap
def main():
cap_left = open_camera(CAM_LEFT)
cap_right = open_camera(CAM_RIGHT)
if cap_left is None or cap_right is None:
print("[ERREUR] Vérifier connexion des 2 caméras USB.", file=sys.stderr)
sys.exit(1)
paire = 0
print(f"[INFO] Sauvegarde dans: {OUTPUT_DIR}")
print("[INFO] ESPACE = capturer paire | Q = quitter")
while True:
ret_l, frame_l = cap_left.read()
ret_r, frame_r = cap_right.read()
if not ret_l or not ret_r:
print("[WARN] Frame manquante, retry...")
time.sleep(0.05)
continue
# Affichage côte à côte (redimensionné pour l'écran)
display_l = cv2.resize(frame_l, (640, 360))
display_r = cv2.resize(frame_r, (640, 360))
combined = cv2.hconcat([display_l, display_r])
cv2.putText(combined, f"Paires: {paire}/30", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
cv2.imshow("Stereo Capture — ESPACE=capture Q=quit", combined)
key = cv2.waitKey(1) & 0xFF
if key == ord('q') or key == 27:
break
if key == ord(' '):
# Sauvegarde la paire
fname_l = os.path.join(OUTPUT_DIR, f"left_{paire:04d}.png")
fname_r = os.path.join(OUTPUT_DIR, f"right_{paire:04d}.png")
cv2.imwrite(fname_l, frame_l)
cv2.imwrite(fname_r, frame_r)
paire += 1
print(f"[OK] Paire {paire} sauvegardée")
if paire >= 30:
print("[INFO] 30 paires capturées. Calibration possible.")
cap_left.release()
cap_right.release()
cv2.destroyAllWindows()
print(f"[INFO] {paire} paires sauvegardées dans {OUTPUT_DIR}")
if __name__ == "__main__":
main()

236
src/interface/app.py Normal file
View File

@@ -0,0 +1,236 @@
#!/usr/bin/env python3
"""
Interface Flask — SLAM Stéréo BlueOS
Routes: / dashboard, /calibrate, /slam/start, /slam/stop, /stream
"""
import os
import sys
import subprocess
import threading
import time
import cv2
import numpy as np
from flask import Flask, render_template_string, Response, jsonify, redirect, url_for
app = Flask(__name__)
# Chemins
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
SRC_DIR = os.path.join(BASE_DIR, "..")
CALIB_SCRIPT = os.path.join(SRC_DIR, "calibration", "stereo_calibrate.py")
CONFIG_YAML = os.path.join(BASE_DIR, "..", "..", "config", "stereo_calib.yaml")
# État global
slam_process = None
calib_process = None
last_frame_left = None
frame_lock = threading.Lock()
# Thread capture preview
preview_active = True
def capture_preview():
"""Thread de capture frame gauche pour MJPEG."""
global last_frame_left
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
while preview_active:
ret, frame = cap.read()
if ret:
with frame_lock:
last_frame_left = frame.copy()
time.sleep(0.033) # ~30 fps
cap.release()
preview_thread = threading.Thread(target=capture_preview, daemon=True)
preview_thread.start()
# Template HTML Bootstrap
HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="fr">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>SLAM Stéréo BlueOS</title>
<link rel="stylesheet"
href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css">
<style>
body { background: #1a1a2e; color: #e0e0e0; }
.card { background: #16213e; border: 1px solid #0f3460; }
.badge-ok { background: #00b894; }
.badge-err { background: #d63031; }
.stream-img { width: 100%; border-radius: 8px; }
</style>
</head>
<body>
<div class="container mt-4">
<h1 class="mb-4">SLAM Stéréo — BlueOS</h1>
<div class="row g-3">
<!-- Status -->
<div class="col-md-4">
<div class="card p-3">
<h5>Statut</h5>
<p>SLAM: <span class="badge {{ 'badge-ok' if slam_running else 'badge-err' }}">
{{ 'Actif' if slam_running else 'Arrêté' }}</span></p>
<p>Calibration: <span class="badge {{ 'badge-ok' if calib_ok else 'badge-err' }}">
{{ 'OK' if calib_ok else 'Manquante' }}</span></p>
</div>
</div>
<!-- Contrôles SLAM -->
<div class="col-md-4">
<div class="card p-3">
<h5>Contrôles SLAM</h5>
<a href="/slam/start" class="btn btn-success btn-sm me-2">Démarrer</a>
<a href="/slam/stop" class="btn btn-danger btn-sm">Arrêter</a>
</div>
</div>
<!-- Calibration -->
<div class="col-md-4">
<div class="card p-3">
<h5>Calibration</h5>
<a href="/calibrate" class="btn btn-warning btn-sm">
Lancer calibration
</a>
{% if calib_log %}
<pre class="mt-2" style="font-size:0.75rem;max-height:80px;overflow:auto;">
{{ calib_log }}</pre>
{% endif %}
</div>
</div>
<!-- Stream caméra gauche -->
<div class="col-12">
<div class="card p-3">
<h5>Preview — Caméra Gauche</h5>
<img src="/stream" class="stream-img" alt="stream">
</div>
</div>
</div>
</div>
</body>
</html>
"""
def calib_ok():
"""Vérifie si stereo_calib.yaml existe."""
return os.path.isfile(CONFIG_YAML)
def slam_running():
"""Vérifie si le process ORB-SLAM3 tourne."""
global slam_process
return slam_process is not None and slam_process.poll() is None
calib_log_cache = ""
@app.route("/")
def index():
"""Dashboard principal."""
return render_template_string(
HTML_TEMPLATE,
slam_running=slam_running(),
calib_ok=calib_ok(),
calib_log=calib_log_cache,
)
@app.route("/calibrate")
def calibrate():
"""Lance le script de calibration stéréo en arrière-plan."""
global calib_process, calib_log_cache
if calib_process and calib_process.poll() is None:
calib_log_cache = "[INFO] Calibration déjà en cours..."
return redirect(url_for("index"))
calib_process = subprocess.Popen(
[sys.executable, CALIB_SCRIPT],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
text=True
)
def read_log(proc):
global calib_log_cache
lines = []
for line in proc.stdout:
lines.append(line.strip())
calib_log_cache = "\n".join(lines[-10:]) # 10 dernières lignes
proc.wait()
threading.Thread(target=read_log, args=(calib_process,), daemon=True).start()
calib_log_cache = "[INFO] Calibration lancée..."
return redirect(url_for("index"))
@app.route("/slam/start")
def slam_start():
"""Démarre ORB-SLAM3 (wrapper à implémenter dans src/slam/)."""
global slam_process
slam_wrapper = os.path.join(BASE_DIR, "..", "slam", "slam_node.py")
if not os.path.isfile(slam_wrapper):
return jsonify({"status": "error", "msg": "slam_node.py introuvable"}), 404
if slam_running():
return jsonify({"status": "already_running"})
slam_process = subprocess.Popen(
[sys.executable, slam_wrapper],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
return redirect(url_for("index"))
@app.route("/slam/stop")
def slam_stop():
"""Arrête ORB-SLAM3."""
global slam_process
if slam_process and slam_process.poll() is None:
slam_process.terminate()
slam_process.wait(timeout=5)
slam_process = None
return redirect(url_for("index"))
def generate_mjpeg():
"""Génère un flux MJPEG à partir de la caméra gauche."""
while True:
with frame_lock:
frame = last_frame_left
if frame is None:
# Frame vide si caméra pas disponible
blank = np.zeros((240, 320, 3), dtype=np.uint8)
cv2.putText(blank, "No Camera", (60, 120),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
frame = blank
ret, buf = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 70])
if ret:
yield (b"--frame\r\n"
b"Content-Type: image/jpeg\r\n\r\n" + buf.tobytes() + b"\r\n")
time.sleep(0.033)
@app.route("/stream")
def stream():
"""Flux MJPEG — caméra gauche."""
return Response(
generate_mjpeg(),
mimetype="multipart/x-mixed-replace; boundary=frame"
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False, threaded=True)

114
src/slam/slam_node.py Normal file
View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
Wrapper ORB-SLAM3 — nœud Python minimal.
Lit les frames stéréo, appelle ORB-SLAM3 via bindings C++ (à compiler).
Ce fichier est un placeholder fonctionnel jusqu'à compilation ORB-SLAM3.
"""
import cv2
import time
import os
import sys
import yaml
import numpy as np
CONFIG_YAML = os.path.join(
os.path.dirname(__file__), "..", "..", "config", "stereo_calib.yaml"
)
def load_calib(path):
"""Charge les paramètres de calibration stéréo."""
if not os.path.isfile(path):
print(f"[ERREUR] {path} introuvable. Lancer la calibration d'abord.")
sys.exit(1)
with open(path) as f:
return yaml.safe_load(f)
def rectify_pair(frame_l, frame_r, calib):
"""Rectifie une paire stéréo."""
sz = tuple(calib["image_size"])
K1 = np.array(calib["K1"])
D1 = np.array(calib["D1"])
K2 = np.array(calib["K2"])
D2 = np.array(calib["D2"])
R1 = np.array(calib["R1"])
R2 = np.array(calib["R2"])
P1 = np.array(calib["P1"])
P2 = np.array(calib["P2"])
map1_l, map2_l = cv2.initUndistortRectifyMap(K1, D1, R1, P1, sz, cv2.CV_16SC2)
map1_r, map2_r = cv2.initUndistortRectifyMap(K2, D2, R2, P2, sz, cv2.CV_16SC2)
rect_l = cv2.remap(frame_l, map1_l, map2_l, cv2.INTER_LINEAR)
rect_r = cv2.remap(frame_r, map1_r, map2_r, cv2.INTER_LINEAR)
return rect_l, rect_r
def compute_disparity(rect_l, rect_r):
"""Calcule la carte de disparité (SGBM)."""
gray_l = cv2.cvtColor(rect_l, cv2.COLOR_BGR2GRAY)
gray_r = cv2.cvtColor(rect_r, cv2.COLOR_BGR2GRAY)
stereo = cv2.StereoSGBM_create(
minDisparity=0,
numDisparities=64, # multiple de 16
blockSize=9,
P1=8 * 3 * 9 ** 2,
P2=32 * 3 * 9 ** 2,
disp12MaxDiff=1,
uniquenessRatio=10,
speckleWindowSize=100,
speckleRange=32,
mode=cv2.STEREO_SGBM_MODE_SGBM_3WAY
)
disparity = stereo.compute(gray_l, gray_r).astype(np.float32) / 16.0
return disparity
def main():
print("[SLAM] Démarrage du nœud SLAM stéréo...")
calib = load_calib(CONFIG_YAML)
cap_l = cv2.VideoCapture(0)
cap_r = cv2.VideoCapture(1)
if not cap_l.isOpened() or not cap_r.isOpened():
print("[ERREUR] Caméras non disponibles")
sys.exit(1)
print("[SLAM] Caméras ouvertes. Ctrl+C pour arrêter.")
try:
while True:
ret_l, frame_l = cap_l.read()
ret_r, frame_r = cap_r.read()
if not ret_l or not ret_r:
time.sleep(0.05)
continue
t = time.time()
# Rectification
rect_l, rect_r = rectify_pair(frame_l, frame_r, calib)
# Disparité (fallback si ORB-SLAM3 pas compilé)
disparity = compute_disparity(rect_l, rect_r)
# TODO: appeler ORB-SLAM3 C++ bindings quand disponible
# pose = orbslam3.track_stereo(rect_l, rect_r, t)
fps = 1.0 / (time.time() - t + 1e-9)
print(f"\r[SLAM] fps={fps:.1f} disp_mean={disparity[disparity>0].mean():.1f}px",
end="", flush=True)
except KeyboardInterrupt:
print("\n[SLAM] Arrêt.")
finally:
cap_l.release()
cap_r.release()
if __name__ == "__main__":
main()