#!/usr/bin/env python3 """ Drone Controller - Takeoff, Fly Pattern, Land ===================================================== A minimal controller that demonstrates drone movement in simulation. Usage: Terminal 1: ./scripts/run_ardupilot_sim.sh runway Terminal 2: sim_vehicle.py -v ArduCopter -f gazebo-iris --model JSON --console Terminal 3: python scripts/run_ardupilot.py """ import sys import os import time import math import argparse sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) try: from pymavlink import mavutil except ImportError: print("ERROR: pymavlink not installed") print("Run: pip install pymavlink") sys.exit(1) from config import MAVLINK class DroneController: """Drone controller with takeoff, fly pattern, and land.""" def __init__(self, connection_string=None): self.connection_string = connection_string or MAVLINK["connection_string"] self.mav = None self.armed = False self.mode = "UNKNOWN" self.altitude = 0 self.position = {"x": 0, "y": 0, "z": 0} self.vehicle_type = "copter" # Default to copter def connect(self, timeout=30): """Connect to ArduPilot SITL.""" print(f"[INFO] Connecting to {self.connection_string}...") try: self.mav = mavutil.mavlink_connection(self.connection_string) msg = self.mav.wait_heartbeat(timeout=timeout) print(f"[OK] Connected to system {self.mav.target_system}") # Detect vehicle type from heartbeat if msg: mav_type = msg.type if mav_type == mavutil.mavlink.MAV_TYPE_QUADROTOR: self.vehicle_type = "copter" print(f"[INFO] Vehicle: Quadrotor (ArduCopter)") elif mav_type == mavutil.mavlink.MAV_TYPE_HELICOPTER: self.vehicle_type = "copter" print(f"[INFO] Vehicle: Helicopter") elif mav_type == mavutil.mavlink.MAV_TYPE_FIXED_WING: self.vehicle_type = "plane" print(f"[INFO] Vehicle: Fixed Wing (ArduPlane)") else: self.vehicle_type = "copter" # Default to copter print(f"[INFO] Vehicle type: {mav_type}") return True except Exception as e: print(f"[ERROR] Connection failed: {e}") return False def update_state(self): """Update drone state from MAVLink messages.""" while True: msg = self.mav.recv_match(blocking=False) if msg is None: break msg_type = msg.get_type() if msg_type == "HEARTBEAT": self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0 try: self.mode = mavutil.mode_string_v10(msg) except: self.mode = str(msg.custom_mode) elif msg_type == "LOCAL_POSITION_NED": self.position = {"x": msg.x, "y": msg.y, "z": msg.z} self.altitude = -msg.z # NED: down is positive def set_mode(self, mode_name): """Set flight mode.""" mode_upper = mode_name.upper() # ArduCopter mode IDs (hardcoded as fallback since pymavlink can be buggy) copter_modes = { 'STABILIZE': 0, 'ACRO': 1, 'ALT_HOLD': 2, 'AUTO': 3, 'GUIDED': 4, 'LOITER': 5, 'RTL': 6, 'CIRCLE': 7, 'LAND': 9, 'DRIFT': 11, 'SPORT': 13, 'FLIP': 14, 'AUTOTUNE': 15, 'POSHOLD': 16, 'BRAKE': 17, 'THROW': 18, 'AVOID_ADSB': 19, 'GUIDED_NOGPS': 20, 'SMART_RTL': 21, } # Try copter modes first if we detected a copter if hasattr(self, 'vehicle_type') and self.vehicle_type == 'copter': if mode_upper in copter_modes: mode_id = copter_modes[mode_upper] else: print(f"[ERROR] Unknown copter mode: {mode_name}") print(f"[INFO] Available modes: {list(copter_modes.keys())}") return False else: # Fall back to pymavlink mode mapping mode_map = self.mav.mode_mapping() if mode_upper not in mode_map: # Try copter modes anyway if mode_upper in copter_modes: mode_id = copter_modes[mode_upper] else: print(f"[ERROR] Unknown mode: {mode_name}") return False else: mode_id = mode_map[mode_upper] print(f"[INFO] Setting mode to {mode_upper} (id={mode_id})...") # Send mode change using MAV_CMD_DO_SET_MODE self.mav.mav.command_long_send( self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_CMD_DO_SET_MODE, 0, mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, mode_id, 0, 0, 0, 0, 0 ) # Wait and verify time.sleep(1) for _ in range(10): self.update_state() if mode_upper in self.mode.upper(): print(f"[OK] Mode: {self.mode}") return True time.sleep(0.2) print(f"[OK] Mode set (current: {self.mode})") return True def set_param(self, param_name, value): """Set a parameter value.""" # Pad param name to 16 bytes param_bytes = param_name.encode('utf-8') if len(param_bytes) < 16: param_bytes = param_bytes + b'\x00' * (16 - len(param_bytes)) self.mav.mav.param_set_send( self.mav.target_system, self.mav.target_component, param_bytes, float(value), mavutil.mavlink.MAV_PARAM_TYPE_REAL32 ) # Wait for parameter to be acknowledged time.sleep(0.5) def setup_gps_denied(self): """Configure for GPS-denied operation.""" print("[INFO] Configuring for GPS-denied operation...") # Disable all arming checks for testing self.set_param("ARMING_CHECK", 0) # Configure EKF3 for non-GPS operation # These parameters tell the EKF to not require GPS self.set_param("EK3_SRC1_POSXY", 0) # 0 = None (no GPS for horizontal position) self.set_param("EK3_SRC1_VELXY", 0) # 0 = None (no GPS for horizontal velocity) self.set_param("EK3_SRC1_POSZ", 1) # 1 = Baro (use barometer for altitude) print("[OK] GPS-denied parameters set") # Wait longer for parameters to take effect time.sleep(1.0) def arm(self): """Force arm the drone (bypasses pre-arm checks).""" print("[INFO] Arming...") # Result code meanings for COMMAND_ACK result_names = { 0: "ACCEPTED", 1: "TEMPORARILY_REJECTED", 2: "DENIED", 3: "UNSUPPORTED", 4: "FAILED", 5: "IN_PROGRESS", 6: "CANCELLED", } for attempt in range(5): # Clear any pending messages while self.mav.recv_match(blocking=False): pass # Send force arm command with magic number 21196 print(f"[INFO] Sending arm command (attempt {attempt + 1}/5)...") self.mav.mav.command_long_send( self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, 0, # confirmation 1, # param1: 1 = arm 21196, # param2: force arm magic number 0, 0, 0, 0, 0 # params 3-7 unused ) # Wait for COMMAND_ACK with timeout ack_received = False start_time = time.time() while time.time() - start_time < 3.0: # 3 second timeout msg = self.mav.recv_match(type=['COMMAND_ACK', 'HEARTBEAT'], blocking=True, timeout=0.5) if msg is None: continue if msg.get_type() == 'HEARTBEAT': self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0 if self.armed: print("[OK] Armed successfully!") return True elif msg.get_type() == 'COMMAND_ACK': if msg.command == mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM: ack_received = True result_name = result_names.get(msg.result, f"UNKNOWN({msg.result})") if msg.result == 0: # ACCEPTED print(f"[OK] Arm command accepted") # Give it a moment to actually arm time.sleep(0.5) self.update_state() if self.armed: print("[OK] Armed successfully!") return True else: print(f"[WARN] Arm command result: {result_name}") # Try to get more info from statustext break if not ack_received: print("[WARN] No ACK received for arm command") # Check for any STATUSTEXT messages that might explain the failure for _ in range(10): msg = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=0.1) if msg: print(f"[SITL] {msg.text}") # Update state and check if we're armed self.update_state() if self.armed: print("[OK] Armed!") return True time.sleep(0.5) print("[ERROR] Failed to arm after 5 attempts") print("[TIP] In MAVProxy, try: arm throttle force") print("[TIP] Check MAVProxy console for pre-arm failure messages") return False def takeoff(self, altitude=5.0): """Takeoff to altitude.""" print(f"[INFO] Taking off to {altitude}m...") self.mav.mav.command_long_send( self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, 0, 0, 0, 0, 0, 0, 0, altitude ) # Wait for altitude for i in range(100): # 10 seconds max self.update_state() if self.altitude > altitude * 0.9: print(f"[OK] Reached {self.altitude:.1f}m") return True print(f"\r Climbing... {self.altitude:.1f}m", end="") time.sleep(0.1) print(f"\n[WARN] Takeoff timeout at {self.altitude:.1f}m") return False def goto(self, x, y, z): """Go to position (NED frame, z is down so negative = up).""" print(f"[INFO] Going to ({x:.1f}, {y:.1f}, alt={-z:.1f}m)...") self.mav.mav.set_position_target_local_ned_send( 0, self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_FRAME_LOCAL_NED, 0b0000111111111000, # Position only x, y, z, # Position (NED) 0, 0, 0, # Velocity 0, 0, 0, # Acceleration 0, 0 # Yaw, yaw_rate ) def fly_to_and_wait(self, x, y, altitude, tolerance=0.5, timeout=30): """Fly to position and wait until reached.""" z = -altitude # NED self.goto(x, y, z) for i in range(int(timeout * 10)): self.update_state() dx = x - self.position["x"] dy = y - self.position["y"] dist = math.sqrt(dx*dx + dy*dy) if dist < tolerance: print(f"[OK] Reached waypoint ({x:.1f}, {y:.1f})") return True if i % 10 == 0: print(f"\r Distance: {dist:.1f}m", end="") time.sleep(0.1) print(f"\n[WARN] Timeout reaching waypoint") return False def land(self): """Land the drone.""" print("[INFO] Landing...") return self.set_mode("LAND") def fly_square(self, size=5, altitude=5): """Fly a square pattern.""" waypoints = [ (size, 0), (size, size), (0, size), (0, 0), ] print(f"\n[INFO] Flying square pattern ({size}m x {size}m)") for i, (x, y) in enumerate(waypoints): print(f"\n--- Waypoint {i+1}/4 ---") self.fly_to_and_wait(x, y, altitude) time.sleep(1) print("\n[OK] Square pattern complete!") def fly_circle(self, radius=5, altitude=5, points=8): """Fly a circular pattern.""" print(f"\n[INFO] Flying circle pattern (radius={radius}m)") for i in range(points + 1): angle = 2 * math.pi * i / points x = radius * math.cos(angle) y = radius * math.sin(angle) print(f"\n--- Point {i+1}/{points+1} ---") self.fly_to_and_wait(x, y, altitude) time.sleep(0.5) print("\n[OK] Circle pattern complete!") def main(): parser = argparse.ArgumentParser(description="Simple Drone Controller - GPS Denied") parser.add_argument("--pattern", "-p", choices=["square", "circle", "hover"], default="square", help="Flight pattern") parser.add_argument("--altitude", "-a", type=float, default=5.0, help="Flight altitude (meters)") parser.add_argument("--size", "-s", type=float, default=5.0, help="Pattern size/radius (meters)") parser.add_argument("--connection", "-c", default=None, help="MAVLink connection string") parser.add_argument("--gps", action="store_true", help="Use GPS mode (default is GPS-denied)") args = parser.parse_args() gps_mode = "with GPS" if args.gps else "GPS-DENIED" print("\n" + "=" * 50) print(" Drone Controller") print("=" * 50) print(f" Mode: {gps_mode}") print(f" Pattern: {args.pattern}") print(f" Altitude: {args.altitude}m") print(f" Size: {args.size}m") print("=" * 50 + "\n") drone = DroneController(args.connection) if not drone.connect(): print("\n[ERROR] Could not connect to SITL") print("Make sure sim_vehicle.py is running") sys.exit(1) try: # Setup print("\n--- SETUP ---") # Configure for GPS-denied if needed if not args.gps: drone.setup_gps_denied() # Try GUIDED_NOGPS first (ArduCopter), fallback to GUIDED if not drone.set_mode("GUIDED_NOGPS"): print("[INFO] GUIDED_NOGPS not available, using GUIDED") drone.set_mode("GUIDED") else: drone.set_mode("GUIDED") time.sleep(1) if not drone.arm(): print("[ERROR] Could not arm") print("[TIP] In MAVProxy console, try: arm throttle force") sys.exit(1) # Takeoff print("\n--- TAKEOFF ---") if not drone.takeoff(args.altitude): print("[WARN] Takeoff may have failed") time.sleep(2) # Stabilize # Fly pattern print("\n--- FLY PATTERN ---") if args.pattern == "square": drone.fly_square(size=args.size, altitude=args.altitude) elif args.pattern == "circle": drone.fly_circle(radius=args.size, altitude=args.altitude) elif args.pattern == "hover": print("[INFO] Hovering for 10 seconds...") time.sleep(10) # Land print("\n--- LAND ---") drone.land() # Wait for landing print("[INFO] Waiting for landing...") for i in range(100): drone.update_state() if drone.altitude < 0.3: print("[OK] Landed!") break print(f"\r Altitude: {drone.altitude:.1f}m", end="") time.sleep(0.1) print("\n\n[OK] Mission complete!") except KeyboardInterrupt: print("\n\n[INFO] Interrupted - Landing...") drone.land() if __name__ == "__main__": main()