352 lines
12 KiB
Python
352 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ArduPilot Drone Controller
|
|
==========================
|
|
Connects your drone_controller.py logic to ArduPilot SITL via MAVLink.
|
|
|
|
Usage:
|
|
Terminal 1: ./scripts/run_ardupilot_sim.sh runway
|
|
Terminal 2: sim_vehicle.py -v ArduCopter -f gazebo-iris --model JSON --console
|
|
Terminal 3: python run_ardupilot.py
|
|
|
|
This script:
|
|
1. Connects to ArduPilot SITL via MAVLink
|
|
2. Gets telemetry (position, attitude, velocity)
|
|
3. Runs your drone_controller.calculate_landing_maneuver()
|
|
4. Sends velocity commands to ArduPilot
|
|
"""
|
|
import sys
|
|
import os
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import time
|
|
import math
|
|
import argparse
|
|
|
|
try:
|
|
from pymavlink import mavutil
|
|
except ImportError:
|
|
print("ERROR: pymavlink not installed")
|
|
print("Run: pip install pymavlink")
|
|
sys.exit(1)
|
|
|
|
from config import ARDUPILOT, CONTROLLER, MAVLINK
|
|
|
|
|
|
class ArduPilotInterface:
|
|
"""Interface to ArduPilot SITL via MAVLink."""
|
|
|
|
def __init__(self, connection_string=None):
|
|
self.connection_string = connection_string or MAVLINK["connection_string"]
|
|
self.mav = None
|
|
self.armed = False
|
|
self.mode = "UNKNOWN"
|
|
|
|
# Telemetry state
|
|
self.position = {"x": 0, "y": 0, "z": 0}
|
|
self.velocity = {"x": 0, "y": 0, "z": 0}
|
|
self.attitude = {"roll": 0, "pitch": 0, "yaw": 0}
|
|
self.altitude = 0
|
|
self.vertical_velocity = 0
|
|
|
|
def connect(self, timeout=30):
|
|
"""Connect to ArduPilot SITL."""
|
|
print(f"[INFO] Connecting to {self.connection_string}...")
|
|
|
|
try:
|
|
self.mav = mavutil.mavlink_connection(self.connection_string)
|
|
self.mav.wait_heartbeat(timeout=timeout)
|
|
print(f"[OK] Connected to system {self.mav.target_system}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"[ERROR] Connection failed: {e}")
|
|
return False
|
|
|
|
def update_telemetry(self):
|
|
"""Read and update telemetry from ArduPilot."""
|
|
# Read all available messages
|
|
while True:
|
|
msg = self.mav.recv_match(blocking=False)
|
|
if msg is None:
|
|
break
|
|
|
|
msg_type = msg.get_type()
|
|
|
|
if msg_type == "HEARTBEAT":
|
|
self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0
|
|
# Get mode name using mavutil helper
|
|
try:
|
|
self.mode = mavutil.mode_string_v10(msg)
|
|
except Exception:
|
|
self.mode = str(msg.custom_mode)
|
|
|
|
elif msg_type == "LOCAL_POSITION_NED":
|
|
self.position = {"x": msg.x, "y": msg.y, "z": msg.z}
|
|
self.velocity = {"x": msg.vx, "y": msg.vy, "z": msg.vz}
|
|
self.altitude = -msg.z # NED: down is positive
|
|
self.vertical_velocity = -msg.vz
|
|
|
|
elif msg_type == "ATTITUDE":
|
|
self.attitude = {
|
|
"roll": msg.roll,
|
|
"pitch": msg.pitch,
|
|
"yaw": msg.yaw
|
|
}
|
|
|
|
def get_telemetry(self):
|
|
"""Get telemetry in the same format as standalone simulation."""
|
|
self.update_telemetry()
|
|
|
|
return {
|
|
"imu": {
|
|
"orientation": self.attitude,
|
|
"angular_velocity": {"x": 0, "y": 0, "z": 0} # Not available via MAVLink easily
|
|
},
|
|
"altimeter": {
|
|
"altitude": self.altitude,
|
|
"vertical_velocity": self.vertical_velocity
|
|
},
|
|
"velocity": self.velocity,
|
|
"landing_pad": None, # Not available in ArduPilot mode - use vision
|
|
"position": self.position, # Extra: actual position (GPS-like)
|
|
"armed": self.armed,
|
|
"mode": self.mode
|
|
}
|
|
|
|
def set_mode(self, mode_name):
|
|
"""Set flight mode."""
|
|
mode_map = self.mav.mode_mapping()
|
|
if mode_name.upper() not in mode_map:
|
|
print(f"[WARN] Unknown mode: {mode_name}")
|
|
return False
|
|
|
|
mode_id = mode_map[mode_name.upper()]
|
|
self.mav.set_mode(mode_id)
|
|
print(f"[OK] Mode set to {mode_name}")
|
|
return True
|
|
|
|
def arm(self, force=True):
|
|
"""Arm motors."""
|
|
print("[INFO] Arming...")
|
|
|
|
for attempt in range(3):
|
|
if force:
|
|
# Force arm (bypass checks)
|
|
self.mav.mav.command_long_send(
|
|
self.mav.target_system,
|
|
self.mav.target_component,
|
|
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
|
|
0, 1, 21196, 0, 0, 0, 0, 0 # 21196 = force arm
|
|
)
|
|
else:
|
|
self.mav.arducopter_arm()
|
|
|
|
# Wait and check if armed
|
|
time.sleep(1)
|
|
self.update_telemetry()
|
|
|
|
if self.armed:
|
|
print("[OK] Armed")
|
|
return True
|
|
else:
|
|
print(f"[WARN] Arm attempt {attempt + 1}/3 failed, retrying...")
|
|
|
|
print("[ERROR] Failed to arm after 3 attempts")
|
|
return False
|
|
|
|
def disarm(self):
|
|
"""Disarm motors."""
|
|
self.mav.arducopter_disarm()
|
|
print("[OK] Disarmed")
|
|
|
|
def takeoff(self, altitude=5.0):
|
|
"""Takeoff to specified altitude."""
|
|
self.mav.mav.command_long_send(
|
|
self.mav.target_system,
|
|
self.mav.target_component,
|
|
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
|
|
0, 0, 0, 0, 0, 0, 0, altitude
|
|
)
|
|
print(f"[OK] Takeoff to {altitude}m")
|
|
|
|
def send_velocity(self, vx, vy, vz, yaw_rate=0):
|
|
"""Send velocity command in body frame.
|
|
|
|
Args:
|
|
vx: Forward velocity (m/s)
|
|
vy: Right velocity (m/s)
|
|
vz: Down velocity (m/s, positive = descend)
|
|
yaw_rate: Yaw rate (rad/s)
|
|
"""
|
|
# Type mask: use velocity only
|
|
type_mask = (
|
|
mavutil.mavlink.POSITION_TARGET_TYPEMASK_X_IGNORE |
|
|
mavutil.mavlink.POSITION_TARGET_TYPEMASK_Y_IGNORE |
|
|
mavutil.mavlink.POSITION_TARGET_TYPEMASK_Z_IGNORE |
|
|
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE |
|
|
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE |
|
|
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE |
|
|
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE
|
|
)
|
|
|
|
self.mav.mav.set_position_target_local_ned_send(
|
|
0, # time_boot_ms
|
|
self.mav.target_system,
|
|
self.mav.target_component,
|
|
mavutil.mavlink.MAV_FRAME_BODY_NED,
|
|
type_mask,
|
|
0, 0, 0, # position (ignored)
|
|
vx, vy, vz, # velocity
|
|
0, 0, 0, # acceleration (ignored)
|
|
0, yaw_rate # yaw, yaw_rate
|
|
)
|
|
|
|
class SimpleController:
|
|
"""Simple landing controller that doesn't require ROS 2."""
|
|
|
|
def __init__(self):
|
|
self.Kp_z = CONTROLLER.get("Kp_z", 0.5)
|
|
self.Kd_z = CONTROLLER.get("Kd_z", 0.3)
|
|
self.Kp_xy = CONTROLLER.get("Kp_xy", 0.3)
|
|
self.Kd_xy = CONTROLLER.get("Kd_xy", 0.2)
|
|
|
|
def calculate_landing_maneuver(self, telemetry, rover_telemetry=None):
|
|
"""Calculate control outputs for landing."""
|
|
altimeter = telemetry.get('altimeter', {})
|
|
altitude = altimeter.get('altitude', 5.0)
|
|
vertical_vel = altimeter.get('vertical_velocity', 0.0)
|
|
|
|
velocity = telemetry.get('velocity', {'x': 0, 'y': 0, 'z': 0})
|
|
vel_x = velocity.get('x', 0.0)
|
|
vel_y = velocity.get('y', 0.0)
|
|
|
|
landing_pad = telemetry.get('landing_pad', None)
|
|
|
|
# Descent rate control
|
|
if altitude > 1.0:
|
|
target_descent_rate = -0.5
|
|
else:
|
|
target_descent_rate = -0.2
|
|
|
|
descent_error = target_descent_rate - vertical_vel
|
|
thrust = self.Kp_z * descent_error
|
|
|
|
# Horizontal control
|
|
if landing_pad is not None:
|
|
target_x = landing_pad.get('relative_x', 0.0)
|
|
target_y = landing_pad.get('relative_y', 0.0)
|
|
|
|
pitch = self.Kp_xy * target_x - self.Kd_xy * vel_x
|
|
roll = self.Kp_xy * target_y - self.Kd_xy * vel_y
|
|
else:
|
|
# Just dampen velocity
|
|
pitch = -self.Kd_xy * vel_x
|
|
roll = -self.Kd_xy * vel_y
|
|
|
|
yaw = 0.0
|
|
|
|
return (thrust, pitch, roll, yaw)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="ArduPilot Drone Controller")
|
|
parser.add_argument("--connection", "-c", default=None,
|
|
help="MAVLink connection string (default: from config.py)")
|
|
parser.add_argument("--takeoff-alt", "-a", type=float, default=5.0,
|
|
help="Takeoff altitude (meters)")
|
|
parser.add_argument("--no-takeoff", action="store_true",
|
|
help="Don't auto takeoff, just control")
|
|
args = parser.parse_args()
|
|
|
|
# Create ArduPilot interface
|
|
ardupilot = ArduPilotInterface(args.connection)
|
|
|
|
# Connect
|
|
if not ardupilot.connect():
|
|
print("[ERROR] Could not connect to ArduPilot")
|
|
print("Make sure sim_vehicle.py is running")
|
|
sys.exit(1)
|
|
|
|
# Create simple controller (no ROS 2 required)
|
|
controller = SimpleController()
|
|
|
|
print("\n" + "=" * 50)
|
|
print(" ArduPilot GPS-Denied Controller")
|
|
print("=" * 50)
|
|
print(f"Mode: {ardupilot.mode}")
|
|
print(f"Armed: {ardupilot.armed}")
|
|
print("")
|
|
|
|
if not args.no_takeoff:
|
|
# Set GUIDED mode and arm
|
|
print("[INFO] Setting up for takeoff...")
|
|
ardupilot.set_mode("GUIDED")
|
|
time.sleep(2) # Give mode time to change
|
|
|
|
# Update telemetry to get current mode
|
|
ardupilot.update_telemetry()
|
|
print(f"[INFO] Current mode: {ardupilot.mode}")
|
|
|
|
if not ardupilot.arm(force=True):
|
|
print("[ERROR] Could not arm - continuing in manual mode")
|
|
print("[INFO] Use MAVProxy to arm: arm throttle force")
|
|
else:
|
|
ardupilot.takeoff(args.takeoff_alt)
|
|
print(f"[INFO] Taking off to {args.takeoff_alt}m...")
|
|
|
|
# Wait for takeoff (up to 15 seconds)
|
|
for i in range(150):
|
|
telemetry = ardupilot.get_telemetry()
|
|
alt = telemetry["altimeter"]["altitude"]
|
|
if alt > args.takeoff_alt * 0.9:
|
|
print(f"\n[OK] Reached target altitude ({alt:.1f}m)")
|
|
break
|
|
if i % 10 == 0:
|
|
print(f"\r[INFO] Climbing... {alt:.1f}m / {args.takeoff_alt}m", end="")
|
|
time.sleep(0.1)
|
|
print("")
|
|
|
|
print("\n[INFO] Starting controller loop...")
|
|
print("[INFO] Press Ctrl+C to stop\n")
|
|
|
|
rate = 20 # Hz
|
|
try:
|
|
while True:
|
|
# Get telemetry
|
|
telemetry = ardupilot.get_telemetry()
|
|
|
|
# Run your controller
|
|
thrust, pitch, roll, yaw = controller.calculate_landing_maneuver(
|
|
telemetry,
|
|
rover_telemetry=None # No rover in ArduPilot mode yet
|
|
)
|
|
|
|
# Convert control outputs to velocities
|
|
# thrust -> vertical velocity (negative = up)
|
|
# pitch -> forward velocity
|
|
# roll -> right velocity
|
|
vz = -thrust * 2.0 # Scale factor
|
|
vx = pitch * 2.0
|
|
vy = roll * 2.0
|
|
yaw_rate = yaw * 1.0
|
|
|
|
# Send velocity command
|
|
ardupilot.send_velocity(vx, vy, vz, yaw_rate)
|
|
|
|
# Print status
|
|
alt = telemetry["altimeter"]["altitude"]
|
|
mode = telemetry.get("mode", "?")
|
|
print(f"\rAlt: {alt:5.1f}m | Mode: {mode:10} | Cmd: vx={vx:+.2f} vy={vy:+.2f} vz={vz:+.2f}", end="")
|
|
|
|
time.sleep(1.0 / rate)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n[INFO] Stopping...")
|
|
ardupilot.set_mode("LAND")
|
|
print("[OK] Landing mode set")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|