feat: Simplify drone controller to basic flight pattern, overhaul installation guide, and update related scripts.
This commit is contained in:
@@ -1,500 +1,286 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
DroneController - GPS-denied landing with 3-phase state machine.
|
||||
Simple Drone Controller - Takeoff, Fly Pattern, Land
|
||||
=====================================================
|
||||
A minimal controller that demonstrates drone movement in simulation.
|
||||
|
||||
Phases:
|
||||
1. SEARCH - Find QR code on rover using camera
|
||||
2. COMMAND - Send commands to rover
|
||||
3. LAND - Land on the rover
|
||||
Usage:
|
||||
Terminal 1: ./scripts/run_ardupilot_sim.sh runway
|
||||
Terminal 2: sim_vehicle.py -v ArduCopter -f gazebo-iris --model JSON --console
|
||||
Terminal 3: python scripts/run_ardupilot.py
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import math
|
||||
import base64
|
||||
from enum import Enum, auto
|
||||
from typing import Dict, Any, Optional
|
||||
import argparse
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
from geometry_msgs.msg import Twist, PoseStamped, TwistStamped
|
||||
from sensor_msgs.msg import Imu, BatteryState, NavSatFix, Image
|
||||
from std_msgs.msg import String
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
try:
|
||||
from config import CONTROLLER, DRONE, LANDING
|
||||
CONFIG_LOADED = True
|
||||
from pymavlink import mavutil
|
||||
except ImportError:
|
||||
CONFIG_LOADED = False
|
||||
CONTROLLER = {"Kp_z": 0.5, "Kd_z": 0.3, "Kp_xy": 0.3, "Kd_xy": 0.2, "rate": 50}
|
||||
DRONE = {"max_thrust": 1.0, "max_pitch": 0.5, "max_roll": 0.5}
|
||||
LANDING = {"height_threshold": 0.1, "success_velocity": 0.1}
|
||||
print("ERROR: pymavlink not installed")
|
||||
print("Run: pip install pymavlink")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
import cv2
|
||||
import numpy as np
|
||||
CV2_AVAILABLE = True
|
||||
except ImportError:
|
||||
CV2_AVAILABLE = False
|
||||
from config import MAVLINK
|
||||
|
||||
|
||||
class Phase(Enum):
|
||||
SEARCH = auto()
|
||||
COMMAND = auto()
|
||||
LAND = auto()
|
||||
COMPLETE = auto()
|
||||
|
||||
|
||||
def quaternion_to_euler(x: float, y: float, z: float, w: float) -> tuple:
|
||||
sinr_cosp = 2.0 * (w * x + y * z)
|
||||
cosr_cosp = 1.0 - 2.0 * (x * x + y * y)
|
||||
roll = math.atan2(sinr_cosp, cosr_cosp)
|
||||
class SimpleDroneController:
|
||||
"""Simple drone controller with takeoff, fly pattern, and land."""
|
||||
|
||||
sinp = 2.0 * (w * y - z * x)
|
||||
if abs(sinp) >= 1:
|
||||
pitch = math.copysign(math.pi / 2, sinp)
|
||||
else:
|
||||
pitch = math.asin(sinp)
|
||||
def __init__(self, connection_string=None):
|
||||
self.connection_string = connection_string or MAVLINK["connection_string"]
|
||||
self.mav = None
|
||||
self.armed = False
|
||||
self.mode = "UNKNOWN"
|
||||
self.altitude = 0
|
||||
self.position = {"x": 0, "y": 0, "z": 0}
|
||||
|
||||
def connect(self, timeout=30):
|
||||
"""Connect to ArduPilot SITL."""
|
||||
print(f"[INFO] Connecting to {self.connection_string}...")
|
||||
|
||||
try:
|
||||
self.mav = mavutil.mavlink_connection(self.connection_string)
|
||||
self.mav.wait_heartbeat(timeout=timeout)
|
||||
print(f"[OK] Connected to system {self.mav.target_system}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Connection failed: {e}")
|
||||
return False
|
||||
|
||||
siny_cosp = 2.0 * (w * z + x * y)
|
||||
cosy_cosp = 1.0 - 2.0 * (y * y + z * z)
|
||||
yaw = math.atan2(siny_cosp, cosy_cosp)
|
||||
def update_state(self):
|
||||
"""Update drone state from MAVLink messages."""
|
||||
while True:
|
||||
msg = self.mav.recv_match(blocking=False)
|
||||
if msg is None:
|
||||
break
|
||||
|
||||
msg_type = msg.get_type()
|
||||
|
||||
if msg_type == "HEARTBEAT":
|
||||
self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0
|
||||
try:
|
||||
self.mode = mavutil.mode_string_v10(msg)
|
||||
except:
|
||||
self.mode = str(msg.custom_mode)
|
||||
|
||||
elif msg_type == "LOCAL_POSITION_NED":
|
||||
self.position = {"x": msg.x, "y": msg.y, "z": msg.z}
|
||||
self.altitude = -msg.z # NED: down is positive
|
||||
|
||||
return roll, pitch, yaw
|
||||
|
||||
|
||||
class DroneController(Node):
|
||||
def __init__(self, use_ardupilot_topics: bool = True):
|
||||
super().__init__('drone_controller')
|
||||
def set_mode(self, mode_name):
|
||||
"""Set flight mode."""
|
||||
mode_map = self.mav.mode_mapping()
|
||||
if mode_name.upper() not in mode_map:
|
||||
print(f"[ERROR] Unknown mode: {mode_name}")
|
||||
return False
|
||||
|
||||
self._control_rate = CONTROLLER.get("rate", 50.0)
|
||||
self._Kp_z = CONTROLLER.get("Kp_z", 0.5)
|
||||
self._Kd_z = CONTROLLER.get("Kd_z", 0.3)
|
||||
self._Kp_xy = CONTROLLER.get("Kp_xy", 0.3)
|
||||
self._Kd_xy = CONTROLLER.get("Kd_xy", 0.2)
|
||||
self._max_thrust = DRONE.get("max_thrust", 1.0)
|
||||
self._max_pitch = DRONE.get("max_pitch", 0.5)
|
||||
self._max_roll = DRONE.get("max_roll", 0.5)
|
||||
self._landing_height = LANDING.get("height_threshold", 0.1)
|
||||
self._landing_velocity = LANDING.get("success_velocity", 0.1)
|
||||
mode_id = mode_map[mode_name.upper()]
|
||||
self.mav.set_mode(mode_id)
|
||||
time.sleep(0.5)
|
||||
self.update_state()
|
||||
print(f"[OK] Mode: {self.mode}")
|
||||
return True
|
||||
|
||||
def arm(self):
|
||||
"""Force arm the drone."""
|
||||
print("[INFO] Arming...")
|
||||
|
||||
self._use_ardupilot = use_ardupilot_topics
|
||||
for attempt in range(3):
|
||||
self.mav.mav.command_long_send(
|
||||
self.mav.target_system,
|
||||
self.mav.target_component,
|
||||
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
|
||||
0, 1, 21196, 0, 0, 0, 0, 0 # 21196 = force arm
|
||||
)
|
||||
time.sleep(1)
|
||||
self.update_state()
|
||||
|
||||
if self.armed:
|
||||
print("[OK] Armed")
|
||||
return True
|
||||
print(f"[WARN] Arm attempt {attempt + 1}/3...")
|
||||
|
||||
self._phase = Phase.SEARCH
|
||||
self._phase_start_time = self.get_clock().now()
|
||||
print("[ERROR] Failed to arm")
|
||||
return False
|
||||
|
||||
def takeoff(self, altitude=5.0):
|
||||
"""Takeoff to altitude."""
|
||||
print(f"[INFO] Taking off to {altitude}m...")
|
||||
|
||||
self._qr_detected = False
|
||||
self._qr_data: Optional[str] = None
|
||||
self._qr_position: Optional[Dict[str, float]] = None
|
||||
self._search_pattern_angle = 0.0
|
||||
|
||||
self._commands_sent = False
|
||||
self._command_acknowledged = False
|
||||
|
||||
self._landing_complete = False
|
||||
|
||||
self._latest_telemetry: Optional[Dict[str, Any]] = None
|
||||
self._rover_telemetry: Optional[Dict[str, Any]] = None
|
||||
self._latest_camera_image: Optional[bytes] = None
|
||||
self._telemetry_received = False
|
||||
|
||||
self._ap_pose: Optional[PoseStamped] = None
|
||||
self._ap_twist: Optional[TwistStamped] = None
|
||||
self._ap_imu: Optional[Imu] = None
|
||||
self._ap_battery: Optional[BatteryState] = None
|
||||
|
||||
self.get_logger().info('=' * 50)
|
||||
self.get_logger().info('Drone Controller - 3 Phase GPS-Denied Landing')
|
||||
self.get_logger().info(f' Phase 1: SEARCH | Phase 2: COMMAND | Phase 3: LAND')
|
||||
self.get_logger().info(f' Mode: {"ArduPilot DDS" if use_ardupilot_topics else "Legacy JSON"}')
|
||||
self.get_logger().info('=' * 50)
|
||||
|
||||
sensor_qos = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=1
|
||||
self.mav.mav.command_long_send(
|
||||
self.mav.target_system,
|
||||
self.mav.target_component,
|
||||
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
|
||||
0, 0, 0, 0, 0, 0, 0, altitude
|
||||
)
|
||||
|
||||
if use_ardupilot_topics:
|
||||
self._setup_ardupilot_subscriptions(sensor_qos)
|
||||
else:
|
||||
self._setup_legacy_subscriptions()
|
||||
# Wait for altitude
|
||||
for i in range(100): # 10 seconds max
|
||||
self.update_state()
|
||||
if self.altitude > altitude * 0.9:
|
||||
print(f"[OK] Reached {self.altitude:.1f}m")
|
||||
return True
|
||||
print(f"\r Climbing... {self.altitude:.1f}m", end="")
|
||||
time.sleep(0.1)
|
||||
|
||||
self._rover_telemetry_sub = self.create_subscription(
|
||||
String, '/rover/telemetry', self._rover_telemetry_callback, 10)
|
||||
|
||||
self._camera_sub = self.create_subscription(
|
||||
Image, '/drone/camera', self._camera_callback, sensor_qos)
|
||||
|
||||
self._cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
|
||||
self._rover_cmd_pub = self.create_publisher(String, '/rover/command', 10)
|
||||
|
||||
self._control_timer = self.create_timer(1.0 / self._control_rate, self._control_loop)
|
||||
|
||||
self.get_logger().info(f'Current Phase: {self._phase.name}')
|
||||
print(f"\n[WARN] Takeoff timeout at {self.altitude:.1f}m")
|
||||
return False
|
||||
|
||||
def _setup_ardupilot_subscriptions(self, qos: QoSProfile):
|
||||
self._ap_pose_sub = self.create_subscription(
|
||||
PoseStamped, '/ap/pose/filtered', self._ap_pose_callback, qos)
|
||||
self._ap_twist_sub = self.create_subscription(
|
||||
TwistStamped, '/ap/twist/filtered', self._ap_twist_callback, qos)
|
||||
self._ap_imu_sub = self.create_subscription(
|
||||
Imu, '/ap/imu/filtered', self._ap_imu_callback, qos)
|
||||
self._ap_battery_sub = self.create_subscription(
|
||||
BatteryState, '/ap/battery', self._ap_battery_callback, qos)
|
||||
|
||||
def _setup_legacy_subscriptions(self):
|
||||
self._telemetry_sub = self.create_subscription(
|
||||
String, '/drone/telemetry', self._telemetry_callback, 10)
|
||||
|
||||
def _ap_pose_callback(self, msg: PoseStamped):
|
||||
self._ap_pose = msg
|
||||
if not self._telemetry_received:
|
||||
self._telemetry_received = True
|
||||
self._warmup_count = 0
|
||||
self.get_logger().info('First ArduPilot pose received!')
|
||||
self._build_telemetry_from_ardupilot()
|
||||
|
||||
def _ap_twist_callback(self, msg: TwistStamped):
|
||||
self._ap_twist = msg
|
||||
self._build_telemetry_from_ardupilot()
|
||||
|
||||
def _ap_imu_callback(self, msg: Imu):
|
||||
self._ap_imu = msg
|
||||
self._build_telemetry_from_ardupilot()
|
||||
|
||||
def _ap_battery_callback(self, msg: BatteryState):
|
||||
self._ap_battery = msg
|
||||
self._build_telemetry_from_ardupilot()
|
||||
|
||||
def _telemetry_callback(self, msg: String) -> None:
|
||||
try:
|
||||
self._latest_telemetry = json.loads(msg.data)
|
||||
if not self._telemetry_received:
|
||||
self._telemetry_received = True
|
||||
self._warmup_count = 0
|
||||
self.get_logger().info('First telemetry received!')
|
||||
except json.JSONDecodeError as e:
|
||||
self.get_logger().warning(f'Failed to parse telemetry: {e}')
|
||||
|
||||
def _rover_telemetry_callback(self, msg: String) -> None:
|
||||
try:
|
||||
self._rover_telemetry = json.loads(msg.data)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
def _camera_callback(self, msg: Image) -> None:
|
||||
try:
|
||||
self._latest_camera_image = bytes(msg.data)
|
||||
self._camera_width = msg.width
|
||||
self._camera_height = msg.height
|
||||
self._camera_encoding = msg.encoding
|
||||
except Exception as e:
|
||||
self.get_logger().warning(f'Camera callback error: {e}')
|
||||
|
||||
def _build_telemetry_from_ardupilot(self):
|
||||
telemetry = {}
|
||||
def goto(self, x, y, z):
|
||||
"""Go to position (NED frame, z is down so negative = up)."""
|
||||
print(f"[INFO] Going to ({x:.1f}, {y:.1f}, alt={-z:.1f}m)...")
|
||||
|
||||
if self._ap_pose:
|
||||
pos = self._ap_pose.pose.position
|
||||
ori = self._ap_pose.pose.orientation
|
||||
roll, pitch, yaw = quaternion_to_euler(ori.x, ori.y, ori.z, ori.w)
|
||||
|
||||
telemetry['position'] = {'x': pos.x, 'y': pos.y, 'z': pos.z}
|
||||
telemetry['altimeter'] = {
|
||||
'altitude': pos.z,
|
||||
'vertical_velocity': self._ap_twist.twist.linear.z if self._ap_twist else 0.0
|
||||
}
|
||||
telemetry['imu'] = {
|
||||
'orientation': {'roll': roll, 'pitch': pitch, 'yaw': yaw},
|
||||
'angular_velocity': {'x': 0, 'y': 0, 'z': 0}
|
||||
}
|
||||
|
||||
if self._ap_twist:
|
||||
twist = self._ap_twist.twist
|
||||
telemetry['velocity'] = {
|
||||
'x': twist.linear.x, 'y': twist.linear.y, 'z': twist.linear.z
|
||||
}
|
||||
if 'altimeter' in telemetry:
|
||||
telemetry['altimeter']['vertical_velocity'] = twist.linear.z
|
||||
|
||||
if self._ap_imu:
|
||||
if 'imu' not in telemetry:
|
||||
telemetry['imu'] = {}
|
||||
telemetry['imu']['angular_velocity'] = {
|
||||
'x': self._ap_imu.angular_velocity.x,
|
||||
'y': self._ap_imu.angular_velocity.y,
|
||||
'z': self._ap_imu.angular_velocity.z
|
||||
}
|
||||
|
||||
if self._ap_battery:
|
||||
telemetry['battery'] = {
|
||||
'voltage': self._ap_battery.voltage,
|
||||
'remaining': self._ap_battery.percentage * 100
|
||||
}
|
||||
|
||||
if self._qr_position:
|
||||
telemetry['landing_pad'] = self._qr_position
|
||||
elif self._rover_telemetry and self._ap_pose:
|
||||
rover_pos = self._rover_telemetry.get('position', {})
|
||||
rx, ry = rover_pos.get('x', 0), rover_pos.get('y', 0)
|
||||
dx = self._ap_pose.pose.position.x
|
||||
dy = self._ap_pose.pose.position.y
|
||||
dz = self._ap_pose.pose.position.z
|
||||
|
||||
rel_x, rel_y = rx - dx, ry - dy
|
||||
distance = math.sqrt(rel_x**2 + rel_y**2 + dz**2)
|
||||
|
||||
telemetry['landing_pad'] = {
|
||||
'relative_x': rel_x, 'relative_y': rel_y,
|
||||
'distance': distance, 'confidence': 1.0 if distance < 10.0 else 0.0
|
||||
}
|
||||
|
||||
self._latest_telemetry = telemetry
|
||||
|
||||
def _control_loop(self) -> None:
|
||||
if self._latest_telemetry is None:
|
||||
return
|
||||
|
||||
if self._phase == Phase.COMPLETE:
|
||||
return
|
||||
|
||||
if not hasattr(self, '_warmup_count'):
|
||||
self._warmup_count = 0
|
||||
self._warmup_count += 1
|
||||
if self._warmup_count < 50:
|
||||
return
|
||||
|
||||
if self._phase == Phase.SEARCH:
|
||||
thrust, pitch, roll, yaw = self._execute_search_phase()
|
||||
elif self._phase == Phase.COMMAND:
|
||||
thrust, pitch, roll, yaw = self._execute_command_phase()
|
||||
elif self._phase == Phase.LAND:
|
||||
thrust, pitch, roll, yaw = self._execute_land_phase()
|
||||
else:
|
||||
thrust, pitch, roll, yaw = 0.0, 0.0, 0.0, 0.0
|
||||
|
||||
thrust = max(min(thrust, self._max_thrust), -self._max_thrust)
|
||||
pitch = max(min(pitch, self._max_pitch), -self._max_pitch)
|
||||
roll = max(min(roll, self._max_roll), -self._max_roll)
|
||||
yaw = max(min(yaw, 0.5), -0.5)
|
||||
|
||||
self._publish_command(thrust, pitch, roll, yaw)
|
||||
|
||||
def _execute_search_phase(self) -> tuple:
|
||||
qr_result = self.detect_qr_code()
|
||||
|
||||
if qr_result is not None:
|
||||
self._qr_detected = True
|
||||
self._qr_data = qr_result.get('data')
|
||||
self._qr_position = qr_result.get('position')
|
||||
|
||||
self.get_logger().info(f'QR CODE DETECTED: {self._qr_data}')
|
||||
self._transition_to_phase(Phase.COMMAND)
|
||||
return (0.0, 0.0, 0.0, 0.0)
|
||||
|
||||
return self.calculate_search_maneuver(self._latest_telemetry)
|
||||
|
||||
def detect_qr_code(self) -> Optional[Dict[str, Any]]:
|
||||
if not CV2_AVAILABLE or self._latest_camera_image is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
if self._camera_encoding == 'rgb8':
|
||||
img = np.frombuffer(self._latest_camera_image, dtype=np.uint8)
|
||||
img = img.reshape((self._camera_height, self._camera_width, 3))
|
||||
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
||||
elif self._camera_encoding == 'bgr8':
|
||||
img = np.frombuffer(self._latest_camera_image, dtype=np.uint8)
|
||||
img = img.reshape((self._camera_height, self._camera_width, 3))
|
||||
else:
|
||||
return None
|
||||
|
||||
detector = cv2.QRCodeDetector()
|
||||
data, points, _ = detector.detectAndDecode(img)
|
||||
|
||||
if data and points is not None:
|
||||
cx = np.mean(points[0][:, 0])
|
||||
cy = np.mean(points[0][:, 1])
|
||||
|
||||
rel_x = (cx - self._camera_width / 2) / (self._camera_width / 2)
|
||||
rel_y = (cy - self._camera_height / 2) / (self._camera_height / 2)
|
||||
|
||||
qr_size = np.linalg.norm(points[0][0] - points[0][2])
|
||||
altitude = self._latest_telemetry.get('altimeter', {}).get('altitude', 5.0)
|
||||
|
||||
return {
|
||||
'data': data,
|
||||
'position': {
|
||||
'relative_x': rel_x * altitude * 0.5,
|
||||
'relative_y': rel_y * altitude * 0.5,
|
||||
'distance': altitude,
|
||||
'confidence': min(qr_size / 100, 1.0)
|
||||
}
|
||||
}
|
||||
except Exception as e:
|
||||
self.get_logger().debug(f'QR detection error: {e}')
|
||||
|
||||
return None
|
||||
|
||||
def calculate_search_maneuver(self, telemetry: Dict[str, Any]) -> tuple:
|
||||
altimeter = telemetry.get('altimeter', {})
|
||||
altitude = altimeter.get('altitude', 5.0)
|
||||
vertical_vel = altimeter.get('vertical_velocity', 0.0)
|
||||
|
||||
target_altitude = 5.0
|
||||
altitude_error = target_altitude - altitude
|
||||
thrust = self._Kp_z * altitude_error - self._Kd_z * vertical_vel
|
||||
|
||||
self._search_pattern_angle += 0.01
|
||||
yaw = 0.1
|
||||
|
||||
velocity = telemetry.get('velocity', {})
|
||||
pitch = -self._Kd_xy * velocity.get('x', 0)
|
||||
roll = -self._Kd_xy * velocity.get('y', 0)
|
||||
|
||||
return (thrust, pitch, roll, yaw)
|
||||
|
||||
def _execute_command_phase(self) -> tuple:
|
||||
if not self._commands_sent:
|
||||
command = self.generate_rover_command(self._qr_data)
|
||||
self._send_rover_command(command)
|
||||
self._commands_sent = True
|
||||
self._command_time = self.get_clock().now()
|
||||
|
||||
elapsed = (self.get_clock().now() - self._command_time).nanoseconds / 1e9
|
||||
|
||||
if self._command_acknowledged or elapsed > 5.0:
|
||||
self.get_logger().info('Command phase complete, transitioning to LAND')
|
||||
self._transition_to_phase(Phase.LAND)
|
||||
|
||||
return self.calculate_hover_maneuver(self._latest_telemetry)
|
||||
|
||||
def generate_rover_command(self, qr_data: Optional[str]) -> Dict[str, Any]:
|
||||
return {
|
||||
'type': 'prepare_landing',
|
||||
'qr_data': qr_data,
|
||||
'drone_position': self._latest_telemetry.get('position', {})
|
||||
}
|
||||
|
||||
def _send_rover_command(self, command: Dict[str, Any]) -> None:
|
||||
msg = String()
|
||||
msg.data = json.dumps(command)
|
||||
self._rover_cmd_pub.publish(msg)
|
||||
self.get_logger().info(f'Sent rover command: {command.get("type")}')
|
||||
|
||||
def calculate_hover_maneuver(self, telemetry: Dict[str, Any]) -> tuple:
|
||||
altimeter = telemetry.get('altimeter', {})
|
||||
altitude = altimeter.get('altitude', 5.0)
|
||||
vertical_vel = altimeter.get('vertical_velocity', 0.0)
|
||||
|
||||
altitude_error = 0
|
||||
thrust = self._Kp_z * altitude_error - self._Kd_z * vertical_vel
|
||||
|
||||
velocity = telemetry.get('velocity', {})
|
||||
pitch = -self._Kd_xy * velocity.get('x', 0)
|
||||
roll = -self._Kd_xy * velocity.get('y', 0)
|
||||
|
||||
return (thrust, pitch, roll, 0.0)
|
||||
|
||||
def _execute_land_phase(self) -> tuple:
|
||||
if self._check_landing_complete():
|
||||
self._landing_complete = True
|
||||
self.get_logger().info('LANDING COMPLETE!')
|
||||
self._transition_to_phase(Phase.COMPLETE)
|
||||
return (0.0, 0.0, 0.0, 0.0)
|
||||
|
||||
return self.calculate_landing_maneuver(
|
||||
self._latest_telemetry,
|
||||
self._rover_telemetry
|
||||
self.mav.mav.set_position_target_local_ned_send(
|
||||
0,
|
||||
self.mav.target_system,
|
||||
self.mav.target_component,
|
||||
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
|
||||
0b0000111111111000, # Position only
|
||||
x, y, z, # Position (NED)
|
||||
0, 0, 0, # Velocity
|
||||
0, 0, 0, # Acceleration
|
||||
0, 0 # Yaw, yaw_rate
|
||||
)
|
||||
|
||||
def calculate_landing_maneuver(
|
||||
self,
|
||||
telemetry: Dict[str, Any],
|
||||
rover_telemetry: Optional[Dict[str, Any]] = None
|
||||
) -> tuple:
|
||||
altimeter = telemetry.get('altimeter', {})
|
||||
altitude = altimeter.get('altitude', 5.0)
|
||||
vertical_vel = altimeter.get('vertical_velocity', 0.0)
|
||||
def fly_to_and_wait(self, x, y, altitude, tolerance=0.5, timeout=30):
|
||||
"""Fly to position and wait until reached."""
|
||||
z = -altitude # NED
|
||||
self.goto(x, y, z)
|
||||
|
||||
velocity = telemetry.get('velocity', {'x': 0, 'y': 0, 'z': 0})
|
||||
vel_x = velocity.get('x', 0.0)
|
||||
vel_y = velocity.get('y', 0.0)
|
||||
|
||||
landing_pad = telemetry.get('landing_pad', None)
|
||||
|
||||
if altitude > 1.0:
|
||||
target_descent_rate = -0.5
|
||||
else:
|
||||
target_descent_rate = -0.2
|
||||
|
||||
descent_error = target_descent_rate - vertical_vel
|
||||
thrust = self._Kp_z * descent_error
|
||||
|
||||
if landing_pad is not None:
|
||||
target_x = landing_pad.get('relative_x', 0.0)
|
||||
target_y = landing_pad.get('relative_y', 0.0)
|
||||
for i in range(int(timeout * 10)):
|
||||
self.update_state()
|
||||
dx = x - self.position["x"]
|
||||
dy = y - self.position["y"]
|
||||
dist = math.sqrt(dx*dx + dy*dy)
|
||||
|
||||
pitch = self._Kp_xy * target_x - self._Kd_xy * vel_x
|
||||
roll = self._Kp_xy * target_y - self._Kd_xy * vel_y
|
||||
else:
|
||||
pitch = -self._Kd_xy * vel_x
|
||||
roll = -self._Kd_xy * vel_y
|
||||
if dist < tolerance:
|
||||
print(f"[OK] Reached waypoint ({x:.1f}, {y:.1f})")
|
||||
return True
|
||||
|
||||
if i % 10 == 0:
|
||||
print(f"\r Distance: {dist:.1f}m", end="")
|
||||
time.sleep(0.1)
|
||||
|
||||
yaw = 0.0
|
||||
print(f"\n[WARN] Timeout reaching waypoint")
|
||||
return False
|
||||
|
||||
def land(self):
|
||||
"""Land the drone."""
|
||||
print("[INFO] Landing...")
|
||||
return self.set_mode("LAND")
|
||||
|
||||
def fly_square(self, size=5, altitude=5):
|
||||
"""Fly a square pattern."""
|
||||
waypoints = [
|
||||
(size, 0),
|
||||
(size, size),
|
||||
(0, size),
|
||||
(0, 0),
|
||||
]
|
||||
|
||||
return (thrust, pitch, roll, yaw)
|
||||
print(f"\n[INFO] Flying square pattern ({size}m x {size}m)")
|
||||
|
||||
for i, (x, y) in enumerate(waypoints):
|
||||
print(f"\n--- Waypoint {i+1}/4 ---")
|
||||
self.fly_to_and_wait(x, y, altitude)
|
||||
time.sleep(1)
|
||||
|
||||
print("\n[OK] Square pattern complete!")
|
||||
|
||||
def _transition_to_phase(self, new_phase: Phase) -> None:
|
||||
old_phase = self._phase
|
||||
self._phase = new_phase
|
||||
self._phase_start_time = self.get_clock().now()
|
||||
self.get_logger().info(f'Phase: {old_phase.name} -> {new_phase.name}')
|
||||
|
||||
def _check_landing_complete(self) -> bool:
|
||||
if self._latest_telemetry is None:
|
||||
return False
|
||||
try:
|
||||
altimeter = self._latest_telemetry.get('altimeter', {})
|
||||
altitude = altimeter.get('altitude', float('inf'))
|
||||
vertical_velocity = abs(altimeter.get('vertical_velocity', float('inf')))
|
||||
return altitude < self._landing_height and vertical_velocity < self._landing_velocity
|
||||
except (KeyError, TypeError):
|
||||
return False
|
||||
|
||||
def _publish_command(self, thrust: float, pitch: float, roll: float, yaw: float) -> None:
|
||||
msg = Twist()
|
||||
msg.linear.z = thrust
|
||||
msg.linear.x = pitch
|
||||
msg.linear.y = roll
|
||||
msg.angular.z = yaw
|
||||
self._cmd_vel_pub.publish(msg)
|
||||
|
||||
def get_current_phase(self) -> Phase:
|
||||
return self._phase
|
||||
def fly_circle(self, radius=5, altitude=5, points=8):
|
||||
"""Fly a circular pattern."""
|
||||
print(f"\n[INFO] Flying circle pattern (radius={radius}m)")
|
||||
|
||||
for i in range(points + 1):
|
||||
angle = 2 * math.pi * i / points
|
||||
x = radius * math.cos(angle)
|
||||
y = radius * math.sin(angle)
|
||||
print(f"\n--- Point {i+1}/{points+1} ---")
|
||||
self.fly_to_and_wait(x, y, altitude)
|
||||
time.sleep(0.5)
|
||||
|
||||
print("\n[OK] Circle pattern complete!")
|
||||
|
||||
|
||||
def main(args=None):
|
||||
import sys
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Simple Drone Controller")
|
||||
parser.add_argument("--pattern", "-p", choices=["square", "circle", "hover"],
|
||||
default="square", help="Flight pattern")
|
||||
parser.add_argument("--altitude", "-a", type=float, default=5.0,
|
||||
help="Flight altitude (meters)")
|
||||
parser.add_argument("--size", "-s", type=float, default=5.0,
|
||||
help="Pattern size/radius (meters)")
|
||||
parser.add_argument("--connection", "-c", default=None,
|
||||
help="MAVLink connection string")
|
||||
args = parser.parse_args()
|
||||
|
||||
use_ardupilot = '--ardupilot' in sys.argv or '-a' in sys.argv
|
||||
filtered_args = [a for a in (args or sys.argv) if a not in ['--ardupilot', '-a']]
|
||||
print("\n" + "=" * 50)
|
||||
print(" Simple Drone Controller")
|
||||
print("=" * 50)
|
||||
print(f" Pattern: {args.pattern}")
|
||||
print(f" Altitude: {args.altitude}m")
|
||||
print(f" Size: {args.size}m")
|
||||
print("=" * 50 + "\n")
|
||||
|
||||
rclpy.init(args=filtered_args)
|
||||
controller = None
|
||||
drone = SimpleDroneController(args.connection)
|
||||
|
||||
if not drone.connect():
|
||||
print("\n[ERROR] Could not connect to SITL")
|
||||
print("Make sure sim_vehicle.py is running")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
controller = DroneController(use_ardupilot_topics=use_ardupilot)
|
||||
rclpy.spin(controller)
|
||||
# Setup
|
||||
print("\n--- SETUP ---")
|
||||
drone.set_mode("GUIDED")
|
||||
time.sleep(1)
|
||||
|
||||
if not drone.arm():
|
||||
print("[ERROR] Could not arm")
|
||||
sys.exit(1)
|
||||
|
||||
# Takeoff
|
||||
print("\n--- TAKEOFF ---")
|
||||
if not drone.takeoff(args.altitude):
|
||||
print("[WARN] Takeoff may have failed")
|
||||
|
||||
time.sleep(2) # Stabilize
|
||||
|
||||
# Fly pattern
|
||||
print("\n--- FLY PATTERN ---")
|
||||
if args.pattern == "square":
|
||||
drone.fly_square(size=args.size, altitude=args.altitude)
|
||||
elif args.pattern == "circle":
|
||||
drone.fly_circle(radius=args.size, altitude=args.altitude)
|
||||
elif args.pattern == "hover":
|
||||
print("[INFO] Hovering for 10 seconds...")
|
||||
time.sleep(10)
|
||||
|
||||
# Land
|
||||
print("\n--- LAND ---")
|
||||
drone.land()
|
||||
|
||||
# Wait for landing
|
||||
print("[INFO] Waiting for landing...")
|
||||
for i in range(100):
|
||||
drone.update_state()
|
||||
if drone.altitude < 0.3:
|
||||
print("[OK] Landed!")
|
||||
break
|
||||
print(f"\r Altitude: {drone.altitude:.1f}m", end="")
|
||||
time.sleep(0.1)
|
||||
|
||||
print("\n\n[OK] Mission complete!")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print('\nShutting down...')
|
||||
finally:
|
||||
if controller:
|
||||
controller.destroy_node()
|
||||
if rclpy.ok():
|
||||
rclpy.shutdown()
|
||||
print("\n\n[INFO] Interrupted - Landing...")
|
||||
drone.land()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user