#!/usr/bin/env python3 import time import math import argparse import numpy as np from time import sleep, perf_counter from typing import Tuple from pymavlink import mavutil import pymavlink.dialects.v20.ardupilotmega as mavlink from pymavlink.dialects.v20.ardupilotmega import ( MAVLink_set_position_target_local_ned_message, MAV_FRAME_BODY_OFFSET_NED, MAV_FRAME_LOCAL_NED, MAV_CMD_DO_SET_MODE, MAV_CMD_NAV_TAKEOFF, MAV_CMD_NAV_LAND, ) HOLD_ALT = 5.0 LOWEST_ALT = 3.0 GUIDED_MODE = 4 GUIDED_NOGPS_MODE = 20 DEFAULT_WPNAV_SPEED = 150 DEFAULT_WPNAV_ACCEL = 100 DEFAULT_WPNAV_SPEED_UP = 100 DEFAULT_WPNAV_SPEED_DN = 75 DEFAULT_WPNAV_ACCEL_Z = 75 DEFAULT_LOIT_SPEED = 150 class Controller: HOLD_ALT = HOLD_ALT LOWEST_ALT = LOWEST_ALT def __init__(self, connection_string: str): print(f"[UAV] Connecting to {connection_string} ...") self.conn = mavutil.mavlink_connection(connection_string) self.conn.wait_heartbeat() print("[UAV] Heartbeat from system %u component %u" % (self.conn.target_system, self.conn.target_component)) self.conn.mav.request_data_stream_send( self.conn.target_system, self.conn.target_component, mavutil.mavlink.MAV_DATA_STREAM_ALL, 50, 1) self.current_yaw = 0.0 self.altitude = 0.0 self.position = {'x': 0.0, 'y': 0.0, 'z': 0.0} self.armed = False def _drain_messages(self): while True: msg = self.conn.recv_match(blocking=False) if msg is None: break mtype = msg.get_type() if mtype == 'ATTITUDE': self.current_yaw = msg.yaw elif mtype == 'LOCAL_POSITION_NED': self.position = {'x': msg.x, 'y': msg.y, 'z': msg.z} self.altitude = -msg.z elif mtype == 'GLOBAL_POSITION_INT': if self.altitude == 0: self.altitude = msg.relative_alt / 1000.0 elif mtype == 'HEARTBEAT': self.armed = bool(msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) elif mtype == 'STATUSTEXT': text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore') text = text.strip() if text: print(f"[SITL] {text}") def update_state(self): self._drain_messages() def get_local_position(self) -> dict: self.update_state() return self.position.copy() def get_altitude(self) -> float: self.update_state() return self.altitude def set_param(self, name: str, value: float): self.conn.mav.param_set_send( self.conn.target_system, self.conn.target_component, name.encode('utf-8'), value, mavutil.mavlink.MAV_PARAM_TYPE_REAL32) sleep(0.1) self._drain_messages() def configure_ekf_fast_converge(self): """Tune EKF parameters for faster initial convergence in SITL.""" params = { 'EK3_CHECK_SCALE': 200, # Relax EKF health checks (default 100) 'EK3_GPS_CHECK': 0, # Disable GPS preflight checks in SITL 'EK3_POSNE_M_NSE': 0.5, # Trust GPS position more 'EK3_VELD_M_NSE': 0.2, # Trust GPS vertical vel more (default 0.4) 'EK3_VELNE_M_NSE': 0.1, # Trust GPS horizontal vel more (default 0.3) 'INS_GYRO_CAL': 0, # Skip gyro cal on boot (SITL doesn't need it) 'ARMING_CHECK': 0, # Disable ALL preflight checks (SITL only) } for name, value in params.items(): self.set_param(name, value) print("[UAV] EKF fast-converge params applied") def set_mode_guided(self): self.conn.mav.command_long_send( self.conn.target_system, self.conn.target_component, MAV_CMD_DO_SET_MODE, 0, 89, GUIDED_MODE, 0, 0, 0, 0, 0) sleep(1) self._drain_messages() print("[UAV] Mode -> GUIDED") def set_mode_guided_nogps(self): self.conn.mav.command_long_send( self.conn.target_system, self.conn.target_component, MAV_CMD_DO_SET_MODE, 0, 89, GUIDED_NOGPS_MODE, 0, 0, 0, 0, 0) sleep(1) self._drain_messages() print("[UAV] Mode -> GUIDED_NOGPS") def wait_for_ekf(self, timeout: float = 120.0) -> bool: """Wait until EKF has actual GPS-fused position estimate.""" print("[UAV] Waiting for EKF position estimate ...") t0 = time.time() while time.time() - t0 < timeout: self._drain_messages() msg = self.conn.recv_match(type='EKF_STATUS_REPORT', blocking=True, timeout=2) if msg is not None: # bit 0x08 = pos_horiz_abs (actual GPS-fused position) # bit 0x20 = pred_pos_horiz_abs (predicted, not enough for arming) pos_horiz_abs = bool(msg.flags & 0x08) if pos_horiz_abs: print(f"\n[UAV] EKF has position estimate (flags=0x{msg.flags:04x})") return True elapsed = int(time.time() - t0) print(f"\r[UAV] Waiting for EKF ... {elapsed}s ", end='', flush=True) print("\n[UAV] EKF wait timed out") return False def arm(self, retries: int = 15): self.set_mode_guided() # Wait for EKF before burning through arm attempts if not self.wait_for_ekf(timeout=120.0): print("[UAV] Aborting arm: EKF never became healthy") return False for i in range(retries): print(f"[UAV] Arm attempt {i+1}/{retries}...") self.conn.mav.command_long_send( self.conn.target_system, self.conn.target_component, mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, 0, 1, 21196, 0, 0, 0, 0, 0) armed = False for _ in range(20): self._drain_messages() if self.conn.motors_armed(): armed = True break time.sleep(0.2) if armed: self.armed = True print("[UAV] Armed") return True print(f"[UAV] Arm attempt {i+1} failed, retrying in 2s...") time.sleep(2) return False def disarm(self): self.conn.arducopter_disarm() print("[UAV] Disarmed") def takeoff(self, altitude: float = HOLD_ALT): self.conn.mav.command_long_send( self.conn.target_system, self.conn.target_component, MAV_CMD_NAV_TAKEOFF, 0, 0, 0, 0, 0, 0, 0, altitude) print(f"[UAV] Takeoff -> {altitude}m") def land(self): self.conn.mav.command_long_send( self.conn.target_system, self.conn.target_component, MAV_CMD_DO_SET_MODE, 0, 89, 9, 0, 0, 0, 0, 0) print("[UAV] Landing (LAND mode)") def move_pos_rel(self, x: float, y: float, z: float): move_msg = MAVLink_set_position_target_local_ned_message( int(perf_counter() * 1000), self.conn.target_system, self.conn.target_component, MAV_FRAME_BODY_OFFSET_NED, 3576, x, y, z, 0, 0, 0, 0, 0, 0, 0, 0) self.conn.mav.send(move_msg) def move_pos_rel_yaw(self, x: float, y: float, z: float, yaw: float): move_msg = MAVLink_set_position_target_local_ned_message( int(perf_counter() * 1000), self.conn.target_system, self.conn.target_component, MAV_FRAME_BODY_OFFSET_NED, 3576, x, y, z, 0, 0, 0, 0, 0, 0, yaw, 0) self.conn.mav.send(move_msg) def move_vel_rel_alt(self, vx: float, vy: float, z: float): move_msg = MAVLink_set_position_target_local_ned_message( int(perf_counter() * 1000), self.conn.target_system, self.conn.target_component, MAV_FRAME_BODY_OFFSET_NED, 3555, 0, 0, z, vx, vy, 0, 0, 0, 0, 0, 0) self.conn.mav.send(move_msg) def move_local_ned(self, x: float, y: float, z: float): move_msg = MAVLink_set_position_target_local_ned_message( int(perf_counter() * 1000), self.conn.target_system, self.conn.target_component, MAV_FRAME_LOCAL_NED, 3576, x, y, z, 0, 0, 0, 0, 0, 0, 0, 0) self.conn.mav.send(move_msg) def distance(self, p1: Tuple, p2: Tuple) -> float: return float(np.linalg.norm(np.asarray(p1) - np.asarray(p2))) def wait_altitude(self, target_alt: float, tolerance: float = 0.5, timeout: float = 30.0): t0 = time.time() while time.time() - t0 < timeout: self.update_state() if abs(self.altitude - target_alt) < tolerance: print(f"\n[UAV] Altitude reached: {self.altitude:.1f}m") return True print(f"\r[UAV] Climbing: {self.altitude:.1f}m / {target_alt:.1f}m ", end='', flush=True) sleep(0.2) return False def configure_speed_limits(self, wpnav_speed=DEFAULT_WPNAV_SPEED, wpnav_accel=DEFAULT_WPNAV_ACCEL, wpnav_speed_up=DEFAULT_WPNAV_SPEED_UP, wpnav_speed_dn=DEFAULT_WPNAV_SPEED_DN, wpnav_accel_z=DEFAULT_WPNAV_ACCEL_Z, loit_speed=DEFAULT_LOIT_SPEED): params = { 'WPNAV_SPEED': wpnav_speed, 'WPNAV_ACCEL': wpnav_accel, 'WPNAV_SPEED_UP': wpnav_speed_up, 'WPNAV_SPEED_DN': wpnav_speed_dn, 'WPNAV_ACCEL_Z': wpnav_accel_z, 'LOIT_SPEED': loit_speed, } for name, value in params.items(): self.set_param(name, value) print(f"[UAV] Speed limits set: horiz={wpnav_speed}cm/s " f"accel={wpnav_accel}cm/s² up={wpnav_speed_up}cm/s " f"dn={wpnav_speed_dn}cm/s") def set_max_velocity(self, speed): self.conn.mav.command_long_send( self.conn.target_system, self.conn.target_component, mavlink.MAV_CMD_DO_CHANGE_SPEED, 0, 1, speed, -1, 0, 0, 0, 0)