Files
RDC_Simulation/gazebo_bridge.py
2025-12-31 23:50:26 +00:00

272 lines
8.8 KiB
Python

#!/usr/bin/env python3
"""
Gazebo Bridge - GPS-denied interface with camera.
Provides sensor data: IMU, altimeter, camera image.
"""
import base64
import json
import math
from typing import Optional, Dict, Any
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Image
from std_msgs.msg import String
class GazeboBridge(Node):
"""Bridges Gazebo topics to GPS-denied sensor interface with camera."""
CAMERA_FOV = 60.0
CAMERA_RANGE = 10.0
CAMERA_WIDTH = 320
CAMERA_HEIGHT = 240
def __init__(self):
super().__init__('gazebo_bridge')
self.get_logger().info('=' * 60)
self.get_logger().info('Gazebo Bridge Starting (GPS-Denied + Camera)...')
self.get_logger().info('=' * 60)
self._landed = False
self._step_count = 0
self._rover_pos = [0.0, 0.0, 0.15]
self._last_camera_image = None
self._drone_pos = [0.0, 0.0, 5.0]
self._drone_orn = [0.0, 0.0, 0.0, 1.0]
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
depth=10
)
self._cmd_vel_sub = self.create_subscription(
Twist, '/cmd_vel', self._cmd_vel_callback, 10
)
self.get_logger().info(' Subscribed to: /cmd_vel')
self._odom_sub = self.create_subscription(
Odometry, '/model/drone/odometry', self._odom_callback, sensor_qos
)
self.get_logger().info(' Subscribed to: /model/drone/odometry')
self._camera_sub = self.create_subscription(
Image, '/drone/camera', self._camera_callback, sensor_qos
)
self.get_logger().info(' Subscribed to: /drone/camera')
self._rover_sub = self.create_subscription(
String, '/rover/telemetry', self._rover_callback, 10
)
self.get_logger().info(' Subscribed to: /rover/telemetry')
self._gz_cmd_vel_pub = self.create_publisher(
Twist, '/drone/cmd_vel', 10
)
self.get_logger().info(' Publishing to: /drone/cmd_vel (Gazebo)')
self._telemetry_pub = self.create_publisher(
String, '/drone/telemetry', 10
)
self.get_logger().info(' Publishing to: /drone/telemetry')
self.get_logger().info('Gazebo Bridge Ready!')
def _cmd_vel_callback(self, msg: Twist) -> None:
gz_msg = Twist()
gz_msg.linear.x = msg.linear.x
gz_msg.linear.y = msg.linear.y
gz_msg.linear.z = msg.linear.z
gz_msg.angular.z = msg.angular.z
self._gz_cmd_vel_pub.publish(gz_msg)
def _rover_callback(self, msg: String) -> None:
try:
data = json.loads(msg.data)
pos = data.get('position', {})
self._rover_pos = [
pos.get('x', 0.0),
pos.get('y', 0.0),
pos.get('z', 0.15)
]
except json.JSONDecodeError:
pass
def _camera_callback(self, msg: Image) -> None:
"""Process camera images and encode as base64."""
try:
if msg.encoding == 'rgb8':
image = np.frombuffer(msg.data, dtype=np.uint8).reshape(
msg.height, msg.width, 3
)
elif msg.encoding == 'bgr8':
image = np.frombuffer(msg.data, dtype=np.uint8).reshape(
msg.height, msg.width, 3
)[:, :, ::-1]
else:
return
# Encode as base64 JPEG
try:
from PIL import Image as PILImage
import io
pil_img = PILImage.fromarray(image)
buffer = io.BytesIO()
pil_img.save(buffer, format='JPEG', quality=70)
self._last_camera_image = base64.b64encode(buffer.getvalue()).decode('utf-8')
except ImportError:
self._last_camera_image = base64.b64encode(image.tobytes()).decode('utf-8')
except Exception as e:
self.get_logger().warning(f'Camera processing error: {e}')
def _get_landing_pad_detection(self) -> Optional[Dict[str, float]]:
dx = self._rover_pos[0] - self._drone_pos[0]
dy = self._rover_pos[1] - self._drone_pos[1]
dz = self._rover_pos[2] - self._drone_pos[2]
horizontal_dist = math.sqrt(dx**2 + dy**2)
vertical_dist = -dz
if vertical_dist <= 0 or vertical_dist > self.CAMERA_RANGE:
return None
fov_rad = math.radians(self.CAMERA_FOV / 2)
max_horizontal = vertical_dist * math.tan(fov_rad)
if horizontal_dist > max_horizontal:
return None
roll, pitch, yaw = self._quaternion_to_euler(
self._drone_orn[0], self._drone_orn[1],
self._drone_orn[2], self._drone_orn[3]
)
cos_yaw = math.cos(-yaw)
sin_yaw = math.sin(-yaw)
relative_x = dx * cos_yaw - dy * sin_yaw
relative_y = dx * sin_yaw + dy * cos_yaw
angle = math.atan2(horizontal_dist, vertical_dist)
confidence = max(0.0, 1.0 - (angle / fov_rad))
confidence *= max(0.0, 1.0 - (vertical_dist / self.CAMERA_RANGE))
return {
"relative_x": round(relative_x, 4),
"relative_y": round(relative_y, 4),
"distance": round(vertical_dist, 4),
"confidence": round(confidence, 4)
}
def _odom_callback(self, msg: Odometry) -> None:
self._step_count += 1
pos = msg.pose.pose.position
vel = msg.twist.twist.linear
ang_vel = msg.twist.twist.angular
quat = msg.pose.pose.orientation
self._drone_pos = [pos.x, pos.y, pos.z]
self._drone_orn = [quat.x, quat.y, quat.z, quat.w]
roll, pitch, yaw = self._quaternion_to_euler(quat.x, quat.y, quat.z, quat.w)
landing_height = 0.5
self._landed = (
abs(pos.x - self._rover_pos[0]) < 0.5 and
abs(pos.y - self._rover_pos[1]) < 0.5 and
pos.z < landing_height and
abs(vel.z) < 0.2
)
pad_detection = self._get_landing_pad_detection()
telemetry = {
"imu": {
"orientation": {
"roll": round(roll, 4),
"pitch": round(pitch, 4),
"yaw": round(yaw, 4)
},
"angular_velocity": {
"x": round(ang_vel.x, 4),
"y": round(ang_vel.y, 4),
"z": round(ang_vel.z, 4)
},
"linear_acceleration": {
"x": 0.0,
"y": 0.0,
"z": 9.81
}
},
"altimeter": {
"altitude": round(pos.z, 4),
"vertical_velocity": round(vel.z, 4)
},
"velocity": {
"x": round(vel.x, 4),
"y": round(vel.y, 4),
"z": round(vel.z, 4)
},
"landing_pad": pad_detection,
"camera": {
"width": self.CAMERA_WIDTH,
"height": self.CAMERA_HEIGHT,
"fov": self.CAMERA_FOV,
"image": self._last_camera_image
},
"landed": self._landed,
"timestamp": round(self._step_count * 0.02, 4)
}
telemetry_msg = String()
telemetry_msg.data = json.dumps(telemetry)
self._telemetry_pub.publish(telemetry_msg)
def _quaternion_to_euler(self, x: float, y: float, z: float, w: float) -> tuple:
sinr_cosp = 2 * (w * x + y * z)
cosr_cosp = 1 - 2 * (x * x + y * y)
roll = math.atan2(sinr_cosp, cosr_cosp)
sinp = 2 * (w * y - z * x)
if abs(sinp) >= 1:
pitch = math.copysign(math.pi / 2, sinp)
else:
pitch = math.asin(sinp)
siny_cosp = 2 * (w * z + x * y)
cosy_cosp = 1 - 2 * (y * y + z * z)
yaw = math.atan2(siny_cosp, cosy_cosp)
return roll, pitch, yaw
def main(args=None):
print("\n" + "=" * 60)
print(" GAZEBO BRIDGE (GPS-DENIED + CAMERA)")
print("=" * 60 + "\n")
rclpy.init(args=args)
bridge_node = None
try:
bridge_node = GazeboBridge()
rclpy.spin(bridge_node)
except KeyboardInterrupt:
print('\nShutting down...')
finally:
if bridge_node is not None:
bridge_node.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()