Ardupilot Controller Script
This commit is contained in:
455
run_ardupilot.py
455
run_ardupilot.py
@@ -1,235 +1,284 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ArduPilot ROS 2 Launcher - Official DDS Integration.
|
||||
|
||||
This script provides a convenient way to launch the ArduPilot SITL simulation
|
||||
using the official ardupilot_gz packages with ROS 2 DDS support.
|
||||
ArduPilot Drone Controller
|
||||
==========================
|
||||
Connects your drone_controller.py logic to ArduPilot SITL via MAVLink.
|
||||
|
||||
Usage:
|
||||
python run_ardupilot.py # Launch Iris on runway
|
||||
python run_ardupilot.py --world maze # Launch Iris in maze
|
||||
python run_ardupilot.py --vehicle rover # Launch WildThumper rover
|
||||
python run_ardupilot.py --mavproxy # Also start MAVProxy
|
||||
Terminal 1: ./scripts/run_ardupilot_sim.sh runway
|
||||
Terminal 2: sim_vehicle.py -v ArduCopter -f gazebo-iris --model JSON --console
|
||||
Terminal 3: python run_ardupilot.py
|
||||
|
||||
Prerequisites:
|
||||
- ArduPilot ROS 2 packages installed (./setup/install_ardupilot.sh)
|
||||
- ROS 2 Humble/Jazzy sourced
|
||||
- ~/ardu_ws workspace built and sourced
|
||||
This script:
|
||||
1. Connects to ArduPilot SITL via MAVLink
|
||||
2. Gets telemetry (position, attitude, velocity)
|
||||
3. Runs your drone_controller.calculate_landing_maneuver()
|
||||
4. Sends velocity commands to ArduPilot
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
import math
|
||||
import argparse
|
||||
|
||||
try:
|
||||
from pymavlink import mavutil
|
||||
except ImportError:
|
||||
print("ERROR: pymavlink not installed")
|
||||
print("Run: pip install pymavlink")
|
||||
sys.exit(1)
|
||||
|
||||
from config import ARDUPILOT, CONTROLLER, MAVLINK
|
||||
from drone_controller import DroneController
|
||||
|
||||
|
||||
def check_ros2():
|
||||
"""Check if ROS 2 is available."""
|
||||
try:
|
||||
subprocess.run(['ros2', '--help'], capture_output=True, check=True)
|
||||
return True
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
return False
|
||||
|
||||
|
||||
def check_ardupilot_packages():
|
||||
"""Check if ArduPilot ROS 2 packages are installed."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['ros2', 'pkg', 'list'],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True
|
||||
)
|
||||
packages = result.stdout
|
||||
return 'ardupilot_gz_bringup' in packages or 'ardupilot_sitl' in packages
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
return False
|
||||
|
||||
|
||||
def source_workspace():
|
||||
"""Source the ArduPilot workspace."""
|
||||
ardu_ws = os.path.expanduser("~/ardu_ws")
|
||||
setup_bash = os.path.join(ardu_ws, "install", "setup.bash")
|
||||
class ArduPilotInterface:
|
||||
"""Interface to ArduPilot SITL via MAVLink."""
|
||||
|
||||
if os.path.exists(setup_bash):
|
||||
# Update environment by sourcing the workspace
|
||||
# This is done by running commands in a sourced shell
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_launch_command(world: str, vehicle: str) -> list:
|
||||
"""Get the appropriate launch command."""
|
||||
|
||||
launch_files = {
|
||||
# Copter configurations
|
||||
'runway': ('ardupilot_gz_bringup', 'iris_runway.launch.py'),
|
||||
'maze': ('ardupilot_gz_bringup', 'iris_maze.launch.py'),
|
||||
'iris': ('ardupilot_gz_bringup', 'iris_runway.launch.py'),
|
||||
def __init__(self, connection_string=None):
|
||||
self.connection_string = connection_string or MAVLINK["connection_string"]
|
||||
self.mav = None
|
||||
self.armed = False
|
||||
self.mode = "UNKNOWN"
|
||||
|
||||
# Rover configurations
|
||||
'rover': ('ardupilot_gz_bringup', 'wildthumper_playpen.launch.py'),
|
||||
'wildthumper': ('ardupilot_gz_bringup', 'wildthumper_playpen.launch.py'),
|
||||
# Telemetry state
|
||||
self.position = {"x": 0, "y": 0, "z": 0}
|
||||
self.velocity = {"x": 0, "y": 0, "z": 0}
|
||||
self.attitude = {"roll": 0, "pitch": 0, "yaw": 0}
|
||||
self.altitude = 0
|
||||
self.vertical_velocity = 0
|
||||
|
||||
def connect(self, timeout=30):
|
||||
"""Connect to ArduPilot SITL."""
|
||||
print(f"[INFO] Connecting to {self.connection_string}...")
|
||||
|
||||
# SITL only (no Gazebo)
|
||||
'sitl': ('ardupilot_sitl', 'sitl_dds_udp.launch.py'),
|
||||
}
|
||||
|
||||
if vehicle == 'rover':
|
||||
key = 'rover'
|
||||
else:
|
||||
key = world if world in launch_files else 'runway'
|
||||
|
||||
package, launch_file = launch_files.get(key, launch_files['runway'])
|
||||
|
||||
cmd = ['ros2', 'launch', package, launch_file]
|
||||
|
||||
# Add SITL-specific parameters if using sitl_dds_udp
|
||||
if launch_file == 'sitl_dds_udp.launch.py':
|
||||
cmd.extend([
|
||||
'transport:=udp4',
|
||||
'synthetic_clock:=True',
|
||||
'model:=quad' if vehicle == 'copter' else 'rover',
|
||||
'speedup:=1',
|
||||
])
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
def launch_mavproxy(master_port: int = 14550):
|
||||
"""Launch MAVProxy in a new terminal."""
|
||||
mavproxy_cmd = f"mavproxy.py --console --map --master=:{master_port}"
|
||||
|
||||
# Try different terminal emulators
|
||||
terminals = [
|
||||
['gnome-terminal', '--', 'bash', '-c', mavproxy_cmd],
|
||||
['xterm', '-e', mavproxy_cmd],
|
||||
['konsole', '-e', mavproxy_cmd],
|
||||
]
|
||||
|
||||
for term_cmd in terminals:
|
||||
try:
|
||||
subprocess.Popen(term_cmd)
|
||||
self.mav = mavutil.mavlink_connection(self.connection_string)
|
||||
self.mav.wait_heartbeat(timeout=timeout)
|
||||
print(f"[OK] Connected to system {self.mav.target_system}")
|
||||
return True
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Connection failed: {e}")
|
||||
return False
|
||||
|
||||
print(f"[WARN] Could not open terminal for MAVProxy")
|
||||
print(f"[INFO] Run manually: {mavproxy_cmd}")
|
||||
return False
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Launch ArduPilot SITL with ROS 2 and Gazebo',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
python run_ardupilot.py # Iris on runway
|
||||
python run_ardupilot.py --world maze # Iris in maze
|
||||
python run_ardupilot.py --vehicle rover # WildThumper rover
|
||||
python run_ardupilot.py --mavproxy # With MAVProxy
|
||||
|
||||
Available worlds: runway, maze, sitl (no Gazebo)
|
||||
Available vehicles: copter, rover
|
||||
def update_telemetry(self):
|
||||
"""Read and update telemetry from ArduPilot."""
|
||||
# Read all available messages
|
||||
while True:
|
||||
msg = self.mav.recv_match(blocking=False)
|
||||
if msg is None:
|
||||
break
|
||||
|
||||
msg_type = msg.get_type()
|
||||
|
||||
if msg_type == "HEARTBEAT":
|
||||
self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0
|
||||
# Get mode name
|
||||
if hasattr(mavutil.mavlink, 'enums'):
|
||||
mode_map = mavutil.mavlink.enums.get('COPTER_MODE', {})
|
||||
self.mode = mode_map.get(msg.custom_mode, {}).get('name', str(msg.custom_mode))
|
||||
|
||||
elif msg_type == "LOCAL_POSITION_NED":
|
||||
self.position = {"x": msg.x, "y": msg.y, "z": msg.z}
|
||||
self.velocity = {"x": msg.vx, "y": msg.vy, "z": msg.vz}
|
||||
self.altitude = -msg.z # NED: down is positive
|
||||
self.vertical_velocity = -msg.vz
|
||||
|
||||
elif msg_type == "ATTITUDE":
|
||||
self.attitude = {
|
||||
"roll": msg.roll,
|
||||
"pitch": msg.pitch,
|
||||
"yaw": msg.yaw
|
||||
}
|
||||
|
||||
def get_telemetry(self):
|
||||
"""Get telemetry in the same format as standalone simulation."""
|
||||
self.update_telemetry()
|
||||
|
||||
return {
|
||||
"imu": {
|
||||
"orientation": self.attitude,
|
||||
"angular_velocity": {"x": 0, "y": 0, "z": 0} # Not available via MAVLink easily
|
||||
},
|
||||
"altimeter": {
|
||||
"altitude": self.altitude,
|
||||
"vertical_velocity": self.vertical_velocity
|
||||
},
|
||||
"velocity": self.velocity,
|
||||
"landing_pad": None, # Not available in ArduPilot mode - use vision
|
||||
"position": self.position, # Extra: actual position (GPS-like)
|
||||
"armed": self.armed,
|
||||
"mode": self.mode
|
||||
}
|
||||
|
||||
def set_mode(self, mode_name):
|
||||
"""Set flight mode."""
|
||||
mode_map = self.mav.mode_mapping()
|
||||
if mode_name.upper() not in mode_map:
|
||||
print(f"[WARN] Unknown mode: {mode_name}")
|
||||
return False
|
||||
|
||||
mode_id = mode_map[mode_name.upper()]
|
||||
self.mav.set_mode(mode_id)
|
||||
print(f"[OK] Mode set to {mode_name}")
|
||||
return True
|
||||
|
||||
def arm(self, force=True):
|
||||
"""Arm motors."""
|
||||
if force:
|
||||
# Force arm (bypass checks)
|
||||
self.mav.mav.command_long_send(
|
||||
self.mav.target_system,
|
||||
self.mav.target_component,
|
||||
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
|
||||
0, 1, 21196, 0, 0, 0, 0, 0 # 21196 = force arm
|
||||
)
|
||||
else:
|
||||
self.mav.arducopter_arm()
|
||||
|
||||
time.sleep(1)
|
||||
self.update_telemetry()
|
||||
if self.armed:
|
||||
print("[OK] Armed")
|
||||
return self.armed
|
||||
|
||||
def disarm(self):
|
||||
"""Disarm motors."""
|
||||
self.mav.arducopter_disarm()
|
||||
print("[OK] Disarmed")
|
||||
|
||||
def takeoff(self, altitude=5.0):
|
||||
"""Takeoff to specified altitude."""
|
||||
self.mav.mav.command_long_send(
|
||||
self.mav.target_system,
|
||||
self.mav.target_component,
|
||||
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
|
||||
0, 0, 0, 0, 0, 0, 0, altitude
|
||||
)
|
||||
print(f"[OK] Takeoff to {altitude}m")
|
||||
|
||||
def send_velocity(self, vx, vy, vz, yaw_rate=0):
|
||||
"""Send velocity command in body frame.
|
||||
|
||||
Args:
|
||||
vx: Forward velocity (m/s)
|
||||
vy: Right velocity (m/s)
|
||||
vz: Down velocity (m/s, positive = descend)
|
||||
yaw_rate: Yaw rate (rad/s)
|
||||
"""
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--world', '-w', type=str, default='runway',
|
||||
choices=['runway', 'maze', 'sitl'],
|
||||
help='Simulation world (default: runway)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--vehicle', '-v', type=str, default='copter',
|
||||
choices=['copter', 'rover'],
|
||||
help='Vehicle type (default: copter)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--mavproxy', '-m', action='store_true',
|
||||
help='Also launch MAVProxy in a new terminal'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--mavproxy-port', type=int, default=14550,
|
||||
help='MAVProxy master port (default: 14550)'
|
||||
)
|
||||
|
||||
return parser.parse_args()
|
||||
# Type mask: use velocity only
|
||||
type_mask = (
|
||||
mavutil.mavlink.POSITION_TARGET_TYPEMASK_X_IGNORE |
|
||||
mavutil.mavlink.POSITION_TARGET_TYPEMASK_Y_IGNORE |
|
||||
mavutil.mavlink.POSITION_TARGET_TYPEMASK_Z_IGNORE |
|
||||
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE |
|
||||
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE |
|
||||
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE |
|
||||
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE
|
||||
)
|
||||
|
||||
self.mav.mav.set_position_target_local_ned_send(
|
||||
0, # time_boot_ms
|
||||
self.mav.target_system,
|
||||
self.mav.target_component,
|
||||
mavutil.mavlink.MAV_FRAME_BODY_NED,
|
||||
type_mask,
|
||||
0, 0, 0, # position (ignored)
|
||||
vx, vy, vz, # velocity
|
||||
0, 0, 0, # acceleration (ignored)
|
||||
0, yaw_rate # yaw, yaw_rate
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
parser = argparse.ArgumentParser(description="ArduPilot Drone Controller")
|
||||
parser.add_argument("--connection", "-c", default=None,
|
||||
help="MAVLink connection string (default: from config.py)")
|
||||
parser.add_argument("--takeoff-alt", "-a", type=float, default=5.0,
|
||||
help="Takeoff altitude (meters)")
|
||||
parser.add_argument("--no-takeoff", action="store_true",
|
||||
help="Don't auto takeoff, just control")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("=" * 60)
|
||||
print(" ArduPilot SITL + Gazebo (Official ROS 2 DDS)")
|
||||
print("=" * 60)
|
||||
print()
|
||||
# Create ArduPilot interface
|
||||
ardupilot = ArduPilotInterface(args.connection)
|
||||
|
||||
# Check ROS 2
|
||||
if not check_ros2():
|
||||
print("[ERROR] ROS 2 not found!")
|
||||
print("Please source ROS 2:")
|
||||
print(" source /opt/ros/humble/setup.bash")
|
||||
return 1
|
||||
# Connect
|
||||
if not ardupilot.connect():
|
||||
print("[ERROR] Could not connect to ArduPilot")
|
||||
print("Make sure sim_vehicle.py is running")
|
||||
sys.exit(1)
|
||||
|
||||
print("[OK] ROS 2 available")
|
||||
# Create drone controller
|
||||
controller = DroneController()
|
||||
|
||||
# Check ArduPilot packages
|
||||
if not check_ardupilot_packages():
|
||||
print("[ERROR] ArduPilot ROS 2 packages not found!")
|
||||
print()
|
||||
print("Please install ArduPilot ROS 2:")
|
||||
print(" ./setup/install_ardupilot.sh")
|
||||
print()
|
||||
print("Then source the workspace:")
|
||||
print(" source ~/ardu_ws/install/setup.bash")
|
||||
return 1
|
||||
print("\n" + "=" * 50)
|
||||
print(" ArduPilot GPS-Denied Controller")
|
||||
print("=" * 50)
|
||||
print(f"Mode: {ardupilot.mode}")
|
||||
print(f"Armed: {ardupilot.armed}")
|
||||
print("")
|
||||
|
||||
print("[OK] ArduPilot ROS 2 packages found")
|
||||
if not args.no_takeoff:
|
||||
# Set GUIDED mode and arm
|
||||
print("[INFO] Setting up for takeoff...")
|
||||
ardupilot.set_mode("GUIDED")
|
||||
time.sleep(1)
|
||||
|
||||
ardupilot.arm(force=True)
|
||||
time.sleep(1)
|
||||
|
||||
if ardupilot.armed:
|
||||
ardupilot.takeoff(args.takeoff_alt)
|
||||
print(f"[INFO] Taking off to {args.takeoff_alt}m...")
|
||||
|
||||
# Wait for takeoff
|
||||
for _ in range(50): # 5 seconds
|
||||
telemetry = ardupilot.get_telemetry()
|
||||
if telemetry["altimeter"]["altitude"] > args.takeoff_alt * 0.9:
|
||||
print("[OK] Reached target altitude")
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
# Get launch command
|
||||
launch_cmd = get_launch_command(args.world, args.vehicle)
|
||||
print("\n[INFO] Starting controller loop...")
|
||||
print("[INFO] Press Ctrl+C to stop\n")
|
||||
|
||||
print()
|
||||
print(f"World: {args.world}")
|
||||
print(f"Vehicle: {args.vehicle}")
|
||||
print(f"Launch: {' '.join(launch_cmd)}")
|
||||
print()
|
||||
|
||||
# Launch MAVProxy if requested
|
||||
if args.mavproxy:
|
||||
print("[INFO] Starting MAVProxy...")
|
||||
# Delay to let SITL start first
|
||||
time.sleep(2)
|
||||
launch_mavproxy(args.mavproxy_port)
|
||||
|
||||
print("Starting simulation...")
|
||||
print("Press Ctrl+C to stop.")
|
||||
print()
|
||||
print("-" * 60)
|
||||
|
||||
# Handle Ctrl+C gracefully
|
||||
def signal_handler(sig, frame):
|
||||
print("\nShutting down...")
|
||||
sys.exit(0)
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
# Run the launch command
|
||||
rate = 20 # Hz
|
||||
try:
|
||||
subprocess.run(launch_cmd, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
print(f"[ERROR] Launch failed: {e}")
|
||||
return 1
|
||||
while True:
|
||||
# Get telemetry
|
||||
telemetry = ardupilot.get_telemetry()
|
||||
|
||||
# Run your controller
|
||||
thrust, pitch, roll, yaw = controller.calculate_landing_maneuver(
|
||||
telemetry,
|
||||
rover_telemetry=None # No rover in ArduPilot mode yet
|
||||
)
|
||||
|
||||
# Convert control outputs to velocities
|
||||
# thrust -> vertical velocity (negative = up)
|
||||
# pitch -> forward velocity
|
||||
# roll -> right velocity
|
||||
vz = -thrust * 2.0 # Scale factor
|
||||
vx = pitch * 2.0
|
||||
vy = roll * 2.0
|
||||
yaw_rate = yaw * 1.0
|
||||
|
||||
# Send velocity command
|
||||
ardupilot.send_velocity(vx, vy, vz, yaw_rate)
|
||||
|
||||
# Print status
|
||||
alt = telemetry["altimeter"]["altitude"]
|
||||
mode = telemetry.get("mode", "?")
|
||||
print(f"\rAlt: {alt:5.1f}m | Mode: {mode:10} | Cmd: vx={vx:+.2f} vy={vy:+.2f} vz={vz:+.2f}", end="")
|
||||
|
||||
time.sleep(1.0 / rate)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nShutdown complete.")
|
||||
|
||||
return 0
|
||||
print("\n\n[INFO] Stopping...")
|
||||
ardupilot.set_mode("LAND")
|
||||
print("[OK] Landing mode set")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user