Files
RDC_Simulation/drone_controller.py
2025-12-31 23:50:26 +00:00

215 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""
DroneController - Template for GPS-denied landing logic.
Implement your algorithm in calculate_landing_maneuver().
Uses sensors: IMU, altimeter, camera, and landing pad detection.
"""
import json
from typing import Dict, Any, Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
class DroneController(Node):
"""Drone controller for GPS-denied landing."""
CONTROL_RATE = 50.0
LANDING_HEIGHT_THRESHOLD = 0.1
LANDING_VELOCITY_THRESHOLD = 0.1
MAX_THRUST = 1.0
MAX_PITCH = 0.5
MAX_ROLL = 0.5
MAX_YAW = 0.5
def __init__(self):
super().__init__('drone_controller')
self.get_logger().info('=' * 50)
self.get_logger().info('Drone Controller Starting (GPS-Denied)...')
self.get_logger().info('=' * 50)
self._latest_telemetry: Optional[Dict[str, Any]] = None
self._rover_telemetry: Optional[Dict[str, Any]] = None
self._telemetry_received = False
self._landing_complete = False
self._telemetry_sub = self.create_subscription(
String, '/drone/telemetry', self._telemetry_callback, 10
)
self.get_logger().info(' Subscribed to: /drone/telemetry')
self._rover_telemetry_sub = self.create_subscription(
String, '/rover/telemetry', self._rover_telemetry_callback, 10
)
self.get_logger().info(' Subscribed to: /rover/telemetry')
self._cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.get_logger().info(' Publishing to: /cmd_vel')
control_period = 1.0 / self.CONTROL_RATE
self._control_timer = self.create_timer(control_period, self._control_loop)
self.get_logger().info(f' Control loop: {self.CONTROL_RATE} Hz')
self.get_logger().info('Drone Controller Ready!')
self.get_logger().info('Sensors: IMU, Altimeter, Camera, Landing Pad Detection')
def _telemetry_callback(self, msg: String) -> None:
try:
self._latest_telemetry = json.loads(msg.data)
if not self._telemetry_received:
self._telemetry_received = True
self.get_logger().info('First telemetry received!')
except json.JSONDecodeError as e:
self.get_logger().warning(f'Failed to parse telemetry: {e}')
def _rover_telemetry_callback(self, msg: String) -> None:
try:
self._rover_telemetry = json.loads(msg.data)
except json.JSONDecodeError:
pass
def _control_loop(self) -> None:
if self._latest_telemetry is None:
return
if self._landing_complete:
return
if self._check_landing_complete():
self._landing_complete = True
self.get_logger().info('=' * 50)
self.get_logger().info('LANDING COMPLETE!')
self.get_logger().info('=' * 50)
self._publish_command(0.0, 0.0, 0.0, 0.0)
return
thrust, pitch, roll, yaw = self.calculate_landing_maneuver(
self._latest_telemetry,
self._rover_telemetry
)
thrust = max(min(thrust, self.MAX_THRUST), -self.MAX_THRUST)
pitch = max(min(pitch, self.MAX_PITCH), -self.MAX_PITCH)
roll = max(min(roll, self.MAX_ROLL), -self.MAX_ROLL)
yaw = max(min(yaw, self.MAX_YAW), -self.MAX_YAW)
self._publish_command(thrust, pitch, roll, yaw)
def _check_landing_complete(self) -> bool:
if self._latest_telemetry is None:
return False
try:
altimeter = self._latest_telemetry.get('altimeter', {})
altitude = altimeter.get('altitude', float('inf'))
vertical_velocity = abs(altimeter.get('vertical_velocity', float('inf')))
return altitude < self.LANDING_HEIGHT_THRESHOLD and vertical_velocity < self.LANDING_VELOCITY_THRESHOLD
except (KeyError, TypeError):
return False
def calculate_landing_maneuver(
self,
telemetry: Dict[str, Any],
rover_telemetry: Optional[Dict[str, Any]] = None
) -> tuple[float, float, float, float]:
"""
Calculate control commands for GPS-denied landing.
Available sensors in telemetry:
- imu.orientation: {roll, pitch, yaw} in radians
- imu.angular_velocity: {x, y, z} in rad/s
- altimeter.altitude: height in meters
- altimeter.vertical_velocity: vertical speed in m/s
- velocity: {x, y, z} estimated velocity in m/s
- landing_pad: relative position to pad (may be None)
- relative_x, relative_y: offset in body frame
- distance: vertical distance to pad
- confidence: detection confidence (0-1)
- camera: {width, height, fov, image}
- image: base64 encoded JPEG (or None)
- landed: bool
Args:
telemetry: Sensor data from drone
rover_telemetry: Rover state (for moving target) - may be None
Returns:
Tuple of (thrust, pitch, roll, yaw) control values (-1.0 to 1.0)
"""
# Extract sensor data
altimeter = telemetry.get('altimeter', {})
altitude = altimeter.get('altitude', 5.0)
vertical_vel = altimeter.get('vertical_velocity', 0.0)
velocity = telemetry.get('velocity', {'x': 0, 'y': 0, 'z': 0})
vel_x = velocity.get('x', 0.0)
vel_y = velocity.get('y', 0.0)
landing_pad = telemetry.get('landing_pad', None)
# Camera image is available in telemetry['camera']['image']
# Decode with: base64.b64decode(telemetry['camera']['image'])
# Default target
target_x = 0.0
target_y = 0.0
# Use landing pad detection for positioning
if landing_pad is not None:
target_x = landing_pad.get('relative_x', 0.0)
target_y = landing_pad.get('relative_y', 0.0)
# TODO: Implement your GPS-denied landing algorithm
# You can use the camera image for custom vision processing
# Simple PD controller for altitude
target_altitude = 0.0
altitude_error = target_altitude - altitude
Kp_z, Kd_z = 0.5, 0.3
thrust = Kp_z * altitude_error - Kd_z * vertical_vel
# Horizontal control based on landing pad detection
Kp_xy, Kd_xy = 0.3, 0.2
if landing_pad is not None:
pitch = Kp_xy * target_x - Kd_xy * vel_x
roll = Kp_xy * target_y - Kd_xy * vel_y
else:
pitch = -Kd_xy * vel_x
roll = -Kd_xy * vel_y
yaw = 0.0
return (thrust, pitch, roll, yaw)
def _publish_command(self, thrust: float, pitch: float, roll: float, yaw: float) -> None:
msg = Twist()
msg.linear.z = thrust
msg.linear.x = pitch
msg.linear.y = roll
msg.angular.z = yaw
self._cmd_vel_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
controller = None
try:
controller = DroneController()
rclpy.spin(controller)
except KeyboardInterrupt:
print('\nShutting down...')
finally:
if controller:
controller.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()