#!/usr/bin/env python3 """ DroneController - Template for GPS-denied landing logic. Implement your algorithm in calculate_landing_maneuver(). Uses sensors: IMU, altimeter, camera, and landing pad detection. """ import json from typing import Dict, Any, Optional import rclpy from rclpy.node import Node from geometry_msgs.msg import Twist from std_msgs.msg import String class DroneController(Node): """Drone controller for GPS-denied landing.""" CONTROL_RATE = 50.0 LANDING_HEIGHT_THRESHOLD = 0.1 LANDING_VELOCITY_THRESHOLD = 0.1 MAX_THRUST = 1.0 MAX_PITCH = 0.5 MAX_ROLL = 0.5 MAX_YAW = 0.5 def __init__(self): super().__init__('drone_controller') self.get_logger().info('=' * 50) self.get_logger().info('Drone Controller Starting (GPS-Denied)...') self.get_logger().info('=' * 50) self._latest_telemetry: Optional[Dict[str, Any]] = None self._rover_telemetry: Optional[Dict[str, Any]] = None self._telemetry_received = False self._landing_complete = False self._telemetry_sub = self.create_subscription( String, '/drone/telemetry', self._telemetry_callback, 10 ) self.get_logger().info(' Subscribed to: /drone/telemetry') self._rover_telemetry_sub = self.create_subscription( String, '/rover/telemetry', self._rover_telemetry_callback, 10 ) self.get_logger().info(' Subscribed to: /rover/telemetry') self._cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10) self.get_logger().info(' Publishing to: /cmd_vel') control_period = 1.0 / self.CONTROL_RATE self._control_timer = self.create_timer(control_period, self._control_loop) self.get_logger().info(f' Control loop: {self.CONTROL_RATE} Hz') self.get_logger().info('Drone Controller Ready!') self.get_logger().info('Sensors: IMU, Altimeter, Camera, Landing Pad Detection') def _telemetry_callback(self, msg: String) -> None: try: self._latest_telemetry = json.loads(msg.data) if not self._telemetry_received: self._telemetry_received = True self.get_logger().info('First telemetry received!') except json.JSONDecodeError as e: self.get_logger().warning(f'Failed to parse telemetry: {e}') def _rover_telemetry_callback(self, msg: String) -> None: try: self._rover_telemetry = json.loads(msg.data) except json.JSONDecodeError: pass def _control_loop(self) -> None: if self._latest_telemetry is None: return if self._landing_complete: return if self._check_landing_complete(): self._landing_complete = True self.get_logger().info('=' * 50) self.get_logger().info('LANDING COMPLETE!') self.get_logger().info('=' * 50) self._publish_command(0.0, 0.0, 0.0, 0.0) return thrust, pitch, roll, yaw = self.calculate_landing_maneuver( self._latest_telemetry, self._rover_telemetry ) thrust = max(min(thrust, self.MAX_THRUST), -self.MAX_THRUST) pitch = max(min(pitch, self.MAX_PITCH), -self.MAX_PITCH) roll = max(min(roll, self.MAX_ROLL), -self.MAX_ROLL) yaw = max(min(yaw, self.MAX_YAW), -self.MAX_YAW) self._publish_command(thrust, pitch, roll, yaw) def _check_landing_complete(self) -> bool: if self._latest_telemetry is None: return False try: altimeter = self._latest_telemetry.get('altimeter', {}) altitude = altimeter.get('altitude', float('inf')) vertical_velocity = abs(altimeter.get('vertical_velocity', float('inf'))) return altitude < self.LANDING_HEIGHT_THRESHOLD and vertical_velocity < self.LANDING_VELOCITY_THRESHOLD except (KeyError, TypeError): return False def calculate_landing_maneuver( self, telemetry: Dict[str, Any], rover_telemetry: Optional[Dict[str, Any]] = None ) -> tuple[float, float, float, float]: """ Calculate control commands for GPS-denied landing. Available sensors in telemetry: - imu.orientation: {roll, pitch, yaw} in radians - imu.angular_velocity: {x, y, z} in rad/s - altimeter.altitude: height in meters - altimeter.vertical_velocity: vertical speed in m/s - velocity: {x, y, z} estimated velocity in m/s - landing_pad: relative position to pad (may be None) - relative_x, relative_y: offset in body frame - distance: vertical distance to pad - confidence: detection confidence (0-1) - camera: {width, height, fov, image} - image: base64 encoded JPEG (or None) - landed: bool Args: telemetry: Sensor data from drone rover_telemetry: Rover state (for moving target) - may be None Returns: Tuple of (thrust, pitch, roll, yaw) control values (-1.0 to 1.0) """ # Extract sensor data altimeter = telemetry.get('altimeter', {}) altitude = altimeter.get('altitude', 5.0) vertical_vel = altimeter.get('vertical_velocity', 0.0) velocity = telemetry.get('velocity', {'x': 0, 'y': 0, 'z': 0}) vel_x = velocity.get('x', 0.0) vel_y = velocity.get('y', 0.0) landing_pad = telemetry.get('landing_pad', None) # Camera image is available in telemetry['camera']['image'] # Decode with: base64.b64decode(telemetry['camera']['image']) # Default target target_x = 0.0 target_y = 0.0 # Use landing pad detection for positioning if landing_pad is not None: target_x = landing_pad.get('relative_x', 0.0) target_y = landing_pad.get('relative_y', 0.0) # TODO: Implement your GPS-denied landing algorithm # You can use the camera image for custom vision processing # Simple PD controller for altitude target_altitude = 0.0 altitude_error = target_altitude - altitude Kp_z, Kd_z = 0.5, 0.3 thrust = Kp_z * altitude_error - Kd_z * vertical_vel # Horizontal control based on landing pad detection Kp_xy, Kd_xy = 0.3, 0.2 if landing_pad is not None: pitch = Kp_xy * target_x - Kd_xy * vel_x roll = Kp_xy * target_y - Kd_xy * vel_y else: pitch = -Kd_xy * vel_x roll = -Kd_xy * vel_y yaw = 0.0 return (thrust, pitch, roll, yaw) def _publish_command(self, thrust: float, pitch: float, roll: float, yaw: float) -> None: msg = Twist() msg.linear.z = thrust msg.linear.x = pitch msg.linear.y = roll msg.angular.z = yaw self._cmd_vel_pub.publish(msg) def main(args=None): rclpy.init(args=args) controller = None try: controller = DroneController() rclpy.spin(controller) except KeyboardInterrupt: print('\nShutting down...') finally: if controller: controller.destroy_node() if rclpy.ok(): rclpy.shutdown() if __name__ == '__main__': main()