diff --git a/src/calibration/stereo_calibrate.py b/src/calibration/stereo_calibrate.py new file mode 100644 index 0000000..3a88db6 --- /dev/null +++ b/src/calibration/stereo_calibrate.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python3 +""" +Calibration stéréo à partir de paires d'images de damier. +Damier: 9x6 cases intérieures, case 25mm. +Sortie: config/stereo_calib.yaml +""" + +import cv2 +import numpy as np +import yaml +import os +import sys +import argparse +import glob + +# Paramètres du damier +CHESSBOARD_SIZE = (9, 6) # coins intérieurs (colonnes, lignes) +SQUARE_SIZE_MM = 25.0 # taille d'une case en mm +SQUARE_SIZE_M = SQUARE_SIZE_MM / 1000.0 + +# Chemins +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +CALIB_IMAGES_DIR = os.path.join(SCRIPT_DIR, "calib_images") +CONFIG_DIR = os.path.join(SCRIPT_DIR, "..", "..", "config") +OUTPUT_YAML = os.path.join(CONFIG_DIR, "stereo_calib.yaml") + +# Critères de terminaison pour cornerSubPix +SUBPIX_CRITERIA = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001) +STEREO_CRITERIA = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 1e-5) + + +def prepare_object_points(): + """Génère les coordonnées 3D du damier (plan Z=0).""" + objp = np.zeros((CHESSBOARD_SIZE[0] * CHESSBOARD_SIZE[1], 3), np.float32) + objp[:, :2] = np.mgrid[ + 0:CHESSBOARD_SIZE[0], 0:CHESSBOARD_SIZE[1] + ].T.reshape(-1, 2) * SQUARE_SIZE_M + return objp + + +def load_pairs(): + """Charge les paires d'images gauche/droite.""" + lefts = sorted(glob.glob(os.path.join(CALIB_IMAGES_DIR, "left_*.png"))) + rights = sorted(glob.glob(os.path.join(CALIB_IMAGES_DIR, "right_*.png"))) + + if len(lefts) == 0 or len(rights) == 0: + print(f"[ERREUR] Aucune image dans {CALIB_IMAGES_DIR}", file=sys.stderr) + print("[ERREUR] Lancer d'abord stereo_capture.py", file=sys.stderr) + sys.exit(1) + + if len(lefts) != len(rights): + print(f"[WARN] {len(lefts)} gauche vs {len(rights)} droite — utilise le min") + + n = min(len(lefts), len(rights)) + print(f"[INFO] Chargement {n} paires de damier...") + return lefts[:n], rights[:n] + + +def find_corners(image_paths, label=""): + """Détecte les coins du damier dans une liste d'images.""" + objp = prepare_object_points() + obj_points = [] # 3D + img_points = [] # 2D + valid_indices = [] + img_size = None + + for i, path in enumerate(image_paths): + img = cv2.imread(path) + if img is None: + print(f"[WARN] Impossible de lire: {path}") + continue + + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + img_size = gray.shape[::-1] # (width, height) + + ret, corners = cv2.findChessboardCorners( + gray, CHESSBOARD_SIZE, + cv2.CALIB_CB_ADAPTIVE_THRESH + cv2.CALIB_CB_NORMALIZE_IMAGE + ) + + if ret: + corners_refined = cv2.cornerSubPix( + gray, corners, (11, 11), (-1, -1), SUBPIX_CRITERIA + ) + obj_points.append(objp) + img_points.append(corners_refined) + valid_indices.append(i) + else: + print(f"[WARN] {label} image {i:04d}: damier non détecté") + + print(f"[INFO] {label}: {len(obj_points)}/{len(image_paths)} images valides") + return obj_points, img_points, img_size, valid_indices + + +def calibrate_stereo(lefts, rights): + """Calibre le système stéréo.""" + obj_l, pts_l, size_l, idx_l = find_corners(lefts, "Gauche") + obj_r, pts_r, size_r, idx_r = find_corners(rights, "Droite") + + if size_l != size_r: + print("[ERREUR] Taille images gauche/droite différente", file=sys.stderr) + sys.exit(1) + + img_size = size_l + + # Garder seulement les paires où les 2 caméras détectent le damier + valid_pairs = set(idx_l) & set(idx_r) + idx_map_l = {v: i for i, v in enumerate(idx_l)} + idx_map_r = {v: i for i, v in enumerate(idx_r)} + + obj_pts = [] + pts_left = [] + pts_right = [] + for idx in sorted(valid_pairs): + obj_pts.append(obj_l[idx_map_l[idx]]) + pts_left.append(pts_l[idx_map_l[idx]]) + pts_right.append(pts_r[idx_map_r[idx]]) + + print(f"[INFO] {len(obj_pts)} paires valides pour calibration stéréo") + + if len(obj_pts) < 10: + print("[ERREUR] Moins de 10 paires valides. Recapturer.", file=sys.stderr) + sys.exit(1) + + # Calibration individuelle d'abord + print("[INFO] Calibration caméra gauche...") + _, K1, D1, _, _ = cv2.calibrateCamera(obj_pts, pts_left, img_size, None, None) + + print("[INFO] Calibration caméra droite...") + _, K2, D2, _, _ = cv2.calibrateCamera(obj_pts, pts_right, img_size, None, None) + + # Calibration stéréo + print("[INFO] Calibration stéréo...") + flags = cv2.CALIB_FIX_INTRINSIC + rms, K1, D1, K2, D2, R, T, E, F = cv2.stereoCalibrate( + obj_pts, pts_left, pts_right, + K1, D1, K2, D2, + img_size, + criteria=STEREO_CRITERIA, + flags=flags + ) + + print(f"[INFO] RMS reprojection error: {rms:.4f} px") + if rms > 1.0: + print("[WARN] RMS > 1.0 px — reconsidérer la calibration") + elif rms < 0.5: + print("[OK] RMS excellent (< 0.5 px)") + + # Rectification + R1, R2, P1, P2, Q, roi_l, roi_r = cv2.stereoRectify( + K1, D1, K2, D2, img_size, R, T, alpha=0 + ) + + return { + "K1": K1, "D1": D1, "K2": K2, "D2": D2, + "R": R, "T": T, "E": E, "F": F, + "R1": R1, "R2": R2, "P1": P1, "P2": P2, "Q": Q, + "img_size": img_size, + "rms": rms + } + + +def save_yaml(calib, path): + """Sauvegarde les paramètres en YAML.""" + os.makedirs(os.path.dirname(path), exist_ok=True) + + def mat_to_list(m): + return m.tolist() + + data = { + "rms_reprojection_error": float(calib["rms"]), + "image_size": list(calib["img_size"]), + "K1": mat_to_list(calib["K1"]), + "D1": mat_to_list(calib["D1"]), + "K2": mat_to_list(calib["K2"]), + "D2": mat_to_list(calib["D2"]), + "R": mat_to_list(calib["R"]), + "T": mat_to_list(calib["T"]), + "E": mat_to_list(calib["E"]), + "F": mat_to_list(calib["F"]), + "R1": mat_to_list(calib["R1"]), + "R2": mat_to_list(calib["R2"]), + "P1": mat_to_list(calib["P1"]), + "P2": mat_to_list(calib["P2"]), + "Q": mat_to_list(calib["Q"]), + } + + with open(path, "w") as f: + yaml.dump(data, f, default_flow_style=False) + + print(f"[OK] Sauvegarde: {path}") + + +def show_rectified(calib, left_path, right_path): + """Affiche une paire rectifiée pour vérification visuelle.""" + img_l = cv2.imread(left_path) + img_r = cv2.imread(right_path) + sz = (calib["img_size"][0], calib["img_size"][1]) + + map1_l, map2_l = cv2.initUndistortRectifyMap( + np.array(calib["K1"]), np.array(calib["D1"]), + np.array(calib["R1"]), np.array(calib["P1"]), + sz, cv2.CV_16SC2 + ) + map1_r, map2_r = cv2.initUndistortRectifyMap( + np.array(calib["K2"]), np.array(calib["D2"]), + np.array(calib["R2"]), np.array(calib["P2"]), + sz, cv2.CV_16SC2 + ) + + rect_l = cv2.remap(img_l, map1_l, map2_l, cv2.INTER_LINEAR) + rect_r = cv2.remap(img_r, map1_r, map2_r, cv2.INTER_LINEAR) + + # Tracer lignes épipolaires horizontales + combined = cv2.hconcat([ + cv2.resize(rect_l, (640, 360)), + cv2.resize(rect_r, (640, 360)) + ]) + for y in range(0, 360, 30): + cv2.line(combined, (0, y), (1280, y), (0, 255, 0), 1) + + cv2.imshow("Rectified — lignes vertes doivent etre horizontales", combined) + cv2.waitKey(0) + cv2.destroyAllWindows() + + +def main(): + parser = argparse.ArgumentParser(description="Calibration stéréo OpenCV") + parser.add_argument("--show-rectified", action="store_true", + help="Afficher une paire rectifiée après calibration") + args = parser.parse_args() + + lefts, rights = load_pairs() + calib = calibrate_stereo(lefts, rights) + save_yaml(calib, OUTPUT_YAML) + + if args.show_rectified and len(lefts) > 0: + # Charger le yaml pour show + with open(OUTPUT_YAML) as f: + calib_loaded = yaml.safe_load(f) + show_rectified(calib_loaded, lefts[0], rights[0]) + + +if __name__ == "__main__": + main() diff --git a/src/calibration/stereo_capture.py b/src/calibration/stereo_capture.py new file mode 100644 index 0000000..7ba0003 --- /dev/null +++ b/src/calibration/stereo_capture.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +""" +Capture synchrone de paires d'images stéréo pour calibration. +Utilise deux webcams USB (index 0 et 1). +Appuyer ESPACE pour sauvegarder, Q pour quitter. +""" + +import cv2 +import os +import sys +import time + +# Dossier de sortie +OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "calib_images") +os.makedirs(OUTPUT_DIR, exist_ok=True) + +# Index des caméras +CAM_LEFT = 0 +CAM_RIGHT = 1 + +# Résolution cible +WIDTH = 1280 +HEIGHT = 720 + + +def open_camera(index): + """Ouvre une caméra et configure la résolution.""" + cap = cv2.VideoCapture(index, cv2.CAP_DSHOW) # CAP_DSHOW sur Windows + if not cap.isOpened(): + # Essai sans backend explicite + cap = cv2.VideoCapture(index) + if not cap.isOpened(): + print(f"[ERREUR] Impossible d'ouvrir la caméra {index}", file=sys.stderr) + return None + cap.set(cv2.CAP_PROP_FRAME_WIDTH, WIDTH) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, HEIGHT) + cap.set(cv2.CAP_PROP_FPS, 30) + return cap + + +def main(): + cap_left = open_camera(CAM_LEFT) + cap_right = open_camera(CAM_RIGHT) + + if cap_left is None or cap_right is None: + print("[ERREUR] Vérifier connexion des 2 caméras USB.", file=sys.stderr) + sys.exit(1) + + paire = 0 + print(f"[INFO] Sauvegarde dans: {OUTPUT_DIR}") + print("[INFO] ESPACE = capturer paire | Q = quitter") + + while True: + ret_l, frame_l = cap_left.read() + ret_r, frame_r = cap_right.read() + + if not ret_l or not ret_r: + print("[WARN] Frame manquante, retry...") + time.sleep(0.05) + continue + + # Affichage côte à côte (redimensionné pour l'écran) + display_l = cv2.resize(frame_l, (640, 360)) + display_r = cv2.resize(frame_r, (640, 360)) + combined = cv2.hconcat([display_l, display_r]) + cv2.putText(combined, f"Paires: {paire}/30", (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) + cv2.imshow("Stereo Capture — ESPACE=capture Q=quit", combined) + + key = cv2.waitKey(1) & 0xFF + + if key == ord('q') or key == 27: + break + + if key == ord(' '): + # Sauvegarde la paire + fname_l = os.path.join(OUTPUT_DIR, f"left_{paire:04d}.png") + fname_r = os.path.join(OUTPUT_DIR, f"right_{paire:04d}.png") + cv2.imwrite(fname_l, frame_l) + cv2.imwrite(fname_r, frame_r) + paire += 1 + print(f"[OK] Paire {paire} sauvegardée") + if paire >= 30: + print("[INFO] 30 paires capturées. Calibration possible.") + + cap_left.release() + cap_right.release() + cv2.destroyAllWindows() + print(f"[INFO] {paire} paires sauvegardées dans {OUTPUT_DIR}") + + +if __name__ == "__main__": + main() diff --git a/src/interface/app.py b/src/interface/app.py new file mode 100644 index 0000000..085061b --- /dev/null +++ b/src/interface/app.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +""" +Interface Flask — SLAM Stéréo BlueOS +Routes: / dashboard, /calibrate, /slam/start, /slam/stop, /stream +""" + +import os +import sys +import subprocess +import threading +import time +import cv2 +import numpy as np +from flask import Flask, render_template_string, Response, jsonify, redirect, url_for + +app = Flask(__name__) + +# Chemins +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +SRC_DIR = os.path.join(BASE_DIR, "..") +CALIB_SCRIPT = os.path.join(SRC_DIR, "calibration", "stereo_calibrate.py") +CONFIG_YAML = os.path.join(BASE_DIR, "..", "..", "config", "stereo_calib.yaml") + +# État global +slam_process = None +calib_process = None +last_frame_left = None +frame_lock = threading.Lock() + +# Thread capture preview +preview_active = True + + +def capture_preview(): + """Thread de capture frame gauche pour MJPEG.""" + global last_frame_left + cap = cv2.VideoCapture(0) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) + while preview_active: + ret, frame = cap.read() + if ret: + with frame_lock: + last_frame_left = frame.copy() + time.sleep(0.033) # ~30 fps + cap.release() + + +preview_thread = threading.Thread(target=capture_preview, daemon=True) +preview_thread.start() + +# Template HTML Bootstrap +HTML_TEMPLATE = """ + + + + + + SLAM Stéréo BlueOS + + + + +
+

