#!/usr/bin/env python3 """ ArduPilot Drone Controller ========================== Connects your drone_controller.py logic to ArduPilot SITL via MAVLink. Usage: Terminal 1: ./scripts/run_ardupilot_sim.sh runway Terminal 2: sim_vehicle.py -v ArduCopter -f gazebo-iris --model JSON --console Terminal 3: python run_ardupilot.py This script: 1. Connects to ArduPilot SITL via MAVLink 2. Gets telemetry (position, attitude, velocity) 3. Runs your drone_controller.calculate_landing_maneuver() 4. Sends velocity commands to ArduPilot """ import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import time import math import argparse try: from pymavlink import mavutil except ImportError: print("ERROR: pymavlink not installed") print("Run: pip install pymavlink") sys.exit(1) from config import ARDUPILOT, CONTROLLER, MAVLINK from src.drone_controller import DroneController class ArduPilotInterface: """Interface to ArduPilot SITL via MAVLink.""" def __init__(self, connection_string=None): self.connection_string = connection_string or MAVLINK["connection_string"] self.mav = None self.armed = False self.mode = "UNKNOWN" # Telemetry state self.position = {"x": 0, "y": 0, "z": 0} self.velocity = {"x": 0, "y": 0, "z": 0} self.attitude = {"roll": 0, "pitch": 0, "yaw": 0} self.altitude = 0 self.vertical_velocity = 0 def connect(self, timeout=30): """Connect to ArduPilot SITL.""" print(f"[INFO] Connecting to {self.connection_string}...") try: self.mav = mavutil.mavlink_connection(self.connection_string) self.mav.wait_heartbeat(timeout=timeout) print(f"[OK] Connected to system {self.mav.target_system}") return True except Exception as e: print(f"[ERROR] Connection failed: {e}") return False def update_telemetry(self): """Read and update telemetry from ArduPilot.""" # Read all available messages while True: msg = self.mav.recv_match(blocking=False) if msg is None: break msg_type = msg.get_type() if msg_type == "HEARTBEAT": self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0 # Get mode name if hasattr(mavutil.mavlink, 'enums'): mode_map = mavutil.mavlink.enums.get('COPTER_MODE', {}) self.mode = mode_map.get(msg.custom_mode, {}).get('name', str(msg.custom_mode)) elif msg_type == "LOCAL_POSITION_NED": self.position = {"x": msg.x, "y": msg.y, "z": msg.z} self.velocity = {"x": msg.vx, "y": msg.vy, "z": msg.vz} self.altitude = -msg.z # NED: down is positive self.vertical_velocity = -msg.vz elif msg_type == "ATTITUDE": self.attitude = { "roll": msg.roll, "pitch": msg.pitch, "yaw": msg.yaw } def get_telemetry(self): """Get telemetry in the same format as standalone simulation.""" self.update_telemetry() return { "imu": { "orientation": self.attitude, "angular_velocity": {"x": 0, "y": 0, "z": 0} # Not available via MAVLink easily }, "altimeter": { "altitude": self.altitude, "vertical_velocity": self.vertical_velocity }, "velocity": self.velocity, "landing_pad": None, # Not available in ArduPilot mode - use vision "position": self.position, # Extra: actual position (GPS-like) "armed": self.armed, "mode": self.mode } def set_mode(self, mode_name): """Set flight mode.""" mode_map = self.mav.mode_mapping() if mode_name.upper() not in mode_map: print(f"[WARN] Unknown mode: {mode_name}") return False mode_id = mode_map[mode_name.upper()] self.mav.set_mode(mode_id) print(f"[OK] Mode set to {mode_name}") return True def arm(self, force=True): """Arm motors.""" if force: # Force arm (bypass checks) self.mav.mav.command_long_send( self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, 0, 1, 21196, 0, 0, 0, 0, 0 # 21196 = force arm ) else: self.mav.arducopter_arm() time.sleep(1) self.update_telemetry() if self.armed: print("[OK] Armed") return self.armed def disarm(self): """Disarm motors.""" self.mav.arducopter_disarm() print("[OK] Disarmed") def takeoff(self, altitude=5.0): """Takeoff to specified altitude.""" self.mav.mav.command_long_send( self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, 0, 0, 0, 0, 0, 0, 0, altitude ) print(f"[OK] Takeoff to {altitude}m") def send_velocity(self, vx, vy, vz, yaw_rate=0): """Send velocity command in body frame. Args: vx: Forward velocity (m/s) vy: Right velocity (m/s) vz: Down velocity (m/s, positive = descend) yaw_rate: Yaw rate (rad/s) """ # Type mask: use velocity only type_mask = ( mavutil.mavlink.POSITION_TARGET_TYPEMASK_X_IGNORE | mavutil.mavlink.POSITION_TARGET_TYPEMASK_Y_IGNORE | mavutil.mavlink.POSITION_TARGET_TYPEMASK_Z_IGNORE | mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE | mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE | mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE | mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE ) self.mav.mav.set_position_target_local_ned_send( 0, # time_boot_ms self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_FRAME_BODY_NED, type_mask, 0, 0, 0, # position (ignored) vx, vy, vz, # velocity 0, 0, 0, # acceleration (ignored) 0, yaw_rate # yaw, yaw_rate ) def main(): parser = argparse.ArgumentParser(description="ArduPilot Drone Controller") parser.add_argument("--connection", "-c", default=None, help="MAVLink connection string (default: from config.py)") parser.add_argument("--takeoff-alt", "-a", type=float, default=5.0, help="Takeoff altitude (meters)") parser.add_argument("--no-takeoff", action="store_true", help="Don't auto takeoff, just control") args = parser.parse_args() # Create ArduPilot interface ardupilot = ArduPilotInterface(args.connection) # Connect if not ardupilot.connect(): print("[ERROR] Could not connect to ArduPilot") print("Make sure sim_vehicle.py is running") sys.exit(1) # Create drone controller controller = DroneController() print("\n" + "=" * 50) print(" ArduPilot GPS-Denied Controller") print("=" * 50) print(f"Mode: {ardupilot.mode}") print(f"Armed: {ardupilot.armed}") print("") if not args.no_takeoff: # Set GUIDED mode and arm print("[INFO] Setting up for takeoff...") ardupilot.set_mode("GUIDED") time.sleep(1) ardupilot.arm(force=True) time.sleep(1) if ardupilot.armed: ardupilot.takeoff(args.takeoff_alt) print(f"[INFO] Taking off to {args.takeoff_alt}m...") # Wait for takeoff for _ in range(50): # 5 seconds telemetry = ardupilot.get_telemetry() if telemetry["altimeter"]["altitude"] > args.takeoff_alt * 0.9: print("[OK] Reached target altitude") break time.sleep(0.1) print("\n[INFO] Starting controller loop...") print("[INFO] Press Ctrl+C to stop\n") rate = 20 # Hz try: while True: # Get telemetry telemetry = ardupilot.get_telemetry() # Run your controller thrust, pitch, roll, yaw = controller.calculate_landing_maneuver( telemetry, rover_telemetry=None # No rover in ArduPilot mode yet ) # Convert control outputs to velocities # thrust -> vertical velocity (negative = up) # pitch -> forward velocity # roll -> right velocity vz = -thrust * 2.0 # Scale factor vx = pitch * 2.0 vy = roll * 2.0 yaw_rate = yaw * 1.0 # Send velocity command ardupilot.send_velocity(vx, vy, vz, yaw_rate) # Print status alt = telemetry["altimeter"]["altitude"] mode = telemetry.get("mode", "?") print(f"\rAlt: {alt:5.1f}m | Mode: {mode:10} | Cmd: vx={vx:+.2f} vy={vy:+.2f} vz={vz:+.2f}", end="") time.sleep(1.0 / rate) except KeyboardInterrupt: print("\n\n[INFO] Stopping...") ardupilot.set_mode("LAND") print("[OK] Landing mode set") if __name__ == "__main__": main()