Files
simulation/src/control/search.py

558 lines
19 KiB
Python

#!/usr/bin/env python3
import math
import time
import numpy as np
from scipy.stats import levy, uniform
from time import sleep
from enum import Enum
from control.uav_controller import Controller
from vision.object_detector import ObjectDetector
from utils.helpers import distance_2d
class SearchMode(Enum):
SPIRAL = "spiral"
LAWNMOWER = "lawnmower"
LEVY = "levy"
class Search:
POSITION_TOLERANCE = 0.2
CHECK_INTERVAL = 0.5
MAX_TRAVEL_TIME = 300.0
def __init__(self, ctrl: Controller, detector: ObjectDetector,
camera=None, mode: str = "spiral", altitude: float = 5.0,
actions: dict = None):
self.ctrl = ctrl
self.detector = detector
self.camera = camera
self.mode = SearchMode(mode.lower())
self.altitude = altitude
self.found_markers = {}
self.running = True
self.landed = False
self._landing = False
self.landing_enabled = False
self.actions = actions or {}
self.land_ids = set(self.actions.get("land", []))
self.target_ids = set(self.actions.get("target", []))
self.on_target_found = None
self._dispatched_targets = set()
hfov = 1.3962634 # 80 deg horizontal FOV
self.cam_fov_meters = 2.0 * self.altitude * math.tan(hfov / 2.0)
self.spiral_max_legs = 40
self.spiral_initial_leg = self.cam_fov_meters
self.spiral_leg_increment = self.cam_fov_meters * 0.8
self.lawn_width = 30.0
self.lawn_height = 30.0
self.lawn_lane_spacing = self.cam_fov_meters * 0.8
self.lawn_lanes = int(self.lawn_width / max(self.lawn_lane_spacing, 0.1)) + 1
self.levy_max_steps = 20
self.levy_field_size = 50.0
self.levy_angle_dist = uniform(loc=-180, scale=360)
self.waypoints = []
self.current_wp = 0
self._on_waypoint_change = None
def configure(self, **kwargs):
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
def plan(self, start_pos=(0.0, 0.0), geofence_points=None):
if self.mode == SearchMode.SPIRAL:
waypoints = self._plan_spiral(start_pos)
elif self.mode == SearchMode.LAWNMOWER:
waypoints = self._plan_lawnmower(start_pos)
elif self.mode == SearchMode.LEVY:
waypoints = self._plan_levy(start_pos)
else:
waypoints = [start_pos]
if geofence_points:
waypoints = self._clip_to_geofence(waypoints, geofence_points)
self.waypoints = waypoints
self.current_wp = 0
print(f"\n[PLAN] {self.mode.value.upper()} flight plan: "
f"{len(waypoints)} waypoints")
for i, (wx, wy) in enumerate(waypoints):
print(f" WP{i:02d}: ({wx:+.1f}, {wy:+.1f})")
if geofence_points:
print(f"[PLAN] Clipped to geofence ({len(geofence_points)} vertices)")
return waypoints
def _plan_spiral(self, start):
waypoints = [start]
x, y = start
directions = [
(0, 1), # East (+Y)
(1, 0), # North (+X)
(0, -1), # West (-Y)
(-1, 0), # South (-X)
]
distance = self.spiral_initial_leg
dir_idx = 0
legs_at_this_dist = 0
for _ in range(self.spiral_max_legs):
dx_dir, dy_dir = directions[dir_idx % 4]
x += dx_dir * distance
y += dy_dir * distance
waypoints.append((x, y))
dir_idx += 1
legs_at_this_dist += 1
if legs_at_this_dist >= 2:
legs_at_this_dist = 0
distance += self.spiral_leg_increment
return waypoints
def _plan_lawnmower(self, start):
waypoints = [start]
sx, sy = start
lane_spacing = self.lawn_lane_spacing
height = self.lawn_height
num_lanes = max(1, int(self.lawn_width / lane_spacing))
for lane in range(num_lanes):
lane_x = sx + lane * lane_spacing
if lane % 2 == 0:
target_y = sy + height
else:
target_y = sy
waypoints.append((lane_x, target_y))
if lane < num_lanes - 1:
next_x = sx + (lane + 1) * lane_spacing
waypoints.append((next_x, target_y))
return waypoints
def _plan_levy(self, start):
waypoints = [start]
x, y = start
for _ in range(self.levy_max_steps):
angle_deg = self.levy_angle_dist.rvs()
angle_rad = math.radians(angle_deg)
raw_distance = levy.rvs(loc=1, scale=1)
distance = min(max(raw_distance, 1.0), self.levy_field_size)
x += distance * math.cos(angle_rad)
y += distance * math.sin(angle_rad)
waypoints.append((x, y))
return waypoints
def _clip_to_geofence(self, waypoints, polygon):
warn_dist = getattr(self, 'geofence_warn_dist', 3.0)
safe_polygon = []
if warn_dist > 0:
cx = sum(p[0] for p in polygon) / len(polygon)
cy = sum(p[1] for p in polygon) / len(polygon)
for x, y in polygon:
dx = x - cx
dy = y - cy
length = math.sqrt(dx*dx + dy*dy)
if length > 0:
shrink = warn_dist / length
safe_polygon.append((x - dx * shrink, y - dy * shrink))
else:
safe_polygon.append((x, y))
else:
safe_polygon = polygon
clipped = []
for wx, wy in waypoints:
if self._point_in_polygon(wx, wy, safe_polygon):
clipped.append((wx, wy))
else:
nearest = self._nearest_point_on_polygon(wx, wy, safe_polygon)
if not clipped or distance_2d(clipped[-1], nearest) > 0.5:
clipped.append(nearest)
return clipped
@staticmethod
def _point_in_polygon(px, py, polygon):
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i]
xj, yj = polygon[j]
if ((yi > py) != (yj > py)) and \
(px < (xj - xi) * (py - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
@staticmethod
def _nearest_point_on_polygon(px, py, polygon):
min_dist = float('inf')
nearest = (px, py)
n = len(polygon)
for i in range(n):
x1, y1 = polygon[i]
x2, y2 = polygon[(i + 1) % n]
dx, dy = x2 - x1, y2 - y1
length_sq = dx * dx + dy * dy
if length_sq == 0:
proj = (x1, y1)
else:
t = max(0, min(1, ((px - x1) * dx + (py - y1) * dy) / length_sq))
proj = (x1 + t * dx, y1 + t * dy)
dist = math.sqrt((px - proj[0])**2 + (py - proj[1])**2)
if dist < min_dist:
min_dist = dist
nearest = proj
return nearest
def run(self):
if not self.waypoints:
self.plan()
n = len(self.waypoints)
print(f"\n{'=' * 50}")
print(f" SEARCH: {self.mode.value.upper()} at {self.altitude}m")
print(f" Waypoints: {n}")
if self.land_ids:
print(f" Landing targets: {self.land_ids}")
print(f"{'=' * 50}\n")
for i, (wx, wy) in enumerate(self.waypoints):
if not self.running:
break
self.current_wp = i
if self._on_waypoint_change:
self._on_waypoint_change(i)
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
dist = distance_2d((pos['x'], pos['y']), (wx, wy))
dir_label = ""
if i > 0:
prev = self.waypoints[i - 1]
dx = wx - prev[0]
dy = wy - prev[1]
if abs(dx) > abs(dy):
dir_label = "N" if dx > 0 else "S"
else:
dir_label = "E" if dy > 0 else "W"
print(f"[SEARCH] WP {i + 1}/{n} "
f"→ ({wx:+.1f}, {wy:+.1f}) "
f"dist:{dist:.1f}m {dir_label} "
f"markers:{len(self.found_markers)}")
self.move_to_local(wx, wy)
if not self._landing:
print(f"\n[SEARCH] Search complete. "
f"Found {len(self.found_markers)} markers.")
return self.found_markers
def get_camera_frame(self):
if self.camera is None:
return None
frame = self.camera.raw_frames.get("downward")
if frame is None:
frame = self.camera.raw_frames.get("gimbal")
return frame
def check_for_markers(self):
frame = self.get_camera_frame()
if frame is None:
return []
detections = self.detector.detect(frame)
new_markers = []
for d in detections:
marker_id = d.get("id")
if marker_id is None:
continue
if marker_id not in self.found_markers:
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
self.found_markers[marker_id] = {
"id": marker_id,
"uav_position": pos.copy(),
"distance": d.get("distance", 0),
"timestamp": time.time(),
}
new_markers.append(d)
print(f"\n[SEARCH] ArUco ID:{marker_id} detected! "
f"distance:{d['distance']:.2f}m "
f"UAV at ({pos['x']:.1f}, {pos['y']:.1f})")
if marker_id in self.target_ids and marker_id not in self._dispatched_targets:
self._dispatched_targets.add(marker_id)
self.running = False
final_pos = self.center_over_target(marker_id)
print(f"\n[SEARCH] Target ID:{marker_id} centered! "
f"Dispatching UGV to ({final_pos['x']:.1f}, {final_pos['y']:.1f})")
if self.on_target_found:
self.on_target_found(marker_id, final_pos['x'], final_pos['y'])
if marker_id in self.land_ids and not self._landing and self.landing_enabled:
print(f"\n[SEARCH] Landing target ID:{marker_id} found! "
f"Starting approach.")
self._landing = True
self.execute_landing(detections)
return new_markers
return new_markers
def execute_landing(self, initial_detections):
target_det = None
for d in initial_detections:
if d.get("id") in self.land_ids:
target_det = d
break
if target_det is None:
print("[SEARCH] Lost landing target, resuming search")
return
print(f"\n[SEARCH] ===== LANDING SEQUENCE =====")
print(f"[SEARCH] Target marker ID:{target_det['id']} "
f"distance:{target_det['distance']:.2f}m")
IMG_W, IMG_H = 640, 480
IMG_CX, IMG_CY = IMG_W / 2, IMG_H / 2
HFOV = 1.3962634 # 80° horizontal FOV
self.ctrl.update_state()
current_alt = self.ctrl.altitude
DESCENT_STEP = max(0.5, current_alt * 0.25)
LAND_ALT = min(1.0, current_alt - 0.2)
CENTER_PX = 50
MAX_CORRECTIONS = 200
MAX_LOST = 30
GAIN = 0.35
self.ctrl.update_state()
current_alt = self.ctrl.altitude
print(f"[SEARCH] Visual servoing descent from {current_alt:.1f}m")
while current_alt > LAND_ALT and self.running:
centered = self._center_over_marker(
current_alt, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV,
CENTER_PX, MAX_CORRECTIONS, MAX_LOST, GAIN)
if centered is None:
print(f"\n[SEARCH] Marker lost, aborting landing")
self._landing = False
return
current_alt = max(current_alt - DESCENT_STEP, LAND_ALT)
print(f"\n[SEARCH] Descending to {current_alt:.1f}m")
self.ctrl.update_state()
cur = self.ctrl.get_local_position()
self.ctrl.move_local_ned(cur['x'], cur['y'], -current_alt)
sleep(2.0)
print(f"[SEARCH] Final centering at {LAND_ALT:.1f}m")
self._center_over_marker(
LAND_ALT, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV,
35, MAX_CORRECTIONS, MAX_LOST, 0.25)
print(f"\n[SEARCH] ===== LANDING =====")
self.ctrl.land()
self.landed = True
self.running = False
def center_over_target(self, target_id):
print(f"\n[SEARCH] ===== CENTERING OVER TARGET {target_id} =====")
if hasattr(self, 'uav_min_cms'):
print(f"[SEARCH] Reducing speed to minimum ({self.uav_min_cms} cm/s) for centering.")
self.ctrl.configure_speed_limits(wpnav_speed=self.uav_min_cms, loit_speed=self.uav_min_cms)
IMG_W, IMG_H = 640, 480
IMG_CX, IMG_CY = IMG_W / 2, IMG_H / 2
HFOV = 1.3962634 # 80° horizontal FOV
CENTER_PX = 30
MAX_CORRECTIONS = 200
MAX_LOST = 30
GAIN = 0.35
self.ctrl.update_state()
current_alt = self.ctrl.altitude
centered = self._center_over_marker(
current_alt, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV,
CENTER_PX, MAX_CORRECTIONS, MAX_LOST, GAIN, allowed_ids=[target_id])
if not centered:
print(f"\n[SEARCH] Failed to fully center, using best estimate.")
else:
print(f"\n[SEARCH] Successfully centered over target {target_id}.")
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
return pos
def _center_over_marker(self, target_alt, img_w, img_h, img_cx, img_cy,
hfov, center_px, max_corrections, max_lost, gain, allowed_ids=None):
if allowed_ids is None:
allowed_ids = self.land_ids
lost_count = 0
centered = False
for attempt in range(max_corrections):
frame = self.get_camera_frame()
if frame is None:
sleep(0.3)
continue
detections = self.detector.detect(frame)
target = None
for d in detections:
if d.get("id") in allowed_ids:
target = d
break
if target is None:
lost_count += 1
print(f"\r[SEARCH] Alt:{target_alt:.1f}m "
f"MARKER LOST ({lost_count}/{max_lost}) ",
end='', flush=True)
if lost_count >= max_lost:
return None
sleep(0.3)
continue
lost_count = 0
cx, cy = target["center_px"]
err_x = cx - img_cx
err_y = cy - img_cy
print(f"\r[SEARCH] Alt:{target_alt:.1f}m "
f"err=({err_x:+.0f},{err_y:+.0f})px "
f"dist:{target['distance']:.2f}m "
f"({attempt+1}/{max_corrections}) ",
end='', flush=True)
if abs(err_x) < center_px and abs(err_y) < center_px:
centered = True
print(f"\n[SEARCH] Centered at {target_alt:.1f}m!")
break
self.ctrl.update_state()
alt = max(self.ctrl.altitude, 0.5)
ground_w = 2.0 * alt * math.tan(hfov / 2.0)
m_per_px = ground_w / img_w
# Calculate movement in the UAV's body frame
correction_forward = -err_y * m_per_px * gain
correction_right = err_x * m_per_px * gain
self.ctrl.update_state()
yaw = self.ctrl.current_yaw
# Rotate body correction (Forward, Right) into world NED (North, East)
correction_n = correction_forward * math.cos(yaw) - correction_right * math.sin(yaw)
correction_e = correction_forward * math.sin(yaw) + correction_right * math.cos(yaw)
cur = self.ctrl.get_local_position()
self.ctrl.move_local_ned(
cur['x'] + correction_n,
cur['y'] + correction_e,
-target_alt)
sleep(0.5)
if not centered:
print(f"\n[SEARCH] Partially centered at {target_alt:.1f}m, continuing")
return centered
def wait_for_position(self, target_x, target_y, timeout=None):
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
dist_total = distance_2d((pos['x'], pos['y']), (target_x, target_y))
if timeout is None:
# Dynamic timeout: 30s base time + 3 seconds per meter
timeout = 30.0 + (dist_total * 3.0)
t0 = time.time()
while time.time() - t0 < timeout and self.running:
self.ctrl.update_state()
self.check_for_markers()
if not self.running:
return False
pos = self.ctrl.get_local_position()
dist = distance_2d(
(pos['x'], pos['y']),
(target_x, target_y)
)
elapsed = int(time.time() - t0)
print(f"\r[SEARCH] Moving: {dist:.1f}m to target ({elapsed}s) "
f"markers found: {len(self.found_markers)} ",
end='', flush=True)
if dist < self.POSITION_TOLERANCE:
print()
return True
sleep(self.CHECK_INTERVAL)
print()
return False
def _wait_arrival(self, target_x, target_y, timeout=15.0):
t0 = time.time()
while time.time() - t0 < timeout and self.running:
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
dist = distance_2d(
(pos['x'], pos['y']),
(target_x, target_y)
)
elapsed = int(time.time() - t0)
print(f"\r[SEARCH] Approaching marker: {dist:.1f}m ({elapsed}s) ",
end='', flush=True)
if dist < self.POSITION_TOLERANCE:
print()
return True
sleep(self.CHECK_INTERVAL)
print()
return False
def move_to_local(self, x, y):
z = -self.altitude
self.ctrl.move_local_ned(x, y, z)
return self.wait_for_position(x, y)
def stop(self):
self.running = False
def get_results(self):
return {
"mode": self.mode.value,
"markers_found": len(self.found_markers),
"markers": self.found_markers,
"landed": self.landed,
}