Files
simulation/src/autonomous_controller.py
2026-02-09 06:54:00 +00:00

709 lines
26 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Autonomous UAV Controller
=========================
GPS-denied navigation simulation with GPS-based geofencing for safety.
Uses GUIDED_NOGPS mode with velocity/attitude commands while keeping
GPS enabled in EKF for geofence and LOCAL_POSITION_NED telemetry.
Usage:
python src/autonomous_controller.py --mission hover
python src/autonomous_controller.py --mission square --size 5
python src/autonomous_controller.py --mission circle --size 3
"""
import sys
import os
import time
import math
import argparse
from pymavlink import mavutil
class AutonomousController:
"""Autonomous UAV controller using pymavlink."""
# ArduCopter mode IDs
COPTER_MODES = {
'STABILIZE': 0,
'ACRO': 1,
'ALT_HOLD': 2,
'AUTO': 3,
'GUIDED': 4,
'LOITER': 5,
'RTL': 6,
'CIRCLE': 7,
'LAND': 9,
'GUIDED_NOGPS': 20,
}
def __init__(self, connection_string='tcp:127.0.0.1:5760'):
"""Initialize controller."""
self.connection_string = connection_string
self.mav = None
self.armed = False
self.mode = "UNKNOWN"
self.altitude = 0
self.position = {"x": 0, "y": 0, "z": 0}
def connect(self, timeout=30):
"""Connect to ArduPilot SITL."""
print(f"[CTRL] Connecting to {self.connection_string}...")
try:
self.mav = mavutil.mavlink_connection(self.connection_string)
print("[CTRL] Waiting for heartbeat...")
msg = None
for attempt in range(3):
msg = self.mav.wait_heartbeat(timeout=timeout)
if msg and self.mav.target_system > 0:
break
print(f"[CTRL] Heartbeat attempt {attempt + 1}/3...")
if not msg or self.mav.target_system == 0:
print("[ERROR] Failed to receive heartbeat")
return False
print(f"[CTRL] Connected! System {self.mav.target_system}, Component {self.mav.target_component}")
# Request ALL data streams at higher rate
self.mav.mav.request_data_stream_send(
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_DATA_STREAM_ALL,
50, # 50 Hz for everything
1 # Start
)
# Specifically request position stream at high rate
self.mav.mav.request_data_stream_send(
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_DATA_STREAM_POSITION,
50, # 50 Hz
1
)
# Request extended status for altitude
self.mav.mav.request_data_stream_send(
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_DATA_STREAM_EXTENDED_STATUS,
10,
1
)
return True
except Exception as e:
print(f"[ERROR] Connection failed: {e}")
return False
def update_state(self):
"""Update drone state from MAVLink messages."""
while True:
msg = self.mav.recv_match(blocking=False)
if msg is None:
break
msg_type = msg.get_type()
if msg_type == "HEARTBEAT":
self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0
try:
self.mode = mavutil.mode_string_v10(msg)
except:
self.mode = str(msg.custom_mode)
elif msg_type == "LOCAL_POSITION_NED":
self.position = {"x": msg.x, "y": msg.y, "z": msg.z}
self.altitude = -msg.z # NED: down is positive
elif msg_type == "GLOBAL_POSITION_INT":
# Backup altitude source (relative altitude in mm)
if self.altitude == 0:
self.altitude = msg.relative_alt / 1000.0
elif msg_type == "VFR_HUD":
# Another backup altitude source
if self.altitude == 0:
self.altitude = msg.alt
elif msg_type == "STATUSTEXT":
text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore')
text = text.strip()
if text:
print(f"[SITL] {text}")
def set_param(self, param_name, value):
"""Set a parameter value."""
param_bytes = param_name.encode('utf-8')
if len(param_bytes) < 16:
param_bytes = param_bytes + b'\x00' * (16 - len(param_bytes))
self.mav.mav.param_set_send(
self.mav.target_system,
self.mav.target_component,
param_bytes,
float(value),
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
)
time.sleep(0.5)
def setup_gps_denied(self):
"""Configure for simulated GPS-denied operation.
GPS is enabled in EKF for:
1. Geofence safety (enabled after GPS lock)
2. LOCAL_POSITION_NED telemetry
But we use GUIDED_NOGPS mode and send velocity/attitude commands,
simulating GPS-denied flight control.
"""
print("[CTRL] Configuring for GPS-denied simulation...")
# Disable arming checks
self.set_param("ARMING_CHECK", 0)
# Enable GPS in EKF (for geofence and position telemetry)
self.set_param("EK3_SRC1_POSXY", 3) # GPS
self.set_param("EK3_SRC1_VELXY", 3) # GPS
self.set_param("EK3_SRC1_POSZ", 1) # Baro
# Disable GPS failsafe (we're simulating GPS-denied)
self.set_param("FS_GPS_ENABLE", 0)
# DISABLE fence initially - will enable after GPS lock
self.set_param("FENCE_ENABLE", 0)
print("[CTRL] Setup complete (fence will enable after GPS lock)")
time.sleep(1)
def wait_for_ready(self, timeout=90):
"""Wait for GPS lock and home position."""
print("[CTRL] Waiting for GPS lock and home position...")
start = time.time()
gps_ok = False
home_ok = False
while time.time() - start < timeout:
self.update_state()
# Check multiple message types
msg = self.mav.recv_match(type=['STATUSTEXT', 'GPS_RAW_INT', 'HOME_POSITION'],
blocking=True, timeout=1)
if msg:
msg_type = msg.get_type()
if msg_type == 'STATUSTEXT':
text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore')
text = text.strip()
if text:
print(f"[SITL] {text}")
# Check for origin set (means home is set)
if 'origin set' in text.lower() or 'Field Elevation' in text:
home_ok = True
print("[CTRL] Home position set!")
elif msg_type == 'GPS_RAW_INT':
if msg.fix_type >= 3: # 3D fix
if not gps_ok:
print(f"[CTRL] GPS fix: {msg.fix_type} (satellites: {msg.satellites_visible})")
gps_ok = True
elif msg_type == 'HOME_POSITION':
home_ok = True
print("[CTRL] Home position received!")
# Check if ready
if gps_ok and home_ok:
print("[CTRL] GPS and home ready!")
# Wait for EKF position estimate to be valid
print("[CTRL] Waiting for EKF position estimate...")
time.sleep(5)
# Skip fence for now - causes "need position estimate" errors
# TODO: Re-enable fence after basic flight works
# print("[CTRL] Enabling geofence: Alt=10m, Radius=20m")
# self.set_param("FENCE_ENABLE", 1)
# self.set_param("FENCE_TYPE", 3)
# self.set_param("FENCE_ACTION", 2)
# self.set_param("FENCE_ALT_MAX", 10)
# self.set_param("FENCE_RADIUS", 20)
return True
elapsed = int(time.time() - start)
if elapsed % 10 == 0 and elapsed > 0:
status = f"GPS={'OK' if gps_ok else 'waiting'}, Home={'OK' if home_ok else 'waiting'}"
print(f"[CTRL] Waiting... ({elapsed}s) {status}")
print("[CTRL] Ready timeout - continuing anyway (fence disabled)")
return True
def set_mode(self, mode_name):
"""Set flight mode."""
mode_upper = mode_name.upper()
if mode_upper not in self.COPTER_MODES:
print(f"[CTRL] Unknown mode: {mode_name}")
return False
mode_id = self.COPTER_MODES[mode_upper]
print(f"[CTRL] Setting mode to {mode_upper}...")
self.mav.mav.command_long_send(
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_CMD_DO_SET_MODE,
0,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode_id,
0, 0, 0, 0, 0
)
time.sleep(1)
for _ in range(10):
self.update_state()
if mode_upper in self.mode.upper():
print(f"[CTRL] Mode: {self.mode}")
return True
time.sleep(0.2)
print(f"[CTRL] Mode set (current: {self.mode})")
return True
def arm(self):
"""Force arm the drone."""
print("[CTRL] Arming...")
for attempt in range(5):
while self.mav.recv_match(blocking=False):
pass
print(f"[CTRL] Arm attempt {attempt + 1}/5...")
self.mav.mav.command_long_send(
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0,
1, # Arm
21196, # Force arm
0, 0, 0, 0, 0
)
start = time.time()
while time.time() - start < 3.0:
msg = self.mav.recv_match(type=['COMMAND_ACK', 'HEARTBEAT', 'STATUSTEXT'], blocking=True, timeout=0.5)
if msg is None:
continue
msg_type = msg.get_type()
if msg_type == 'STATUSTEXT':
text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore')
if text.strip():
print(f"[SITL] {text.strip()}")
elif msg_type == 'HEARTBEAT':
if msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED:
print("[CTRL] Armed successfully!")
self.armed = True
return True
elif msg_type == 'COMMAND_ACK':
if msg.command == mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM:
if msg.result == 0:
print("[CTRL] Arm accepted")
time.sleep(0.5)
self.update_state()
if self.armed:
return True
time.sleep(1)
print("[CTRL] Arm failed")
return False
def send_attitude_thrust(self, roll, pitch, yaw, thrust):
"""Send attitude and thrust command for GUIDED_NOGPS mode."""
cy = math.cos(yaw * 0.5)
sy = math.sin(yaw * 0.5)
cp = math.cos(pitch * 0.5)
sp = math.sin(pitch * 0.5)
cr = math.cos(roll * 0.5)
sr = math.sin(roll * 0.5)
q = [
cr * cp * cy + sr * sp * sy,
sr * cp * cy - cr * sp * sy,
cr * sp * cy + sr * cp * sy,
cr * cp * sy - sr * sp * cy
]
type_mask = 0b00000111 # Use attitude + thrust
self.mav.mav.set_attitude_target_send(
0,
self.mav.target_system,
self.mav.target_component,
type_mask,
q,
0, 0, 0,
thrust
)
def send_velocity_ned(self, vx, vy, vz):
"""Send velocity command in NED frame."""
type_mask = (
(1 << 0) | (1 << 1) | (1 << 2) | # Ignore position
(1 << 6) | (1 << 7) | (1 << 8) | # Ignore acceleration
(1 << 10) | (1 << 11) # Ignore yaw
)
self.mav.mav.set_position_target_local_ned_send(
0,
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
type_mask,
0, 0, 0,
vx, vy, vz,
0, 0, 0,
0, 0
)
def takeoff(self, altitude=5.0):
"""Take off using thrust commands."""
print(f"[CTRL] Taking off to {altitude}m...")
hover_thrust = 0.6
max_thrust = 0.85
thrust = 0.5
start = time.time()
timeout = 30
last_alt = 0
stuck_count = 0
got_alt_reading = False
while time.time() - start < timeout:
# Actively poll for position messages
for _ in range(5):
msg = self.mav.recv_match(type=['LOCAL_POSITION_NED', 'GLOBAL_POSITION_INT', 'VFR_HUD'],
blocking=True, timeout=0.02)
if msg:
msg_type = msg.get_type()
if msg_type == 'LOCAL_POSITION_NED':
self.altitude = -msg.z
self.position = {"x": msg.x, "y": msg.y, "z": msg.z}
got_alt_reading = True
elif msg_type == 'GLOBAL_POSITION_INT':
self.altitude = msg.relative_alt / 1000.0
got_alt_reading = True
elif msg_type == 'VFR_HUD':
# VFR_HUD.alt is MSL, but climb rate is useful
if msg.climb > 0.1:
got_alt_reading = True
self.update_state()
if self.altitude >= altitude * 0.90:
self.send_attitude_thrust(0, 0, 0, hover_thrust)
print(f"\n[CTRL] Reached {self.altitude:.1f}m")
time.sleep(0.5)
return True
# Ramp thrust more aggressively
elapsed = time.time() - start
if elapsed < 2.0:
# Initial ramp
thrust = 0.5 + (elapsed / 2.0) * 0.35 # 0.5 -> 0.85 in 2 seconds
elif self.altitude > 0.5:
# Flying - reduce to climb thrust
thrust = 0.75
else:
# Max thrust if not climbing after 2s
thrust = max_thrust
# Only check for stuck if we have altitude readings
if got_alt_reading and elapsed > 5.0:
if abs(self.altitude - last_alt) < 0.01:
stuck_count += 1
if stuck_count > 50: # 5 seconds stuck
print(f"\n[WARN] Not climbing at thrust={thrust:.2f}, alt={self.altitude:.2f}m")
break
else:
stuck_count = 0
last_alt = self.altitude
self.send_attitude_thrust(0, 0, 0, thrust)
alt_str = f"{self.altitude:.1f}" if got_alt_reading else "?"
print(f"\r[CTRL] Climbing: {alt_str}m / {altitude:.1f}m (thrust={thrust:.2f}, t={elapsed:.1f}s)", end="")
time.sleep(0.1)
self.send_attitude_thrust(0, 0, 0, hover_thrust)
print(f"\n[CTRL] Takeoff ended at {self.altitude:.1f}m")
return self.altitude > 1.0
def fly_to(self, target_x, target_y, altitude, tolerance=1.0, timeout=60):
"""Fly to position using attitude+thrust commands (works in GUIDED_NOGPS).
Uses pitch for forward/backward movement, roll for left/right.
"""
print(f"[CTRL] Flying to ({target_x:.1f}, {target_y:.1f}, {altitude:.1f}m)")
hover_thrust = 0.6
max_tilt = 0.15 # ~8.5 degrees max tilt angle in radians
start_time = time.time()
while time.time() - start_time < timeout:
# Actively poll for position
for _ in range(3):
msg = self.mav.recv_match(type=['LOCAL_POSITION_NED', 'GLOBAL_POSITION_INT'],
blocking=True, timeout=0.03)
if msg:
if msg.get_type() == 'LOCAL_POSITION_NED':
self.altitude = -msg.z
self.position = {"x": msg.x, "y": msg.y, "z": msg.z}
elif msg.get_type() == 'GLOBAL_POSITION_INT':
self.altitude = msg.relative_alt / 1000.0
self.update_state()
dx = target_x - self.position["x"]
dy = target_y - self.position["y"]
dz = altitude - self.altitude
dist = math.sqrt(dx*dx + dy*dy)
# Check if reached target
if dist < tolerance and abs(dz) < tolerance:
self.send_attitude_thrust(0, 0, 0, hover_thrust)
print(f"\n[CTRL] Reached ({self.position['x']:.1f}, {self.position['y']:.1f})")
return True
# Calculate tilt angles for movement
# In NED frame: +X is forward (North), +Y is right (East)
# Pitch: negative = nose down = fly forward (+X)
# Roll: positive = right wing down = fly right (+Y)
if dist > 0.1:
# Calculate direction to target
# Scale tilt based on distance (more tilt = faster movement)
tilt_scale = min(1.0, dist / 5.0) # Full tilt at 5+ meters away
# Pitch for forward/back movement (negative pitch = forward)
pitch = -max_tilt * tilt_scale * (dx / dist)
# Roll for left/right movement (positive roll = right)
roll = max_tilt * tilt_scale * (dy / dist)
else:
pitch = 0
roll = 0
# Thrust for altitude
if dz > 0.5:
thrust = hover_thrust + 0.1 # Climb
elif dz < -0.5:
thrust = hover_thrust - 0.1 # Descend
else:
thrust = hover_thrust + dz * 0.1 # Small adjustment
# Clamp thrust
thrust = max(0.4, min(0.8, thrust))
self.send_attitude_thrust(roll, pitch, 0, thrust)
elapsed = time.time() - start_time
if int(elapsed * 10) % 10 == 0: # Print every second
print(f"\r[CTRL] Pos: ({self.position['x']:.1f}, {self.position['y']:.1f}) -> ({target_x:.1f}, {target_y:.1f}) Dist: {dist:.1f}m Alt: {self.altitude:.1f}m", end="", flush=True)
time.sleep(0.05) # 20Hz control rate
self.send_attitude_thrust(0, 0, 0, hover_thrust)
print(f"\n[CTRL] Waypoint timeout")
return False
def land(self):
"""Land the drone."""
print("[CTRL] Landing...")
self.set_mode("LAND")
for _ in range(100):
self.update_state()
print(f"\r[CTRL] Landing... Alt: {self.altitude:.1f}m", end="")
if self.altitude < 0.3:
print("\n[CTRL] Landed!")
return True
time.sleep(0.2)
return False
# ==================== MISSIONS ====================
def run_hover_mission(self, altitude=5.0, duration=30):
"""Hover in place."""
print(f"\n{'='*50}")
print(f" HOVER MISSION: {altitude}m for {duration}s")
print(f"{'='*50}\n")
self.setup_gps_denied()
self.wait_for_ready()
if not self.set_mode('GUIDED_NOGPS'):
self.set_mode('GUIDED')
if not self.arm():
return False
if not self.takeoff(altitude):
self.land()
return False
print(f"[CTRL] Hovering for {duration}s...")
start = time.time()
while time.time() - start < duration:
self.update_state()
self.send_attitude_thrust(0, 0, 0, 0.6)
remaining = duration - (time.time() - start)
print(f"\r[CTRL] Hovering... {remaining:.0f}s remaining, Alt: {self.altitude:.1f}m ", end="", flush=True)
time.sleep(0.1)
print()
self.land()
print("\n[CTRL] Hover mission complete!")
return True
def run_square_mission(self, altitude=5.0, size=5.0):
"""Fly a square pattern."""
print(f"\n{'='*50}")
print(f" SQUARE MISSION: {size}m x {size}m at {altitude}m")
print(f"{'='*50}\n")
self.setup_gps_denied()
self.wait_for_ready()
if not self.set_mode('GUIDED_NOGPS'):
self.set_mode('GUIDED')
if not self.arm():
return False
if not self.takeoff(altitude):
self.land()
return False
self.update_state()
start_x = self.position["x"]
start_y = self.position["y"]
waypoints = [
(start_x + size, start_y),
(start_x + size, start_y + size),
(start_x, start_y + size),
(start_x, start_y),
]
print(f"[CTRL] Flying square from ({start_x:.1f}, {start_y:.1f})")
for i, (x, y) in enumerate(waypoints):
print(f"\n--- Waypoint {i+1}/4 ---")
self.fly_to(x, y, altitude)
time.sleep(1)
self.land()
print("\n[CTRL] Square mission complete!")
return True
def run_circle_mission(self, altitude=5.0, radius=5.0, points=8):
"""Fly a circular pattern."""
print(f"\n{'='*50}")
print(f" CIRCLE MISSION: radius={radius}m at {altitude}m")
print(f"{'='*50}\n")
self.setup_gps_denied()
self.wait_for_ready()
if not self.set_mode('GUIDED_NOGPS'):
self.set_mode('GUIDED')
if not self.arm():
return False
if not self.takeoff(altitude):
self.land()
return False
self.update_state()
center_x = self.position["x"]
center_y = self.position["y"]
print(f"[CTRL] Flying circle around ({center_x:.1f}, {center_y:.1f})")
for i in range(points + 1):
angle = 2 * math.pi * i / points
x = center_x + radius * math.cos(angle)
y = center_y + radius * math.sin(angle)
print(f"\n--- Point {i+1}/{points+1} ---")
self.fly_to(x, y, altitude)
time.sleep(0.5)
self.land()
print("\n[CTRL] Circle mission complete!")
return True
def main():
parser = argparse.ArgumentParser(description='Autonomous UAV Controller')
parser.add_argument('--connection', '-c', default='tcp:127.0.0.1:5760',
help='MAVLink connection string')
parser.add_argument('--mission', '-m', choices=['hover', 'square', 'circle'],
default='hover', help='Mission type')
parser.add_argument('--altitude', '-a', type=float, default=5.0,
help='Flight altitude (meters)')
parser.add_argument('--size', '-s', type=float, default=5.0,
help='Pattern size/radius (meters)')
parser.add_argument('--duration', '-d', type=float, default=30.0,
help='Hover duration (seconds)')
args = parser.parse_args()
print("=" * 50)
print(" Autonomous UAV Controller")
print(" GPS-Denied Navigation Simulation")
print("=" * 50)
print(f" Connection: {args.connection}")
print(f" Mission: {args.mission}")
print(f" Altitude: {args.altitude}m")
print("=" * 50)
controller = AutonomousController(args.connection)
if not controller.connect():
print("\n[ERROR] Could not connect to SITL")
sys.exit(1)
try:
if args.mission == 'hover':
controller.run_hover_mission(args.altitude, args.duration)
elif args.mission == 'square':
controller.run_square_mission(args.altitude, args.size)
elif args.mission == 'circle':
controller.run_circle_mission(args.altitude, args.size)
except KeyboardInterrupt:
print("\n\n[CTRL] Interrupted - Landing...")
controller.land()
except Exception as e:
print(f"\n[ERROR] {e}")
import traceback
traceback.print_exc()
controller.land()
if __name__ == '__main__':
main()