Files
RDC_Simulation/mavlink_bridge.py
2026-01-04 00:24:46 +00:00

425 lines
11 KiB
Python

#!/usr/bin/env python3
"""
MAVLink Command Interface for ArduPilot.
This module provides a Python interface for sending MAVLink commands to ArduPilot
when using the official ROS 2 DDS integration. Telemetry is received via native
ROS 2 topics (/ap/*), but commands are still sent via MAVLink.
Usage:
from mavlink_bridge import MAVLinkCommander
cmd = MAVLinkCommander()
cmd.connect()
cmd.arm()
cmd.takeoff(5.0)
cmd.land()
"""
import time
import threading
from typing import Optional, Callable
from enum import Enum
try:
from pymavlink import mavutil
PYMAVLINK_AVAILABLE = True
except ImportError:
PYMAVLINK_AVAILABLE = False
print("Warning: pymavlink not installed. Install with: pip install pymavlink")
class FlightMode(Enum):
"""ArduCopter flight modes."""
STABILIZE = 0
ACRO = 1
ALT_HOLD = 2
AUTO = 3
GUIDED = 4
LOITER = 5
RTL = 6
CIRCLE = 7
LAND = 9
DRIFT = 11
SPORT = 13
FLIP = 14
AUTOTUNE = 15
POSHOLD = 16
BRAKE = 17
THROW = 18
AVOID_ADSB = 19
GUIDED_NOGPS = 20
SMART_RTL = 21
FLOWHOLD = 22
FOLLOW = 23
ZIGZAG = 24
SYSTEMID = 25
AUTOROTATE = 26
class MAVLinkCommander:
"""
MAVLink command interface for ArduPilot.
Provides methods for arming, disarming, mode changes, and flight commands.
"""
def __init__(self, connection_string: str = "udpin:127.0.0.1:14550"):
"""
Initialize MAVLink commander.
Args:
connection_string: MAVLink connection string (default for MAVProxy output)
"""
if not PYMAVLINK_AVAILABLE:
raise ImportError("pymavlink is required: pip install pymavlink")
self._connection_string = connection_string
self._mav: Optional[mavutil.mavlink_connection] = None
self._connected = False
self._armed = False
self._flight_mode = "UNKNOWN"
self._heartbeat_thread: Optional[threading.Thread] = None
self._running = False
# Callbacks
self._on_heartbeat: Optional[Callable] = None
self._on_state_change: Optional[Callable] = None
def connect(self, timeout: float = 30.0) -> bool:
"""
Connect to ArduPilot via MAVLink.
Args:
timeout: Connection timeout in seconds
Returns:
True if connected successfully
"""
print(f"Connecting to {self._connection_string}...")
try:
self._mav = mavutil.mavlink_connection(self._connection_string)
# Wait for heartbeat
heartbeat = self._mav.wait_heartbeat(timeout=timeout)
if heartbeat is None:
print("Connection failed: No heartbeat received")
return False
self._connected = True
self._running = True
# Start heartbeat thread
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_loop, daemon=True
)
self._heartbeat_thread.start()
print(f"Connected! System: {self._mav.target_system}, Component: {self._mav.target_component}")
return True
except Exception as e:
print(f"Connection failed: {e}")
return False
def disconnect(self):
"""Disconnect from ArduPilot."""
self._running = False
if self._heartbeat_thread:
self._heartbeat_thread.join(timeout=2.0)
if self._mav:
self._mav.close()
self._connected = False
print("Disconnected")
def _heartbeat_loop(self):
"""Background thread for receiving heartbeats and updating state."""
while self._running and self._mav:
try:
msg = self._mav.recv_match(type='HEARTBEAT', blocking=True, timeout=1.0)
if msg:
self._armed = msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED
self._flight_mode = mavutil.mode_string_v10(msg)
if self._on_heartbeat:
self._on_heartbeat(self._armed, self._flight_mode)
except Exception:
pass
@property
def connected(self) -> bool:
return self._connected
@property
def armed(self) -> bool:
return self._armed
@property
def flight_mode(self) -> str:
return self._flight_mode
def arm(self, force: bool = False) -> bool:
"""
Arm the vehicle.
Args:
force: Force arm (bypass preflight checks)
Returns:
True if arm command was sent
"""
if not self._connected:
print("Not connected")
return False
print("Arming...")
param = 21196.0 if force else 0.0 # Force arm magic number
self._mav.mav.command_long_send(
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0, # Confirmation
1, # Arm
param, # Force
0, 0, 0, 0, 0
)
# Wait for confirmation
time.sleep(0.5)
return True
def disarm(self, force: bool = False) -> bool:
"""
Disarm the vehicle.
Args:
force: Force disarm
Returns:
True if disarm command was sent
"""
if not self._connected:
print("Not connected")
return False
print("Disarming...")
param = 21196.0 if force else 0.0
self._mav.mav.command_long_send(
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0,
0, # Disarm
param,
0, 0, 0, 0, 0
)
time.sleep(0.5)
return True
def set_mode(self, mode: str) -> bool:
"""
Set flight mode.
Args:
mode: Mode name (GUIDED, LAND, LOITER, etc.)
Returns:
True if mode change command was sent
"""
if not self._connected:
print("Not connected")
return False
mode_upper = mode.upper()
mode_id = self._mav.mode_mapping().get(mode_upper)
if mode_id is None:
print(f"Unknown mode: {mode}")
return False
print(f"Setting mode to {mode_upper}...")
self._mav.mav.set_mode_send(
self._mav.target_system,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode_id
)
time.sleep(0.5)
return True
def takeoff(self, altitude: float = 5.0) -> bool:
"""
Command takeoff to specified altitude.
Args:
altitude: Target altitude in meters
Returns:
True if takeoff command was sent
"""
if not self._connected:
print("Not connected")
return False
print(f"Taking off to {altitude}m...")
self._mav.mav.command_long_send(
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
0,
0, 0, 0, 0, 0, 0,
altitude
)
return True
def land(self) -> bool:
"""
Command landing.
Returns:
True if land command was sent
"""
if not self._connected:
print("Not connected")
return False
print("Landing...")
self._mav.mav.command_long_send(
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_CMD_NAV_LAND,
0,
0, 0, 0, 0, 0, 0, 0
)
return True
def goto_position_ned(self, north: float, east: float, down: float) -> bool:
"""
Go to position in NED frame relative to home.
Args:
north: North position (m)
east: East position (m)
down: Down position (m, negative is up)
Returns:
True if command was sent
"""
if not self._connected:
print("Not connected")
return False
print(f"Going to NED: ({north}, {east}, {down})")
self._mav.mav.set_position_target_local_ned_send(
0, # time_boot_ms
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
0b0000111111111000, # Position only
north, east, down, # Position
0, 0, 0, # Velocity
0, 0, 0, # Acceleration
0, 0 # Yaw, yaw_rate
)
return True
def set_velocity_ned(self, vn: float, ve: float, vd: float, yaw_rate: float = 0.0) -> bool:
"""
Set velocity in NED frame.
Args:
vn: North velocity (m/s)
ve: East velocity (m/s)
vd: Down velocity (m/s, positive is down)
yaw_rate: Yaw rate (rad/s)
Returns:
True if command was sent
"""
if not self._connected:
return False
self._mav.mav.set_position_target_local_ned_send(
0, # time_boot_ms
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
0b0000011111000111, # Velocity only
0, 0, 0, # Position (ignored)
vn, ve, vd, # Velocity
0, 0, 0, # Acceleration
0, yaw_rate
)
return True
def set_param(self, param_id: str, value: float) -> bool:
"""
Set a parameter value.
Args:
param_id: Parameter name
value: Parameter value
Returns:
True if command was sent
"""
if not self._connected:
print("Not connected")
return False
print(f"Setting {param_id} = {value}")
self._mav.mav.param_set_send(
self._mav.target_system,
self._mav.target_component,
param_id.encode('utf-8'),
value,
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
)
return True
def main():
"""Example usage of MAVLinkCommander."""
if not PYMAVLINK_AVAILABLE:
print("pymavlink not installed")
return
cmd = MAVLinkCommander()
if not cmd.connect(timeout=10.0):
print("Failed to connect")
return
print(f"Armed: {cmd.armed}")
print(f"Mode: {cmd.flight_mode}")
# Example commands (uncomment to use):
# cmd.set_mode("GUIDED")
# cmd.arm()
# cmd.takeoff(5.0)
# time.sleep(10)
# cmd.land()
input("Press Enter to disconnect...")
cmd.disconnect()
if __name__ == '__main__':
main()