#!/usr/bin/env python3 """ DroneController - Template for GPS-denied landing logic. Supports multiple telemetry sources: 1. Custom JSON telemetry (/drone/telemetry) - PyBullet/Gazebo bridge 2. ArduPilot ROS 2 DDS topics (/ap/*) - Official ArduPilot integration Implement your algorithm in calculate_landing_maneuver(). Uses sensors: IMU, altimeter, camera, and landing pad detection. """ import json import math from typing import Dict, Any, Optional import rclpy from rclpy.node import Node from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy from geometry_msgs.msg import Twist, PoseStamped, TwistStamped from sensor_msgs.msg import Imu, BatteryState, NavSatFix from std_msgs.msg import String # Load configuration try: from config import CONTROLLER, DRONE, LANDING CONFIG_LOADED = True except ImportError: CONFIG_LOADED = False CONTROLLER = {"Kp_z": 0.5, "Kd_z": 0.3, "Kp_xy": 0.3, "Kd_xy": 0.2, "rate": 50} DRONE = {"max_thrust": 1.0, "max_pitch": 0.5, "max_roll": 0.5} LANDING = {"height_threshold": 0.1, "success_velocity": 0.1} def quaternion_to_euler(x: float, y: float, z: float, w: float) -> tuple: """Convert quaternion to Euler angles (roll, pitch, yaw).""" # Roll (x-axis rotation) sinr_cosp = 2.0 * (w * x + y * z) cosr_cosp = 1.0 - 2.0 * (x * x + y * y) roll = math.atan2(sinr_cosp, cosr_cosp) # Pitch (y-axis rotation) sinp = 2.0 * (w * y - z * x) if abs(sinp) >= 1: pitch = math.copysign(math.pi / 2, sinp) else: pitch = math.asin(sinp) # Yaw (z-axis rotation) siny_cosp = 2.0 * (w * z + x * y) cosy_cosp = 1.0 - 2.0 * (y * y + z * z) yaw = math.atan2(siny_cosp, cosy_cosp) return roll, pitch, yaw class DroneController(Node): """ Drone controller for GPS-denied landing. Supports multiple telemetry sources: - Legacy: /drone/telemetry (JSON String) - ArduPilot DDS: /ap/pose/filtered, /ap/imu/filtered, etc. """ def __init__(self, use_ardupilot_topics: bool = True): super().__init__('drone_controller') # Load from config self._control_rate = CONTROLLER.get("rate", 50.0) self._Kp_z = CONTROLLER.get("Kp_z", 0.5) self._Kd_z = CONTROLLER.get("Kd_z", 0.3) self._Kp_xy = CONTROLLER.get("Kp_xy", 0.3) self._Kd_xy = CONTROLLER.get("Kd_xy", 0.2) self._max_thrust = DRONE.get("max_thrust", 1.0) self._max_pitch = DRONE.get("max_pitch", 0.5) self._max_roll = DRONE.get("max_roll", 0.5) self._landing_height = LANDING.get("height_threshold", 0.1) self._landing_velocity = LANDING.get("success_velocity", 0.1) # Mode selection self._use_ardupilot = use_ardupilot_topics self.get_logger().info('=' * 50) self.get_logger().info('Drone Controller Starting (GPS-Denied)...') if CONFIG_LOADED: self.get_logger().info(' Configuration loaded from config.py') self.get_logger().info(f' Mode: {"ArduPilot DDS" if use_ardupilot_topics else "Legacy JSON"}') self.get_logger().info('=' * 50) # State variables self._latest_telemetry: Optional[Dict[str, Any]] = None self._rover_telemetry: Optional[Dict[str, Any]] = None self._telemetry_received = False self._landing_complete = False # ArduPilot state (built from DDS topics) self._ap_pose: Optional[PoseStamped] = None self._ap_twist: Optional[TwistStamped] = None self._ap_imu: Optional[Imu] = None self._ap_battery: Optional[BatteryState] = None # QoS for sensor topics sensor_qos = QoSProfile( reliability=ReliabilityPolicy.BEST_EFFORT, history=HistoryPolicy.KEEP_LAST, depth=1 ) if use_ardupilot_topics: self._setup_ardupilot_subscriptions(sensor_qos) else: self._setup_legacy_subscriptions() # Always subscribe to rover telemetry (for moving landing pad) self._rover_telemetry_sub = self.create_subscription( String, '/rover/telemetry', self._rover_telemetry_callback, 10 ) self.get_logger().info(' Subscribed to: /rover/telemetry') # Command publisher self._cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10) self.get_logger().info(' Publishing to: /cmd_vel') # Control loop control_period = 1.0 / self._control_rate self._control_timer = self.create_timer(control_period, self._control_loop) self.get_logger().info(f' Control loop: {self._control_rate} Hz') self.get_logger().info('Drone Controller Ready!') self.get_logger().info('Sensors: IMU, Altimeter, Velocity, Landing Pad Detection') def _setup_ardupilot_subscriptions(self, qos: QoSProfile): """Set up subscriptions to official ArduPilot ROS 2 topics.""" # Pose (position + orientation) self._ap_pose_sub = self.create_subscription( PoseStamped, '/ap/pose/filtered', self._ap_pose_callback, qos ) self.get_logger().info(' Subscribed to: /ap/pose/filtered') # Twist (velocity) self._ap_twist_sub = self.create_subscription( TwistStamped, '/ap/twist/filtered', self._ap_twist_callback, qos ) self.get_logger().info(' Subscribed to: /ap/twist/filtered') # IMU self._ap_imu_sub = self.create_subscription( Imu, '/ap/imu/filtered', self._ap_imu_callback, qos ) self.get_logger().info(' Subscribed to: /ap/imu/filtered') # Battery self._ap_battery_sub = self.create_subscription( BatteryState, '/ap/battery', self._ap_battery_callback, qos ) self.get_logger().info(' Subscribed to: /ap/battery') def _setup_legacy_subscriptions(self): """Set up subscriptions to legacy JSON telemetry.""" self._telemetry_sub = self.create_subscription( String, '/drone/telemetry', self._telemetry_callback, 10 ) self.get_logger().info(' Subscribed to: /drone/telemetry') # ArduPilot topic callbacks def _ap_pose_callback(self, msg: PoseStamped): self._ap_pose = msg if not self._telemetry_received: self._telemetry_received = True self._warmup_count = 0 self.get_logger().info('First ArduPilot pose received!') self._build_telemetry_from_ardupilot() def _ap_twist_callback(self, msg: TwistStamped): self._ap_twist = msg self._build_telemetry_from_ardupilot() def _ap_imu_callback(self, msg: Imu): self._ap_imu = msg self._build_telemetry_from_ardupilot() def _ap_battery_callback(self, msg: BatteryState): self._ap_battery = msg self._build_telemetry_from_ardupilot() def _build_telemetry_from_ardupilot(self): """Build telemetry dict from ArduPilot topics.""" telemetry = {} # Position and orientation from pose if self._ap_pose: pos = self._ap_pose.pose.position ori = self._ap_pose.pose.orientation roll, pitch, yaw = quaternion_to_euler(ori.x, ori.y, ori.z, ori.w) telemetry['position'] = { 'x': pos.x, 'y': pos.y, 'z': pos.z } telemetry['altimeter'] = { 'altitude': pos.z, 'vertical_velocity': self._ap_twist.twist.linear.z if self._ap_twist else 0.0 } telemetry['imu'] = { 'orientation': {'roll': roll, 'pitch': pitch, 'yaw': yaw}, 'angular_velocity': {'x': 0, 'y': 0, 'z': 0} } # Velocity from twist if self._ap_twist: twist = self._ap_twist.twist telemetry['velocity'] = { 'x': twist.linear.x, 'y': twist.linear.y, 'z': twist.linear.z } if 'altimeter' in telemetry: telemetry['altimeter']['vertical_velocity'] = twist.linear.z # Angular velocity from IMU if self._ap_imu: if 'imu' not in telemetry: telemetry['imu'] = {} telemetry['imu']['angular_velocity'] = { 'x': self._ap_imu.angular_velocity.x, 'y': self._ap_imu.angular_velocity.y, 'z': self._ap_imu.angular_velocity.z } # Battery if self._ap_battery: telemetry['battery'] = { 'voltage': self._ap_battery.voltage, 'remaining': self._ap_battery.percentage * 100 } # Landing pad detection (placeholder - would need vision processing) # For now, calculate from rover telemetry if available if self._rover_telemetry and self._ap_pose: rover_pos = self._rover_telemetry.get('position', {}) rx = rover_pos.get('x', 0) ry = rover_pos.get('y', 0) dx = self._ap_pose.pose.position.x dy = self._ap_pose.pose.position.y dz = self._ap_pose.pose.position.z rel_x = rx - dx rel_y = ry - dy distance = math.sqrt(rel_x**2 + rel_y**2 + dz**2) telemetry['landing_pad'] = { 'relative_x': rel_x, 'relative_y': rel_y, 'distance': distance, 'confidence': 1.0 if distance < 10.0 else 0.0 } self._latest_telemetry = telemetry # Legacy callbacks def _telemetry_callback(self, msg: String) -> None: try: self._latest_telemetry = json.loads(msg.data) if not self._telemetry_received: self._telemetry_received = True self._warmup_count = 0 self.get_logger().info('First telemetry received!') except json.JSONDecodeError as e: self.get_logger().warning(f'Failed to parse telemetry: {e}') def _rover_telemetry_callback(self, msg: String) -> None: try: self._rover_telemetry = json.loads(msg.data) except json.JSONDecodeError: pass def _control_loop(self) -> None: if self._latest_telemetry is None: return if self._landing_complete: return # Warmup period - wait for stable telemetry before checking landing if not hasattr(self, '_warmup_count'): self._warmup_count = 0 self._warmup_count += 1 if self._warmup_count < 100: # Wait ~2 seconds at 50Hz # Still fly with controller, just don't check for landing yet pass elif self._check_landing_complete(): self._landing_complete = True self.get_logger().info('=' * 50) self.get_logger().info('LANDING COMPLETE!') self.get_logger().info('=' * 50) self._publish_command(0.0, 0.0, 0.0, 0.0) return thrust, pitch, roll, yaw = self.calculate_landing_maneuver( self._latest_telemetry, self._rover_telemetry ) thrust = max(min(thrust, self._max_thrust), -self._max_thrust) pitch = max(min(pitch, self._max_pitch), -self._max_pitch) roll = max(min(roll, self._max_roll), -self._max_roll) yaw = max(min(yaw, 0.5), -0.5) self._publish_command(thrust, pitch, roll, yaw) def _check_landing_complete(self) -> bool: if self._latest_telemetry is None: return False try: altimeter = self._latest_telemetry.get('altimeter', {}) altitude = altimeter.get('altitude', float('inf')) vertical_velocity = abs(altimeter.get('vertical_velocity', float('inf'))) return altitude < self._landing_height and vertical_velocity < self._landing_velocity except (KeyError, TypeError): return False def calculate_landing_maneuver( self, telemetry: Dict[str, Any], rover_telemetry: Optional[Dict[str, Any]] = None ) -> tuple[float, float, float, float]: """ Calculate control commands for GPS-denied landing. Available sensors in telemetry: - imu.orientation: {roll, pitch, yaw} in radians - imu.angular_velocity: {x, y, z} in rad/s - altimeter.altitude: height in meters - altimeter.vertical_velocity: vertical speed in m/s - velocity: {x, y, z} estimated velocity in m/s - landing_pad: relative position to pad (may be None) - relative_x, relative_y: offset in body frame - distance: vertical distance to pad - confidence: detection confidence (0-1) - camera: {width, height, fov, image} - image: base64 encoded JPEG (or None) - landed: bool Args: telemetry: Sensor data from drone rover_telemetry: Rover state (for moving target) - may be None Returns: Tuple of (thrust, pitch, roll, yaw) control values (-1.0 to 1.0) """ # Extract sensor data altimeter = telemetry.get('altimeter', {}) altitude = altimeter.get('altitude', 5.0) vertical_vel = altimeter.get('vertical_velocity', 0.0) velocity = telemetry.get('velocity', {'x': 0, 'y': 0, 'z': 0}) vel_x = velocity.get('x', 0.0) vel_y = velocity.get('y', 0.0) landing_pad = telemetry.get('landing_pad', None) # camera = telemetry.get('camera', None) # Descent control if altitude > 1.0: target_descent_rate = -0.5 else: target_descent_rate = -0.2 descent_error = target_descent_rate - vertical_vel thrust = self._Kp_z * descent_error # Horizontal control if landing_pad is not None: target_x = landing_pad.get('relative_x', 0.0) target_y = landing_pad.get('relative_y', 0.0) pitch = self._Kp_xy * target_x - self._Kd_xy * vel_x roll = self._Kp_xy * target_y - self._Kd_xy * vel_y else: pitch = -self._Kd_xy * vel_x roll = -self._Kd_xy * vel_y yaw = 0.0 return (thrust, pitch, roll, yaw) def _publish_command(self, thrust: float, pitch: float, roll: float, yaw: float) -> None: msg = Twist() msg.linear.z = thrust msg.linear.x = pitch msg.linear.y = roll msg.angular.z = yaw self._cmd_vel_pub.publish(msg) def main(args=None): import sys # Check for --ardupilot flag use_ardupilot = '--ardupilot' in sys.argv or '-a' in sys.argv # Remove our custom args before passing to rclpy filtered_args = [a for a in (args or sys.argv) if a not in ['--ardupilot', '-a']] rclpy.init(args=filtered_args) controller = None try: controller = DroneController(use_ardupilot_topics=use_ardupilot) rclpy.spin(controller) except KeyboardInterrupt: print('\nShutting down...') finally: if controller: controller.destroy_node() if rclpy.ok(): rclpy.shutdown() if __name__ == '__main__': main()