#!/usr/bin/env python3 """ Autonomous UAV Controller using pymavlink. Connects directly to ArduPilot SITL and controls the drone autonomously. No ROS/MAVROS required. Uses GUIDED_NOGPS mode for GPS-denied flight simulation while keeping GPS enabled in EKF for geofence safety. """ import time import math import argparse from pymavlink import mavutil class AutonomousController: """Autonomous UAV controller using pymavlink.""" # ArduCopter mode IDs COPTER_MODES = { 'STABILIZE': 0, 'ACRO': 1, 'ALT_HOLD': 2, 'AUTO': 3, 'GUIDED': 4, 'LOITER': 5, 'RTL': 6, 'CIRCLE': 7, 'LAND': 9, 'GUIDED_NOGPS': 20, } def __init__(self, connection_string='tcp:127.0.0.1:5760'): """Initialize connection to ArduPilot.""" print(f"[CTRL] Connecting to {connection_string}...") self.mav = mavutil.mavlink_connection(connection_string) # Wait for heartbeat print("[CTRL] Waiting for heartbeat...") self.mav.wait_heartbeat() print(f"[CTRL] Connected! System {self.mav.target_system} Component {self.mav.target_component}") self.home_position = None self.altitude = 0 self.position = {"x": 0, "y": 0, "z": 0} self.armed = False # Request data streams self.mav.mav.request_data_stream_send( self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_DATA_STREAM_ALL, 4, # 4 Hz 1 # Start ) def update_state(self): """Update drone state from MAVLink messages.""" while True: msg = self.mav.recv_match(blocking=False) if msg is None: break msg_type = msg.get_type() if msg_type == "HEARTBEAT": self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0 elif msg_type == "LOCAL_POSITION_NED": self.position = {"x": msg.x, "y": msg.y, "z": msg.z} self.altitude = -msg.z # NED: down is positive elif msg_type == "STATUSTEXT": text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore') text = text.strip() if text: print(f"[SITL] {text}") def set_param(self, param_name, value): """Set a parameter value.""" param_bytes = param_name.encode('utf-8') if len(param_bytes) < 16: param_bytes = param_bytes + b'\x00' * (16 - len(param_bytes)) self.mav.mav.param_set_send( self.mav.target_system, self.mav.target_component, param_bytes, float(value), mavutil.mavlink.MAV_PARAM_TYPE_REAL32 ) time.sleep(0.3) def setup_for_flight(self): """Configure ArduPilot for simulated GPS-denied flight. GPS stays enabled in EKF for geofence safety, but we use GUIDED_NOGPS mode which doesn't require GPS for control. """ print("[CTRL] Configuring for simulated GPS-denied flight...") # Disable arming checks self.set_param("ARMING_CHECK", 0) # Keep GPS enabled in EKF for geofence self.set_param("EK3_SRC1_POSXY", 3) # GPS self.set_param("EK3_SRC1_VELXY", 3) # GPS self.set_param("EK3_SRC1_POSZ", 1) # Baro # Setup geofence self.set_param("FENCE_ENABLE", 1) self.set_param("FENCE_TYPE", 3) # Alt + Circle self.set_param("FENCE_ACTION", 2) # Land on breach self.set_param("FENCE_ALT_MAX", 15) self.set_param("FENCE_RADIUS", 30) print("[CTRL] Setup complete (GPS enabled for geofence, using GUIDED_NOGPS for control)") time.sleep(1) def wait_for_ekf(self, timeout=60): """Wait for EKF to initialize.""" print("[CTRL] Waiting for EKF initialization...") start_time = time.time() while time.time() - start_time < timeout: msg = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=1) if msg: text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore') text = text.strip() if text: print(f"[SITL] {text}") if 'EKF' in text and 'active' in text.lower(): print("[CTRL] EKF is active!") time.sleep(2) return True if 'EKF3 IMU0 initialised' in text: print("[CTRL] EKF initialized") time.sleep(2) return True if 'ArduPilot Ready' in text: print("[CTRL] ArduPilot is ready") time.sleep(2) return True print("[CTRL] EKF timeout - continuing anyway...") return True def set_mode(self, mode): """Set flight mode using MAV_CMD_DO_SET_MODE.""" mode_upper = mode.upper() if mode_upper not in self.COPTER_MODES: print(f"[CTRL] Unknown mode: {mode}") return False mode_id = self.COPTER_MODES[mode_upper] print(f"[CTRL] Setting mode to {mode_upper} (id={mode_id})...") self.mav.mav.command_long_send( self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_CMD_DO_SET_MODE, 0, mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, mode_id, 0, 0, 0, 0, 0 ) # Wait and verify time.sleep(1) for _ in range(10): self.update_state() time.sleep(0.2) print(f"[CTRL] Mode set to {mode_upper}") return True def arm(self): """Arm the vehicle with force.""" print("[CTRL] Arming...") for attempt in range(5): # Clear pending messages while self.mav.recv_match(blocking=False): pass print(f"[CTRL] Arm attempt {attempt + 1}/5...") self.mav.mav.command_long_send( self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, 0, 1, # Arm 21196, # Force arm magic number 0, 0, 0, 0, 0 ) # Wait for result start_time = time.time() while time.time() - start_time < 3.0: msg = self.mav.recv_match(type=['COMMAND_ACK', 'HEARTBEAT', 'STATUSTEXT'], blocking=True, timeout=0.5) if msg is None: continue msg_type = msg.get_type() if msg_type == 'STATUSTEXT': text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore') if text.strip(): print(f"[SITL] {text.strip()}") elif msg_type == 'HEARTBEAT': if msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED: print("[CTRL] Armed successfully!") self.armed = True return True elif msg_type == 'COMMAND_ACK': if msg.command == mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM: if msg.result == 0: print("[CTRL] Arm command accepted") time.sleep(0.5) self.update_state() if self.armed: return True else: print(f"[CTRL] Arm rejected: result={msg.result}") time.sleep(1) print("[CTRL] Arm failed after 5 attempts") return False def send_attitude_thrust(self, roll, pitch, yaw, thrust): """Send attitude and thrust command for GUIDED_NOGPS mode.""" # Convert euler to quaternion cy = math.cos(yaw * 0.5) sy = math.sin(yaw * 0.5) cp = math.cos(pitch * 0.5) sp = math.sin(pitch * 0.5) cr = math.cos(roll * 0.5) sr = math.sin(roll * 0.5) q = [ cr * cp * cy + sr * sp * sy, # w sr * cp * cy - cr * sp * sy, # x cr * sp * cy + sr * cp * sy, # y cr * cp * sy - sr * sp * cy # z ] # Use attitude + thrust, ignore body rates type_mask = 0b00000111 self.mav.mav.set_attitude_target_send( 0, self.mav.target_system, self.mav.target_component, type_mask, q, 0, 0, 0, # Body rates (ignored) thrust ) def send_velocity_ned(self, vx, vy, vz): """Send velocity command in NED frame.""" type_mask = ( (1 << 0) | (1 << 1) | (1 << 2) | # Ignore position (1 << 6) | (1 << 7) | (1 << 8) | # Ignore acceleration (1 << 10) | (1 << 11) # Ignore yaw ) self.mav.mav.set_position_target_local_ned_send( 0, self.mav.target_system, self.mav.target_component, mavutil.mavlink.MAV_FRAME_LOCAL_NED, type_mask, 0, 0, 0, # Position (ignored) vx, vy, vz, # Velocity NED 0, 0, 0, # Acceleration (ignored) 0, 0 # Yaw, yaw_rate ) def takeoff(self, altitude=5.0): """Take off using attitude+thrust commands (works in GUIDED_NOGPS).""" print(f"[CTRL] Taking off to {altitude}m using thrust commands...") hover_thrust = 0.6 max_thrust = 0.85 thrust = 0.5 start_time = time.time() timeout = 30 while time.time() - start_time < timeout: self.update_state() if self.altitude >= altitude * 0.9: self.send_attitude_thrust(0, 0, 0, hover_thrust) print(f"\n[CTRL] Reached {self.altitude:.1f}m") time.sleep(0.5) return True # Ramp up thrust if self.altitude < 0.5 and thrust < max_thrust: thrust = min(thrust + 0.01, max_thrust) elif self.altitude > 0.5: thrust = 0.75 self.send_attitude_thrust(0, 0, 0, thrust) print(f"\r[CTRL] Climbing... {self.altitude:.1f}m / {altitude:.1f}m (thrust={thrust:.2f})", end="") time.sleep(0.1) self.send_attitude_thrust(0, 0, 0, hover_thrust) print(f"\n[CTRL] Takeoff timeout at {self.altitude:.1f}m") return False def fly_to(self, target_x, target_y, altitude, tolerance=1.0, timeout=30): """Fly to position using velocity commands.""" print(f"[CTRL] Flying to ({target_x:.1f}, {target_y:.1f}, {altitude:.1f}m)") for i in range(int(timeout * 10)): self.update_state() dx = target_x - self.position["x"] dy = target_y - self.position["y"] dz = altitude - self.altitude dist = math.sqrt(dx*dx + dy*dy) if dist < tolerance and abs(dz) < tolerance: self.send_velocity_ned(0, 0, 0) print(f"\n[CTRL] Reached ({self.position['x']:.1f}, {self.position['y']:.1f})") return True speed = min(2.0, max(0.5, dist * 0.5)) if dist > 0.1: vx = (dx / dist) * speed vy = (dy / dist) * speed else: vx, vy = 0, 0 vz = -1.0 if dz > 0.5 else (0.5 if dz < -0.5 else -dz) self.send_velocity_ned(vx, vy, vz) if i % 10 == 0: print(f"\r[CTRL] Pos: ({self.position['x']:.1f}, {self.position['y']:.1f}) Alt: {self.altitude:.1f}m Dist: {dist:.1f}m", end="") time.sleep(0.1) self.send_velocity_ned(0, 0, 0) print(f"\n[CTRL] Timeout reaching waypoint") return False def land(self): """Land the vehicle.""" print("[CTRL] Landing...") self.set_mode("LAND") start_time = time.time() while time.time() - start_time < 60: self.update_state() print(f"\r[CTRL] Landing... Alt: {self.altitude:.1f}m", end="") if self.altitude < 0.3: print("\n[CTRL] Landed!") return True time.sleep(0.2) return False def run_hover_mission(self, altitude=5.0, duration=30): """Hover mission.""" print(f"\n[MISSION] Hover at {altitude}m for {duration}s") self.setup_for_flight() if not self.wait_for_ekf(): return False # Try GUIDED_NOGPS first, fallback to GUIDED if not self.set_mode('GUIDED_NOGPS'): self.set_mode('GUIDED') if not self.arm(): return False if not self.takeoff(altitude): return False print(f"[MISSION] Hovering for {duration} seconds...") start = time.time() while time.time() - start < duration: self.update_state() self.send_attitude_thrust(0, 0, 0, 0.6) # Maintain hover time.sleep(0.1) self.land() print("[MISSION] Hover mission complete!") return True def run_square_mission(self, altitude=5.0, side=5.0): """Fly a square pattern.""" print(f"\n[MISSION] Square pattern: {side}m sides at {altitude}m") self.setup_for_flight() if not self.wait_for_ekf(): return False if not self.set_mode('GUIDED_NOGPS'): self.set_mode('GUIDED') if not self.arm(): return False if not self.takeoff(altitude): return False self.update_state() start_x = self.position["x"] start_y = self.position["y"] waypoints = [ (start_x + side, start_y), (start_x + side, start_y + side), (start_x, start_y + side), (start_x, start_y), ] for i, (x, y) in enumerate(waypoints): print(f"\n[MISSION] Waypoint {i+1}/4") self.fly_to(x, y, altitude) time.sleep(1) self.land() print("[MISSION] Square mission complete!") return True def run_circle_mission(self, altitude=5.0, radius=5.0, points=8): """Fly a circular pattern.""" print(f"\n[MISSION] Circle pattern: {radius}m radius at {altitude}m") self.setup_for_flight() if not self.wait_for_ekf(): return False if not self.set_mode('GUIDED_NOGPS'): self.set_mode('GUIDED') if not self.arm(): return False if not self.takeoff(altitude): return False self.update_state() center_x = self.position["x"] center_y = self.position["y"] for i in range(points + 1): angle = 2 * math.pi * i / points x = center_x + radius * math.cos(angle) y = center_y + radius * math.sin(angle) print(f"\n[MISSION] Point {i+1}/{points+1}") self.fly_to(x, y, altitude) time.sleep(0.5) self.land() print("[MISSION] Circle mission complete!") return True def main(): parser = argparse.ArgumentParser(description='Autonomous UAV Controller') parser.add_argument('--connection', default='tcp:127.0.0.1:5760', help='MAVLink connection string') parser.add_argument('--mission', choices=['hover', 'square', 'circle'], default='hover', help='Mission type') parser.add_argument('--altitude', type=float, default=5.0, help='Flight altitude in meters') parser.add_argument('--size', type=float, default=5.0, help='Mission size (side length or radius)') parser.add_argument('--duration', type=float, default=30.0, help='Hover duration in seconds') args = parser.parse_args() print("=" * 50) print(" Autonomous UAV Controller") print("=" * 50) print(f" Connection: {args.connection}") print(f" Mission: {args.mission}") print(f" Altitude: {args.altitude}m") print("=" * 50) print() try: controller = AutonomousController(args.connection) if args.mission == 'hover': controller.run_hover_mission(args.altitude, args.duration) elif args.mission == 'square': controller.run_square_mission(args.altitude, args.size) elif args.mission == 'circle': controller.run_circle_mission(args.altitude, args.size) except KeyboardInterrupt: print("\n[CTRL] Interrupted by user") except Exception as e: print(f"[ERROR] {e}") import traceback traceback.print_exc() if __name__ == '__main__': main()