Camera Sim Update

This commit is contained in:
2026-02-12 14:29:32 -05:00
parent 0e427f3597
commit 92da41138b
27 changed files with 1353 additions and 1317 deletions

View File

@@ -1,173 +1,119 @@
#!/usr/bin/env python3
"""Visual Odometry - Camera-based position estimation without GPS."""
"""
Visual Odometry - Camera-based position estimation without GPS.
ORB/SIFT feature matching with essential matrix decomposition.
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import PoseStamped, TwistStamped
from cv_bridge import CvBridge
import time
import cv2
import numpy as np
from scipy.spatial.transform import Rotation
class VisualOdometryNode(Node):
def __init__(self):
super().__init__('visual_odometry_node')
self.bridge = CvBridge()
self.prev_frame = None
self.prev_keypoints = None
self.prev_descriptors = None
self.camera_matrix = None
self.dist_coeffs = None
self.current_pose = np.eye(4)
self.position = np.zeros(3)
self.orientation = np.eye(3)
self.prev_time = None
self.velocity = np.zeros(3)
self.detector_type = self.declare_parameter('detector_type', 'ORB').value
self.min_features = self.declare_parameter('min_features', 100).value
self.feature_quality = self.declare_parameter('feature_quality', 0.01).value
if self.detector_type == 'ORB':
class VisualOdometry:
def __init__(self, camera_matrix=None, detector_type="ORB", min_features=100):
self.detector_type = detector_type
self.min_features = min_features
if camera_matrix is not None:
self.camera_matrix = np.array(camera_matrix).reshape(3, 3)
else:
self.camera_matrix = np.array(
[[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]]
)
self.dist_coeffs = np.zeros(5)
if detector_type == "ORB":
self.detector = cv2.ORB_create(nfeatures=500)
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
elif self.detector_type == 'SIFT':
elif detector_type == "SIFT":
self.detector = cv2.SIFT_create(nfeatures=500)
self.matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
else:
self.detector = cv2.ORB_create(nfeatures=500)
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
self.image_sub = self.create_subscription(
Image, '/uav/camera/forward/image_raw', self.image_callback, 10)
self.camera_info_sub = self.create_subscription(
CameraInfo, '/uav/camera/forward/camera_info', self.camera_info_callback, 10)
self.pose_pub = self.create_publisher(PoseStamped, '/uav/visual_odometry/pose', 10)
self.velocity_pub = self.create_publisher(TwistStamped, '/uav/visual_odometry/velocity', 10)
self.timer = self.create_timer(0.033, self.publish_pose)
self.get_logger().info(f'Visual Odometry Node Started - {self.detector_type} detector')
def camera_info_callback(self, msg):
if self.camera_matrix is None:
self.camera_matrix = np.array(msg.k).reshape(3, 3)
self.dist_coeffs = np.array(msg.d)
self.get_logger().info('Camera calibration received')
def image_callback(self, msg):
try:
frame = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
keypoints, descriptors = self.detector.detectAndCompute(gray, None)
current_time = self.get_clock().now()
if self.prev_frame is not None and len(keypoints) >= self.min_features:
matches = self.match_features(self.prev_descriptors, descriptors)
if len(matches) >= self.min_features:
self.estimate_motion(self.prev_keypoints, keypoints, matches, current_time)
self.prev_frame = gray
self.prev_keypoints = keypoints
self.prev_descriptors = descriptors
self.prev_time = current_time
except Exception as e:
self.get_logger().error(f'Visual odometry error: {e}')
def match_features(self, desc1, desc2):
self.prev_frame = None
self.prev_keypoints = None
self.prev_descriptors = None
self.prev_time = None
self.position = np.zeros(3)
self.orientation = np.eye(3)
self.velocity = np.zeros(3)
self.on_pose = None
print(f"[VO] Visual Odometry initialized ({detector_type})")
def process_frame(self, camera_name, frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
keypoints, descriptors = self.detector.detectAndCompute(gray, None)
current_time = time.time()
if (
self.prev_frame is not None
and self.prev_descriptors is not None
and len(keypoints) >= self.min_features
):
matches = self._match_features(self.prev_descriptors, descriptors)
if len(matches) >= self.min_features:
self._estimate_motion(
self.prev_keypoints, keypoints, matches, current_time
)
self.prev_frame = gray
self.prev_keypoints = keypoints
self.prev_descriptors = descriptors
self.prev_time = current_time
def _match_features(self, desc1, desc2):
if desc1 is None or desc2 is None:
return []
matches = self.matcher.knnMatch(desc1, desc2, k=2)
good_matches = []
for match_pair in matches:
if len(match_pair) == 2:
m, n = match_pair
good = []
for pair in matches:
if len(pair) == 2:
m, n = pair
if m.distance < 0.7 * n.distance:
good_matches.append(m)
return good_matches
def estimate_motion(self, prev_kp, curr_kp, matches, current_time):
if self.camera_matrix is None or len(matches) < 5:
good.append(m)
return good
def _estimate_motion(self, prev_kp, curr_kp, matches, current_time):
if len(matches) < 5:
return
pts1 = np.float32([prev_kp[m.queryIdx].pt for m in matches])
pts2 = np.float32([curr_kp[m.trainIdx].pt for m in matches])
E, mask = cv2.findEssentialMat(
pts1, pts2, self.camera_matrix,
method=cv2.RANSAC, prob=0.999, threshold=1.0)
pts1, pts2, self.camera_matrix, method=cv2.RANSAC, prob=0.999, threshold=1.0
)
if E is None:
return
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, self.camera_matrix, mask=mask)
scale = 1.0
translation = scale * t.flatten()
if self.prev_time is not None:
dt = (current_time - self.prev_time).nanoseconds / 1e9
dt = current_time - self.prev_time
if dt > 0:
self.velocity = translation / dt
self.position += self.orientation @ translation
self.orientation = R @ self.orientation
def publish_pose(self):
pose_msg = PoseStamped()
pose_msg.header.stamp = self.get_clock().now().to_msg()
pose_msg.header.frame_id = 'odom'
pose_msg.pose.position.x = float(self.position[0])
pose_msg.pose.position.y = float(self.position[1])
pose_msg.pose.position.z = float(self.position[2])
rotation = Rotation.from_matrix(self.orientation)
quat = rotation.as_quat()
pose_msg.pose.orientation.x = quat[0]
pose_msg.pose.orientation.y = quat[1]
pose_msg.pose.orientation.z = quat[2]
pose_msg.pose.orientation.w = quat[3]
self.pose_pub.publish(pose_msg)
vel_msg = TwistStamped()
vel_msg.header.stamp = self.get_clock().now().to_msg()
vel_msg.header.frame_id = 'odom'
vel_msg.twist.linear.x = float(self.velocity[0])
vel_msg.twist.linear.y = float(self.velocity[1])
vel_msg.twist.linear.z = float(self.velocity[2])
self.velocity_pub.publish(vel_msg)
if self.on_pose:
r = Rotation.from_matrix(self.orientation)
self.on_pose(self.position.copy(), r.as_quat())
def main(args=None):
rclpy.init(args=args)
node = VisualOdometryNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
def get_pose(self):
r = Rotation.from_matrix(self.orientation)
return self.position.copy(), r.as_quat()
if __name__ == '__main__':
main()
def get_velocity(self):
return self.velocity.copy()