#!/usr/bin/env python3 """ ROS 2 Bridge for Drone Simulation - Connects PyBullet simulator to ROS 2. Usage: python ros_bridge.py --host --port """ import argparse import json import socket import threading from typing import Optional import rclpy from rclpy.node import Node from rclpy.executors import MultiThreadedExecutor from geometry_msgs.msg import Twist from std_msgs.msg import String class ROS2SimulatorBridge(Node): """Bridges ROS 2 topics to UDP-based simulator.""" DEFAULT_SIMULATOR_HOST = '127.0.0.1' DEFAULT_SIMULATOR_PORT = 5555 SOCKET_TIMEOUT = 0.1 RECEIVE_BUFFER_SIZE = 4096 CMD_VEL_TOPIC = '/cmd_vel' TELEMETRY_TOPIC = '/drone/telemetry' def __init__(self, simulator_host: str = None, simulator_port: int = None): super().__init__('ros2_simulator_bridge') self._simulator_host = simulator_host or self.DEFAULT_SIMULATOR_HOST self._simulator_port = simulator_port or self.DEFAULT_SIMULATOR_PORT self._telemetry_port = self._simulator_port + 1 self.get_logger().info('=' * 60) self.get_logger().info('ROS 2 Simulator Bridge Starting...') self.get_logger().info(f' Simulator Host: {self._simulator_host}') self.get_logger().info(f' Simulator Port: {self._simulator_port}') self.get_logger().info(f' Telemetry Port: {self._telemetry_port}') self.get_logger().info('=' * 60) self._socket: Optional[socket.socket] = None self._socket_lock = threading.Lock() self._init_socket() self._cmd_vel_sub = self.create_subscription( Twist, self.CMD_VEL_TOPIC, self._cmd_vel_callback, 10 ) self.get_logger().info(f' Subscribed to: {self.CMD_VEL_TOPIC}') self._telemetry_pub = self.create_publisher( String, self.TELEMETRY_TOPIC, 10 ) self.get_logger().info(f' Publishing to: {self.TELEMETRY_TOPIC}') self._running = True self._receiver_thread = threading.Thread( target=self._telemetry_receiver_loop, name='TelemetryReceiver', daemon=True ) self._receiver_thread.start() self.get_logger().info(' Telemetry receiver thread started') self.get_logger().info('ROS 2 Simulator Bridge Ready!') def _init_socket(self) -> None: try: self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._socket.settimeout(self.SOCKET_TIMEOUT) self._socket.bind(('0.0.0.0', self._telemetry_port)) self.get_logger().info( f' UDP socket initialized (listening on port {self._telemetry_port})' ) except socket.error as e: self.get_logger().error(f'Failed to initialize socket: {e}') raise def _cmd_vel_callback(self, msg: Twist) -> None: command = { 'thrust': msg.linear.z, 'yaw': msg.angular.z, 'pitch': msg.linear.x, 'roll': msg.linear.y } self._send_command_to_simulator(command) def _send_command_to_simulator(self, command: dict) -> bool: try: json_data = json.dumps(command).encode('utf-8') with self._socket_lock: if self._socket is None: return False self._socket.sendto( json_data, (self._simulator_host, self._simulator_port) ) return True except socket.error as e: self.get_logger().warning(f'Failed to send command: {e}') return False except json.JSONEncodeError as e: self.get_logger().error(f'JSON encoding error: {e}') return False def _telemetry_receiver_loop(self) -> None: self.get_logger().info('Telemetry receiver loop started') while self._running and rclpy.ok(): try: with self._socket_lock: if self._socket is None: continue sock = self._socket data, addr = sock.recvfrom(self.RECEIVE_BUFFER_SIZE) if data: json_str = data.decode('utf-8') try: json.loads(json_str) except json.JSONDecodeError as e: self.get_logger().warning( f'Received invalid JSON from simulator: {e}' ) continue msg = String() msg.data = json_str self._telemetry_pub.publish(msg) except socket.timeout: continue except socket.error as e: if self._running: self.get_logger().warning(f'Socket receive error: {e}') continue except Exception as e: self.get_logger().error(f'Unexpected error in receiver loop: {e}') continue def shutdown(self) -> None: self.get_logger().info('Shutting down ROS 2 Simulator Bridge...') self._running = False if self._receiver_thread.is_alive(): self._receiver_thread.join(timeout=1.0) with self._socket_lock: if self._socket is not None: try: self._socket.close() except socket.error: pass self._socket = None self.get_logger().info('Bridge shutdown complete') def parse_args(): parser = argparse.ArgumentParser( description='ROS 2 Bridge for Drone Simulation', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python ros_bridge.py python ros_bridge.py --host 192.168.1.100 python ros_bridge.py --host 192.168.1.100 --port 5555 """ ) parser.add_argument( '--host', '-H', type=str, default='127.0.0.1', help='IP address of the simulator (default: 127.0.0.1)' ) parser.add_argument( '--port', '-p', type=int, default=5555, help='UDP port of the simulator (default: 5555)' ) args, _ = parser.parse_known_args() return args def main(args=None): cli_args = parse_args() print(f"\n{'='*60}") print(f" Connecting to simulator at {cli_args.host}:{cli_args.port}") print(f"{'='*60}\n") rclpy.init(args=args) bridge_node = None try: bridge_node = ROS2SimulatorBridge( simulator_host=cli_args.host, simulator_port=cli_args.port ) executor = MultiThreadedExecutor(num_threads=2) executor.add_node(bridge_node) try: executor.spin() finally: executor.shutdown() except KeyboardInterrupt: print('\nKeyboard interrupt received, shutting down...') except Exception as e: print(f'Error: {e}') finally: if bridge_node is not None: bridge_node.shutdown() bridge_node.destroy_node() if rclpy.ok(): rclpy.shutdown() if __name__ == '__main__': main()