Controller Update 4
This commit is contained in:
@@ -1,73 +1,46 @@
|
|||||||
# ArduPilot Parameters for Simulation
|
# ArduPilot Parameters for GPS-Denied Navigation Simulation
|
||||||
# Uses GPS from Gazebo for position estimation
|
# =========================================================
|
||||||
|
# GPS is ENABLED for: Geofence safety, LOCAL_POSITION_NED telemetry
|
||||||
|
# GPS is SIMULATED DENIED for: Control (using GUIDED_NOGPS mode)
|
||||||
|
|
||||||
# ====================
|
# ====================
|
||||||
# GPS Configuration
|
# GPS Configuration
|
||||||
# ====================
|
# ====================
|
||||||
# Enable GPS (Gazebo provides simulated GPS)
|
# Enable GPS (needed for geofence and position telemetry)
|
||||||
GPS_TYPE 1 # Auto-detect
|
GPS_TYPE 1 # Auto-detect
|
||||||
GPS_TYPE2 0
|
|
||||||
|
|
||||||
# ====================
|
# ====================
|
||||||
# Geofence Settings
|
# Geofence Settings
|
||||||
# ====================
|
# ====================
|
||||||
# Disable geofence for simulation (avoids position requirement issues)
|
# Safety cage - uses GPS for boundaries
|
||||||
FENCE_ENABLE 0
|
FENCE_ENABLE 1
|
||||||
FENCE_TYPE 3 # Alt + Circle
|
FENCE_TYPE 3 # Alt + Circle
|
||||||
FENCE_ACTION 2 # Land on breach
|
FENCE_ACTION 2 # Land on breach
|
||||||
FENCE_ALT_MAX 50 # Maximum altitude (meters)
|
FENCE_ALT_MAX 10 # Maximum altitude (meters)
|
||||||
FENCE_RADIUS 100 # Circular fence radius (meters)
|
FENCE_RADIUS 20 # Circular fence radius (meters)
|
||||||
FENCE_MARGIN 2 # Margin inside fence for warning
|
FENCE_MARGIN 2 # Margin inside fence for warning
|
||||||
|
|
||||||
# ====================
|
# ====================
|
||||||
# EKF Configuration
|
# EKF Configuration
|
||||||
# ====================
|
# ====================
|
||||||
# Use EKF3 with GPS
|
# Use EKF3 with GPS (for telemetry)
|
||||||
AHRS_EKF_TYPE 3
|
AHRS_EKF_TYPE 3
|
||||||
EK3_ENABLE 1
|
EK3_ENABLE 1
|
||||||
EK2_ENABLE 0
|
EK2_ENABLE 0
|
||||||
|
|
||||||
# EKF3 Source Configuration - Use GPS
|
# EKF3 Source - GPS for position (enables LOCAL_POSITION_NED)
|
||||||
EK3_SRC1_POSXY 3 # GPS for XY position
|
EK3_SRC1_POSXY 3 # GPS for XY position
|
||||||
EK3_SRC1_POSZ 1 # Barometer for Z position
|
EK3_SRC1_POSZ 1 # Barometer for Z position
|
||||||
EK3_SRC1_VELXY 3 # GPS for XY velocity
|
EK3_SRC1_VELXY 3 # GPS for XY velocity
|
||||||
EK3_SRC1_VELZ 0 # None for Z velocity
|
EK3_SRC1_VELZ 0 # None for Z velocity
|
||||||
EK3_SRC1_YAW 1 # Compass for yaw
|
EK3_SRC1_YAW 1 # Compass for yaw
|
||||||
|
|
||||||
# EKF3 Position Innovation Gate
|
|
||||||
EK3_POS_I_GATE 300 # Larger gate
|
|
||||||
|
|
||||||
# ====================
|
|
||||||
# Vision Position Input
|
|
||||||
# ====================
|
|
||||||
# Disable vision odometry (not using for simulation)
|
|
||||||
VISO_TYPE 0 # Disabled
|
|
||||||
VISO_POS_X 0.0
|
|
||||||
VISO_POS_Y 0.0
|
|
||||||
VISO_POS_Z 0.0
|
|
||||||
VISO_ORIENT 0
|
|
||||||
VISO_DELAY_MS 50
|
|
||||||
|
|
||||||
|
|
||||||
# ====================
|
|
||||||
# Optical Flow
|
|
||||||
# ====================
|
|
||||||
# Disable optical flow (using GPS instead)
|
|
||||||
FLOW_TYPE 0 # Disabled
|
|
||||||
|
|
||||||
# ====================
|
|
||||||
# Rangefinder
|
|
||||||
# ====================
|
|
||||||
# Disable rangefinder (using barometer + GPS)
|
|
||||||
RNGFND1_TYPE 0 # Disabled
|
|
||||||
|
|
||||||
# ====================
|
# ====================
|
||||||
# Failsafe Settings
|
# Failsafe Settings
|
||||||
# ====================
|
# ====================
|
||||||
FS_EKF_ACTION 1 # Land on EKF failsafe
|
FS_EKF_ACTION 1 # Land on EKF failsafe
|
||||||
FS_EKF_THRESH 0.8 # EKF failsafe threshold
|
FS_EKF_THRESH 0.8 # EKF failsafe threshold
|
||||||
FS_VIBE_ENABLE 0 # Disable vibration failsafe
|
FS_GPS_ENABLE 0 # Disable GPS failsafe (we simulate GPS-denied)
|
||||||
FS_GPS_ENABLE 0 # Disable GPS failsafe for simulation
|
|
||||||
|
|
||||||
# ====================
|
# ====================
|
||||||
# Flight Modes
|
# Flight Modes
|
||||||
@@ -82,12 +55,10 @@ FLTMODE6 9 # Land
|
|||||||
# ====================
|
# ====================
|
||||||
# Arming Checks
|
# Arming Checks
|
||||||
# ====================
|
# ====================
|
||||||
# Disable arming checks for simulation testing
|
ARMING_CHECK 0 # Disable all checks (for simulation)
|
||||||
ARMING_CHECK 0 # Disable all checks (for sim only)
|
|
||||||
|
|
||||||
# ====================
|
# ====================
|
||||||
# Logging
|
# Logging
|
||||||
# ====================
|
# ====================
|
||||||
LOG_BITMASK 176126 # Log all relevant data
|
LOG_BITMASK 176126 # Log all relevant data
|
||||||
LOG_DISARMED 0 # Don't log when disarmed
|
LOG_DISARMED 0 # Don't log when disarmed
|
||||||
|
|
||||||
|
|||||||
@@ -1,16 +1,24 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Autonomous UAV Controller using pymavlink.
|
Autonomous UAV Controller
|
||||||
Connects directly to ArduPilot SITL and controls the drone autonomously.
|
=========================
|
||||||
No ROS/MAVROS required.
|
GPS-denied navigation simulation with GPS-based geofencing for safety.
|
||||||
|
|
||||||
Uses GUIDED_NOGPS mode for GPS-denied flight simulation while keeping
|
Uses GUIDED_NOGPS mode with velocity/attitude commands while keeping
|
||||||
GPS enabled in EKF for geofence safety.
|
GPS enabled in EKF for geofence and LOCAL_POSITION_NED telemetry.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python src/autonomous_controller.py --mission hover
|
||||||
|
python src/autonomous_controller.py --mission square --size 5
|
||||||
|
python src/autonomous_controller.py --mission circle --size 3
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
import math
|
import math
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
from pymavlink import mavutil
|
from pymavlink import mavutil
|
||||||
|
|
||||||
|
|
||||||
@@ -32,28 +40,48 @@ class AutonomousController:
|
|||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, connection_string='tcp:127.0.0.1:5760'):
|
def __init__(self, connection_string='tcp:127.0.0.1:5760'):
|
||||||
"""Initialize connection to ArduPilot."""
|
"""Initialize controller."""
|
||||||
print(f"[CTRL] Connecting to {connection_string}...")
|
self.connection_string = connection_string
|
||||||
self.mav = mavutil.mavlink_connection(connection_string)
|
self.mav = None
|
||||||
|
self.armed = False
|
||||||
# Wait for heartbeat
|
self.mode = "UNKNOWN"
|
||||||
print("[CTRL] Waiting for heartbeat...")
|
|
||||||
self.mav.wait_heartbeat()
|
|
||||||
print(f"[CTRL] Connected! System {self.mav.target_system} Component {self.mav.target_component}")
|
|
||||||
|
|
||||||
self.home_position = None
|
|
||||||
self.altitude = 0
|
self.altitude = 0
|
||||||
self.position = {"x": 0, "y": 0, "z": 0}
|
self.position = {"x": 0, "y": 0, "z": 0}
|
||||||
self.armed = False
|
|
||||||
|
|
||||||
# Request data streams
|
def connect(self, timeout=30):
|
||||||
self.mav.mav.request_data_stream_send(
|
"""Connect to ArduPilot SITL."""
|
||||||
self.mav.target_system,
|
print(f"[CTRL] Connecting to {self.connection_string}...")
|
||||||
self.mav.target_component,
|
|
||||||
mavutil.mavlink.MAV_DATA_STREAM_ALL,
|
try:
|
||||||
4, # 4 Hz
|
self.mav = mavutil.mavlink_connection(self.connection_string)
|
||||||
1 # Start
|
|
||||||
)
|
print("[CTRL] Waiting for heartbeat...")
|
||||||
|
msg = None
|
||||||
|
for attempt in range(3):
|
||||||
|
msg = self.mav.wait_heartbeat(timeout=timeout)
|
||||||
|
if msg and self.mav.target_system > 0:
|
||||||
|
break
|
||||||
|
print(f"[CTRL] Heartbeat attempt {attempt + 1}/3...")
|
||||||
|
|
||||||
|
if not msg or self.mav.target_system == 0:
|
||||||
|
print("[ERROR] Failed to receive heartbeat")
|
||||||
|
return False
|
||||||
|
|
||||||
|
print(f"[CTRL] Connected! System {self.mav.target_system}, Component {self.mav.target_component}")
|
||||||
|
|
||||||
|
# Request data streams for telemetry
|
||||||
|
self.mav.mav.request_data_stream_send(
|
||||||
|
self.mav.target_system,
|
||||||
|
self.mav.target_component,
|
||||||
|
mavutil.mavlink.MAV_DATA_STREAM_ALL,
|
||||||
|
10, # 10 Hz
|
||||||
|
1 # Start
|
||||||
|
)
|
||||||
|
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[ERROR] Connection failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
def update_state(self):
|
def update_state(self):
|
||||||
"""Update drone state from MAVLink messages."""
|
"""Update drone state from MAVLink messages."""
|
||||||
@@ -66,6 +94,10 @@ class AutonomousController:
|
|||||||
|
|
||||||
if msg_type == "HEARTBEAT":
|
if msg_type == "HEARTBEAT":
|
||||||
self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0
|
self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0
|
||||||
|
try:
|
||||||
|
self.mode = mavutil.mode_string_v10(msg)
|
||||||
|
except:
|
||||||
|
self.mode = str(msg.custom_mode)
|
||||||
|
|
||||||
elif msg_type == "LOCAL_POSITION_NED":
|
elif msg_type == "LOCAL_POSITION_NED":
|
||||||
self.position = {"x": msg.x, "y": msg.y, "z": msg.z}
|
self.position = {"x": msg.x, "y": msg.y, "z": msg.z}
|
||||||
@@ -90,105 +122,85 @@ class AutonomousController:
|
|||||||
float(value),
|
float(value),
|
||||||
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
|
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
|
||||||
)
|
)
|
||||||
time.sleep(0.3)
|
time.sleep(0.5)
|
||||||
|
|
||||||
def setup_for_flight(self):
|
def setup_gps_denied(self):
|
||||||
"""Configure ArduPilot for simulation flight.
|
"""Configure for simulated GPS-denied operation.
|
||||||
|
|
||||||
For Gazebo simulation, we use GPS (provided by Gazebo) for position.
|
GPS is enabled in EKF for:
|
||||||
We disable the fence to avoid position requirement conflicts.
|
1. Geofence safety
|
||||||
|
2. LOCAL_POSITION_NED telemetry
|
||||||
|
|
||||||
|
But we use GUIDED_NOGPS mode and send velocity/attitude commands,
|
||||||
|
simulating GPS-denied flight control.
|
||||||
"""
|
"""
|
||||||
print("[CTRL] Configuring for simulation...")
|
print("[CTRL] Configuring for GPS-denied simulation...")
|
||||||
|
|
||||||
# Disable ALL arming checks for simulation
|
# Disable arming checks
|
||||||
self.set_param("ARMING_CHECK", 0)
|
self.set_param("ARMING_CHECK", 0)
|
||||||
|
|
||||||
# Enable GPS (Gazebo provides simulated GPS)
|
# Enable GPS in EKF (for geofence and position telemetry)
|
||||||
self.set_param("GPS_TYPE", 1) # Auto
|
|
||||||
|
|
||||||
# Configure EKF to use GPS
|
|
||||||
self.set_param("EK3_SRC1_POSXY", 3) # GPS
|
self.set_param("EK3_SRC1_POSXY", 3) # GPS
|
||||||
self.set_param("EK3_SRC1_VELXY", 3) # GPS
|
self.set_param("EK3_SRC1_VELXY", 3) # GPS
|
||||||
self.set_param("EK3_SRC1_POSZ", 1) # Baro
|
self.set_param("EK3_SRC1_POSZ", 1) # Baro
|
||||||
|
|
||||||
# Disable fence to avoid "fence requires position" error
|
# Disable GPS failsafe (we're simulating GPS-denied)
|
||||||
self.set_param("FENCE_ENABLE", 0)
|
|
||||||
|
|
||||||
# Disable vision odometry requirement
|
|
||||||
self.set_param("VISO_TYPE", 0)
|
|
||||||
|
|
||||||
# Disable GPS failsafe
|
|
||||||
self.set_param("FS_GPS_ENABLE", 0)
|
self.set_param("FS_GPS_ENABLE", 0)
|
||||||
|
|
||||||
print("[CTRL] Setup complete")
|
# Setup geofence (safety cage)
|
||||||
|
print("[CTRL] Setting up geofence: Alt=10m, Radius=20m")
|
||||||
|
self.set_param("FENCE_ENABLE", 1)
|
||||||
|
self.set_param("FENCE_TYPE", 3) # Alt + Circle
|
||||||
|
self.set_param("FENCE_ACTION", 2) # Land on breach
|
||||||
|
self.set_param("FENCE_ALT_MAX", 10)
|
||||||
|
self.set_param("FENCE_RADIUS", 20)
|
||||||
|
self.set_param("FENCE_MARGIN", 2.0)
|
||||||
|
|
||||||
|
print("[CTRL] Setup complete (GPS for fence, GUIDED_NOGPS for control)")
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Wait for GPS lock and home position
|
def wait_for_ready(self, timeout=60):
|
||||||
print("[CTRL] Waiting for GPS lock and home position...")
|
"""Wait for EKF and GPS to be ready."""
|
||||||
for i in range(60): # 60 second timeout
|
print("[CTRL] Waiting for system ready...")
|
||||||
|
start = time.time()
|
||||||
|
|
||||||
|
while time.time() - start < timeout:
|
||||||
self.update_state()
|
self.update_state()
|
||||||
|
|
||||||
# Check for GPS_RAW_INT message
|
msg = self.mav.recv_match(type=['STATUSTEXT', 'GPS_RAW_INT'], blocking=True, timeout=1)
|
||||||
msg = self.mav.recv_match(type=['GPS_RAW_INT', 'HOME_POSITION', 'STATUSTEXT'], blocking=True, timeout=1)
|
|
||||||
if msg:
|
if msg:
|
||||||
msg_type = msg.get_type()
|
msg_type = msg.get_type()
|
||||||
if msg_type == 'GPS_RAW_INT':
|
if msg_type == 'STATUSTEXT':
|
||||||
fix = msg.fix_type
|
|
||||||
if fix >= 3: # 3D fix
|
|
||||||
print(f"[CTRL] GPS fix: {fix} (satellites: {msg.satellites_visible})")
|
|
||||||
return True
|
|
||||||
elif msg_type == 'HOME_POSITION':
|
|
||||||
print("[CTRL] Home position set")
|
|
||||||
return True
|
|
||||||
elif msg_type == 'STATUSTEXT':
|
|
||||||
text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore')
|
text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore')
|
||||||
if text.strip():
|
text = text.strip()
|
||||||
print(f"[SITL] {text.strip()}")
|
if text:
|
||||||
|
print(f"[SITL] {text}")
|
||||||
|
if 'EKF3' in text and 'active' in text.lower():
|
||||||
|
print("[CTRL] EKF ready!")
|
||||||
|
time.sleep(2)
|
||||||
|
return True
|
||||||
|
elif msg_type == 'GPS_RAW_INT':
|
||||||
|
if msg.fix_type >= 3:
|
||||||
|
print(f"[CTRL] GPS fix: {msg.fix_type} (satellites: {msg.satellites_visible})")
|
||||||
|
time.sleep(1)
|
||||||
|
return True
|
||||||
|
|
||||||
if i % 10 == 0:
|
if int(time.time() - start) % 10 == 0 and int(time.time() - start) > 0:
|
||||||
print(f"[CTRL] Waiting for GPS... ({i}s)")
|
print(f"[CTRL] Waiting... ({int(time.time() - start)}s)")
|
||||||
|
|
||||||
print("[CTRL] GPS timeout - continuing anyway")
|
print("[CTRL] Ready timeout - continuing anyway")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def wait_for_ekf(self, timeout=60):
|
def set_mode(self, mode_name):
|
||||||
"""Wait for EKF to initialize."""
|
"""Set flight mode."""
|
||||||
print("[CTRL] Waiting for EKF initialization...")
|
mode_upper = mode_name.upper()
|
||||||
start_time = time.time()
|
|
||||||
|
|
||||||
while time.time() - start_time < timeout:
|
|
||||||
msg = self.mav.recv_match(type='STATUSTEXT', blocking=True, timeout=1)
|
|
||||||
if msg:
|
|
||||||
text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore')
|
|
||||||
text = text.strip()
|
|
||||||
if text:
|
|
||||||
print(f"[SITL] {text}")
|
|
||||||
if 'EKF' in text and 'active' in text.lower():
|
|
||||||
print("[CTRL] EKF is active!")
|
|
||||||
time.sleep(2)
|
|
||||||
return True
|
|
||||||
if 'EKF3 IMU0 initialised' in text:
|
|
||||||
print("[CTRL] EKF initialized")
|
|
||||||
time.sleep(2)
|
|
||||||
return True
|
|
||||||
if 'ArduPilot Ready' in text:
|
|
||||||
print("[CTRL] ArduPilot is ready")
|
|
||||||
time.sleep(2)
|
|
||||||
return True
|
|
||||||
|
|
||||||
print("[CTRL] EKF timeout - continuing anyway...")
|
|
||||||
return True
|
|
||||||
|
|
||||||
def set_mode(self, mode):
|
|
||||||
"""Set flight mode using MAV_CMD_DO_SET_MODE."""
|
|
||||||
mode_upper = mode.upper()
|
|
||||||
|
|
||||||
if mode_upper not in self.COPTER_MODES:
|
if mode_upper not in self.COPTER_MODES:
|
||||||
print(f"[CTRL] Unknown mode: {mode}")
|
print(f"[CTRL] Unknown mode: {mode_name}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
mode_id = self.COPTER_MODES[mode_upper]
|
mode_id = self.COPTER_MODES[mode_upper]
|
||||||
print(f"[CTRL] Setting mode to {mode_upper} (id={mode_id})...")
|
print(f"[CTRL] Setting mode to {mode_upper}...")
|
||||||
|
|
||||||
self.mav.mav.command_long_send(
|
self.mav.mav.command_long_send(
|
||||||
self.mav.target_system,
|
self.mav.target_system,
|
||||||
@@ -200,39 +212,38 @@ class AutonomousController:
|
|||||||
0, 0, 0, 0, 0
|
0, 0, 0, 0, 0
|
||||||
)
|
)
|
||||||
|
|
||||||
# Wait and verify
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
self.update_state()
|
self.update_state()
|
||||||
|
if mode_upper in self.mode.upper():
|
||||||
|
print(f"[CTRL] Mode: {self.mode}")
|
||||||
|
return True
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
|
|
||||||
print(f"[CTRL] Mode set to {mode_upper}")
|
print(f"[CTRL] Mode set (current: {self.mode})")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def arm(self):
|
def arm(self):
|
||||||
"""Arm the vehicle with force."""
|
"""Force arm the drone."""
|
||||||
print("[CTRL] Arming...")
|
print("[CTRL] Arming...")
|
||||||
|
|
||||||
for attempt in range(5):
|
for attempt in range(5):
|
||||||
# Clear pending messages
|
|
||||||
while self.mav.recv_match(blocking=False):
|
while self.mav.recv_match(blocking=False):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
print(f"[CTRL] Arm attempt {attempt + 1}/5...")
|
print(f"[CTRL] Arm attempt {attempt + 1}/5...")
|
||||||
|
|
||||||
self.mav.mav.command_long_send(
|
self.mav.mav.command_long_send(
|
||||||
self.mav.target_system,
|
self.mav.target_system,
|
||||||
self.mav.target_component,
|
self.mav.target_component,
|
||||||
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
|
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
|
||||||
0,
|
0,
|
||||||
1, # Arm
|
1, # Arm
|
||||||
21196, # Force arm magic number
|
21196, # Force arm
|
||||||
0, 0, 0, 0, 0
|
0, 0, 0, 0, 0
|
||||||
)
|
)
|
||||||
|
|
||||||
# Wait for result
|
start = time.time()
|
||||||
start_time = time.time()
|
while time.time() - start < 3.0:
|
||||||
while time.time() - start_time < 3.0:
|
|
||||||
msg = self.mav.recv_match(type=['COMMAND_ACK', 'HEARTBEAT', 'STATUSTEXT'], blocking=True, timeout=0.5)
|
msg = self.mav.recv_match(type=['COMMAND_ACK', 'HEARTBEAT', 'STATUSTEXT'], blocking=True, timeout=0.5)
|
||||||
if msg is None:
|
if msg is None:
|
||||||
continue
|
continue
|
||||||
@@ -253,22 +264,19 @@ class AutonomousController:
|
|||||||
elif msg_type == 'COMMAND_ACK':
|
elif msg_type == 'COMMAND_ACK':
|
||||||
if msg.command == mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM:
|
if msg.command == mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM:
|
||||||
if msg.result == 0:
|
if msg.result == 0:
|
||||||
print("[CTRL] Arm command accepted")
|
print("[CTRL] Arm accepted")
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
self.update_state()
|
self.update_state()
|
||||||
if self.armed:
|
if self.armed:
|
||||||
return True
|
return True
|
||||||
else:
|
|
||||||
print(f"[CTRL] Arm rejected: result={msg.result}")
|
|
||||||
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
print("[CTRL] Arm failed after 5 attempts")
|
print("[CTRL] Arm failed")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def send_attitude_thrust(self, roll, pitch, yaw, thrust):
|
def send_attitude_thrust(self, roll, pitch, yaw, thrust):
|
||||||
"""Send attitude and thrust command for GUIDED_NOGPS mode."""
|
"""Send attitude and thrust command for GUIDED_NOGPS mode."""
|
||||||
# Convert euler to quaternion
|
|
||||||
cy = math.cos(yaw * 0.5)
|
cy = math.cos(yaw * 0.5)
|
||||||
sy = math.sin(yaw * 0.5)
|
sy = math.sin(yaw * 0.5)
|
||||||
cp = math.cos(pitch * 0.5)
|
cp = math.cos(pitch * 0.5)
|
||||||
@@ -277,14 +285,13 @@ class AutonomousController:
|
|||||||
sr = math.sin(roll * 0.5)
|
sr = math.sin(roll * 0.5)
|
||||||
|
|
||||||
q = [
|
q = [
|
||||||
cr * cp * cy + sr * sp * sy, # w
|
cr * cp * cy + sr * sp * sy,
|
||||||
sr * cp * cy - cr * sp * sy, # x
|
sr * cp * cy - cr * sp * sy,
|
||||||
cr * sp * cy + sr * cp * sy, # y
|
cr * sp * cy + sr * cp * sy,
|
||||||
cr * cp * sy - sr * sp * cy # z
|
cr * cp * sy - sr * sp * cy
|
||||||
]
|
]
|
||||||
|
|
||||||
# Use attitude + thrust, ignore body rates
|
type_mask = 0b00000111 # Use attitude + thrust
|
||||||
type_mask = 0b00000111
|
|
||||||
|
|
||||||
self.mav.mav.set_attitude_target_send(
|
self.mav.mav.set_attitude_target_send(
|
||||||
0,
|
0,
|
||||||
@@ -292,7 +299,7 @@ class AutonomousController:
|
|||||||
self.mav.target_component,
|
self.mav.target_component,
|
||||||
type_mask,
|
type_mask,
|
||||||
q,
|
q,
|
||||||
0, 0, 0, # Body rates (ignored)
|
0, 0, 0,
|
||||||
thrust
|
thrust
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -310,46 +317,58 @@ class AutonomousController:
|
|||||||
self.mav.target_component,
|
self.mav.target_component,
|
||||||
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
|
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
|
||||||
type_mask,
|
type_mask,
|
||||||
0, 0, 0, # Position (ignored)
|
0, 0, 0,
|
||||||
vx, vy, vz, # Velocity NED
|
vx, vy, vz,
|
||||||
0, 0, 0, # Acceleration (ignored)
|
0, 0, 0,
|
||||||
0, 0 # Yaw, yaw_rate
|
0, 0
|
||||||
)
|
)
|
||||||
|
|
||||||
def takeoff(self, altitude=5.0):
|
def takeoff(self, altitude=5.0):
|
||||||
"""Take off using attitude+thrust commands (works in GUIDED_NOGPS)."""
|
"""Take off using thrust commands."""
|
||||||
print(f"[CTRL] Taking off to {altitude}m using thrust commands...")
|
print(f"[CTRL] Taking off to {altitude}m...")
|
||||||
|
|
||||||
hover_thrust = 0.6
|
hover_thrust = 0.6
|
||||||
max_thrust = 0.85
|
max_thrust = 0.85
|
||||||
thrust = 0.5
|
thrust = 0.5
|
||||||
|
|
||||||
start_time = time.time()
|
start = time.time()
|
||||||
timeout = 30
|
timeout = 30
|
||||||
|
last_alt = 0
|
||||||
|
stuck_count = 0
|
||||||
|
|
||||||
while time.time() - start_time < timeout:
|
while time.time() - start < timeout:
|
||||||
self.update_state()
|
self.update_state()
|
||||||
|
|
||||||
if self.altitude >= altitude * 0.9:
|
if self.altitude >= altitude * 0.90:
|
||||||
self.send_attitude_thrust(0, 0, 0, hover_thrust)
|
self.send_attitude_thrust(0, 0, 0, hover_thrust)
|
||||||
print(f"\n[CTRL] Reached {self.altitude:.1f}m")
|
print(f"\n[CTRL] Reached {self.altitude:.1f}m")
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Ramp up thrust
|
# Ramp thrust
|
||||||
if self.altitude < 0.5 and thrust < max_thrust:
|
if self.altitude < 0.5 and thrust < max_thrust:
|
||||||
thrust = min(thrust + 0.01, max_thrust)
|
thrust = min(thrust + 0.01, max_thrust)
|
||||||
elif self.altitude > 0.5:
|
elif self.altitude > 0.5:
|
||||||
thrust = 0.75
|
thrust = 0.75
|
||||||
|
|
||||||
self.send_attitude_thrust(0, 0, 0, thrust)
|
# Check if stuck
|
||||||
|
if thrust >= max_thrust - 0.05:
|
||||||
|
if abs(self.altitude - last_alt) < 0.01:
|
||||||
|
stuck_count += 1
|
||||||
|
if stuck_count > 30:
|
||||||
|
print(f"\n[WARN] Not climbing at thrust={thrust:.2f}")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
stuck_count = 0
|
||||||
|
last_alt = self.altitude
|
||||||
|
|
||||||
print(f"\r[CTRL] Climbing... {self.altitude:.1f}m / {altitude:.1f}m (thrust={thrust:.2f})", end="")
|
self.send_attitude_thrust(0, 0, 0, thrust)
|
||||||
|
print(f"\r[CTRL] Climbing: {self.altitude:.1f}m / {altitude:.1f}m (thrust={thrust:.2f})", end="")
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
self.send_attitude_thrust(0, 0, 0, hover_thrust)
|
self.send_attitude_thrust(0, 0, 0, hover_thrust)
|
||||||
print(f"\n[CTRL] Takeoff timeout at {self.altitude:.1f}m")
|
print(f"\n[CTRL] Takeoff ended at {self.altitude:.1f}m")
|
||||||
return False
|
return self.altitude > 1.0
|
||||||
|
|
||||||
def fly_to(self, target_x, target_y, altitude, tolerance=1.0, timeout=30):
|
def fly_to(self, target_x, target_y, altitude, tolerance=1.0, timeout=30):
|
||||||
"""Fly to position using velocity commands."""
|
"""Fly to position using velocity commands."""
|
||||||
@@ -377,7 +396,12 @@ class AutonomousController:
|
|||||||
else:
|
else:
|
||||||
vx, vy = 0, 0
|
vx, vy = 0, 0
|
||||||
|
|
||||||
vz = -1.0 if dz > 0.5 else (0.5 if dz < -0.5 else -dz)
|
if dz > 0.5:
|
||||||
|
vz = -1.0
|
||||||
|
elif dz < -0.5:
|
||||||
|
vz = 0.5
|
||||||
|
else:
|
||||||
|
vz = -dz
|
||||||
|
|
||||||
self.send_velocity_ned(vx, vy, vz)
|
self.send_velocity_ned(vx, vy, vz)
|
||||||
|
|
||||||
@@ -386,35 +410,34 @@ class AutonomousController:
|
|||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
self.send_velocity_ned(0, 0, 0)
|
self.send_velocity_ned(0, 0, 0)
|
||||||
print(f"\n[CTRL] Timeout reaching waypoint")
|
print(f"\n[CTRL] Waypoint timeout")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def land(self):
|
def land(self):
|
||||||
"""Land the vehicle."""
|
"""Land the drone."""
|
||||||
print("[CTRL] Landing...")
|
print("[CTRL] Landing...")
|
||||||
self.set_mode("LAND")
|
self.set_mode("LAND")
|
||||||
|
|
||||||
start_time = time.time()
|
for _ in range(100):
|
||||||
while time.time() - start_time < 60:
|
|
||||||
self.update_state()
|
self.update_state()
|
||||||
print(f"\r[CTRL] Landing... Alt: {self.altitude:.1f}m", end="")
|
print(f"\r[CTRL] Landing... Alt: {self.altitude:.1f}m", end="")
|
||||||
if self.altitude < 0.3:
|
if self.altitude < 0.3:
|
||||||
print("\n[CTRL] Landed!")
|
print("\n[CTRL] Landed!")
|
||||||
return True
|
return True
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# ==================== MISSIONS ====================
|
||||||
|
|
||||||
def run_hover_mission(self, altitude=5.0, duration=30):
|
def run_hover_mission(self, altitude=5.0, duration=30):
|
||||||
"""Hover mission."""
|
"""Hover in place."""
|
||||||
print(f"\n[MISSION] Hover at {altitude}m for {duration}s")
|
print(f"\n{'='*50}")
|
||||||
|
print(f" HOVER MISSION: {altitude}m for {duration}s")
|
||||||
|
print(f"{'='*50}\n")
|
||||||
|
|
||||||
self.setup_for_flight()
|
self.setup_gps_denied()
|
||||||
|
self.wait_for_ready()
|
||||||
|
|
||||||
if not self.wait_for_ekf():
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Try GUIDED_NOGPS first, fallback to GUIDED
|
|
||||||
if not self.set_mode('GUIDED_NOGPS'):
|
if not self.set_mode('GUIDED_NOGPS'):
|
||||||
self.set_mode('GUIDED')
|
self.set_mode('GUIDED')
|
||||||
|
|
||||||
@@ -422,27 +445,31 @@ class AutonomousController:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
if not self.takeoff(altitude):
|
if not self.takeoff(altitude):
|
||||||
|
self.land()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
print(f"[MISSION] Hovering for {duration} seconds...")
|
print(f"[CTRL] Hovering for {duration}s...")
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while time.time() - start < duration:
|
while time.time() - start < duration:
|
||||||
self.update_state()
|
self.update_state()
|
||||||
self.send_attitude_thrust(0, 0, 0, 0.6) # Maintain hover
|
self.send_attitude_thrust(0, 0, 0, 0.6)
|
||||||
|
remaining = duration - (time.time() - start)
|
||||||
|
print(f"\r[CTRL] Hovering... {remaining:.0f}s remaining, Alt: {self.altitude:.1f}m", end="")
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
print()
|
||||||
self.land()
|
self.land()
|
||||||
print("[MISSION] Hover mission complete!")
|
print("\n[CTRL] Hover mission complete!")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def run_square_mission(self, altitude=5.0, side=5.0):
|
def run_square_mission(self, altitude=5.0, size=5.0):
|
||||||
"""Fly a square pattern."""
|
"""Fly a square pattern."""
|
||||||
print(f"\n[MISSION] Square pattern: {side}m sides at {altitude}m")
|
print(f"\n{'='*50}")
|
||||||
|
print(f" SQUARE MISSION: {size}m x {size}m at {altitude}m")
|
||||||
|
print(f"{'='*50}\n")
|
||||||
|
|
||||||
self.setup_for_flight()
|
self.setup_gps_denied()
|
||||||
|
self.wait_for_ready()
|
||||||
if not self.wait_for_ekf():
|
|
||||||
return False
|
|
||||||
|
|
||||||
if not self.set_mode('GUIDED_NOGPS'):
|
if not self.set_mode('GUIDED_NOGPS'):
|
||||||
self.set_mode('GUIDED')
|
self.set_mode('GUIDED')
|
||||||
@@ -451,6 +478,7 @@ class AutonomousController:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
if not self.takeoff(altitude):
|
if not self.takeoff(altitude):
|
||||||
|
self.land()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
self.update_state()
|
self.update_state()
|
||||||
@@ -458,29 +486,31 @@ class AutonomousController:
|
|||||||
start_y = self.position["y"]
|
start_y = self.position["y"]
|
||||||
|
|
||||||
waypoints = [
|
waypoints = [
|
||||||
(start_x + side, start_y),
|
(start_x + size, start_y),
|
||||||
(start_x + side, start_y + side),
|
(start_x + size, start_y + size),
|
||||||
(start_x, start_y + side),
|
(start_x, start_y + size),
|
||||||
(start_x, start_y),
|
(start_x, start_y),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
print(f"[CTRL] Flying square from ({start_x:.1f}, {start_y:.1f})")
|
||||||
|
|
||||||
for i, (x, y) in enumerate(waypoints):
|
for i, (x, y) in enumerate(waypoints):
|
||||||
print(f"\n[MISSION] Waypoint {i+1}/4")
|
print(f"\n--- Waypoint {i+1}/4 ---")
|
||||||
self.fly_to(x, y, altitude)
|
self.fly_to(x, y, altitude)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
self.land()
|
self.land()
|
||||||
print("[MISSION] Square mission complete!")
|
print("\n[CTRL] Square mission complete!")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def run_circle_mission(self, altitude=5.0, radius=5.0, points=8):
|
def run_circle_mission(self, altitude=5.0, radius=5.0, points=8):
|
||||||
"""Fly a circular pattern."""
|
"""Fly a circular pattern."""
|
||||||
print(f"\n[MISSION] Circle pattern: {radius}m radius at {altitude}m")
|
print(f"\n{'='*50}")
|
||||||
|
print(f" CIRCLE MISSION: radius={radius}m at {altitude}m")
|
||||||
|
print(f"{'='*50}\n")
|
||||||
|
|
||||||
self.setup_for_flight()
|
self.setup_gps_denied()
|
||||||
|
self.wait_for_ready()
|
||||||
if not self.wait_for_ekf():
|
|
||||||
return False
|
|
||||||
|
|
||||||
if not self.set_mode('GUIDED_NOGPS'):
|
if not self.set_mode('GUIDED_NOGPS'):
|
||||||
self.set_mode('GUIDED')
|
self.set_mode('GUIDED')
|
||||||
@@ -489,51 +519,58 @@ class AutonomousController:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
if not self.takeoff(altitude):
|
if not self.takeoff(altitude):
|
||||||
|
self.land()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
self.update_state()
|
self.update_state()
|
||||||
center_x = self.position["x"]
|
center_x = self.position["x"]
|
||||||
center_y = self.position["y"]
|
center_y = self.position["y"]
|
||||||
|
|
||||||
|
print(f"[CTRL] Flying circle around ({center_x:.1f}, {center_y:.1f})")
|
||||||
|
|
||||||
for i in range(points + 1):
|
for i in range(points + 1):
|
||||||
angle = 2 * math.pi * i / points
|
angle = 2 * math.pi * i / points
|
||||||
x = center_x + radius * math.cos(angle)
|
x = center_x + radius * math.cos(angle)
|
||||||
y = center_y + radius * math.sin(angle)
|
y = center_y + radius * math.sin(angle)
|
||||||
print(f"\n[MISSION] Point {i+1}/{points+1}")
|
print(f"\n--- Point {i+1}/{points+1} ---")
|
||||||
self.fly_to(x, y, altitude)
|
self.fly_to(x, y, altitude)
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
|
||||||
self.land()
|
self.land()
|
||||||
print("[MISSION] Circle mission complete!")
|
print("\n[CTRL] Circle mission complete!")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description='Autonomous UAV Controller')
|
parser = argparse.ArgumentParser(description='Autonomous UAV Controller')
|
||||||
parser.add_argument('--connection', default='tcp:127.0.0.1:5760',
|
parser.add_argument('--connection', '-c', default='tcp:127.0.0.1:5760',
|
||||||
help='MAVLink connection string')
|
help='MAVLink connection string')
|
||||||
parser.add_argument('--mission', choices=['hover', 'square', 'circle'],
|
parser.add_argument('--mission', '-m', choices=['hover', 'square', 'circle'],
|
||||||
default='hover', help='Mission type')
|
default='hover', help='Mission type')
|
||||||
parser.add_argument('--altitude', type=float, default=5.0,
|
parser.add_argument('--altitude', '-a', type=float, default=5.0,
|
||||||
help='Flight altitude in meters')
|
help='Flight altitude (meters)')
|
||||||
parser.add_argument('--size', type=float, default=5.0,
|
parser.add_argument('--size', '-s', type=float, default=5.0,
|
||||||
help='Mission size (side length or radius)')
|
help='Pattern size/radius (meters)')
|
||||||
parser.add_argument('--duration', type=float, default=30.0,
|
parser.add_argument('--duration', '-d', type=float, default=30.0,
|
||||||
help='Hover duration in seconds')
|
help='Hover duration (seconds)')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
print("=" * 50)
|
print("=" * 50)
|
||||||
print(" Autonomous UAV Controller")
|
print(" Autonomous UAV Controller")
|
||||||
|
print(" GPS-Denied Navigation Simulation")
|
||||||
print("=" * 50)
|
print("=" * 50)
|
||||||
print(f" Connection: {args.connection}")
|
print(f" Connection: {args.connection}")
|
||||||
print(f" Mission: {args.mission}")
|
print(f" Mission: {args.mission}")
|
||||||
print(f" Altitude: {args.altitude}m")
|
print(f" Altitude: {args.altitude}m")
|
||||||
print("=" * 50)
|
print("=" * 50)
|
||||||
print()
|
|
||||||
|
controller = AutonomousController(args.connection)
|
||||||
|
|
||||||
|
if not controller.connect():
|
||||||
|
print("\n[ERROR] Could not connect to SITL")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
controller = AutonomousController(args.connection)
|
|
||||||
|
|
||||||
if args.mission == 'hover':
|
if args.mission == 'hover':
|
||||||
controller.run_hover_mission(args.altitude, args.duration)
|
controller.run_hover_mission(args.altitude, args.duration)
|
||||||
elif args.mission == 'square':
|
elif args.mission == 'square':
|
||||||
@@ -542,11 +579,13 @@ def main():
|
|||||||
controller.run_circle_mission(args.altitude, args.size)
|
controller.run_circle_mission(args.altitude, args.size)
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
print("\n[CTRL] Interrupted by user")
|
print("\n\n[CTRL] Interrupted - Landing...")
|
||||||
|
controller.land()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] {e}")
|
print(f"\n[ERROR] {e}")
|
||||||
import traceback
|
import traceback
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
controller.land()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
|||||||
Reference in New Issue
Block a user