198 lines
9.9 KiB
Python
198 lines
9.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Classic Visual Odometry with OpenCV — lightweight CPU.
|
|
Pipeline:
|
|
1. ORB features per frame + match consecutive (or KLT track keypoints)
|
|
2. Filter outliers via cv2.findEssentialMat RANSAC
|
|
3. cv2.recoverPose → R, t up-to-scale per pair
|
|
4. Concatenate to global pose chain
|
|
5. Output CSV (frame_idx, ts_s, x, y, z, qw, qx, qy, qz)
|
|
|
|
Usage:
|
|
python3 vo_classic_opencv.py --frames-dir /home/cosma/cosma-pipeline/data/<mission>/frames/<AUV>/<SEGMENT> \
|
|
--start-iso 2026-05-05T08:33:41 --fps 1.0 --label GX039839 --out /tmp/vo_classic.csv \
|
|
--plot /tmp/vo_classic.png
|
|
"""
|
|
import argparse, csv, sys, math
|
|
from pathlib import Path
|
|
import numpy as np
|
|
import cv2
|
|
from datetime import datetime
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument('--frames-dir', required=True)
|
|
ap.add_argument('--start-iso', default='2026-05-05T00:00:00')
|
|
ap.add_argument('--fps', type=float, default=1.0)
|
|
ap.add_argument('--label', default='segment')
|
|
ap.add_argument('--out', required=True)
|
|
ap.add_argument('--plot', default=None)
|
|
ap.add_argument('--ref-csv', default=None, help='lingbot CSV to compare against (same segment)')
|
|
ap.add_argument('--max-features', type=int, default=2000)
|
|
ap.add_argument('--method', choices=['orb','klt'], default='klt')
|
|
args = ap.parse_args()
|
|
|
|
frames = sorted(Path(args.frames_dir).glob('frame_*.jpg'))
|
|
if not frames:
|
|
sys.exit(f'no frames in {args.frames_dir}')
|
|
print(f'[vo] {len(frames)} frames in {args.frames_dir}', flush=True)
|
|
|
|
# camera intrinsics for 518x294 GoPro wide @ ~122° hFOV
|
|
W, H = 518, 294
|
|
# focal estimate from FOV
|
|
fov_h_deg = 122.0
|
|
f = (W / 2.0) / math.tan(math.radians(fov_h_deg / 2.0))
|
|
cx, cy = W/2, H/2
|
|
K = np.array([[f, 0, cx], [0, f, cy], [0, 0, 1]], dtype=np.float64)
|
|
print(f'[vo] K = focal={f:.1f}, cx={cx}, cy={cy}', flush=True)
|
|
|
|
# Init pose
|
|
R_world = np.eye(3)
|
|
t_world = np.zeros((3, 1))
|
|
|
|
out_rows = []
|
|
out_rows.append({'label': args.label, 'frame_idx': 0, 'ts_s': datetime.fromisoformat(args.start_iso).timestamp(),
|
|
'x': 0.0, 'y': 0.0, 'z': 0.0, 'inliers': 0, 'tracked': 0})
|
|
|
|
prev_gray = cv2.imread(str(frames[0]), cv2.IMREAD_GRAYSCALE)
|
|
if args.method == 'klt':
|
|
# Initial corners via goodFeaturesToTrack
|
|
prev_pts = cv2.goodFeaturesToTrack(prev_gray, maxCorners=args.max_features, qualityLevel=0.01, minDistance=7, blockSize=7)
|
|
else:
|
|
orb = cv2.ORB_create(args.max_features)
|
|
prev_kp, prev_desc = orb.detectAndCompute(prev_gray, None)
|
|
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
|
|
|
|
t0 = datetime.fromisoformat(args.start_iso).timestamp()
|
|
fail_count = 0
|
|
|
|
for i in range(1, len(frames)):
|
|
curr_gray = cv2.imread(str(frames[i]), cv2.IMREAD_GRAYSCALE)
|
|
if curr_gray is None: continue
|
|
|
|
# 1. Match/track features
|
|
if args.method == 'klt':
|
|
if prev_pts is None or len(prev_pts) < 50:
|
|
prev_pts = cv2.goodFeaturesToTrack(prev_gray, maxCorners=args.max_features, qualityLevel=0.01, minDistance=7, blockSize=7)
|
|
if prev_pts is None or len(prev_pts) < 50:
|
|
fail_count += 1
|
|
out_rows.append({'label': args.label, 'frame_idx': i, 'ts_s': t0 + i/args.fps,
|
|
'x': t_world[0,0], 'y': t_world[1,0], 'z': t_world[2,0], 'inliers': 0, 'tracked': 0})
|
|
prev_gray = curr_gray
|
|
continue
|
|
curr_pts, status, err = cv2.calcOpticalFlowPyrLK(prev_gray, curr_gray, prev_pts, None, winSize=(21,21), maxLevel=3)
|
|
good_prev = prev_pts[status.flatten() == 1]
|
|
good_curr = curr_pts[status.flatten() == 1]
|
|
n_tracked = len(good_prev)
|
|
else: # orb
|
|
curr_kp, curr_desc = orb.detectAndCompute(curr_gray, None)
|
|
if prev_desc is None or curr_desc is None or len(curr_kp) < 50:
|
|
fail_count += 1
|
|
out_rows.append({'label': args.label, 'frame_idx': i, 'ts_s': t0 + i/args.fps,
|
|
'x': t_world[0,0], 'y': t_world[1,0], 'z': t_world[2,0], 'inliers': 0, 'tracked': 0})
|
|
prev_gray = curr_gray; prev_kp = curr_kp; prev_desc = curr_desc
|
|
continue
|
|
matches = bf.match(prev_desc, curr_desc)
|
|
matches = sorted(matches, key=lambda m: m.distance)[:500]
|
|
good_prev = np.array([prev_kp[m.queryIdx].pt for m in matches], dtype=np.float32).reshape(-1, 1, 2)
|
|
good_curr = np.array([curr_kp[m.trainIdx].pt for m in matches], dtype=np.float32).reshape(-1, 1, 2)
|
|
n_tracked = len(matches)
|
|
|
|
if n_tracked < 30:
|
|
fail_count += 1
|
|
out_rows.append({'label': args.label, 'frame_idx': i, 'ts_s': t0 + i/args.fps,
|
|
'x': t_world[0,0], 'y': t_world[1,0], 'z': t_world[2,0], 'inliers': 0, 'tracked': n_tracked})
|
|
prev_gray = curr_gray
|
|
if args.method == 'klt':
|
|
prev_pts = cv2.goodFeaturesToTrack(curr_gray, maxCorners=args.max_features, qualityLevel=0.01, minDistance=7, blockSize=7)
|
|
else:
|
|
prev_kp, prev_desc = curr_kp, curr_desc
|
|
continue
|
|
|
|
# 2. Essential matrix + recoverPose
|
|
try:
|
|
E, mask = cv2.findEssentialMat(good_curr.reshape(-1,2), good_prev.reshape(-1,2), K, method=cv2.RANSAC, prob=0.999, threshold=1.0)
|
|
if E is None or E.shape != (3,3):
|
|
raise ValueError('bad E')
|
|
_, R, t, mask_pose = cv2.recoverPose(E, good_curr.reshape(-1,2), good_prev.reshape(-1,2), K, mask=mask)
|
|
n_inliers = int(mask_pose.sum()) if mask_pose is not None else 0
|
|
except Exception as e:
|
|
fail_count += 1
|
|
out_rows.append({'label': args.label, 'frame_idx': i, 'ts_s': t0 + i/args.fps,
|
|
'x': t_world[0,0], 'y': t_world[1,0], 'z': t_world[2,0], 'inliers': 0, 'tracked': n_tracked})
|
|
prev_gray = curr_gray
|
|
if args.method == 'klt':
|
|
prev_pts = cv2.goodFeaturesToTrack(curr_gray, maxCorners=args.max_features, qualityLevel=0.01, minDistance=7, blockSize=7)
|
|
else:
|
|
prev_kp, prev_desc = curr_kp, curr_desc
|
|
continue
|
|
|
|
# 3. Update global pose : R_world = R_world @ R^T ; t_world = t_world - R_world @ t
|
|
# (camera convention: R maps prev to curr in cam frame, t is unit baseline)
|
|
# Pose update for VO:
|
|
t_world = t_world + R_world @ t
|
|
R_world = R_world @ R
|
|
|
|
out_rows.append({'label': args.label, 'frame_idx': i, 'ts_s': t0 + i/args.fps,
|
|
'x': t_world[0,0], 'y': t_world[1,0], 'z': t_world[2,0],
|
|
'inliers': n_inliers, 'tracked': n_tracked})
|
|
|
|
# carry forward
|
|
prev_gray = curr_gray
|
|
if args.method == 'klt':
|
|
prev_pts = cv2.goodFeaturesToTrack(curr_gray, maxCorners=args.max_features, qualityLevel=0.01, minDistance=7, blockSize=7)
|
|
else:
|
|
prev_kp, prev_desc = curr_kp, curr_desc
|
|
|
|
if i % 100 == 0:
|
|
print(f'[vo] frame {i}/{len(frames)} tracked={n_tracked} inliers={n_inliers} pos=({t_world[0,0]:.2f},{t_world[1,0]:.2f},{t_world[2,0]:.2f})', flush=True)
|
|
|
|
print(f'[vo] done. Total frames: {len(out_rows)}. Failed pairs: {fail_count}', flush=True)
|
|
|
|
with open(args.out, 'w', newline='') as f:
|
|
w = csv.DictWriter(f, fieldnames=['label','frame_idx','ts_s','x','y','z','inliers','tracked'])
|
|
w.writeheader(); w.writerows(out_rows)
|
|
print(f'[out] {args.out}', flush=True)
|
|
|
|
if args.plot:
|
|
import matplotlib
|
|
matplotlib.use('Agg')
|
|
import matplotlib.pyplot as plt
|
|
x = [r['x'] for r in out_rows]
|
|
y = [r['y'] for r in out_rows]
|
|
z = [r['z'] for r in out_rows]
|
|
fig, axes = plt.subplots(2, 2, figsize=(14, 12))
|
|
ax_xy, ax_xz, ax_yz, ax_qual = axes[0,0], axes[0,1], axes[1,0], axes[1,1]
|
|
ax_xy.plot(x, y, '-b', linewidth=1, alpha=0.7, label='VO classic')
|
|
ax_xy.plot(x[0], y[0], 'go', markersize=10, label='start')
|
|
ax_xy.plot(x[-1], y[-1], 'r^', markersize=10, label='end')
|
|
if args.ref_csv:
|
|
try:
|
|
with open(args.ref_csv) as ff:
|
|
refrows = list(csv.DictReader(ff))
|
|
rx = [float(r['x']) for r in refrows if r.get('segment','') == args.label]
|
|
ry = [float(r['y']) for r in refrows if r.get('segment','') == args.label]
|
|
if rx:
|
|
ax_xy.plot(rx, ry, '-r', linewidth=1, alpha=0.5, label='lingbot')
|
|
except Exception as e:
|
|
print(f'[plot] ref_csv load fail: {e}')
|
|
ax_xy.set_xlabel('X (m, up-to-scale)'); ax_xy.set_ylabel('Y'); ax_xy.set_title(f'Top X-Y — VO classique {args.label}')
|
|
ax_xy.set_aspect('equal'); ax_xy.legend(); ax_xy.grid(True, alpha=0.3)
|
|
|
|
ax_xz.plot(x, z, '-b', linewidth=1)
|
|
ax_xz.set_xlabel('X'); ax_xz.set_ylabel('Z'); ax_xz.set_title('Side X-Z'); ax_xz.set_aspect('equal'); ax_xz.grid(True, alpha=0.3)
|
|
ax_yz.plot(y, z, '-b', linewidth=1)
|
|
ax_yz.set_xlabel('Y'); ax_yz.set_ylabel('Z'); ax_yz.set_title('Side Y-Z'); ax_yz.set_aspect('equal'); ax_yz.grid(True, alpha=0.3)
|
|
|
|
tracked = [r['tracked'] for r in out_rows]
|
|
inliers = [r['inliers'] for r in out_rows]
|
|
ax_qual.plot(tracked, label='tracked features', color='blue', alpha=0.6)
|
|
ax_qual.plot(inliers, label='RANSAC inliers', color='red', alpha=0.6)
|
|
ax_qual.set_xlabel('Frame'); ax_qual.set_ylabel('Count'); ax_qual.set_title('Tracking quality'); ax_qual.legend(); ax_qual.grid(True, alpha=0.3)
|
|
|
|
plt.suptitle(f'Visual Odometry classique (OpenCV {args.method.upper()}) — {args.label}')
|
|
plt.tight_layout()
|
|
plt.savefig(args.plot, dpi=130, bbox_inches='tight')
|
|
print(f'[plot] {args.plot}')
|
|
|
|
if __name__ == '__main__': main()
|