Aruco Code and UVG/UAV Logic Fixes

This commit is contained in:
2026-02-20 12:19:44 -05:00
parent 50ef3f0490
commit e2f805f3f3
22 changed files with 667 additions and 1143 deletions

View File

@@ -36,9 +36,13 @@ class Search:
self.running = True
self.landed = False
self._landing = False
self.landing_enabled = False
self.actions = actions or {}
self.land_ids = set(self.actions.get("land", []))
self.target_ids = set(self.actions.get("target", []))
self.on_target_found = None
self._dispatched_targets = set()
self.spiral_max_legs = 12
self.spiral_initial_leg = self.CAM_FOV_METERS
@@ -281,7 +285,15 @@ class Search:
f"distance:{d['distance']:.2f}m "
f"UAV at ({pos['x']:.1f}, {pos['y']:.1f})")
if marker_id in self.land_ids and not self._landing:
if marker_id in self.target_ids and marker_id not in self._dispatched_targets:
self._dispatched_targets.add(marker_id)
pos = self.found_markers[marker_id]['uav_position']
print(f"\n[SEARCH] Target ID:{marker_id} found! "
f"Dispatching UGV to ({pos['x']:.1f}, {pos['y']:.1f})")
if self.on_target_found:
self.on_target_found(marker_id, pos['x'], pos['y'])
if marker_id in self.land_ids and not self._landing and self.landing_enabled:
print(f"\n[SEARCH] Landing target ID:{marker_id} found! "
f"Starting approach.")
self._landing = True

View File

