Files
RDC_Simulation/drone_controller.py
2026-01-02 07:07:51 +00:00

231 lines
8.3 KiB
Python

#!/usr/bin/env python3
"""
DroneController - Template for GPS-denied landing logic.
Implement your algorithm in calculate_landing_maneuver().
Uses sensors: IMU, altimeter, camera, and landing pad detection.
"""
import json
from typing import Dict, Any, Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import String
# Load configuration
try:
from config import CONTROLLER, DRONE, LANDING
CONFIG_LOADED = True
except ImportError:
CONFIG_LOADED = False
CONTROLLER = {"Kp_z": 0.5, "Kd_z": 0.3, "Kp_xy": 0.3, "Kd_xy": 0.2, "rate": 50}
DRONE = {"max_thrust": 1.0, "max_pitch": 0.5, "max_roll": 0.5}
LANDING = {"height_threshold": 0.1, "success_velocity": 0.1}
class DroneController(Node):
"""Drone controller for GPS-denied landing."""
def __init__(self):
super().__init__('drone_controller')
# Load from config
self._control_rate = CONTROLLER.get("rate", 50.0)
self._Kp_z = CONTROLLER.get("Kp_z", 0.5)
self._Kd_z = CONTROLLER.get("Kd_z", 0.3)
self._Kp_xy = CONTROLLER.get("Kp_xy", 0.3)
self._Kd_xy = CONTROLLER.get("Kd_xy", 0.2)
self._max_thrust = DRONE.get("max_thrust", 1.0)
self._max_pitch = DRONE.get("max_pitch", 0.5)
self._max_roll = DRONE.get("max_roll", 0.5)
self._landing_height = LANDING.get("height_threshold", 0.1)
self._landing_velocity = LANDING.get("success_velocity", 0.1)
self.get_logger().info('=' * 50)
self.get_logger().info('Drone Controller Starting (GPS-Denied)...')
if CONFIG_LOADED:
self.get_logger().info(' Configuration loaded from config.py')
self.get_logger().info('=' * 50)
self._latest_telemetry: Optional[Dict[str, Any]] = None
self._rover_telemetry: Optional[Dict[str, Any]] = None
self._telemetry_received = False
self._landing_complete = False
self._telemetry_sub = self.create_subscription(
String, '/drone/telemetry', self._telemetry_callback, 10
)
self.get_logger().info(' Subscribed to: /drone/telemetry')
self._rover_telemetry_sub = self.create_subscription(
String, '/rover/telemetry', self._rover_telemetry_callback, 10
)
self.get_logger().info(' Subscribed to: /rover/telemetry')
self._cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.get_logger().info(' Publishing to: /cmd_vel')
control_period = 1.0 / self._control_rate
self._control_timer = self.create_timer(control_period, self._control_loop)
self.get_logger().info(f' Control loop: {self._control_rate} Hz')
self.get_logger().info('Drone Controller Ready!')
self.get_logger().info('Sensors: IMU, Altimeter, Camera, Landing Pad Detection')
def _telemetry_callback(self, msg: String) -> None:
try:
self._latest_telemetry = json.loads(msg.data)
if not self._telemetry_received:
self._telemetry_received = True
self.get_logger().info('First telemetry received!')
except json.JSONDecodeError as e:
self.get_logger().warning(f'Failed to parse telemetry: {e}')
def _rover_telemetry_callback(self, msg: String) -> None:
try:
self._rover_telemetry = json.loads(msg.data)
except json.JSONDecodeError:
pass
def _control_loop(self) -> None:
if self._latest_telemetry is None:
return
if self._landing_complete:
return
if self._check_landing_complete():
self._landing_complete = True
self.get_logger().info('=' * 50)
self.get_logger().info('LANDING COMPLETE!')
self.get_logger().info('=' * 50)
self._publish_command(0.0, 0.0, 0.0, 0.0)
return
thrust, pitch, roll, yaw = self.calculate_landing_maneuver(
self._latest_telemetry,
self._rover_telemetry
)
thrust = max(min(thrust, self._max_thrust), -self._max_thrust)
pitch = max(min(pitch, self._max_pitch), -self._max_pitch)
roll = max(min(roll, self._max_roll), -self._max_roll)
yaw = max(min(yaw, 0.5), -0.5)
self._publish_command(thrust, pitch, roll, yaw)
def _check_landing_complete(self) -> bool:
if self._latest_telemetry is None:
return False
try:
altimeter = self._latest_telemetry.get('altimeter', {})
altitude = altimeter.get('altitude', float('inf'))
vertical_velocity = abs(altimeter.get('vertical_velocity', float('inf')))
return altitude < self._landing_height and vertical_velocity < self._landing_velocity
except (KeyError, TypeError):
return False
def calculate_landing_maneuver(
self,
telemetry: Dict[str, Any],
rover_telemetry: Optional[Dict[str, Any]] = None
) -> tuple[float, float, float, float]:
"""
Calculate control commands for GPS-denied landing.
Available sensors in telemetry:
- imu.orientation: {roll, pitch, yaw} in radians
- imu.angular_velocity: {x, y, z} in rad/s
- altimeter.altitude: height in meters
- altimeter.vertical_velocity: vertical speed in m/s
- velocity: {x, y, z} estimated velocity in m/s
- landing_pad: relative position to pad (may be None)
- relative_x, relative_y: offset in body frame
- distance: vertical distance to pad
- confidence: detection confidence (0-1)
- camera: {width, height, fov, image}
- image: base64 encoded JPEG (or None)
- landed: bool
Args:
telemetry: Sensor data from drone
rover_telemetry: Rover state (for moving target) - may be None
Returns:
Tuple of (thrust, pitch, roll, yaw) control values (-1.0 to 1.0)
"""
# Extract sensor data
altimeter = telemetry.get('altimeter', {})
altitude = altimeter.get('altitude', 5.0)
vertical_vel = altimeter.get('vertical_velocity', 0.0)
velocity = telemetry.get('velocity', {'x': 0, 'y': 0, 'z': 0})
vel_x = velocity.get('x', 0.0)
vel_y = velocity.get('y', 0.0)
landing_pad = telemetry.get('landing_pad', None)
# Camera image is available in telemetry['camera']['image']
# Decode with: base64.b64decode(telemetry['camera']['image'])
# Default target
target_x = 0.0
target_y = 0.0
# Use landing pad detection for positioning
if landing_pad is not None:
target_x = landing_pad.get('relative_x', 0.0)
target_y = landing_pad.get('relative_y', 0.0)
# TODO: Implement your GPS-denied landing algorithm
# You can use the camera image for custom vision processing
# Simple PD controller for altitude
target_altitude = 0.0
altitude_error = target_altitude - altitude
Kp_z, Kd_z = 0.5, 0.3
thrust = Kp_z * altitude_error - Kd_z * vertical_vel
# Horizontal control based on landing pad detection
Kp_xy, Kd_xy = 0.3, 0.2
if landing_pad is not None:
pitch = Kp_xy * target_x - Kd_xy * vel_x
roll = Kp_xy * target_y - Kd_xy * vel_y
else:
pitch = -Kd_xy * vel_x
roll = -Kd_xy * vel_y
yaw = 0.0
return (thrust, pitch, roll, yaw)
def _publish_command(self, thrust: float, pitch: float, roll: float, yaw: float) -> None:
msg = Twist()
msg.linear.z = thrust
msg.linear.x = pitch
msg.linear.y = roll
msg.angular.z = yaw
self._cmd_vel_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
controller = None
try:
controller = DroneController()
rclpy.spin(controller)
except KeyboardInterrupt:
print('\nShutting down...')
finally:
if controller:
controller.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()