#!/usr/bin/env python3 """Classic Visual Odometry with OpenCV — lightweight CPU. Pipeline: 1. ORB features per frame + match consecutive (or KLT track keypoints) 2. Filter outliers via cv2.findEssentialMat RANSAC 3. cv2.recoverPose → R, t up-to-scale per pair 4. Concatenate to global pose chain 5. Output CSV (frame_idx, ts_s, x, y, z, qw, qx, qy, qz) Usage: python3 vo_classic_opencv.py --frames-dir /home/cosma/cosma-pipeline/data//frames// \ --start-iso 2026-05-05T08:33:41 --fps 1.0 --label GX039839 --out /tmp/vo_classic.csv \ --plot /tmp/vo_classic.png """ import argparse, csv, sys, math from pathlib import Path import numpy as np import cv2 from datetime import datetime def main(): ap = argparse.ArgumentParser() ap.add_argument('--frames-dir', required=True) ap.add_argument('--start-iso', default='2026-05-05T00:00:00') ap.add_argument('--fps', type=float, default=1.0) ap.add_argument('--label', default='segment') ap.add_argument('--out', required=True) ap.add_argument('--plot', default=None) ap.add_argument('--ref-csv', default=None, help='lingbot CSV to compare against (same segment)') ap.add_argument('--max-features', type=int, default=2000) ap.add_argument('--method', choices=['orb','klt'], default='klt') args = ap.parse_args() frames = sorted(Path(args.frames_dir).glob('frame_*.jpg')) if not frames: sys.exit(f'no frames in {args.frames_dir}') print(f'[vo] {len(frames)} frames in {args.frames_dir}', flush=True) # camera intrinsics for 518x294 GoPro wide @ ~122° hFOV W, H = 518, 294 # focal estimate from FOV fov_h_deg = 122.0 f = (W / 2.0) / math.tan(math.radians(fov_h_deg / 2.0)) cx, cy = W/2, H/2 K = np.array([[f, 0, cx], [0, f, cy], [0, 0, 1]], dtype=np.float64) print(f'[vo] K = focal={f:.1f}, cx={cx}, cy={cy}', flush=True) # Init pose R_world = np.eye(3) t_world = np.zeros((3, 1)) out_rows = [] out_rows.append({'label': args.label, 'frame_idx': 0, 'ts_s': datetime.fromisoformat(args.start_iso).timestamp(), 'x': 0.0, 'y': 0.0, 'z': 0.0, 'inliers': 0, 'tracked': 0}) prev_gray = cv2.imread(str(frames[0]), cv2.IMREAD_GRAYSCALE) if args.method == 'klt': # Initial corners via goodFeaturesToTrack prev_pts = cv2.goodFeaturesToTrack(prev_gray, maxCorners=args.max_features, qualityLevel=0.01, minDistance=7, blockSize=7) else: orb = cv2.ORB_create(args.max_features) prev_kp, prev_desc = orb.detectAndCompute(prev_gray, None) bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) t0 = datetime.fromisoformat(args.start_iso).timestamp() fail_count = 0 for i in range(1, len(frames)): curr_gray = cv2.imread(str(frames[i]), cv2.IMREAD_GRAYSCALE) if curr_gray is None: continue # 1. Match/track features if args.method == 'klt': if prev_pts is None or len(prev_pts) < 50: prev_pts = cv2.goodFeaturesToTrack(prev_gray, maxCorners=args.max_features, qualityLevel=0.01, minDistance=7, blockSize=7) if prev_pts is None or len(prev_pts) < 50: fail_count += 1 out_rows.append({'label': args.label, 'frame_idx': i, 'ts_s': t0 + i/args.fps, 'x': t_world[0,0], 'y': t_world[1,0], 'z': t_world[2,0], 'inliers': 0, 'tracked': 0}) prev_gray = curr_gray continue curr_pts, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, prev_pts, None, winSize=(21,21), maxLevel=3) good_prev = prev_pts[status.flatten() == 1] good_curr = curr_pts[status.flatten() == 1] n_tracked = len(good_prev) else: # orb curr_kp, curr_desc = orb.detectAndCompute(curr_gray, None) if prev_desc is None or curr_desc is None or len(curr_kp) < 50: fail_count += 1 out_rows.append({'label': args.label, 'frame_idx': i, 'ts_s': t0 + i/args.fps, 'x': t_world[0,0], 'y': t_world[1,0], 'z': t_world[2,0], 'inliers': 0, 'tracked': 0}) prev_gray = curr_gray; prev_kp = curr_kp; prev_desc = curr_desc continue matches = bf.match(prev_desc, curr_desc) matches = sorted(matches, key=lambda m: m.distance)[:500] good_prev = np.array([prev_kp[m.queryIdx].pt for m in matches], dtype=np.float32).reshape(-1, 1, 2) good_curr = np.array([curr_kp[m.trainIdx].pt for m in matches], dtype=np.float32).reshape(-1, 1, 2) n_tracked = len(matches) if n_tracked < 30: fail_count += 1 out_rows.append({'label': args.label, 'frame_idx': i, 'ts_s': t0 + i/args.fps, 'x': t_world[0,0], 'y': t_world[1,0], 'z': t_world[2,0], 'inliers': 0, 'tracked': n_tracked}) prev_gray = curr_gray if args.method == 'klt': prev_pts = cv2.goodFeaturesToTrack(curr_gray, maxCorners=args.max_features, qualityLevel=0.01, minDistance=7, blockSize=7) else: prev_kp, prev_desc = curr_kp, curr_desc continue # 2. Essential matrix + recoverPose try: E, mask = cv2.findEssentialMat(good_curr.reshape(-1,2), good_prev.reshape(-1,2), K, method=cv2.RANSAC, prob=0.999, threshold=1.0) if E is None or E.shape != (3,3): raise ValueError('bad E') _, R, t, mask_pose = cv2.recoverPose(E, good_curr.reshape(-1,2), good_prev.reshape(-1,2), K, mask=mask) n_inliers = int(mask_pose.sum()) if mask_pose is not None else 0 except Exception as e: fail_count += 1 out_rows.append({'label': args.label, 'frame_idx': i, 'ts_s': t0 + i/args.fps, 'x': t_world[0,0], 'y': t_world[1,0], 'z': t_world[2,0], 'inliers': 0, 'tracked': n_tracked}) prev_gray = curr_gray if args.method == 'klt': prev_pts = cv2.goodFeaturesToTrack(curr_gray, maxCorners=args.max_features, qualityLevel=0.01, minDistance=7, blockSize=7) else: prev_kp, prev_desc = curr_kp, curr_desc continue # 3. Update global pose : R_world = R_world @ R^T ; t_world = t_world - R_world @ t # (camera convention: R maps prev to curr in cam frame, t is unit baseline) # Pose update for VO: t_world = t_world + R_world @ t R_world = R_world @ R out_rows.append({'label': args.label, 'frame_idx': i, 'ts_s': t0 + i/args.fps, 'x': t_world[0,0], 'y': t_world[1,0], 'z': t_world[2,0], 'inliers': n_inliers, 'tracked': n_tracked}) # carry forward prev_gray = curr_gray if args.method == 'klt': prev_pts = cv2.goodFeaturesToTrack(curr_gray, maxCorners=args.max_features, qualityLevel=0.01, minDistance=7, blockSize=7) else: prev_kp, prev_desc = curr_kp, curr_desc if i % 100 == 0: print(f'[vo] frame {i}/{len(frames)} tracked={n_tracked} inliers={n_inliers} pos=({t_world[0,0]:.2f},{t_world[1,0]:.2f},{t_world[2,0]:.2f})', flush=True) print(f'[vo] done. Total frames: {len(out_rows)}. Failed pairs: {fail_count}', flush=True) with open(args.out, 'w', newline='') as f: w = csv.DictWriter(f, fieldnames=['label','frame_idx','ts_s','x','y','z','inliers','tracked']) w.writeheader(); w.writerows(out_rows) print(f'[out] {args.out}', flush=True) if args.plot: import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt x = [r['x'] for r in out_rows] y = [r['y'] for r in out_rows] z = [r['z'] for r in out_rows] fig, axes = plt.subplots(2, 2, figsize=(14, 12)) ax_xy, ax_xz, ax_yz, ax_qual = axes[0,0], axes[0,1], axes[1,0], axes[1,1] ax_xy.plot(x, y, '-b', linewidth=1, alpha=0.7, label='VO classic') ax_xy.plot(x[0], y[0], 'go', markersize=10, label='start') ax_xy.plot(x[-1], y[-1], 'r^', markersize=10, label='end') if args.ref_csv: try: with open(args.ref_csv) as ff: refrows = list(csv.DictReader(ff)) rx = [float(r['x']) for r in refrows if r.get('segment','') == args.label] ry = [float(r['y']) for r in refrows if r.get('segment','') == args.label] if rx: ax_xy.plot(rx, ry, '-r', linewidth=1, alpha=0.5, label='lingbot') except Exception as e: print(f'[plot] ref_csv load fail: {e}') ax_xy.set_xlabel('X (m, up-to-scale)'); ax_xy.set_ylabel('Y'); ax_xy.set_title(f'Top X-Y — VO classique {args.label}') ax_xy.set_aspect('equal'); ax_xy.legend(); ax_xy.grid(True, alpha=0.3) ax_xz.plot(x, z, '-b', linewidth=1) ax_xz.set_xlabel('X'); ax_xz.set_ylabel('Z'); ax_xz.set_title('Side X-Z'); ax_xz.set_aspect('equal'); ax_xz.grid(True, alpha=0.3) ax_yz.plot(y, z, '-b', linewidth=1) ax_yz.set_xlabel('Y'); ax_yz.set_ylabel('Z'); ax_yz.set_title('Side Y-Z'); ax_yz.set_aspect('equal'); ax_yz.grid(True, alpha=0.3) tracked = [r['tracked'] for r in out_rows] inliers = [r['inliers'] for r in out_rows] ax_qual.plot(tracked, label='tracked features', color='blue', alpha=0.6) ax_qual.plot(inliers, label='RANSAC inliers', color='red', alpha=0.6) ax_qual.set_xlabel('Frame'); ax_qual.set_ylabel('Count'); ax_qual.set_title('Tracking quality'); ax_qual.legend(); ax_qual.grid(True, alpha=0.3) plt.suptitle(f'Visual Odometry classique (OpenCV {args.method.upper()}) — {args.label}') plt.tight_layout() plt.savefig(args.plot, dpi=130, bbox_inches='tight') print(f'[plot] {args.plot}') if __name__ == '__main__': main()