Files
simulation/src/vision/optical_flow.py
2026-02-12 14:29:32 -05:00

86 lines
2.8 KiB
Python

#!/usr/bin/env python3
"""
Optical Flow - Velocity estimation from downward camera.
Lucas-Kanade sparse optical flow for GPS-denied velocity estimation.
"""
import time
import cv2
import numpy as np
class OpticalFlowEstimator:
def __init__(self, focal_length=500.0, min_altitude=0.3, max_altitude=10.0):
self.focal_length = focal_length
self.min_altitude = min_altitude
self.max_altitude = max_altitude
self.prev_frame = None
self.prev_points = None
self.prev_time = None
self.current_altitude = 1.0
self.velocity = np.zeros(2)
self.lk_params = dict(
winSize=(15, 15),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03),
)
self.feature_params = dict(
maxCorners=100, qualityLevel=0.3, minDistance=7, blockSize=7
)
self.on_velocity = None
print("[OF] Optical Flow Estimator initialized")
def set_altitude(self, altitude):
if self.min_altitude < altitude < self.max_altitude:
self.current_altitude = altitude
def process_frame(self, camera_name, frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
current_time = time.time()
if self.prev_frame is not None and self.prev_points is not None:
new_points, status, error = cv2.calcOpticalFlowPyrLK(
self.prev_frame, gray, self.prev_points, None, **self.lk_params
)
if new_points is not None:
good_new = new_points[status.flatten() == 1]
good_old = self.prev_points[status.flatten() == 1]
if len(good_new) > 10:
flow = good_new - good_old
avg_flow = np.mean(flow, axis=0)
if self.prev_time is not None:
dt = current_time - self.prev_time
if dt > 0:
pixel_velocity = avg_flow / dt
self.velocity[0] = (
pixel_velocity[0]
* self.current_altitude
/ self.focal_length
)
self.velocity[1] = (
pixel_velocity[1]
* self.current_altitude
/ self.focal_length
)
if self.on_velocity:
self.on_velocity(self.velocity)
self.prev_points = cv2.goodFeaturesToTrack(gray, **self.feature_params)
self.prev_frame = gray
self.prev_time = current_time
return self.velocity
def get_velocity(self):
return self.velocity.copy()