@@ -105,9 +105,24 @@ class Controller:
name.encode('utf-8'),
value,
mavutil.mavlink.MAV_PARAM_TYPE_REAL32)
sleep(0.5)
sleep(0.1)
self._drain_messages()
def configure_ekf_fast_converge(self):
"""Tune EKF parameters for faster initial convergence in SITL."""
params = {
'EK3_CHECK_SCALE': 200, # Relax EKF health checks (default 100)
'EK3_GPS_CHECK': 0, # Disable GPS preflight checks in SITL
'EK3_POSNE_M_NSE': 0.5, # Trust GPS position more
'EK3_VELD_M_NSE': 0.2, # Trust GPS vertical vel more (default 0.4)
'EK3_VELNE_M_NSE': 0.1, # Trust GPS horizontal vel more (default 0.3)
'INS_GYRO_CAL': 0, # Skip gyro cal on boot (SITL doesn't need it)
'ARMING_CHECK': 0, # Disable ALL preflight checks (SITL only)
}
for name, value in params.items():
self.set_param(name, value)
print("[UAV] EKF fast-converge params applied")
def set_mode_guided(self):
self.conn.mav.command_long_send(
self.conn.target_system,
@@ -128,8 +143,33 @@ class Controller:
self._drain_messages()
print("[UAV] Mode -> GUIDED_NOGPS")
def arm(self, retries: int = 30):
def wait_for_ekf(self, timeout: float = 120.0) -> bool:
"""Wait until EKF has actual GPS-fused position estimate."""
print("[UAV] Waiting for EKF position estimate ...")
t0 = time.time()
while time.time() - t0 < timeout:
self._drain_messages()
msg = self.conn.recv_match(type='EKF_STATUS_REPORT', blocking=True, timeout=2)
if msg is not None:
# bit 0x08 = pos_horiz_abs (actual GPS-fused position)
# bit 0x20 = pred_pos_horiz_abs (predicted, not enough for arming)
pos_horiz_abs = bool(msg.flags & 0x08)
if pos_horiz_abs:
print(f"\n[UAV] EKF has position estimate (flags=0x{msg.flags:04x})")
return True
elapsed = int(time.time() - t0)
print(f"\r[UAV] Waiting for EKF ... {elapsed}s ", end='', flush=True)
print("\n[UAV] EKF wait timed out")
return False
def arm(self, retries: int = 15):
self.set_mode_guided()
# Wait for EKF before burning through arm attempts
if not self.wait_for_ekf(timeout=120.0):
print("[UAV] Aborting arm: EKF never became healthy")
return False
for i in range(retries):
print(f"[UAV] Arm attempt {i+1}/{retries}...")
self.conn.mav.command_long_send(
@@ -149,7 +189,7 @@ class Controller:
self.armed = True
print("[UAV] Armed")
return True
print(f"[UAV] Arm attempt {i+1} failed, retrying...")
print(f"[UAV] Arm attempt {i+1} failed, retrying in 2s...")
time.sleep(2)
return False
@@ -259,7 +299,8 @@ class Controller:
if msg and msg.fix_type >= 3:
print(f"\n[UAV] GPS fix: {msg.fix_type} sats: {msg.satellites_visible}")
print("[UAV] Waiting for EKF to settle ...")
for _ in range(15):
# Wait briefly; let the arming loop handle the rest
for _ in range(2):
self._drain_messages()
sleep(1)
return True

View File

@@ -1,94 +1,149 @@
#!/usr/bin/env python3
"""UGV controller using Gazebo transport (gz.transport13).
Publishes Twist to /ugv/cmd_vel, subscribes to /ugv/odometry for
position feedback. Drives a proportional turn-then-drive controller.
"""
import math
import numpy as np
from time import sleep, perf_counter
from typing import Tuple, Optional
import time
import threading
from time import sleep
from gz.transport13 import Node
from gz.msgs10.twist_pb2 import Twist
from gz.msgs10.odometry_pb2 import Odometry
try:
from pymavlink import mavutil
import pymavlink.dialects.v20.ardupilotmega as mavlink
from pymavlink.dialects.v20.ardupilotmega import (
MAVLink_set_position_target_local_ned_message,
MAV_FRAME_BODY_OFFSET_NED,
MAV_FRAME_LOCAL_NED,
)
HAS_MAVLINK = True
except ImportError:
HAS_MAVLINK = False
MAX_LINEAR_VEL = 1.0
MAX_ANGULAR_VEL = 1.5
POSITION_TOL = 0.3
MAX_ANGULAR_VEL = 2.0
POSITION_TOL = 0.5
YAW_TOL = 0.15
class UGVController:
def __init__(self, connection_string: Optional[str] = None,
static_pos: Tuple[float, float] = (10.0, 5.0)):
self.conn = None
self.backend = 'passive'
self.position = {'x': static_pos[0], 'y': static_pos[1], 'z': 0.0}
def __init__(self, cmd_topic="/ugv/cmd_vel", odom_topic="/ugv/odometry"):
self.node = Node()
self.pub = self.node.advertise(cmd_topic, Twist)
self.position = {'x': 0.0, 'y': 0.0, 'z': 0.0}
self.yaw = 0.0
self._lock = threading.Lock()
self._driving = False
self._drive_thread = None
if connection_string and HAS_MAVLINK:
try:
print(f"[UGV] Connecting via pymavlink: {connection_string}")
self.conn = mavutil.mavlink_connection(connection_string)
self.conn.wait_heartbeat(timeout=10)
print("[UGV] Heartbeat received (system %u component %u)" %
(self.conn.target_system, self.conn.target_component))
self.backend = 'mavlink'
except Exception as e:
print(f"[UGV] MAVLink connection failed: {e}")
self.conn = None
ok = self.node.subscribe(Odometry, odom_topic, self._odom_cb)
if ok:
print(f"[UGV] Subscribed to {odom_topic}")
else:
print(f"[UGV] WARNING: Failed to subscribe to {odom_topic}")
if self.backend == 'passive':
print(f"[UGV] Passive mode - static at ({static_pos[0]}, {static_pos[1]})")
print(f"[UGV] Publishing commands to {cmd_topic}")
def update_state(self):
if self.backend == 'mavlink' and self.conn:
while True:
msg = self.conn.recv_match(blocking=False)
if msg is None:
break
mtype = msg.get_type()
if mtype == 'LOCAL_POSITION_NED':
self.position = {'x': msg.x, 'y': msg.y, 'z': msg.z}
elif mtype == 'ATTITUDE':
self.yaw = msg.yaw
def _odom_cb(self, msg):
with self._lock:
self.position['x'] = msg.pose.position.x
self.position['y'] = msg.pose.position.y
self.position['z'] = msg.pose.position.z
q = msg.pose.orientation
siny_cosp = 2.0 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1.0 - 2.0 * (q.y * q.y + q.z * q.z)
self.yaw = math.atan2(siny_cosp, cosy_cosp)
def get_position(self) -> dict:
self.update_state()
return self.position.copy()
def get_position(self):
with self._lock:
return self.position.copy()
def move_to(self, x: float, y: float):
if self.backend == 'mavlink' and self.conn:
move_msg = MAVLink_set_position_target_local_ned_message(
int(perf_counter() * 1000),
self.conn.target_system,
self.conn.target_component,
MAV_FRAME_LOCAL_NED,
3576, x, y, 0,
0, 0, 0, 0, 0, 0, 0, 0)
self.conn.mav.send(move_msg)
print(f"[UGV] Moving to ({x:.1f}, {y:.1f})")
def get_yaw(self):
with self._lock:
return self.yaw
def send_velocity(self, vx: float, vy: float, yaw_rate: float = 0.0):
if self.backend == 'mavlink' and self.conn:
move_msg = MAVLink_set_position_target_local_ned_message(
int(perf_counter() * 1000),
self.conn.target_system,
self.conn.target_component,
MAV_FRAME_BODY_OFFSET_NED,
3527, 0, 0, 0,
vx, vy, 0, 0, 0, 0, 0, yaw_rate)
self.conn.mav.send(move_msg)
def send_cmd_vel(self, linear_x, angular_z):
msg = Twist()
msg.linear.x = max(-MAX_LINEAR_VEL, min(MAX_LINEAR_VEL, linear_x))
msg.angular.z = max(-MAX_ANGULAR_VEL, min(MAX_ANGULAR_VEL, angular_z))
self.pub.publish(msg)
def stop(self):
self.send_velocity(0, 0, 0)
self.send_cmd_vel(0.0, 0.0)
def distance_to(self, x: float, y: float) -> float:
dx = x - self.position['x']
dy = y - self.position['y']
def distance_to(self, x, y):
pos = self.get_position()
dx = x - pos['x']
dy = y - pos['y']
return math.sqrt(dx**2 + dy**2)
def drive_to(self, target_x, target_y, callback=None):
"""Start driving to (target_x, target_y) in a background thread."""
if self._driving:
print("[UGV] Already driving, ignoring new target")
return
self._driving = True
self._drive_thread = threading.Thread(
target=self._drive_loop,
args=(target_x, target_y, callback),
daemon=True
)
self._drive_thread.start()
def _drive_loop(self, target_x, target_y, callback):
print(f"[UGV] Driving to ({target_x:.1f}, {target_y:.1f})")
t0 = time.time()
timeout = 120.0
while self._driving and (time.time() - t0) < timeout:
pos = self.get_position()
yaw = self.get_yaw()
dx = target_x - pos['x']
dy = target_y - pos['y']
dist = math.sqrt(dx**2 + dy**2)
if dist < POSITION_TOL:
self.stop()
elapsed = int(time.time() - t0)
print(f"\n[UGV] Arrived at ({target_x:.1f}, {target_y:.1f}) "
f"in {elapsed}s")
self._driving = False
if callback:
callback(target_x, target_y)
return
target_yaw = math.atan2(dy, dx)
yaw_error = target_yaw - yaw
while yaw_error > math.pi:
yaw_error -= 2 * math.pi
while yaw_error < -math.pi:
yaw_error += 2 * math.pi
if abs(yaw_error) > YAW_TOL:
angular = max(-MAX_ANGULAR_VEL,
min(MAX_ANGULAR_VEL, yaw_error * 2.0))
linear = 0.0 if abs(yaw_error) > 0.5 else 0.3
else:
linear = min(MAX_LINEAR_VEL, dist * 0.5)
angular = yaw_error * 1.0
self.send_cmd_vel(linear, angular)
elapsed = int(time.time() - t0)
print(f"\r[UGV] dist:{dist:.1f}m yaw_err:"
f"{math.degrees(yaw_error):+.0f}° "
f"v:{linear:.2f} ω:{angular:.2f} ({elapsed}s) ",
end='', flush=True)
sleep(0.1)
self.stop()
self._driving = False
print(f"\n[UGV] Drive timeout or stopped")
def cancel_drive(self):
self._driving = False
self.stop()
@property
def is_driving(self):
return self._driving
@property
def has_arrived(self):
return not self._driving and self._drive_thread is not None