#!/usr/bin/env python3 """ MAVLink Command Interface for ArduPilot. This module provides a Python interface for sending MAVLink commands to ArduPilot when using the official ROS 2 DDS integration. Telemetry is received via native ROS 2 topics (/ap/*), but commands are still sent via MAVLink. Usage: from mavlink_bridge import MAVLinkCommander cmd = MAVLinkCommander() cmd.connect() cmd.arm() cmd.takeoff(5.0) cmd.land() """ import time import threading from typing import Optional, Callable from enum import Enum try: from pymavlink import mavutil PYMAVLINK_AVAILABLE = True except ImportError: PYMAVLINK_AVAILABLE = False print("Warning: pymavlink not installed. Install with: pip install pymavlink") class FlightMode(Enum): """ArduCopter flight modes.""" STABILIZE = 0 ACRO = 1 ALT_HOLD = 2 AUTO = 3 GUIDED = 4 LOITER = 5 RTL = 6 CIRCLE = 7 LAND = 9 DRIFT = 11 SPORT = 13 FLIP = 14 AUTOTUNE = 15 POSHOLD = 16 BRAKE = 17 THROW = 18 AVOID_ADSB = 19 GUIDED_NOGPS = 20 SMART_RTL = 21 FLOWHOLD = 22 FOLLOW = 23 ZIGZAG = 24 SYSTEMID = 25 AUTOROTATE = 26 class MAVLinkCommander: """ MAVLink command interface for ArduPilot. Provides methods for arming, disarming, mode changes, and flight commands. """ def __init__(self, connection_string: str = "udpin:127.0.0.1:14550"): """ Initialize MAVLink commander. Args: connection_string: MAVLink connection string (default for MAVProxy output) """ if not PYMAVLINK_AVAILABLE: raise ImportError("pymavlink is required: pip install pymavlink") self._connection_string = connection_string self._mav: Optional[mavutil.mavlink_connection] = None self._connected = False self._armed = False self._flight_mode = "UNKNOWN" self._heartbeat_thread: Optional[threading.Thread] = None self._running = False # Callbacks self._on_heartbeat: Optional[Callable] = None self._on_state_change: Optional[Callable] = None def connect(self, timeout: float = 30.0) -> bool: """ Connect to ArduPilot via MAVLink. Args: timeout: Connection timeout in seconds Returns: True if connected successfully """ print(f"Connecting to {self._connection_string}...") try: self._mav = mavutil.mavlink_connection(self._connection_string) # Wait for heartbeat heartbeat = self._mav.wait_heartbeat(timeout=timeout) if heartbeat is None: print("Connection failed: No heartbeat received") return False self._connected = True self._running = True # Start heartbeat thread self._heartbeat_thread = threading.Thread( target=self._heartbeat_loop, daemon=True ) self._heartbeat_thread.start() print(f"Connected! System: {self._mav.target_system}, Component: {self._mav.target_component}") return True except Exception as e: print(f"Connection failed: {e}") return False def disconnect(self): """Disconnect from ArduPilot.""" self._running = False if self._heartbeat_thread: self._heartbeat_thread.join(timeout=2.0) if self._mav: self._mav.close() self._connected = False print("Disconnected") def _heartbeat_loop(self): """Background thread for receiving heartbeats and updating state.""" while self._running and self._mav: try: msg = self._mav.recv_match(type='HEARTBEAT', blocking=True, timeout=1.0) if msg: self._armed = msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED self._flight_mode = mavutil.mode_string_v10(msg) if self._on_heartbeat: self._on_heartbeat(self._armed, self._flight_mode) except Exception: pass @property def connected(self) -> bool: return self._connected @property def armed(self) -> bool: return self._armed @property def flight_mode(self) -> str: return self._flight_mode def arm(self, force: bool = False) -> bool: """ Arm the vehicle. Args: force: Force arm (bypass preflight checks) Returns: True if arm command was sent """ if not self._connected: print("Not connected") return False print("Arming...") param = 21196.0 if force else 0.0 # Force arm magic number self._mav.mav.command_long_send( self._mav.target_system, self._mav.target_component, mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, 0, # Confirmation 1, # Arm param, # Force 0, 0, 0, 0, 0 ) # Wait for confirmation time.sleep(0.5) return True def disarm(self, force: bool = False) -> bool: """ Disarm the vehicle. Args: force: Force disarm Returns: True if disarm command was sent """ if not self._connected: print("Not connected") return False print("Disarming...") param = 21196.0 if force else 0.0 self._mav.mav.command_long_send( self._mav.target_system, self._mav.target_component, mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, 0, 0, # Disarm param, 0, 0, 0, 0, 0 ) time.sleep(0.5) return True def set_mode(self, mode: str) -> bool: """ Set flight mode. Args: mode: Mode name (GUIDED, LAND, LOITER, etc.) Returns: True if mode change command was sent """ if not self._connected: print("Not connected") return False mode_upper = mode.upper() mode_id = self._mav.mode_mapping().get(mode_upper) if mode_id is None: print(f"Unknown mode: {mode}") return False print(f"Setting mode to {mode_upper}...") self._mav.mav.set_mode_send( self._mav.target_system, mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, mode_id ) time.sleep(0.5) return True def takeoff(self, altitude: float = 5.0) -> bool: """ Command takeoff to specified altitude. Args: altitude: Target altitude in meters Returns: True if takeoff command was sent """ if not self._connected: print("Not connected") return False print(f"Taking off to {altitude}m...") self._mav.mav.command_long_send( self._mav.target_system, self._mav.target_component, mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, 0, 0, 0, 0, 0, 0, 0, altitude ) return True def land(self) -> bool: """ Command landing. Returns: True if land command was sent """ if not self._connected: print("Not connected") return False print("Landing...") self._mav.mav.command_long_send( self._mav.target_system, self._mav.target_component, mavutil.mavlink.MAV_CMD_NAV_LAND, 0, 0, 0, 0, 0, 0, 0, 0 ) return True def goto_position_ned(self, north: float, east: float, down: float) -> bool: """ Go to position in NED frame relative to home. Args: north: North position (m) east: East position (m) down: Down position (m, negative is up) Returns: True if command was sent """ if not self._connected: print("Not connected") return False print(f"Going to NED: ({north}, {east}, {down})") self._mav.mav.set_position_target_local_ned_send( 0, # time_boot_ms self._mav.target_system, self._mav.target_component, mavutil.mavlink.MAV_FRAME_LOCAL_NED, 0b0000111111111000, # Position only north, east, down, # Position 0, 0, 0, # Velocity 0, 0, 0, # Acceleration 0, 0 # Yaw, yaw_rate ) return True def set_velocity_ned(self, vn: float, ve: float, vd: float, yaw_rate: float = 0.0) -> bool: """ Set velocity in NED frame. Args: vn: North velocity (m/s) ve: East velocity (m/s) vd: Down velocity (m/s, positive is down) yaw_rate: Yaw rate (rad/s) Returns: True if command was sent """ if not self._connected: return False self._mav.mav.set_position_target_local_ned_send( 0, # time_boot_ms self._mav.target_system, self._mav.target_component, mavutil.mavlink.MAV_FRAME_LOCAL_NED, 0b0000011111000111, # Velocity only 0, 0, 0, # Position (ignored) vn, ve, vd, # Velocity 0, 0, 0, # Acceleration 0, yaw_rate ) return True def set_param(self, param_id: str, value: float) -> bool: """ Set a parameter value. Args: param_id: Parameter name value: Parameter value Returns: True if command was sent """ if not self._connected: print("Not connected") return False print(f"Setting {param_id} = {value}") self._mav.mav.param_set_send( self._mav.target_system, self._mav.target_component, param_id.encode('utf-8'), value, mavutil.mavlink.MAV_PARAM_TYPE_REAL32 ) return True def main(): """Example usage of MAVLinkCommander.""" if not PYMAVLINK_AVAILABLE: print("pymavlink not installed") return cmd = MAVLinkCommander() if not cmd.connect(timeout=10.0): print("Failed to connect") return print(f"Armed: {cmd.armed}") print(f"Mode: {cmd.flight_mode}") # Example commands (uncomment to use): # cmd.set_mode("GUIDED") # cmd.arm() # cmd.takeoff(5.0) # time.sleep(10) # cmd.land() input("Press Enter to disconnect...") cmd.disconnect() if __name__ == '__main__': main()