231 lines
8.3 KiB
Python
231 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
DroneController - Template for GPS-denied landing logic.
|
|
Implement your algorithm in calculate_landing_maneuver().
|
|
Uses sensors: IMU, altimeter, camera, and landing pad detection.
|
|
"""
|
|
|
|
import json
|
|
from typing import Dict, Any, Optional
|
|
|
|
import rclpy
|
|
from rclpy.node import Node
|
|
from geometry_msgs.msg import Twist
|
|
from std_msgs.msg import String
|
|
|
|
# Load configuration
|
|
try:
|
|
from config import CONTROLLER, DRONE, LANDING
|
|
CONFIG_LOADED = True
|
|
except ImportError:
|
|
CONFIG_LOADED = False
|
|
CONTROLLER = {"Kp_z": 0.5, "Kd_z": 0.3, "Kp_xy": 0.3, "Kd_xy": 0.2, "rate": 50}
|
|
DRONE = {"max_thrust": 1.0, "max_pitch": 0.5, "max_roll": 0.5}
|
|
LANDING = {"height_threshold": 0.1, "success_velocity": 0.1}
|
|
|
|
|
|
class DroneController(Node):
|
|
"""Drone controller for GPS-denied landing."""
|
|
|
|
def __init__(self):
|
|
super().__init__('drone_controller')
|
|
|
|
# Load from config
|
|
self._control_rate = CONTROLLER.get("rate", 50.0)
|
|
self._Kp_z = CONTROLLER.get("Kp_z", 0.5)
|
|
self._Kd_z = CONTROLLER.get("Kd_z", 0.3)
|
|
self._Kp_xy = CONTROLLER.get("Kp_xy", 0.3)
|
|
self._Kd_xy = CONTROLLER.get("Kd_xy", 0.2)
|
|
self._max_thrust = DRONE.get("max_thrust", 1.0)
|
|
self._max_pitch = DRONE.get("max_pitch", 0.5)
|
|
self._max_roll = DRONE.get("max_roll", 0.5)
|
|
self._landing_height = LANDING.get("height_threshold", 0.1)
|
|
self._landing_velocity = LANDING.get("success_velocity", 0.1)
|
|
|
|
self.get_logger().info('=' * 50)
|
|
self.get_logger().info('Drone Controller Starting (GPS-Denied)...')
|
|
if CONFIG_LOADED:
|
|
self.get_logger().info(' Configuration loaded from config.py')
|
|
self.get_logger().info('=' * 50)
|
|
|
|
self._latest_telemetry: Optional[Dict[str, Any]] = None
|
|
self._rover_telemetry: Optional[Dict[str, Any]] = None
|
|
self._telemetry_received = False
|
|
self._landing_complete = False
|
|
|
|
self._telemetry_sub = self.create_subscription(
|
|
String, '/drone/telemetry', self._telemetry_callback, 10
|
|
)
|
|
self.get_logger().info(' Subscribed to: /drone/telemetry')
|
|
|
|
self._rover_telemetry_sub = self.create_subscription(
|
|
String, '/rover/telemetry', self._rover_telemetry_callback, 10
|
|
)
|
|
self.get_logger().info(' Subscribed to: /rover/telemetry')
|
|
|
|
self._cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
|
|
self.get_logger().info(' Publishing to: /cmd_vel')
|
|
|
|
control_period = 1.0 / self._control_rate
|
|
self._control_timer = self.create_timer(control_period, self._control_loop)
|
|
self.get_logger().info(f' Control loop: {self._control_rate} Hz')
|
|
|
|
self.get_logger().info('Drone Controller Ready!')
|
|
self.get_logger().info('Sensors: IMU, Altimeter, Camera, Landing Pad Detection')
|
|
|
|
def _telemetry_callback(self, msg: String) -> None:
|
|
try:
|
|
self._latest_telemetry = json.loads(msg.data)
|
|
if not self._telemetry_received:
|
|
self._telemetry_received = True
|
|
self._warmup_count = 0
|
|
self.get_logger().info('First telemetry received!')
|
|
except json.JSONDecodeError as e:
|
|
self.get_logger().warning(f'Failed to parse telemetry: {e}')
|
|
|
|
def _rover_telemetry_callback(self, msg: String) -> None:
|
|
try:
|
|
self._rover_telemetry = json.loads(msg.data)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
def _control_loop(self) -> None:
|
|
if self._latest_telemetry is None:
|
|
return
|
|
|
|
if self._landing_complete:
|
|
return
|
|
|
|
# Warmup period - wait for stable telemetry before checking landing
|
|
if not hasattr(self, '_warmup_count'):
|
|
self._warmup_count = 0
|
|
self._warmup_count += 1
|
|
if self._warmup_count < 100: # Wait ~2 seconds at 50Hz
|
|
# Still fly with controller, just don't check for landing yet
|
|
pass
|
|
elif self._check_landing_complete():
|
|
self._landing_complete = True
|
|
self.get_logger().info('=' * 50)
|
|
self.get_logger().info('LANDING COMPLETE!')
|
|
self.get_logger().info('=' * 50)
|
|
self._publish_command(0.0, 0.0, 0.0, 0.0)
|
|
return
|
|
|
|
thrust, pitch, roll, yaw = self.calculate_landing_maneuver(
|
|
self._latest_telemetry,
|
|
self._rover_telemetry
|
|
)
|
|
|
|
thrust = max(min(thrust, self._max_thrust), -self._max_thrust)
|
|
pitch = max(min(pitch, self._max_pitch), -self._max_pitch)
|
|
roll = max(min(roll, self._max_roll), -self._max_roll)
|
|
yaw = max(min(yaw, 0.5), -0.5)
|
|
|
|
self._publish_command(thrust, pitch, roll, yaw)
|
|
|
|
def _check_landing_complete(self) -> bool:
|
|
if self._latest_telemetry is None:
|
|
return False
|
|
try:
|
|
altimeter = self._latest_telemetry.get('altimeter', {})
|
|
altitude = altimeter.get('altitude', float('inf'))
|
|
vertical_velocity = abs(altimeter.get('vertical_velocity', float('inf')))
|
|
return altitude < self._landing_height and vertical_velocity < self._landing_velocity
|
|
except (KeyError, TypeError):
|
|
return False
|
|
|
|
def calculate_landing_maneuver(
|
|
self,
|
|
telemetry: Dict[str, Any],
|
|
rover_telemetry: Optional[Dict[str, Any]] = None
|
|
) -> tuple[float, float, float, float]:
|
|
"""
|
|
Calculate control commands for GPS-denied landing.
|
|
|
|
Available sensors in telemetry:
|
|
- imu.orientation: {roll, pitch, yaw} in radians
|
|
- imu.angular_velocity: {x, y, z} in rad/s
|
|
- altimeter.altitude: height in meters
|
|
- altimeter.vertical_velocity: vertical speed in m/s
|
|
- velocity: {x, y, z} estimated velocity in m/s
|
|
- landing_pad: relative position to pad (may be None)
|
|
- relative_x, relative_y: offset in body frame
|
|
- distance: vertical distance to pad
|
|
- confidence: detection confidence (0-1)
|
|
- camera: {width, height, fov, image}
|
|
- image: base64 encoded JPEG (or None)
|
|
- landed: bool
|
|
|
|
Args:
|
|
telemetry: Sensor data from drone
|
|
rover_telemetry: Rover state (for moving target) - may be None
|
|
|
|
Returns:
|
|
Tuple of (thrust, pitch, roll, yaw) control values (-1.0 to 1.0)
|
|
"""
|
|
# Extract sensor data
|
|
altimeter = telemetry.get('altimeter', {})
|
|
altitude = altimeter.get('altitude', 5.0)
|
|
vertical_vel = altimeter.get('vertical_velocity', 0.0)
|
|
|
|
velocity = telemetry.get('velocity', {'x': 0, 'y': 0, 'z': 0})
|
|
vel_x = velocity.get('x', 0.0)
|
|
vel_y = velocity.get('y', 0.0)
|
|
|
|
landing_pad = telemetry.get('landing_pad', None)
|
|
|
|
# camera = telemetry.get('camera', None)
|
|
|
|
|
|
# Descent control
|
|
if altitude > 1.0:
|
|
target_descent_rate = -0.5
|
|
else:
|
|
target_descent_rate = -0.2
|
|
|
|
descent_error = target_descent_rate - vertical_vel
|
|
thrust = self._Kp_z * descent_error
|
|
|
|
# Horizontal control
|
|
if landing_pad is not None:
|
|
target_x = landing_pad.get('relative_x', 0.0)
|
|
target_y = landing_pad.get('relative_y', 0.0)
|
|
|
|
pitch = self._Kp_xy * target_x - self._Kd_xy * vel_x
|
|
roll = self._Kp_xy * target_y - self._Kd_xy * vel_y
|
|
else:
|
|
pitch = -self._Kd_xy * vel_x
|
|
roll = -self._Kd_xy * vel_y
|
|
|
|
yaw = 0.0
|
|
|
|
return (thrust, pitch, roll, yaw)
|
|
|
|
def _publish_command(self, thrust: float, pitch: float, roll: float, yaw: float) -> None:
|
|
msg = Twist()
|
|
msg.linear.z = thrust
|
|
msg.linear.x = pitch
|
|
msg.linear.y = roll
|
|
msg.angular.z = yaw
|
|
self._cmd_vel_pub.publish(msg)
|
|
|
|
|
|
def main(args=None):
|
|
rclpy.init(args=args)
|
|
controller = None
|
|
|
|
try:
|
|
controller = DroneController()
|
|
rclpy.spin(controller)
|
|
except KeyboardInterrupt:
|
|
print('\nShutting down...')
|
|
finally:
|
|
if controller:
|
|
controller.destroy_node()
|
|
if rclpy.ok():
|
|
rclpy.shutdown()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|