SLAM Stéréo — BlueOS

+ +
+ + +
+
+
Statut
+

SLAM: + {{ 'Actif' if slam_running else 'Arrêté' }}

+

Calibration: + {{ 'OK' if calib_ok else 'Manquante' }}

+
+
+ + +
+
+
Contrôles SLAM
+ Démarrer + Arrêter +
+
+ + +
+
+
Calibration
+ + Lancer calibration + + {% if calib_log %} +
+{{ calib_log }}
+ {% endif %} +
+
+ + +
+
+
Preview — Caméra Gauche
+ stream +
+
+ +
+
+ + +""" + + +def calib_ok(): + """Vérifie si stereo_calib.yaml existe.""" + return os.path.isfile(CONFIG_YAML) + + +def slam_running(): + """Vérifie si le process ORB-SLAM3 tourne.""" + global slam_process + return slam_process is not None and slam_process.poll() is None + + +calib_log_cache = "" + + +@app.route("/") +def index(): + """Dashboard principal.""" + return render_template_string( + HTML_TEMPLATE, + slam_running=slam_running(), + calib_ok=calib_ok(), + calib_log=calib_log_cache, + ) + + +@app.route("/calibrate") +def calibrate(): + """Lance le script de calibration stéréo en arrière-plan.""" + global calib_process, calib_log_cache + if calib_process and calib_process.poll() is None: + calib_log_cache = "[INFO] Calibration déjà en cours..." + return redirect(url_for("index")) + + calib_process = subprocess.Popen( + [sys.executable, CALIB_SCRIPT], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + text=True + ) + + def read_log(proc): + global calib_log_cache + lines = [] + for line in proc.stdout: + lines.append(line.strip()) + calib_log_cache = "\n".join(lines[-10:]) # 10 dernières lignes + proc.wait() + + threading.Thread(target=read_log, args=(calib_process,), daemon=True).start() + calib_log_cache = "[INFO] Calibration lancée..." + return redirect(url_for("index")) + + +@app.route("/slam/start") +def slam_start(): + """Démarre ORB-SLAM3 (wrapper à implémenter dans src/slam/).""" + global slam_process + slam_wrapper = os.path.join(BASE_DIR, "..", "slam", "slam_node.py") + if not os.path.isfile(slam_wrapper): + return jsonify({"status": "error", "msg": "slam_node.py introuvable"}), 404 + + if slam_running(): + return jsonify({"status": "already_running"}) + + slam_process = subprocess.Popen( + [sys.executable, slam_wrapper], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + return redirect(url_for("index")) + + +@app.route("/slam/stop") +def slam_stop(): + """Arrête ORB-SLAM3.""" + global slam_process + if slam_process and slam_process.poll() is None: + slam_process.terminate() + slam_process.wait(timeout=5) + slam_process = None + return redirect(url_for("index")) + + +def generate_mjpeg(): + """Génère un flux MJPEG à partir de la caméra gauche.""" + while True: + with frame_lock: + frame = last_frame_left + + if frame is None: + # Frame vide si caméra pas disponible + blank = np.zeros((240, 320, 3), dtype=np.uint8) + cv2.putText(blank, "No Camera", (60, 120), + cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) + frame = blank + + ret, buf = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 70]) + if ret: + yield (b"--frame\r\n" + b"Content-Type: image/jpeg\r\n\r\n" + buf.tobytes() + b"\r\n") + time.sleep(0.033) + + +@app.route("/stream") +def stream(): + """Flux MJPEG — caméra gauche.""" + return Response( + generate_mjpeg(), + mimetype="multipart/x-mixed-replace; boundary=frame" + ) + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=5000, debug=False, threaded=True) diff --git a/src/slam/slam_node.py b/src/slam/slam_node.py new file mode 100644 index 0000000..fe2584c --- /dev/null +++ b/src/slam/slam_node.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +""" +Wrapper ORB-SLAM3 — nœud Python minimal. +Lit les frames stéréo, appelle ORB-SLAM3 via bindings C++ (à compiler). +Ce fichier est un placeholder fonctionnel jusqu'à compilation ORB-SLAM3. +""" + +import cv2 +import time +import os +import sys +import yaml +import numpy as np + +CONFIG_YAML = os.path.join( + os.path.dirname(__file__), "..", "..", "config", "stereo_calib.yaml" +) + +def load_calib(path): + """Charge les paramètres de calibration stéréo.""" + if not os.path.isfile(path): + print(f"[ERREUR] {path} introuvable. Lancer la calibration d'abord.") + sys.exit(1) + with open(path) as f: + return yaml.safe_load(f) + + +def rectify_pair(frame_l, frame_r, calib): + """Rectifie une paire stéréo.""" + sz = tuple(calib["image_size"]) + K1 = np.array(calib["K1"]) + D1 = np.array(calib["D1"]) + K2 = np.array(calib["K2"]) + D2 = np.array(calib["D2"]) + R1 = np.array(calib["R1"]) + R2 = np.array(calib["R2"]) + P1 = np.array(calib["P1"]) + P2 = np.array(calib["P2"]) + + map1_l, map2_l = cv2.initUndistortRectifyMap(K1, D1, R1, P1, sz, cv2.CV_16SC2) + map1_r, map2_r = cv2.initUndistortRectifyMap(K2, D2, R2, P2, sz, cv2.CV_16SC2) + + rect_l = cv2.remap(frame_l, map1_l, map2_l, cv2.INTER_LINEAR) + rect_r = cv2.remap(frame_r, map1_r, map2_r, cv2.INTER_LINEAR) + return rect_l, rect_r + + +def compute_disparity(rect_l, rect_r): + """Calcule la carte de disparité (SGBM).""" + gray_l = cv2.cvtColor(rect_l, cv2.COLOR_BGR2GRAY) + gray_r = cv2.cvtColor(rect_r, cv2.COLOR_BGR2GRAY) + + stereo = cv2.StereoSGBM_create( + minDisparity=0, + numDisparities=64, # multiple de 16 + blockSize=9, + P1=8 * 3 * 9 ** 2, + P2=32 * 3 * 9 ** 2, + disp12MaxDiff=1, + uniquenessRatio=10, + speckleWindowSize=100, + speckleRange=32, + mode=cv2.STEREO_SGBM_MODE_SGBM_3WAY + ) + disparity = stereo.compute(gray_l, gray_r).astype(np.float32) / 16.0 + return disparity + + +def main(): + print("[SLAM] Démarrage du nœud SLAM stéréo...") + calib = load_calib(CONFIG_YAML) + + cap_l = cv2.VideoCapture(0) + cap_r = cv2.VideoCapture(1) + + if not cap_l.isOpened() or not cap_r.isOpened(): + print("[ERREUR] Caméras non disponibles") + sys.exit(1) + + print("[SLAM] Caméras ouvertes. Ctrl+C pour arrêter.") + + try: + while True: + ret_l, frame_l = cap_l.read() + ret_r, frame_r = cap_r.read() + + if not ret_l or not ret_r: + time.sleep(0.05) + continue + + t = time.time() + + # Rectification + rect_l, rect_r = rectify_pair(frame_l, frame_r, calib) + + # Disparité (fallback si ORB-SLAM3 pas compilé) + disparity = compute_disparity(rect_l, rect_r) + + # TODO: appeler ORB-SLAM3 C++ bindings quand disponible + # pose = orbslam3.track_stereo(rect_l, rect_r, t) + + fps = 1.0 / (time.time() - t + 1e-9) + print(f"\r[SLAM] fps={fps:.1f} disp_mean={disparity[disparity>0].mean():.1f}px", + end="", flush=True) + + except KeyboardInterrupt: + print("\n[SLAM] Arrêt.") + finally: + cap_l.release() + cap_r.release() + + +if __name__ == "__main__": + main()