Files
RDC_Simulation/drone_controller.py
2026-01-04 00:24:46 +00:00

427 lines
15 KiB
Python

#!/usr/bin/env python3
"""
DroneController - Template for GPS-denied landing logic.
Supports multiple telemetry sources:
1. Custom JSON telemetry (/drone/telemetry) - PyBullet/Gazebo bridge
2. ArduPilot ROS 2 DDS topics (/ap/*) - Official ArduPilot integration
Implement your algorithm in calculate_landing_maneuver().
Uses sensors: IMU, altimeter, camera, and landing pad detection.
"""
import json
import math
from typing import Dict, Any, Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from geometry_msgs.msg import Twist, PoseStamped, TwistStamped
from sensor_msgs.msg import Imu, BatteryState, NavSatFix
from std_msgs.msg import String
# Load configuration
try:
from config import CONTROLLER, DRONE, LANDING
CONFIG_LOADED = True
except ImportError:
CONFIG_LOADED = False
CONTROLLER = {"Kp_z": 0.5, "Kd_z": 0.3, "Kp_xy": 0.3, "Kd_xy": 0.2, "rate": 50}
DRONE = {"max_thrust": 1.0, "max_pitch": 0.5, "max_roll": 0.5}
LANDING = {"height_threshold": 0.1, "success_velocity": 0.1}
def quaternion_to_euler(x: float, y: float, z: float, w: float) -> tuple:
"""Convert quaternion to Euler angles (roll, pitch, yaw)."""
# Roll (x-axis rotation)
sinr_cosp = 2.0 * (w * x + y * z)
cosr_cosp = 1.0 - 2.0 * (x * x + y * y)
roll = math.atan2(sinr_cosp, cosr_cosp)
# Pitch (y-axis rotation)
sinp = 2.0 * (w * y - z * x)
if abs(sinp) >= 1:
pitch = math.copysign(math.pi / 2, sinp)
else:
pitch = math.asin(sinp)
# Yaw (z-axis rotation)
siny_cosp = 2.0 * (w * z + x * y)
cosy_cosp = 1.0 - 2.0 * (y * y + z * z)
yaw = math.atan2(siny_cosp, cosy_cosp)
return roll, pitch, yaw
class DroneController(Node):
"""
Drone controller for GPS-denied landing.
Supports multiple telemetry sources:
- Legacy: /drone/telemetry (JSON String)
- ArduPilot DDS: /ap/pose/filtered, /ap/imu/filtered, etc.
"""
def __init__(self, use_ardupilot_topics: bool = True):
super().__init__('drone_controller')
# Load from config
self._control_rate = CONTROLLER.get("rate", 50.0)
self._Kp_z = CONTROLLER.get("Kp_z", 0.5)
self._Kd_z = CONTROLLER.get("Kd_z", 0.3)
self._Kp_xy = CONTROLLER.get("Kp_xy", 0.3)
self._Kd_xy = CONTROLLER.get("Kd_xy", 0.2)
self._max_thrust = DRONE.get("max_thrust", 1.0)
self._max_pitch = DRONE.get("max_pitch", 0.5)
self._max_roll = DRONE.get("max_roll", 0.5)
self._landing_height = LANDING.get("height_threshold", 0.1)
self._landing_velocity = LANDING.get("success_velocity", 0.1)
# Mode selection
self._use_ardupilot = use_ardupilot_topics
self.get_logger().info('=' * 50)
self.get_logger().info('Drone Controller Starting (GPS-Denied)...')
if CONFIG_LOADED:
self.get_logger().info(' Configuration loaded from config.py')
self.get_logger().info(f' Mode: {"ArduPilot DDS" if use_ardupilot_topics else "Legacy JSON"}')
self.get_logger().info('=' * 50)
# State variables
self._latest_telemetry: Optional[Dict[str, Any]] = None
self._rover_telemetry: Optional[Dict[str, Any]] = None
self._telemetry_received = False
self._landing_complete = False
# ArduPilot state (built from DDS topics)
self._ap_pose: Optional[PoseStamped] = None
self._ap_twist: Optional[TwistStamped] = None
self._ap_imu: Optional[Imu] = None
self._ap_battery: Optional[BatteryState] = None
# QoS for sensor topics
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1
)
if use_ardupilot_topics:
self._setup_ardupilot_subscriptions(sensor_qos)
else:
self._setup_legacy_subscriptions()
# Always subscribe to rover telemetry (for moving landing pad)
self._rover_telemetry_sub = self.create_subscription(
String, '/rover/telemetry', self._rover_telemetry_callback, 10
)
self.get_logger().info(' Subscribed to: /rover/telemetry')
# Command publisher
self._cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.get_logger().info(' Publishing to: /cmd_vel')
# Control loop
control_period = 1.0 / self._control_rate
self._control_timer = self.create_timer(control_period, self._control_loop)
self.get_logger().info(f' Control loop: {self._control_rate} Hz')
self.get_logger().info('Drone Controller Ready!')
self.get_logger().info('Sensors: IMU, Altimeter, Velocity, Landing Pad Detection')
def _setup_ardupilot_subscriptions(self, qos: QoSProfile):
"""Set up subscriptions to official ArduPilot ROS 2 topics."""
# Pose (position + orientation)
self._ap_pose_sub = self.create_subscription(
PoseStamped, '/ap/pose/filtered', self._ap_pose_callback, qos
)
self.get_logger().info(' Subscribed to: /ap/pose/filtered')
# Twist (velocity)
self._ap_twist_sub = self.create_subscription(
TwistStamped, '/ap/twist/filtered', self._ap_twist_callback, qos
)
self.get_logger().info(' Subscribed to: /ap/twist/filtered')
# IMU
self._ap_imu_sub = self.create_subscription(
Imu, '/ap/imu/filtered', self._ap_imu_callback, qos
)
self.get_logger().info(' Subscribed to: /ap/imu/filtered')
# Battery
self._ap_battery_sub = self.create_subscription(
BatteryState, '/ap/battery', self._ap_battery_callback, qos
)
self.get_logger().info(' Subscribed to: /ap/battery')
def _setup_legacy_subscriptions(self):
"""Set up subscriptions to legacy JSON telemetry."""
self._telemetry_sub = self.create_subscription(
String, '/drone/telemetry', self._telemetry_callback, 10
)
self.get_logger().info(' Subscribed to: /drone/telemetry')
# ArduPilot topic callbacks
def _ap_pose_callback(self, msg: PoseStamped):
self._ap_pose = msg
if not self._telemetry_received:
self._telemetry_received = True
self._warmup_count = 0
self.get_logger().info('First ArduPilot pose received!')
self._build_telemetry_from_ardupilot()
def _ap_twist_callback(self, msg: TwistStamped):
self._ap_twist = msg
self._build_telemetry_from_ardupilot()
def _ap_imu_callback(self, msg: Imu):
self._ap_imu = msg
self._build_telemetry_from_ardupilot()
def _ap_battery_callback(self, msg: BatteryState):
self._ap_battery = msg
self._build_telemetry_from_ardupilot()
def _build_telemetry_from_ardupilot(self):
"""Build telemetry dict from ArduPilot topics."""
telemetry = {}
# Position and orientation from pose
if self._ap_pose:
pos = self._ap_pose.pose.position
ori = self._ap_pose.pose.orientation
roll, pitch, yaw = quaternion_to_euler(ori.x, ori.y, ori.z, ori.w)
telemetry['position'] = {
'x': pos.x,
'y': pos.y,
'z': pos.z
}
telemetry['altimeter'] = {
'altitude': pos.z,
'vertical_velocity': self._ap_twist.twist.linear.z if self._ap_twist else 0.0
}
telemetry['imu'] = {
'orientation': {'roll': roll, 'pitch': pitch, 'yaw': yaw},
'angular_velocity': {'x': 0, 'y': 0, 'z': 0}
}
# Velocity from twist
if self._ap_twist:
twist = self._ap_twist.twist
telemetry['velocity'] = {
'x': twist.linear.x,
'y': twist.linear.y,
'z': twist.linear.z
}
if 'altimeter' in telemetry:
telemetry['altimeter']['vertical_velocity'] = twist.linear.z
# Angular velocity from IMU
if self._ap_imu:
if 'imu' not in telemetry:
telemetry['imu'] = {}
telemetry['imu']['angular_velocity'] = {
'x': self._ap_imu.angular_velocity.x,
'y': self._ap_imu.angular_velocity.y,
'z': self._ap_imu.angular_velocity.z
}
# Battery
if self._ap_battery:
telemetry['battery'] = {
'voltage': self._ap_battery.voltage,
'remaining': self._ap_battery.percentage * 100
}
# Landing pad detection (placeholder - would need vision processing)
# For now, calculate from rover telemetry if available
if self._rover_telemetry and self._ap_pose:
rover_pos = self._rover_telemetry.get('position', {})
rx = rover_pos.get('x', 0)
ry = rover_pos.get('y', 0)
dx = self._ap_pose.pose.position.x
dy = self._ap_pose.pose.position.y
dz = self._ap_pose.pose.position.z
rel_x = rx - dx
rel_y = ry - dy
distance = math.sqrt(rel_x**2 + rel_y**2 + dz**2)
telemetry['landing_pad'] = {
'relative_x': rel_x,
'relative_y': rel_y,
'distance': distance,
'confidence': 1.0 if distance < 10.0 else 0.0
}
self._latest_telemetry = telemetry
# Legacy callbacks
def _telemetry_callback(self, msg: String) -> None:
try:
self._latest_telemetry = json.loads(msg.data)
if not self._telemetry_received:
self._telemetry_received = True
self._warmup_count = 0
self.get_logger().info('First telemetry received!')
except json.JSONDecodeError as e:
self.get_logger().warning(f'Failed to parse telemetry: {e}')
def _rover_telemetry_callback(self, msg: String) -> None:
try:
self._rover_telemetry = json.loads(msg.data)
except json.JSONDecodeError:
pass
def _control_loop(self) -> None:
if self._latest_telemetry is None:
return
if self._landing_complete:
return
# Warmup period - wait for stable telemetry before checking landing
if not hasattr(self, '_warmup_count'):
self._warmup_count = 0
self._warmup_count += 1
if self._warmup_count < 100: # Wait ~2 seconds at 50Hz
# Still fly with controller, just don't check for landing yet
pass
elif self._check_landing_complete():
self._landing_complete = True
self.get_logger().info('=' * 50)
self.get_logger().info('LANDING COMPLETE!')
self.get_logger().info('=' * 50)
self._publish_command(0.0, 0.0, 0.0, 0.0)
return
thrust, pitch, roll, yaw = self.calculate_landing_maneuver(
self._latest_telemetry,
self._rover_telemetry
)
thrust = max(min(thrust, self._max_thrust), -self._max_thrust)
pitch = max(min(pitch, self._max_pitch), -self._max_pitch)
roll = max(min(roll, self._max_roll), -self._max_roll)
yaw = max(min(yaw, 0.5), -0.5)
self._publish_command(thrust, pitch, roll, yaw)
def _check_landing_complete(self) -> bool:
if self._latest_telemetry is None:
return False
try:
altimeter = self._latest_telemetry.get('altimeter', {})
altitude = altimeter.get('altitude', float('inf'))
vertical_velocity = abs(altimeter.get('vertical_velocity', float('inf')))
return altitude < self._landing_height and vertical_velocity < self._landing_velocity
except (KeyError, TypeError):
return False
def calculate_landing_maneuver(
self,
telemetry: Dict[str, Any],
rover_telemetry: Optional[Dict[str, Any]] = None
) -> tuple[float, float, float, float]:
"""
Calculate control commands for GPS-denied landing.
Available sensors in telemetry:
- imu.orientation: {roll, pitch, yaw} in radians
- imu.angular_velocity: {x, y, z} in rad/s
- altimeter.altitude: height in meters
- altimeter.vertical_velocity: vertical speed in m/s
- velocity: {x, y, z} estimated velocity in m/s
- landing_pad: relative position to pad (may be None)
- relative_x, relative_y: offset in body frame
- distance: vertical distance to pad
- confidence: detection confidence (0-1)
- camera: {width, height, fov, image}
- image: base64 encoded JPEG (or None)
- landed: bool
Args:
telemetry: Sensor data from drone
rover_telemetry: Rover state (for moving target) - may be None
Returns:
Tuple of (thrust, pitch, roll, yaw) control values (-1.0 to 1.0)
"""
# Extract sensor data
altimeter = telemetry.get('altimeter', {})
altitude = altimeter.get('altitude', 5.0)
vertical_vel = altimeter.get('vertical_velocity', 0.0)
velocity = telemetry.get('velocity', {'x': 0, 'y': 0, 'z': 0})
vel_x = velocity.get('x', 0.0)
vel_y = velocity.get('y', 0.0)
landing_pad = telemetry.get('landing_pad', None)
# camera = telemetry.get('camera', None)
# Descent control
if altitude > 1.0:
target_descent_rate = -0.5
else:
target_descent_rate = -0.2
descent_error = target_descent_rate - vertical_vel
thrust = self._Kp_z * descent_error
# Horizontal control
if landing_pad is not None:
target_x = landing_pad.get('relative_x', 0.0)
target_y = landing_pad.get('relative_y', 0.0)
pitch = self._Kp_xy * target_x - self._Kd_xy * vel_x
roll = self._Kp_xy * target_y - self._Kd_xy * vel_y
else:
pitch = -self._Kd_xy * vel_x
roll = -self._Kd_xy * vel_y
yaw = 0.0
return (thrust, pitch, roll, yaw)
def _publish_command(self, thrust: float, pitch: float, roll: float, yaw: float) -> None:
msg = Twist()
msg.linear.z = thrust
msg.linear.x = pitch
msg.linear.y = roll
msg.angular.z = yaw
self._cmd_vel_pub.publish(msg)
def main(args=None):
import sys
# Check for --ardupilot flag
use_ardupilot = '--ardupilot' in sys.argv or '-a' in sys.argv
# Remove our custom args before passing to rclpy
filtered_args = [a for a in (args or sys.argv) if a not in ['--ardupilot', '-a']]
rclpy.init(args=filtered_args)
controller = None
try:
controller = DroneController(use_ardupilot_topics=use_ardupilot)
rclpy.spin(controller)
except KeyboardInterrupt:
print('\nShutting down...')
finally:
if controller:
controller.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()