#!/usr/bin/env python3 """ Gazebo Bridge - GPS-denied interface with camera and rover control. Provides sensor data: IMU, altimeter, camera image. Controls rover movement via Gazebo pose commands. """ import base64 import json import math import subprocess import threading from typing import Optional, Dict, Any import numpy as np import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile, ReliabilityPolicy from geometry_msgs.msg import Twist, Pose from nav_msgs.msg import Odometry from sensor_msgs.msg import Image from std_msgs.msg import String class GazeboBridge(Node): """Bridges Gazebo topics to GPS-denied sensor interface with camera.""" CAMERA_FOV = 60.0 CAMERA_RANGE = 10.0 CAMERA_WIDTH = 320 CAMERA_HEIGHT = 240 def __init__(self): super().__init__('gazebo_bridge') self.get_logger().info('=' * 60) self.get_logger().info('Gazebo Bridge Starting (GPS-Denied + Camera)...') self.get_logger().info('=' * 60) self._landed = False self._step_count = 0 self._rover_pos = [0.0, 0.0, 0.15] self._last_camera_image = None self._drone_pos = [0.0, 0.0, 5.0] self._drone_orn = [0.0, 0.0, 0.0, 1.0] # Detect gz vs ign command import shutil self._gz_cmd = 'gz' if shutil.which('gz') else 'ign' self.get_logger().info(f' Using Gazebo command: {self._gz_cmd}') sensor_qos = QoSProfile( reliability=ReliabilityPolicy.BEST_EFFORT, depth=10 ) self._cmd_vel_sub = self.create_subscription( Twist, '/cmd_vel', self._cmd_vel_callback, 10 ) self.get_logger().info(' Subscribed to: /cmd_vel') self._odom_sub = self.create_subscription( Odometry, '/model/drone/odometry', self._odom_callback, sensor_qos ) self.get_logger().info(' Subscribed to: /model/drone/odometry') self._camera_sub = self.create_subscription( Image, '/drone/camera', self._camera_callback, sensor_qos ) self.get_logger().info(' Subscribed to: /drone/camera') self._rover_sub = self.create_subscription( String, '/rover/telemetry', self._rover_callback, 10 ) self.get_logger().info(' Subscribed to: /rover/telemetry') self._gz_cmd_vel_pub = self.create_publisher( Twist, '/drone/cmd_vel', 10 ) self.get_logger().info(' Publishing to: /drone/cmd_vel (Gazebo)') self._telemetry_pub = self.create_publisher( String, '/drone/telemetry', 10 ) self.get_logger().info(' Publishing to: /drone/telemetry') self.get_logger().info('Gazebo Bridge Ready!') def _cmd_vel_callback(self, msg: Twist) -> None: gz_msg = Twist() gz_msg.linear.x = msg.linear.x gz_msg.linear.y = msg.linear.y gz_msg.linear.z = msg.linear.z gz_msg.angular.z = msg.angular.z self._gz_cmd_vel_pub.publish(gz_msg) def _rover_callback(self, msg: String) -> None: """Receive rover position and update in Gazebo.""" try: data = json.loads(msg.data) pos = data.get('position', {}) new_pos = [ pos.get('x', 0.0), pos.get('y', 0.0), pos.get('z', 0.15) ] # Only update if position changed significantly if (abs(new_pos[0] - self._rover_pos[0]) > 0.01 or abs(new_pos[1] - self._rover_pos[1]) > 0.01): self._rover_pos = new_pos # Move rover in Gazebo (async to avoid blocking) threading.Thread(target=self._move_rover_in_gazebo, daemon=True).start() except json.JSONDecodeError: pass def _move_rover_in_gazebo(self) -> None: """Send pose command to Gazebo to move the landing pad.""" try: x, y, z = self._rover_pos # Use Gazebo service to set model pose if self._gz_cmd == 'ign': # Ignition Gazebo (Fortress) cmd = [ 'ign', 'service', '-s', '/world/drone_landing_world/set_pose', '--reqtype', 'ignition.msgs.Pose', '--reptype', 'ignition.msgs.Boolean', '--timeout', '100', '--req', f'name: "landing_pad", position: {{x: {x}, y: {y}, z: {z}}}' ] else: # Newer Gazebo cmd = [ 'gz', 'service', '-s', '/world/drone_landing_world/set_pose', '--reqtype', 'gz.msgs.Pose', '--reptype', 'gz.msgs.Boolean', '--timeout', '100', '--req', f'name: "landing_pad", position: {{x: {x}, y: {y}, z: {z}}}' ] subprocess.run(cmd, capture_output=True, timeout=1.0) except subprocess.TimeoutExpired: pass except Exception as e: pass # Silently ignore errors to avoid spam def _camera_callback(self, msg: Image) -> None: """Process camera images and encode as base64.""" try: if msg.encoding == 'rgb8': image = np.frombuffer(msg.data, dtype=np.uint8).reshape( msg.height, msg.width, 3 ) elif msg.encoding == 'bgr8': image = np.frombuffer(msg.data, dtype=np.uint8).reshape( msg.height, msg.width, 3 )[:, :, ::-1] else: return # Encode as base64 JPEG try: from PIL import Image as PILImage import io pil_img = PILImage.fromarray(image) buffer = io.BytesIO() pil_img.save(buffer, format='JPEG', quality=70) self._last_camera_image = base64.b64encode(buffer.getvalue()).decode('utf-8') except ImportError: self._last_camera_image = base64.b64encode(image.tobytes()).decode('utf-8') except Exception as e: self.get_logger().warning(f'Camera processing error: {e}') def _get_landing_pad_detection(self) -> Optional[Dict[str, float]]: dx = self._rover_pos[0] - self._drone_pos[0] dy = self._rover_pos[1] - self._drone_pos[1] dz = self._rover_pos[2] - self._drone_pos[2] horizontal_dist = math.sqrt(dx**2 + dy**2) vertical_dist = -dz if vertical_dist <= 0 or vertical_dist > self.CAMERA_RANGE: return None fov_rad = math.radians(self.CAMERA_FOV / 2) max_horizontal = vertical_dist * math.tan(fov_rad) if horizontal_dist > max_horizontal: return None roll, pitch, yaw = self._quaternion_to_euler( self._drone_orn[0], self._drone_orn[1], self._drone_orn[2], self._drone_orn[3] ) cos_yaw = math.cos(-yaw) sin_yaw = math.sin(-yaw) relative_x = dx * cos_yaw - dy * sin_yaw relative_y = dx * sin_yaw + dy * cos_yaw angle = math.atan2(horizontal_dist, vertical_dist) confidence = max(0.0, 1.0 - (angle / fov_rad)) confidence *= max(0.0, 1.0 - (vertical_dist / self.CAMERA_RANGE)) return { "relative_x": round(relative_x, 4), "relative_y": round(relative_y, 4), "distance": round(vertical_dist, 4), "confidence": round(confidence, 4) } def _odom_callback(self, msg: Odometry) -> None: self._step_count += 1 pos = msg.pose.pose.position vel = msg.twist.twist.linear ang_vel = msg.twist.twist.angular quat = msg.pose.pose.orientation self._drone_pos = [pos.x, pos.y, pos.z] self._drone_orn = [quat.x, quat.y, quat.z, quat.w] roll, pitch, yaw = self._quaternion_to_euler(quat.x, quat.y, quat.z, quat.w) landing_height = 0.5 self._landed = ( abs(pos.x - self._rover_pos[0]) < 0.5 and abs(pos.y - self._rover_pos[1]) < 0.5 and pos.z < landing_height and abs(vel.z) < 0.2 ) pad_detection = self._get_landing_pad_detection() telemetry = { "imu": { "orientation": { "roll": round(roll, 4), "pitch": round(pitch, 4), "yaw": round(yaw, 4) }, "angular_velocity": { "x": round(ang_vel.x, 4), "y": round(ang_vel.y, 4), "z": round(ang_vel.z, 4) }, "linear_acceleration": { "x": 0.0, "y": 0.0, "z": 9.81 } }, "altimeter": { "altitude": round(pos.z, 4), "vertical_velocity": round(vel.z, 4) }, "velocity": { "x": round(vel.x, 4), "y": round(vel.y, 4), "z": round(vel.z, 4) }, "landing_pad": pad_detection, "camera": { "width": self.CAMERA_WIDTH, "height": self.CAMERA_HEIGHT, "fov": self.CAMERA_FOV, "image": self._last_camera_image }, "landed": self._landed, "timestamp": round(self._step_count * 0.02, 4) } telemetry_msg = String() telemetry_msg.data = json.dumps(telemetry) self._telemetry_pub.publish(telemetry_msg) def _quaternion_to_euler(self, x: float, y: float, z: float, w: float) -> tuple: sinr_cosp = 2 * (w * x + y * z) cosr_cosp = 1 - 2 * (x * x + y * y) roll = math.atan2(sinr_cosp, cosr_cosp) sinp = 2 * (w * y - z * x) if abs(sinp) >= 1: pitch = math.copysign(math.pi / 2, sinp) else: pitch = math.asin(sinp) siny_cosp = 2 * (w * z + x * y) cosy_cosp = 1 - 2 * (y * y + z * z) yaw = math.atan2(siny_cosp, cosy_cosp) return roll, pitch, yaw def main(args=None): print("\n" + "=" * 60) print(" GAZEBO BRIDGE (GPS-DENIED + CAMERA)") print("=" * 60 + "\n") rclpy.init(args=args) bridge_node = None try: bridge_node = GazeboBridge() rclpy.spin(bridge_node) except KeyboardInterrupt: print('\nShutting down...') finally: if bridge_node is not None: bridge_node.destroy_node() if rclpy.ok(): rclpy.shutdown() if __name__ == '__main__': main()