#!/usr/bin/env python3 import math import time import numpy as np from scipy.stats import levy, uniform from time import sleep from enum import Enum from control.uav_controller import Controller from vision.object_detector import ObjectDetector from utils.helpers import distance_2d from safety.geofence import clip_to_geofence from navigation.patterns.spiral import plan_spiral from navigation.patterns.lawnmower import plan_lawnmower from navigation.patterns.levy import plan_levy class SearchMode(Enum): SPIRAL = "spiral" LAWNMOWER = "lawnmower" LEVY = "levy" class Search: POSITION_TOLERANCE = 0.05 CHECK_INTERVAL = 0.5 MAX_TRAVEL_TIME = 300.0 def __init__(self, ctrl: Controller, detector: ObjectDetector, camera=None, mode: str = "spiral", altitude: float = 5.0, actions: dict = None): self.ctrl = ctrl self.detector = detector self.camera = camera self.mode = SearchMode(mode.lower()) self.altitude = altitude self.found_markers = {} self.running = True self.landed = False self._landing = False self.landing_enabled = False self.actions = actions or {} self.land_ids = set(self.actions.get("land", [])) self.target_ids = set(self.actions.get("target", [])) self.on_target_found = None self._dispatched_targets = set() hfov = 1.3962634 self.cam_fov_meters = 2.0 * self.altitude * math.tan(hfov / 2.0) self.spiral_max_legs = 40 overlap_spacing = self.cam_fov_meters * 0.8 self.spiral_initial_leg = overlap_spacing self.spiral_leg_increment = overlap_spacing self.lawn_width = 30.0 self.lawn_height = 30.0 self.lawn_lane_spacing = self.cam_fov_meters * 0.8 self.lawn_lanes = int(self.lawn_width / max(self.lawn_lane_spacing, 0.1)) + 1 self.levy_max_steps = 20 self.levy_field_size = 50.0 self.levy_angle_dist = uniform(loc=-180, scale=360) self.waypoints = [] self.current_wp = 0 self._on_waypoint_change = None def configure(self, **kwargs): for key, value in kwargs.items(): if hasattr(self, key): setattr(self, key, value) def plan(self, start_pos=(0.0, 0.0), geofence_points=None): if geofence_points: warn_dist = getattr(self, 'geofence_warn_dist', 3.0) xs = [p[0] for p in geofence_points] ys = [p[1] for p in geofence_points] safe_w = (max(xs) - min(xs)) - 2 * warn_dist safe_h = (max(ys) - min(ys)) - 2 * warn_dist if safe_w > 0 and safe_h > 0: self.lawn_width = safe_w self.lawn_height = safe_h self.levy_field_size = min(safe_w, safe_h) hfov = 1.3962634 self.cam_fov_meters = 2.0 * self.altitude * math.tan(hfov / 2.0) overlap_spacing = self.cam_fov_meters * 0.8 self.spiral_initial_leg = overlap_spacing self.spiral_leg_increment = overlap_spacing self.lawn_lane_spacing = overlap_spacing self.lawn_lanes = int(self.lawn_width / max(self.lawn_lane_spacing, 0.1)) + 1 if self.mode == SearchMode.SPIRAL: waypoints = plan_spiral(start_pos, self.spiral_initial_leg, self.spiral_leg_increment, self.spiral_max_legs) elif self.mode == SearchMode.LAWNMOWER: waypoints = plan_lawnmower(start_pos, self.lawn_width, self.lawn_height, self.lawn_lane_spacing) elif self.mode == SearchMode.LEVY: waypoints = plan_levy(start_pos, self.levy_max_steps, self.levy_field_size, overlap_spacing, self.levy_angle_dist) else: waypoints = [start_pos] if geofence_points: warn_dist = getattr(self, 'geofence_warn_dist', 3.0) stop_on_leave = (self.mode == SearchMode.SPIRAL) waypoints = clip_to_geofence(waypoints, geofence_points, warn_dist, stop_on_leave) self.waypoints = waypoints self.current_wp = 0 print(f"\n[PLAN] {self.mode.value.upper()} flight plan: " f"{len(waypoints)} waypoints") for i, (wx, wy) in enumerate(waypoints): print(f" WP{i:02d}: ({wx:+.1f}, {wy:+.1f})") if geofence_points: print(f"[PLAN] Clipped to geofence ({len(geofence_points)} vertices)") return waypoints def run(self): if not self.waypoints: self.plan() n = len(self.waypoints) print(f"\n{'=' * 50}") print(f" SEARCH: {self.mode.value.upper()} at {self.altitude}m") print(f" Waypoints: {n}") if self.land_ids: print(f" Landing targets: {self.land_ids}") print(f"{'=' * 50}\n") for i, (wx, wy) in enumerate(self.waypoints): if not self.running: break self.current_wp = i if self._on_waypoint_change: self._on_waypoint_change(i) self.ctrl.update_state() pos = self.ctrl.get_local_position() dist = distance_2d((pos['x'], pos['y']), (wx, wy)) dir_label = "" if i > 0: prev = self.waypoints[i - 1] dx = wx - prev[0] dy = wy - prev[1] if abs(dx) > abs(dy): dir_label = "N" if dx > 0 else "S" else: dir_label = "E" if dy > 0 else "W" print(f"[SEARCH] WP {i + 1}/{n} " f"→ ({wx:+.1f}, {wy:+.1f}) " f"dist:{dist:.1f}m {dir_label} " f"markers:{len(self.found_markers)}") self.move_to_local(wx, wy) if not self._landing: print(f"\n[SEARCH] Search complete. " f"Found {len(self.found_markers)} markers.") return self.found_markers def get_camera_frame(self): if self.camera is None: return None frame = self.camera.raw_frames.get("downward") if frame is None: frame = self.camera.raw_frames.get("gimbal") return frame def check_for_markers(self): frame = self.get_camera_frame() if frame is None: return [] detections = self.detector.detect(frame) new_markers = [] for d in detections: marker_id = d.get("id") if marker_id is None: continue if marker_id not in self.found_markers: self.ctrl.update_state() pos = self.ctrl.get_local_position() self.found_markers[marker_id] = { "id": marker_id, "uav_position": pos.copy(), "distance": d.get("distance", 0), "timestamp": time.time(), } new_markers.append(d) print(f"\n[SEARCH] ArUco ID:{marker_id} detected! " f"distance:{d['distance']:.2f}m " f"UAV at ({pos['x']:.1f}, {pos['y']:.1f})") if marker_id in self.target_ids and marker_id not in self._dispatched_targets: self._dispatched_targets.add(marker_id) self.running = False final_pos = self.center_over_target(marker_id) print(f"\n[SEARCH] Target ID:{marker_id} centered! " f"Dispatching UGV to ({final_pos['x']:.1f}, {final_pos['y']:.1f})") if self.on_target_found: self.on_target_found(marker_id, final_pos['x'], final_pos['y']) if marker_id in self.land_ids and not self._landing and self.landing_enabled: print(f"\n[SEARCH] Landing target ID:{marker_id} found! " f"Starting approach.") self._landing = True self.execute_landing(detections) return new_markers return new_markers def execute_landing(self, initial_detections): target_det = None for d in initial_detections: if d.get("id") in self.land_ids: target_det = d break if target_det is None: print("[SEARCH] Lost landing target, resuming search") return print(f"\n[SEARCH] ===== LANDING SEQUENCE =====") print(f"[SEARCH] Target marker ID:{target_det['id']} " f"distance:{target_det['distance']:.2f}m") IMG_W, IMG_H = 640, 480 IMG_CX, IMG_CY = IMG_W / 2, IMG_H / 2 HFOV = 1.3962634 self.ctrl.update_state() current_alt = self.ctrl.altitude DESCENT_STEP = max(0.5, current_alt * 0.25) LAND_ALT = min(1.0, current_alt - 0.2) CENTER_PX = 50 MAX_CORRECTIONS = 200 MAX_LOST = 30 GAIN = 0.35 self.ctrl.update_state() current_alt = self.ctrl.altitude print(f"[SEARCH] Visual servoing descent from {current_alt:.1f}m") while current_alt > LAND_ALT and self.running: centered = self._center_over_marker( current_alt, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV, CENTER_PX, MAX_CORRECTIONS, MAX_LOST, GAIN) if centered is None: print(f"\n[SEARCH] Marker lost, aborting landing") self._landing = False return current_alt = max(current_alt - DESCENT_STEP, LAND_ALT) print(f"\n[SEARCH] Descending to {current_alt:.1f}m") self.ctrl.update_state() cur = self.ctrl.get_local_position() self.ctrl.move_local_ned(cur['x'], cur['y'], -current_alt) sleep(2.0) print(f"[SEARCH] Final centering at {LAND_ALT:.1f}m") self._center_over_marker( LAND_ALT, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV, 35, MAX_CORRECTIONS, MAX_LOST, 0.25) print(f"\n[SEARCH] ===== LANDING =====") self.ctrl.land() self.landed = True self.running = False def center_over_target(self, target_id): print(f"\n[SEARCH] ===== CENTERING OVER TARGET {target_id} =====") if hasattr(self, 'uav_min_cms'): print(f"[SEARCH] Reducing speed to minimum ({self.uav_min_cms} cm/s) for centering.") self.ctrl.configure_speed_limits(wpnav_speed=self.uav_min_cms, loit_speed=self.uav_min_cms) IMG_W, IMG_H = 640, 480 IMG_CX, IMG_CY = IMG_W / 2, IMG_H / 2 HFOV = 1.3962634 CENTER_PX = 30 MAX_CORRECTIONS = 200 MAX_LOST = 30 GAIN = 0.35 self.ctrl.update_state() current_alt = self.ctrl.altitude centered = self._center_over_marker( current_alt, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV, CENTER_PX, MAX_CORRECTIONS, MAX_LOST, GAIN, allowed_ids=[target_id]) if not centered: print(f"\n[SEARCH] Failed to fully center, using best estimate.") else: print(f"\n[SEARCH] Successfully centered over target {target_id}.") self.ctrl.update_state() pos = self.ctrl.get_local_position() return pos def _center_over_marker(self, target_alt, img_w, img_h, img_cx, img_cy, hfov, center_px, max_corrections, max_lost, gain, allowed_ids=None): if allowed_ids is None: allowed_ids = self.land_ids lost_count = 0 centered = False for attempt in range(max_corrections): frame = self.get_camera_frame() if frame is None: sleep(0.3) continue detections = self.detector.detect(frame) target = None for d in detections: if d.get("id") in allowed_ids: target = d break if target is None: lost_count += 1 print(f"\r[SEARCH] Alt:{target_alt:.1f}m " f"MARKER LOST ({lost_count}/{max_lost}) ", end='', flush=True) if lost_count >= max_lost: return None sleep(0.3) continue lost_count = 0 cx, cy = target["center_px"] err_x = cx - img_cx err_y = cy - img_cy print(f"\r[SEARCH] Alt:{target_alt:.1f}m " f"err=({err_x:+.0f},{err_y:+.0f})px " f"dist:{target['distance']:.2f}m " f"({attempt+1}/{max_corrections}) ", end='', flush=True) if abs(err_x) < center_px and abs(err_y) < center_px: centered = True print(f"\n[SEARCH] Centered at {target_alt:.1f}m!") break self.ctrl.update_state() alt = max(self.ctrl.altitude, 0.5) ground_w = 2.0 * alt * math.tan(hfov / 2.0) m_per_px = ground_w / img_w correction_forward = -err_y * m_per_px * gain correction_right = err_x * m_per_px * gain self.ctrl.update_state() yaw = self.ctrl.current_yaw correction_n = correction_forward * math.cos(yaw) - correction_right * math.sin(yaw) correction_e = correction_forward * math.sin(yaw) + correction_right * math.cos(yaw) cur = self.ctrl.get_local_position() self.ctrl.move_local_ned( cur['x'] + correction_n, cur['y'] + correction_e, -target_alt) sleep(0.5) if not centered: print(f"\n[SEARCH] Partially centered at {target_alt:.1f}m, continuing") return centered def wait_for_position(self, target_x, target_y, timeout=None): self.ctrl.update_state() pos = self.ctrl.get_local_position() dist_total = distance_2d((pos['x'], pos['y']), (target_x, target_y)) if timeout is None: timeout = 30.0 + (dist_total * 3.0) t0 = time.time() while time.time() - t0 < timeout and self.running: self.ctrl.update_state() self.check_for_markers() if not self.running: return False pos = self.ctrl.get_local_position() dist = distance_2d( (pos['x'], pos['y']), (target_x, target_y) ) elapsed = int(time.time() - t0) print(f"\r[SEARCH] Moving: {dist:.1f}m to target ({elapsed}s) " f"markers found: {len(self.found_markers)} ", end='', flush=True) if dist < self.POSITION_TOLERANCE: print() return True sleep(self.CHECK_INTERVAL) print() return False return False def move_to_local(self, x, y): z = -self.altitude self.ctrl.move_local_ned(x, y, z) return self.wait_for_position(x, y) def stop(self): self.running = False def get_results(self): return { "mode": self.mode.value, "markers_found": len(self.found_markers), "markers": self.found_markers, "landed": self.landed, }