Controller Update

This commit is contained in:
2026-02-09 05:51:51 +00:00
parent cd9ae9a4f6
commit 1a616472f0
16 changed files with 1545 additions and 669 deletions

368
src/autonomous_controller.py Executable file
View File

@@ -0,0 +1,368 @@
#!/usr/bin/env python3
"""
Autonomous UAV Controller using pymavlink.
Connects directly to ArduPilot SITL and controls the drone autonomously.
No ROS/MAVROS required.
"""
import time
import math
import argparse
from pymavlink import mavutil
class AutonomousController:
"""Autonomous UAV controller using pymavlink."""
def __init__(self, connection_string='tcp:127.0.0.1:5760'):
"""Initialize connection to ArduPilot."""
print(f"[CTRL] Connecting to {connection_string}...")
self.mav = mavutil.mavlink_connection(connection_string)
# Wait for heartbeat
print("[CTRL] Waiting for heartbeat...")
self.mav.wait_heartbeat()
print(f"[CTRL] Connected! System {self.mav.target_system} Component {self.mav.target_component}")
self.home_position = None
def wait_for_ekf(self, timeout=60):
"""Wait for EKF to initialize."""
print("[CTRL] Waiting for EKF initialization...")
start_time = time.time()
while time.time() - start_time < timeout:
# Request EKF status
msg = self.mav.recv_match(type='EKF_STATUS_REPORT', blocking=True, timeout=1)
if msg:
# Check if EKF is healthy (flags indicate position and velocity estimates are good)
if msg.flags & 0x01: # EKF_ATTITUDE
print(f"[CTRL] EKF Status: flags={msg.flags:#x}")
if msg.flags >= 0x1FF: # All basic flags set
print("[CTRL] EKF initialized successfully!")
return True
# Also check for GPS status messages to see progress
msg = self.mav.recv_match(type='STATUSTEXT', blocking=False)
if msg:
text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore')
print(f"[SITL] {text}")
if 'EKF3 IMU' in text and 'initialised' in text:
time.sleep(2) # Give it a moment after EKF init
return True
print("[CTRL] EKF initialization timeout!")
return False
def set_mode(self, mode):
"""Set flight mode."""
mode_mapping = self.mav.mode_mapping()
if mode not in mode_mapping:
print(f"[CTRL] Unknown mode: {mode}")
return False
mode_id = mode_mapping[mode]
self.mav.mav.set_mode_send(
self.mav.target_system,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode_id
)
# Wait for ACK
for _ in range(10):
msg = self.mav.recv_match(type='HEARTBEAT', blocking=True, timeout=1)
if msg and msg.custom_mode == mode_id:
print(f"[CTRL] Mode set to {mode}")
return True
print(f"[CTRL] Failed to set mode {mode}")
return False
def arm(self, force=True):
"""Arm the vehicle."""
print("[CTRL] Arming...")
# Disable arming checks if force
if force:
self.mav.mav.param_set_send(
self.mav.target_system,
self.mav.target_component,
b'ARMING_CHECK',
0,
mavutil.mavlink.MAV_PARAM_TYPE_INT32
)
time.sleep(0.5)
self.mav.mav.command_long_send(
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0, # confirmation
1, # arm
21196 if force else 0, # force arm magic number
0, 0, 0, 0, 0
)
# Wait for arm
for _ in range(30):
msg = self.mav.recv_match(type='HEARTBEAT', blocking=True, timeout=1)
if msg and msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED:
print("[CTRL] Armed successfully!")
return True
# Check for failure messages
ack = self.mav.recv_match(type='COMMAND_ACK', blocking=False)
if ack and ack.command == mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM:
if ack.result != mavutil.mavlink.MAV_RESULT_ACCEPTED:
print(f"[CTRL] Arm rejected: {ack.result}")
print("[CTRL] Arm timeout")
return False
def takeoff(self, altitude=5.0):
"""Take off to specified altitude."""
print(f"[CTRL] Taking off to {altitude}m...")
self.mav.mav.command_long_send(
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
0,
0, 0, 0, 0, 0, 0,
altitude
)
# Wait for takeoff
start_time = time.time()
while time.time() - start_time < 30:
msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if msg:
current_alt = msg.relative_alt / 1000.0 # mm to m
print(f"[CTRL] Altitude: {current_alt:.1f}m / {altitude}m")
if current_alt >= altitude * 0.9:
print("[CTRL] Takeoff complete!")
return True
print("[CTRL] Takeoff timeout")
return False
def goto_local(self, north, east, down, yaw=0):
"""Go to local position (NED frame)."""
print(f"[CTRL] Going to N:{north:.1f} E:{east:.1f} D:{down:.1f}")
# Use SET_POSITION_TARGET_LOCAL_NED
self.mav.mav.set_position_target_local_ned_send(
0, # timestamp
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
0b0000111111111000, # position only mask
north, east, down, # position
0, 0, 0, # velocity
0, 0, 0, # acceleration
yaw, 0 # yaw, yaw_rate
)
def wait_for_position(self, north, east, down, tolerance=1.0, timeout=30):
"""Wait until vehicle reaches position."""
start_time = time.time()
while time.time() - start_time < timeout:
msg = self.mav.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=1)
if msg:
dist = math.sqrt(
(msg.x - north)**2 +
(msg.y - east)**2 +
(msg.z - down)**2
)
print(f"[CTRL] Position: N:{msg.x:.1f} E:{msg.y:.1f} D:{msg.z:.1f} (dist:{dist:.1f})")
if dist < tolerance:
return True
return False
def land(self):
"""Land the vehicle."""
print("[CTRL] Landing...")
self.mav.mav.command_long_send(
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_CMD_NAV_LAND,
0,
0, 0, 0, 0, 0, 0, 0
)
# Wait for land
start_time = time.time()
while time.time() - start_time < 60:
msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if msg:
current_alt = msg.relative_alt / 1000.0
print(f"[CTRL] Landing... Alt: {current_alt:.1f}m")
if current_alt < 0.3:
print("[CTRL] Landed!")
return True
return False
def disarm(self):
"""Disarm the vehicle."""
print("[CTRL] Disarming...")
self.mav.mav.command_long_send(
self.mav.target_system,
self.mav.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0,
0, # disarm
21196, # force
0, 0, 0, 0, 0
)
time.sleep(1)
print("[CTRL] Disarmed")
def run_hover_mission(self, altitude=5.0, duration=30):
"""Simple hover mission."""
print(f"\n[MISSION] Hover at {altitude}m for {duration}s")
if not self.wait_for_ekf():
return False
if not self.set_mode('GUIDED'):
return False
if not self.arm(force=True):
return False
if not self.takeoff(altitude):
return False
print(f"[MISSION] Hovering for {duration} seconds...")
time.sleep(duration)
self.land()
self.disarm()
print("[MISSION] Hover mission complete!")
return True
def run_square_mission(self, altitude=5.0, side=5.0):
"""Fly a square pattern."""
print(f"\n[MISSION] Square pattern: {side}m sides at {altitude}m")
if not self.wait_for_ekf():
return False
if not self.set_mode('GUIDED'):
return False
if not self.arm(force=True):
return False
if not self.takeoff(altitude):
return False
# Square waypoints (NED: North, East, Down)
waypoints = [
(side, 0, -altitude), # North
(side, side, -altitude), # North-East
(0, side, -altitude), # East
(0, 0, -altitude), # Back to start
]
for i, (n, e, d) in enumerate(waypoints):
print(f"\n[MISSION] Waypoint {i+1}/4: N:{n} E:{e}")
self.goto_local(n, e, d)
time.sleep(0.5) # Let command process
self.wait_for_position(n, e, d, tolerance=1.5, timeout=20)
time.sleep(2) # Hover at waypoint
self.land()
self.disarm()
print("[MISSION] Square mission complete!")
return True
def run_circle_mission(self, altitude=5.0, radius=5.0, points=8):
"""Fly a circular pattern."""
print(f"\n[MISSION] Circle pattern: {radius}m radius at {altitude}m")
if not self.wait_for_ekf():
return False
if not self.set_mode('GUIDED'):
return False
if not self.arm(force=True):
return False
if not self.takeoff(altitude):
return False
# Generate circle waypoints
for i in range(points + 1): # +1 to complete the circle
angle = 2 * math.pi * i / points
north = radius * math.cos(angle)
east = radius * math.sin(angle)
print(f"\n[MISSION] Circle point {i+1}/{points+1}")
self.goto_local(north, east, -altitude)
time.sleep(0.5)
self.wait_for_position(north, east, -altitude, tolerance=1.5, timeout=15)
time.sleep(1)
# Return to center
self.goto_local(0, 0, -altitude)
self.wait_for_position(0, 0, -altitude, tolerance=1.5, timeout=15)
self.land()
self.disarm()
print("[MISSION] Circle mission complete!")
return True
def main():
parser = argparse.ArgumentParser(description='Autonomous UAV Controller')
parser.add_argument('--connection', default='tcp:127.0.0.1:5760',
help='MAVLink connection string')
parser.add_argument('--mission', choices=['hover', 'square', 'circle'],
default='hover', help='Mission type')
parser.add_argument('--altitude', type=float, default=5.0,
help='Flight altitude in meters')
parser.add_argument('--size', type=float, default=5.0,
help='Mission size (side length or radius)')
parser.add_argument('--duration', type=float, default=30.0,
help='Hover duration in seconds')
args = parser.parse_args()
print("=" * 50)
print(" Autonomous UAV Controller")
print("=" * 50)
print(f" Connection: {args.connection}")
print(f" Mission: {args.mission}")
print(f" Altitude: {args.altitude}m")
print("=" * 50)
print()
try:
controller = AutonomousController(args.connection)
if args.mission == 'hover':
controller.run_hover_mission(args.altitude, args.duration)
elif args.mission == 'square':
controller.run_square_mission(args.altitude, args.size)
elif args.mission == 'circle':
controller.run_circle_mission(args.altitude, args.size)
except KeyboardInterrupt:
print("\n[CTRL] Interrupted by user")
except Exception as e:
print(f"[ERROR] {e}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()