From 42e4fa28a9468a10a6de2209d67b7cb350f3c800 Mon Sep 17 00:00:00 2001 From: SirBlobby Date: Fri, 13 Feb 2026 16:48:14 -0500 Subject: [PATCH] Preflight Planner Update --- .gitignore | 3 + config/properties.yaml | 36 --- config/search.yaml | 27 +- config/uav.yaml | 73 +---- config/ugv.yaml | 46 +-- scripts/generate_tags.py | 2 +- src/control/search.py | 540 ++++++++++++++++++------------- src/main.py | 84 ++++- src/navigation/flight_tracker.py | 80 ++++- src/utils/recorder.py | 293 +++++++++++++++++ 10 files changed, 781 insertions(+), 403 deletions(-) delete mode 100644 config/properties.yaml create mode 100644 src/utils/recorder.py diff --git a/.gitignore b/.gitignore index e64f8df..fb3d160 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,6 @@ htmlcov/ wsl_env.sh activate_venv.sh + +# Simulation results +results/ diff --git a/config/properties.yaml b/config/properties.yaml deleted file mode 100644 index 3aaea73..0000000 --- a/config/properties.yaml +++ /dev/null @@ -1,36 +0,0 @@ -simulation: - world: uav_ugv_search.sdf - software_render: auto - -physics: - max_step_size: 0.002 - real_time_factor: 1.0 - -sensor_noise: - enabled: true - position_stddev: 0.05 - velocity_stddev: 0.1 - gyro_stddev: 0.0002 - accel_stddev: 0.017 - -gps: - enabled: true - update_rate: 5 - noise_stddev: 2.0 - purpose: geofence_only - -initial_positions: - uav: - x: 0.0 - y: 0.0 - z: 0.195 - yaw: 90.0 - ugv: - x: 5.0 - y: 5.0 - z: 0.0 - yaw: 0.0 - -logging: - enabled: true - log_directory: logs diff --git a/config/search.yaml b/config/search.yaml index 6084c50..33dda45 100644 --- a/config/search.yaml +++ b/config/search.yaml @@ -1,5 +1,16 @@ -altitude: 5.0 +# ─── Mission / Search Configuration ────────────────────────── +# Single source of truth for all mission parameters. +# ── Flight ─────────────────────────────────────────────────── +altitude: 4.0 # Search altitude (meters) + +# ── ArUco Marker ───────────────────────────────────────────── +marker: + dictionary: DICT_4X4_50 + size: 0.5 # Physical marker size in meters + landing_ids: [0] # Marker IDs that trigger landing + +# ── Search Patterns ────────────────────────────────────────── spiral: max_legs: 12 initial_leg: 4.0 @@ -15,18 +26,16 @@ levy: max_steps: 20 field_size: 50.0 -actions: - land: - - 0 - +# ── Geofence ───────────────────────────────────────────────── geofence: enabled: true warning_distance: 3.0 min_altitude: 0.0 max_altitude: 15.0 # Polygon vertices in local NED (x=North, y=East) meters + # Centered on UAV start position (0, 0) points: - - [-5, -5] - - [-5, 25] - - [25, 25] - - [25, -5] + - [-15, -15] + - [-15, 15] + - [15, 15] + - [15, -15] diff --git a/config/uav.yaml b/config/uav.yaml index 66a6360..850a33e 100644 --- a/config/uav.yaml +++ b/config/uav.yaml @@ -1,74 +1,7 @@ +# ─── UAV Configuration ─────────────────────────────────────── +# UAV-specific settings. Mission config is in search.yaml. + connection: sim: "tcp:127.0.0.1:5760" real: "/dev/ttyAMA0" baud: 57600 - -flight: - takeoff_altitude: 5.0 - max_altitude: 15.0 - min_altitude: 3.0 - max_velocity: 2.0 - max_acceleration: 1.0 - max_climb_rate: 1.0 - max_yaw_rate: 45.0 - -navigation: - frame: LOCAL_NED - waypoint_radius: 0.5 - position_hold_radius: 0.2 - home_mode: local - -vision: - forward_camera: - enabled: true - frame_rate: 30 - resolution: [640, 480] - downward_camera: - enabled: true - frame_rate: 30 - resolution: [320, 240] - visual_odometry: - enabled: true - method: ORB - min_features: 100 - max_features: 500 - optical_flow: - enabled: true - method: Lucas-Kanade - window_size: 15 - min_altitude: 0.3 - max_altitude: 10.0 - landmark_detection: - enabled: true - method: ArUco - marker_size: 0.15 - -position_estimation: - method: EKF - update_rate: 50 - weights: - visual_odometry: 0.6 - optical_flow: 0.3 - imu: 0.1 - process_noise: - position: 0.1 - velocity: 0.5 - measurement_noise: - visual_odom: 0.05 - optical_flow: 0.1 - imu: 0.2 - -obstacle_avoidance: - enabled: true - detection_range: 5.0 - safety_margin: 1.0 - -safety: - geofence: - enabled: true - action_on_breach: RTL - max_altitude: 10 - radius: 20 - failsafe: - vision_loss_timeout: 5.0 - action_on_vision_loss: HOLD diff --git a/config/ugv.yaml b/config/ugv.yaml index 1beebea..84479e8 100644 --- a/config/ugv.yaml +++ b/config/ugv.yaml @@ -1,46 +1,8 @@ -vehicle: - wheelbase: 0.3 - track_width: 0.25 - wheel_radius: 0.05 - -connection: - sim: null - real: null +# ─── UGV Configuration ─────────────────────────────────────── +# UGV-specific settings. Mission config is in search.yaml. +# UGV spawn position in local NED (meters) +# Also used by run_autonomous.sh to place the UGV model in Gazebo position: x: 5.0 y: 5.0 - z: 0.0 - yaw: 0.0 - -navigation: - max_linear_velocity: 1.0 - max_angular_velocity: 1.5 - linear_acceleration: 0.5 - angular_acceleration: 1.0 - waypoint_radius: 0.3 - position_tolerance: 0.3 - -landing_pad: - marker_type: ArUco - marker_id: 0 - marker_size: 0.3 - pad_diameter: 1.0 - color: [255, 255, 0] - -vision: - detection_method: ArUco - camera: - enabled: true - frame_rate: 30 - resolution: [320, 240] - -safety: - geofence: - enabled: true - action_on_breach: STOP - failsafe: - vision_loss_timeout: 3.0 - action_on_vision_loss: STOP - collision_distance: 0.2 - action_on_collision: STOP diff --git a/scripts/generate_tags.py b/scripts/generate_tags.py index 331b94e..f7ee568 100644 --- a/scripts/generate_tags.py +++ b/scripts/generate_tags.py @@ -91,7 +91,7 @@ def get_landing_id(): if search_cfg.exists(): with open(search_cfg) as f: cfg = yaml.safe_load(f) or {} - land_ids = cfg.get("actions", {}).get("land", []) + land_ids = cfg.get("marker", {}).get("landing_ids", []) if land_ids: return land_ids[0] return 0 diff --git a/src/control/search.py b/src/control/search.py index 3281acf..50b2fd2 100644 --- a/src/control/search.py +++ b/src/control/search.py @@ -9,7 +9,6 @@ from enum import Enum from control.uav_controller import Controller from vision.object_detector import ObjectDetector -from vision.visual_servoing import VisualServoing from utils.helpers import distance_2d @@ -42,11 +41,7 @@ class Search: self.actions = actions or {} self.land_ids = set(self.actions.get("land", [])) - self.servoing = None - if self.land_ids: - target_id = list(self.land_ids)[0] - self.servoing = VisualServoing(ctrl, target_marker_id=target_id) - + # Search pattern parameters self.spiral_max_legs = 12 self.spiral_initial_leg = self.CAM_FOV_METERS self.spiral_leg_increment = self.CAM_FOV_METERS * 0.8 @@ -60,24 +55,236 @@ class Search: self.levy_field_size = 50.0 self.levy_angle_dist = uniform(loc=-180, scale=360) + # Pre-flight plan + self.waypoints = [] + self.current_wp = 0 + self._on_waypoint_change = None # callback: fn(index) + def configure(self, **kwargs): for key, value in kwargs.items(): if hasattr(self, key): setattr(self, key, value) + # ═══════════════════════════════════════════════════════════ + # PRE-FLIGHT PLANNING + # ═══════════════════════════════════════════════════════════ + + def plan(self, start_pos=(0.0, 0.0), geofence_points=None): + """Pre-compute all waypoints for the search pattern. + + Args: + start_pos: (x, y) starting position in local NED + geofence_points: list of (x, y) polygon vertices, or None + + Returns: + List of (x, y) absolute waypoints + """ + if self.mode == SearchMode.SPIRAL: + waypoints = self._plan_spiral(start_pos) + elif self.mode == SearchMode.LAWNMOWER: + waypoints = self._plan_lawnmower(start_pos) + elif self.mode == SearchMode.LEVY: + waypoints = self._plan_levy(start_pos) + else: + waypoints = [start_pos] + + # Clip to geofence + if geofence_points: + waypoints = self._clip_to_geofence(waypoints, geofence_points) + + self.waypoints = waypoints + self.current_wp = 0 + + print(f"\n[PLAN] {self.mode.value.upper()} flight plan: " + f"{len(waypoints)} waypoints") + for i, (wx, wy) in enumerate(waypoints): + print(f" WP{i:02d}: ({wx:+.1f}, {wy:+.1f})") + if geofence_points: + print(f"[PLAN] Clipped to geofence ({len(geofence_points)} vertices)") + + return waypoints + + def _plan_spiral(self, start): + """Expanding square spiral from center outward.""" + waypoints = [start] + x, y = start + + directions = [ + (0, 1), # East (+Y) + (1, 0), # North (+X) + (0, -1), # West (-Y) + (-1, 0), # South (-X) + ] + + distance = self.spiral_initial_leg + dir_idx = 0 + legs_at_this_dist = 0 + + for _ in range(self.spiral_max_legs): + dx_dir, dy_dir = directions[dir_idx % 4] + x += dx_dir * distance + y += dy_dir * distance + waypoints.append((x, y)) + + dir_idx += 1 + legs_at_this_dist += 1 + if legs_at_this_dist >= 2: + legs_at_this_dist = 0 + distance += self.spiral_leg_increment + + return waypoints + + def _plan_lawnmower(self, start): + """Lawnmower (boustrophedon) pattern.""" + waypoints = [start] + sx, sy = start + lane_spacing = self.lawn_lane_spacing + height = self.lawn_height + num_lanes = max(1, int(self.lawn_width / lane_spacing)) + + for lane in range(num_lanes): + lane_x = sx + lane * lane_spacing + if lane % 2 == 0: + target_y = sy + height + else: + target_y = sy + + waypoints.append((lane_x, target_y)) + + if lane < num_lanes - 1: + next_x = sx + (lane + 1) * lane_spacing + waypoints.append((next_x, target_y)) + + return waypoints + + def _plan_levy(self, start): + """Lévy walk — random but pre-computed.""" + waypoints = [start] + x, y = start + + for _ in range(self.levy_max_steps): + angle_deg = self.levy_angle_dist.rvs() + angle_rad = math.radians(angle_deg) + raw_distance = levy.rvs(loc=1, scale=1) + distance = min(max(raw_distance, 1.0), self.levy_field_size) + + x += distance * math.cos(angle_rad) + y += distance * math.sin(angle_rad) + waypoints.append((x, y)) + + return waypoints + + def _clip_to_geofence(self, waypoints, polygon): + """Clip waypoints to stay within the geofence polygon. + + If a waypoint is outside, it is projected to the nearest + point on the polygon boundary. Consecutive out-of-bounds + waypoints are collapsed. + """ + clipped = [] + for wx, wy in waypoints: + if self._point_in_polygon(wx, wy, polygon): + clipped.append((wx, wy)) + else: + nearest = self._nearest_point_on_polygon(wx, wy, polygon) + # Avoid duplicate consecutive points + if not clipped or distance_2d(clipped[-1], nearest) > 0.5: + clipped.append(nearest) + return clipped + + @staticmethod + def _point_in_polygon(px, py, polygon): + """Ray-casting point-in-polygon test.""" + n = len(polygon) + inside = False + j = n - 1 + for i in range(n): + xi, yi = polygon[i] + xj, yj = polygon[j] + if ((yi > py) != (yj > py)) and \ + (px < (xj - xi) * (py - yi) / (yj - yi) + xi): + inside = not inside + j = i + return inside + + @staticmethod + def _nearest_point_on_polygon(px, py, polygon): + """Find the nearest point on the polygon edge to (px, py).""" + min_dist = float('inf') + nearest = (px, py) + n = len(polygon) + for i in range(n): + x1, y1 = polygon[i] + x2, y2 = polygon[(i + 1) % n] + # Project point onto segment + dx, dy = x2 - x1, y2 - y1 + length_sq = dx * dx + dy * dy + if length_sq == 0: + proj = (x1, y1) + else: + t = max(0, min(1, ((px - x1) * dx + (py - y1) * dy) / length_sq)) + proj = (x1 + t * dx, y1 + t * dy) + dist = math.sqrt((px - proj[0])**2 + (py - proj[1])**2) + if dist < min_dist: + min_dist = dist + nearest = proj + return nearest + + # ═══════════════════════════════════════════════════════════ + # EXECUTION + # ═══════════════════════════════════════════════════════════ + def run(self): + """Execute the pre-planned search by following waypoints.""" + if not self.waypoints: + self.plan() + + n = len(self.waypoints) print(f"\n{'=' * 50}") print(f" SEARCH: {self.mode.value.upper()} at {self.altitude}m") + print(f" Waypoints: {n}") if self.land_ids: print(f" Landing targets: {self.land_ids}") print(f"{'=' * 50}\n") - if self.mode == SearchMode.SPIRAL: - return self.spiral() - elif self.mode == SearchMode.LAWNMOWER: - return self.lawnmower() - elif self.mode == SearchMode.LEVY: - return self.levy_walk() + for i, (wx, wy) in enumerate(self.waypoints): + if not self.running: + break + + self.current_wp = i + if self._on_waypoint_change: + self._on_waypoint_change(i) + + self.ctrl.update_state() + pos = self.ctrl.get_local_position() + dist = distance_2d((pos['x'], pos['y']), (wx, wy)) + + dir_label = "" + if i > 0: + prev = self.waypoints[i - 1] + dx = wx - prev[0] + dy = wy - prev[1] + if abs(dx) > abs(dy): + dir_label = "N" if dx > 0 else "S" + else: + dir_label = "E" if dy > 0 else "W" + + print(f"[SEARCH] WP {i + 1}/{n} " + f"→ ({wx:+.1f}, {wy:+.1f}) " + f"dist:{dist:.1f}m {dir_label} " + f"markers:{len(self.found_markers)}") + + self.move_to_local(wx, wy) + + if not self._landing: + print(f"\n[SEARCH] Search complete. " + f"Found {len(self.found_markers)} markers.") + + return self.found_markers + + # ═══════════════════════════════════════════════════════════ + # CAMERA & MARKER DETECTION + # ═══════════════════════════════════════════════════════════ def get_camera_frame(self): if self.camera is None: @@ -114,15 +321,21 @@ class Search: f"UAV at ({pos['x']:.1f}, {pos['y']:.1f})") if marker_id in self.land_ids and not self._landing: - print(f"\n[SEARCH] Landing target ID:{marker_id} found! Starting approach.") + print(f"\n[SEARCH] Landing target ID:{marker_id} found! " + f"Starting approach.") self._landing = True self.execute_landing(detections) return new_markers return new_markers + # ═══════════════════════════════════════════════════════════ + # LANDING + # ═══════════════════════════════════════════════════════════ + def execute_landing(self, initial_detections): - """Visual-servoing descent: keep the ArUco tag centered while descending.""" + """Visual-servoing descent: keep the ArUco tag centered + while descending.""" target_det = None for d in initial_detections: if d.get("id") in self.land_ids: @@ -137,102 +350,34 @@ class Search: print(f"[SEARCH] Target marker ID:{target_det['id']} " f"distance:{target_det['distance']:.2f}m") - # Camera parameters + # Camera parameters (must match model.sdf) IMG_W, IMG_H = 640, 480 IMG_CX, IMG_CY = IMG_W / 2, IMG_H / 2 - HFOV = 1.3962634 # radians (~80 degrees) + HFOV = 1.3962634 # 80° horizontal FOV # Landing parameters - DESCENT_STEP = 1.0 # descend 1m per step - LAND_ALT = 1.5 # switch to blind land at this altitude - CENTER_PX = 40 # centered if within this many pixels - MAX_CORRECTIONS = 15 # max correction iterations per altitude step - GAIN = 0.4 # damping factor for corrections + DESCENT_STEP = 1.0 + LAND_ALT = 1.5 + CENTER_PX = 50 + MAX_CORRECTIONS = 30 + MAX_LOST = 30 + GAIN = 0.35 - # Phase 1: Initial approach — fly toward marker using tvec as rough guide - tvec = target_det.get("tvec", [0, 0, 0]) + # Start visual servoing from current position self.ctrl.update_state() - pos = self.ctrl.get_local_position() + current_alt = self.ctrl.altitude - # For a downward camera (90° pitch): tvec[0]=right=East, tvec[1]=down=North - # But this is unreliable, so just use it as a rough initial move - rough_x = pos['x'] + tvec[1] - rough_y = pos['y'] + tvec[0] - - print(f"[SEARCH] Phase 1: Rough approach to ({rough_x:.1f}, {rough_y:.1f})") - self.ctrl.move_local_ned(rough_x, rough_y, -self.altitude) - self._wait_arrival(rough_x, rough_y, timeout=10.0) - sleep(1.0) # settle - - # Phase 2: Visual servoing descent - current_alt = self.altitude - lost_count = 0 - MAX_LOST = 10 # abort if marker lost this many consecutive times - - print(f"[SEARCH] Phase 2: Visual servoing descent from {current_alt:.1f}m") + print(f"[SEARCH] Visual servoing descent from {current_alt:.1f}m") while current_alt > LAND_ALT and self.running: - # Center over marker at current altitude - centered = False - for attempt in range(MAX_CORRECTIONS): - frame = self.get_camera_frame() - if frame is None: - sleep(0.2) - continue + centered = self._center_over_marker( + current_alt, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV, + CENTER_PX, MAX_CORRECTIONS, MAX_LOST, GAIN) - detections = self.detector.detect(frame) - target = None - for d in detections: - if d.get("id") in self.land_ids: - target = d - break - - if target is None: - lost_count += 1 - print(f"\r[SEARCH] Alt:{current_alt:.1f}m MARKER LOST ({lost_count}/{MAX_LOST}) ", - end='', flush=True) - if lost_count >= MAX_LOST: - print(f"\n[SEARCH] Marker lost too many times, aborting landing") - self._landing = False - return - sleep(0.3) - continue - - lost_count = 0 - - # Pixel error from image center - cx, cy = target["center_px"] - err_x = cx - IMG_CX # positive = marker is right of center - err_y = cy - IMG_CY # positive = marker is below center - - print(f"\r[SEARCH] Alt:{current_alt:.1f}m err=({err_x:+.0f},{err_y:+.0f})px " - f"dist:{target['distance']:.2f}m ({attempt+1}/{MAX_CORRECTIONS}) ", - end='', flush=True) - - if abs(err_x) < CENTER_PX and abs(err_y) < CENTER_PX: - centered = True - break - - # Convert pixel error to meters using FOV and altitude - # At current altitude, the ground plane width visible is: - # ground_width = 2 * alt * tan(HFOV/2) - self.ctrl.update_state() - alt = max(self.ctrl.altitude, 0.5) - ground_w = 2.0 * alt * math.tan(HFOV / 2.0) - m_per_px = ground_w / IMG_W - - # Apply correction (pixel right = East = +y, pixel down = North = +x) - correction_y = err_x * m_per_px * GAIN # East - correction_x = err_y * m_per_px * GAIN # North - - cur = self.ctrl.get_local_position() - new_x = cur['x'] + correction_x - new_y = cur['y'] + correction_y - self.ctrl.move_local_ned(new_x, new_y, -current_alt) - sleep(0.4) - - if not centered: - print(f"\n[SEARCH] Could not fully center at {current_alt:.1f}m, continuing descent") + if centered is None: + print(f"\n[SEARCH] Marker lost, aborting landing") + self._landing = False + return # Descend one step current_alt = max(current_alt - DESCENT_STEP, LAND_ALT) @@ -240,52 +385,94 @@ class Search: self.ctrl.update_state() cur = self.ctrl.get_local_position() self.ctrl.move_local_ned(cur['x'], cur['y'], -current_alt) - sleep(1.5) # wait for descent + sleep(2.0) - # Phase 3: Final centering at low altitude - print(f"[SEARCH] Phase 3: Final centering at {LAND_ALT:.1f}m") - for attempt in range(MAX_CORRECTIONS): + # Final precision centering + print(f"[SEARCH] Final centering at {LAND_ALT:.1f}m") + self._center_over_marker( + LAND_ALT, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV, + 35, MAX_CORRECTIONS, MAX_LOST, 0.25) + + # Land + print(f"\n[SEARCH] ===== LANDING =====") + self.ctrl.land() + self.landed = True + self.running = False + + def _center_over_marker(self, target_alt, img_w, img_h, img_cx, img_cy, + hfov, center_px, max_corrections, max_lost, gain): + """Center the UAV over the ArUco marker using pixel error. + + Returns True if centered, False if partial, None if lost. + """ + lost_count = 0 + centered = False + + for attempt in range(max_corrections): frame = self.get_camera_frame() if frame is None: - sleep(0.2) + sleep(0.3) continue + detections = self.detector.detect(frame) target = None for d in detections: if d.get("id") in self.land_ids: target = d break + if target is None: + lost_count += 1 + print(f"\r[SEARCH] Alt:{target_alt:.1f}m " + f"MARKER LOST ({lost_count}/{max_lost}) ", + end='', flush=True) + if lost_count >= max_lost: + return None sleep(0.3) continue - cx, cy = target["center_px"] - err_x = cx - IMG_CX - err_y = cy - IMG_CY + lost_count = 0 - print(f"\r[SEARCH] Final center: err=({err_x:+.0f},{err_y:+.0f})px ", + cx, cy = target["center_px"] + err_x = cx - img_cx + err_y = cy - img_cy + + print(f"\r[SEARCH] Alt:{target_alt:.1f}m " + f"err=({err_x:+.0f},{err_y:+.0f})px " + f"dist:{target['distance']:.2f}m " + f"({attempt+1}/{max_corrections}) ", end='', flush=True) - if abs(err_x) < 25 and abs(err_y) < 25: - print(f"\n[SEARCH] Centered! Landing now.") + if abs(err_x) < center_px and abs(err_y) < center_px: + centered = True + print(f"\n[SEARCH] Centered at {target_alt:.1f}m!") break self.ctrl.update_state() alt = max(self.ctrl.altitude, 0.5) - ground_w = 2.0 * alt * math.tan(HFOV / 2.0) - m_per_px = ground_w / IMG_W - correction_y = err_x * m_per_px * 0.3 - correction_x = err_y * m_per_px * 0.3 + ground_w = 2.0 * alt * math.tan(hfov / 2.0) + m_per_px = ground_w / img_w + + # Camera pitch-90° inverts image axes relative to NED + correction_y = -err_x * m_per_px * gain + correction_x = -err_y * m_per_px * gain + cur = self.ctrl.get_local_position() - self.ctrl.move_local_ned(cur['x'] + correction_x, - cur['y'] + correction_y, -LAND_ALT) + self.ctrl.move_local_ned( + cur['x'] + correction_x, + cur['y'] + correction_y, + -target_alt) sleep(0.5) - # Phase 4: Land - print(f"[SEARCH] ===== LANDING =====") - self.ctrl.land() - self.landed = True - self.running = False + if not centered: + print(f"\n[SEARCH] Partially centered at {target_alt:.1f}m, " + f"continuing") + + return centered + + # ═══════════════════════════════════════════════════════════ + # MOVEMENT + # ═══════════════════════════════════════════════════════════ def wait_for_position(self, target_x, target_y, timeout=None): if timeout is None: @@ -313,7 +500,7 @@ class Search: return False def _wait_arrival(self, target_x, target_y, timeout=15.0): - """Wait for position without checking markers (used during landing).""" + """Wait for position without checking markers (landing).""" t0 = time.time() while time.time() - t0 < timeout and self.running: self.ctrl.update_state() @@ -337,110 +524,9 @@ class Search: self.ctrl.move_local_ned(x, y, z) return self.wait_for_position(x, y) - def move_relative(self, dx, dy): - self.ctrl.update_state() - pos = self.ctrl.get_local_position() - target_x = pos['x'] + dx - target_y = pos['y'] + dy - self.ctrl.move_pos_rel(dx, dy, 0) - return self.wait_for_position(target_x, target_y) - - def spiral(self): - distance = self.spiral_initial_leg - increment = self.spiral_leg_increment - travel_x = True - direction = 1 - - for leg in range(self.spiral_max_legs): - if not self.running: - break - - self.ctrl.update_state() - pos = self.ctrl.get_local_position() - print(f"[SEARCH] Spiral leg {leg + 1}/{self.spiral_max_legs} " - f"pos:({pos['x']:.1f}, {pos['y']:.1f}) " - f"step:{distance:.1f}m markers:{len(self.found_markers)}") - - if travel_x: - dx = distance * direction - self.move_relative(dx, 0) - else: - dy = distance * direction - self.move_relative(0, dy) - direction *= -1 - distance += increment - - travel_x = not travel_x - - print(f"[SEARCH] Spiral complete. Found {len(self.found_markers)} markers.") - return self.found_markers - - def lawnmower(self): - lane_spacing = self.lawn_lane_spacing - height = self.lawn_height - num_lanes = max(1, int(self.lawn_width / lane_spacing)) - - self.ctrl.update_state() - start_pos = self.ctrl.get_local_position() - start_x = start_pos['x'] - start_y = start_pos['y'] - - print(f"[SEARCH] Lawnmower: {num_lanes} lanes, " - f"{lane_spacing:.1f}m spacing, {height:.1f}m height") - - for lane in range(num_lanes): - if not self.running: - break - - lane_x = start_x + lane * lane_spacing - - if lane % 2 == 0: - target_y = start_y + height - else: - target_y = start_y - - print(f"[SEARCH] Lane {lane + 1}/{num_lanes} " - f"x:{lane_x:.1f} markers:{len(self.found_markers)}") - - self.move_to_local(lane_x, target_y) - - if not self.running: - break - - if lane < num_lanes - 1: - next_x = start_x + (lane + 1) * lane_spacing - self.move_to_local(next_x, target_y) - - print(f"[SEARCH] Lawnmower complete. Found {len(self.found_markers)} markers.") - return self.found_markers - - def levy_walk(self): - field_size = self.levy_field_size - - for step in range(self.levy_max_steps): - if not self.running: - break - - angle_deg = self.levy_angle_dist.rvs() - angle_rad = math.radians(angle_deg) - - raw_distance = levy.rvs(loc=1, scale=1) - distance = min(max(raw_distance, 1.0), field_size) - - dx = distance * math.cos(angle_rad) - dy = distance * math.sin(angle_rad) - - self.ctrl.update_state() - pos = self.ctrl.get_local_position() - print(f"[SEARCH] Lévy step {step + 1}/{self.levy_max_steps} " - f"angle:{angle_deg:.0f}° dist:{distance:.1f}m " - f"pos:({pos['x']:.1f}, {pos['y']:.1f}) " - f"markers:{len(self.found_markers)}") - - self.move_relative(dx, dy) - - print(f"[SEARCH] Lévy walk complete. Found {len(self.found_markers)} markers.") - return self.found_markers + # ═══════════════════════════════════════════════════════════ + # RESULTS + # ═══════════════════════════════════════════════════════════ def stop(self): self.running = False diff --git a/src/main.py b/src/main.py index ae09960..9e2e862 100644 --- a/src/main.py +++ b/src/main.py @@ -15,6 +15,7 @@ from control.search import Search from vision.object_detector import ObjectDetector from vision.camera_processor import CameraProcessor from navigation.flight_tracker import FlightTracker +from utils.recorder import RunRecorder PROJECT_DIR = Path(__file__).resolve().parent.parent CONFIG_DIR = PROJECT_DIR / "config" @@ -61,14 +62,32 @@ def main(): parser.add_argument('--search', default='spiral', choices=['spiral', 'lawnmower', 'levy']) parser.add_argument('--altitude', type=float, default=None) parser.add_argument('--no-camera', action='store_true') + parser.add_argument('--no-record', action='store_true', help='Disable recording') args = parser.parse_args() + # ── Start recorder ────────────────────────────────────────── + recorder = None + if not args.no_record: + recorder = RunRecorder() + recorder.start_logging() + uav_cfg = load_config('uav.yaml') search_cfg = load_config('search.yaml') + ugv_cfg = load_config('ugv.yaml') - altitude = args.altitude or search_cfg.get('altitude', uav_cfg.get('flight', {}).get('takeoff_altitude', 5.0)) + altitude = args.altitude or search_cfg.get('altitude', 5.0) search_mode = args.search + # Marker config (single source of truth) + marker_cfg = search_cfg.get('marker', {}) + marker_dict = marker_cfg.get('dictionary', 'DICT_4X4_50') + marker_size = marker_cfg.get('size', 0.5) + landing_ids = marker_cfg.get('landing_ids', [0]) + + # UGV position + ugv_pos_cfg = ugv_cfg.get('position', {}) + ugv_position = (ugv_pos_cfg.get('x', 5.0), ugv_pos_cfg.get('y', 5.0)) + if args.connection: conn_str = args.connection elif args.device == 'real': @@ -79,8 +98,8 @@ def main(): ctrl = Controller(conn_str) detector = ObjectDetector( - marker_size=uav_cfg.get('vision', {}).get('landmark_detection', {}).get('marker_size', 0.15), - aruco_dict_name="DICT_4X4_50", + marker_size=marker_size, + aruco_dict_name=marker_dict, ) camera = None @@ -101,7 +120,7 @@ def main(): print(f"[MAIN] Camera unavailable: {e}") camera = None - actions = search_cfg.get('actions', {}) + actions = {'land': landing_ids} search = Search(ctrl, detector, camera=camera, mode=search_mode, altitude=altitude, actions=actions) @@ -109,29 +128,48 @@ def main(): if mode_cfg: search.configure(**{f"{search_mode}_{k}": v for k, v in mode_cfg.items()}) + # ── Pre-flight plan ───────────────────────────────────────── + geofence_cfg = search_cfg.get('geofence', {}) + geofence_points = None + if geofence_cfg.get('enabled', False): + raw_pts = geofence_cfg.get('points', []) + if raw_pts: + geofence_points = [tuple(p) for p in raw_pts] + + waypoints = search.plan(start_pos=(0.0, 0.0), + geofence_points=geofence_points) + print(f"[MAIN] Config loaded from {CONFIG_DIR}") print(f"[MAIN] Search: {search_mode} Altitude: {altitude}m") - if actions.get('land'): - print(f"[MAIN] Landing targets: {actions['land']}") + print(f"[MAIN] Marker: {marker_dict} size:{marker_size}m land:{landing_ids}") + + # ── Start flight tracker (show plan before takeoff) ───────── + tracker = FlightTracker( + window_size=600, + world_range=30.0, + ugv_position=ugv_position, + geofence=geofence_cfg, + ) + tracker.set_plan(waypoints) + tracker.start() setup_ardupilot(ctrl) ctrl.wait_for_gps() if not ctrl.arm(): print("[UAV] Cannot arm - aborting") + if recorder: + recorder.save_summary(search_mode=search_mode, altitude=altitude) + recorder.stop() return ctrl.takeoff(altitude) ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30) - # Start flight tracker - tracker = FlightTracker( - window_size=600, - world_range=30.0, - ugv_position=(5.0, 5.0), - geofence=search_cfg.get('geofence'), - ) - tracker.start() + # ── Start recording tracker + camera ──────────────────────── + if recorder: + recorder.start_recording(tracker=tracker, camera=camera) + recorder.snapshot_camera("pre_search") # Feed position updates to tracker during search original_check = search.check_for_markers @@ -148,9 +186,16 @@ def main(): return result search.check_for_markers = tracked_check + # Wire up waypoint progress to tracker + search._on_waypoint_change = lambda idx: tracker.set_active_waypoint(idx) + results = search.run() search_results = search.get_results() + # ── Snapshot on detection ─────────────────────────────────── + if recorder and results: + recorder.snapshot_camera("marker_detected") + print(f"\n{'=' * 50}") print(f" SEARCH COMPLETE") print(f"{'=' * 50}") @@ -170,6 +215,17 @@ def main(): else: wait_for_landing(ctrl) + # ── Finalize recording ────────────────────────────────────── + if recorder: + recorder.snapshot_camera("post_landing") + recorder.save_summary( + search_mode=search_mode, + altitude=altitude, + markers=results, + landed=search_results.get('landed', False), + ) + recorder.stop() + tracker.stop() print("[MAIN] Done.") diff --git a/src/navigation/flight_tracker.py b/src/navigation/flight_tracker.py index 0552629..35cd2cc 100644 --- a/src/navigation/flight_tracker.py +++ b/src/navigation/flight_tracker.py @@ -39,6 +39,9 @@ class FlightTracker: FENCE_COLOR = (0, 220, 255) # Yellow-cyan FENCE_WARN_COLOR = (0, 140, 200) # Darker yellow FENCE_BREACH_COLOR = (0, 0, 255) # Red + PLAN_COLOR = (180, 140, 50) # Muted cyan-blue + PLAN_ACTIVE_COLOR = (0, 180, 255) # Orange + PLAN_VISITED_COLOR = (60, 60, 60) # Dim gray def __init__(self, window_size=600, world_range=30.0, ugv_position=(5.0, 5.0), show_ugv=True, @@ -60,9 +63,13 @@ class FlightTracker: self.uav_pos = (0.0, 0.0) self.uav_alt = 0.0 self.uav_heading = 0.0 - self.trail = deque(maxlen=5000) + self.trail = deque() self.markers_found = {} + # Flight plan + self.plan_waypoints = [] + self.active_waypoint = 0 + # Geofence self.geofence = geofence self.fence_points = [] @@ -108,6 +115,17 @@ class FlightTracker: with self._lock: self.markers_found[marker_id] = (x, y) + def set_plan(self, waypoints): + """Set pre-computed flight plan waypoints for visualization.""" + with self._lock: + self.plan_waypoints = list(waypoints) + self.active_waypoint = 0 + + def set_active_waypoint(self, idx): + """Update which waypoint the UAV is currently targeting.""" + with self._lock: + self.active_waypoint = idx + def _point_in_polygon(self, px, py, polygon): """Simple ray-casting point-in-polygon test.""" n = len(polygon) @@ -159,6 +177,12 @@ class FlightTracker: uav_hdg = self.uav_heading ugv = self.ugv_pos markers = dict(self.markers_found) + plan_copy = list(self.plan_waypoints) + active_wp = self.active_waypoint + + # Draw plan UNDER the trail + if plan_copy: + self._draw_plan(frame, plan_copy, active_wp) self._draw_trail(frame, trail_copy) @@ -288,13 +312,61 @@ class FlightTracker: ey = int(y1 + dy * end) cv2.line(frame, (sx, sy), (ex, ey), color, thickness) + def _draw_plan(self, frame, waypoints, active_idx): + """Draw the pre-flight plan: dashed path with numbered waypoints.""" + if len(waypoints) < 2: + return + + pts = [self.world_to_pixel(x, y) for x, y in waypoints] + + # Draw connecting lines + for i in range(1, len(pts)): + if i <= active_idx: + color = self.PLAN_VISITED_COLOR + else: + color = self.PLAN_COLOR + self._draw_dashed_line(frame, pts[i - 1], pts[i], color, 1, 8) + + # Draw waypoint markers + for i, (px, py) in enumerate(pts): + if i < active_idx: + # Visited — small dim dot + cv2.circle(frame, (px, py), 3, self.PLAN_VISITED_COLOR, -1) + elif i == active_idx: + # Active target — bright ring + cv2.circle(frame, (px, py), 8, self.PLAN_ACTIVE_COLOR, 2) + cv2.circle(frame, (px, py), 3, self.PLAN_ACTIVE_COLOR, -1) + else: + # Future — small cyan dot + cv2.circle(frame, (px, py), 4, self.PLAN_COLOR, 1) + + # Waypoint number label + label = str(i) + cv2.putText(frame, label, (px + 7, py - 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.28, self.PLAN_COLOR, 1) + def _draw_trail(self, frame, trail): - """Draw the flight path trail.""" + """Draw the flight path trail with persistent visibility.""" if len(trail) < 2: return + + # Minimum brightness so old trail never disappears + MIN_ALPHA = 0.3 + FADE_POINTS = 200 # only the newest N points fade + pts = [self.world_to_pixel(x, y) for x, y in trail] - for i in range(1, len(pts)): - alpha = i / len(pts) + n = len(pts) + fade_start = max(0, n - FADE_POINTS) + + for i in range(1, n): + if i <= fade_start: + # Old segments: solid dim color (never invisible) + alpha = MIN_ALPHA + else: + # Recent segments: fade from MIN_ALPHA to 1.0 + progress = (i - fade_start) / FADE_POINTS + alpha = MIN_ALPHA + (1.0 - MIN_ALPHA) * progress + color = ( int(self.UAV_TRAIL_COLOR[0] * alpha), int(self.UAV_TRAIL_COLOR[1] * alpha), diff --git a/src/utils/recorder.py b/src/utils/recorder.py new file mode 100644 index 0000000..b62e93e --- /dev/null +++ b/src/utils/recorder.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python3 +""" +Run Recorder — Captures logs, flight path, camera frames, and summary for each run. + +Creates timestamped folders in /results with: + flight_path.avi — Flight tracker video + camera.avi — Downward camera video + flight_path.png — Final flight tracker snapshot + log.txt — Full console output + summary.json — Run metadata and results +""" + +import os +import sys +import io +import json +import time +import threading +import cv2 +import numpy as np +from pathlib import Path +from datetime import datetime + + +class RunRecorder: + """Records all outputs from a simulation run.""" + + def __init__(self, results_dir=None, fps=5): + if results_dir is None: + project_dir = Path(__file__).resolve().parent.parent + results_dir = project_dir / "results" + + results_dir = Path(results_dir) + results_dir.mkdir(parents=True, exist_ok=True) + + # Find next sequential run number + existing = [d.name for d in results_dir.iterdir() + if d.is_dir() and d.name.startswith("run_")] + nums = [] + for name in existing: + try: + nums.append(int(name.split("_")[1])) + except (IndexError, ValueError): + pass + run_num = max(nums, default=0) + 1 + + self.run_dir = results_dir / f"run_{run_num}" + self.run_dir.mkdir(parents=True, exist_ok=True) + self.run_num = run_num + + self.fps = fps + self.start_time = time.time() + + # Log capture + self._log_path = self.run_dir / "log.txt" + self._log_file = open(self._log_path, "w") + self._original_stdout = sys.stdout + self._original_stderr = sys.stderr + self._tee_stdout = _TeeWriter(sys.stdout, self._log_file) + self._tee_stderr = _TeeWriter(sys.stderr, self._log_file) + + # Video writers (initialized lazily on first frame) + self._tracker_writer = None + self._camera_writer = None + self._tracker_size = None + self._camera_size = None + + # Frame counters + self._tracker_frames = 0 + self._camera_frames = 0 + + # Snapshot storage + self._last_tracker_frame = None + self._last_camera_frame = None + self._camera_snapshots = [] + + # Recording thread + self._recording = False + self._tracker_ref = None + self._camera_ref = None + self._record_thread = None + self._lock = threading.Lock() + + # Metadata + self.metadata = { + "run": run_num, + "start_time": datetime.now().isoformat(), + "run_dir": str(self.run_dir), + } + + print(f"[REC] Recording to: {self.run_dir}") + + def start_logging(self): + """Redirect stdout/stderr to both console and log file.""" + sys.stdout = self._tee_stdout + sys.stderr = self._tee_stderr + + def stop_logging(self): + """Restore original stdout/stderr.""" + sys.stdout = self._original_stdout + sys.stderr = self._original_stderr + + def start_recording(self, tracker=None, camera=None): + """Start background recording of tracker and camera frames.""" + self._tracker_ref = tracker + self._camera_ref = camera + self._recording = True + self._record_thread = threading.Thread( + target=self._record_loop, daemon=True + ) + self._record_thread.start() + + def _record_loop(self): + """Periodically capture frames from tracker and camera.""" + interval = 1.0 / self.fps + while self._recording: + t0 = time.time() + + # Capture tracker frame + if self._tracker_ref is not None: + try: + frame = self._tracker_ref.draw() + if frame is not None: + self._write_tracker_frame(frame) + self._last_tracker_frame = frame.copy() + except Exception: + pass + + # Capture camera frame + if self._camera_ref is not None: + try: + frame = self._camera_ref.frames.get("downward") + if frame is not None: + self._write_camera_frame(frame) + self._last_camera_frame = frame.copy() + except Exception: + pass + + elapsed = time.time() - t0 + sleep_time = max(0, interval - elapsed) + time.sleep(sleep_time) + + def _write_tracker_frame(self, frame): + """Write a frame to the tracker video.""" + h, w = frame.shape[:2] + if self._tracker_writer is None: + self._tracker_size = (w, h) + fourcc = cv2.VideoWriter_fourcc(*'XVID') + path = str(self.run_dir / "flight_path.avi") + self._tracker_writer = cv2.VideoWriter(path, fourcc, self.fps, (w, h)) + self._tracker_writer.write(frame) + self._tracker_frames += 1 + + def _write_camera_frame(self, frame): + """Write a frame to the camera video.""" + h, w = frame.shape[:2] + if self._camera_writer is None: + self._camera_size = (w, h) + fourcc = cv2.VideoWriter_fourcc(*'XVID') + path = str(self.run_dir / "camera.avi") + self._camera_writer = cv2.VideoWriter(path, fourcc, self.fps, (w, h)) + self._camera_writer.write(frame) + self._camera_frames += 1 + + def snapshot_camera(self, label="snapshot"): + """Save a named snapshot of the current camera frame.""" + if self._camera_ref is None: + return + frame = self._camera_ref.frames.get("downward") + if frame is None: + return + idx = len(self._camera_snapshots) + filename = f"camera_{idx:03d}_{label}.png" + path = self.run_dir / filename + cv2.imwrite(str(path), frame) + self._camera_snapshots.append(filename) + print(f"[REC] Snapshot: {filename}") + + def save_summary(self, search_mode="", altitude=0, markers=None, + landed=False, extra=None): + """Write the run summary JSON.""" + duration = time.time() - self.start_time + mins = int(duration // 60) + secs = int(duration % 60) + + summary = { + **self.metadata, + "end_time": datetime.now().isoformat(), + "duration_seconds": round(duration, 1), + "duration_display": f"{mins}m {secs}s", + "search_mode": search_mode, + "altitude": altitude, + "landed": landed, + "markers_found": {}, + "recordings": { + "log": "log.txt", + "flight_path_video": "flight_path.avi", + "flight_path_image": "flight_path.png", + "camera_video": "camera.avi", + "camera_snapshots": self._camera_snapshots, + }, + "frame_counts": { + "tracker": self._tracker_frames, + "camera": self._camera_frames, + }, + } + + if markers: + for mid, info in markers.items(): + pos = info.get('uav_position', {}) + summary["markers_found"][str(mid)] = { + "x": round(pos.get('x', 0), 2), + "y": round(pos.get('y', 0), 2), + "distance": round(info.get('distance', 0), 2), + } + + if extra: + summary.update(extra) + + path = self.run_dir / "summary.json" + with open(path, "w") as f: + json.dump(summary, f, indent=2) + print(f"[REC] Summary saved: {path}") + + def stop(self): + """Stop recording and finalize all outputs.""" + self._recording = False + if self._record_thread: + self._record_thread.join(timeout=3.0) + + # Save final flight path image + if self._last_tracker_frame is not None: + path = self.run_dir / "flight_path.png" + cv2.imwrite(str(path), self._last_tracker_frame) + print(f"[REC] Flight path saved: {path}") + + # Save final camera frame + if self._last_camera_frame is not None: + path = self.run_dir / "camera_final.png" + cv2.imwrite(str(path), self._last_camera_frame) + + # Release video writers + if self._tracker_writer: + self._tracker_writer.release() + if self._camera_writer: + self._camera_writer.release() + + # Stop log capture + self.stop_logging() + self._log_file.close() + + duration = time.time() - self.start_time + mins = int(duration // 60) + secs = int(duration % 60) + + # Print to original stdout since we stopped the tee + self._original_stdout.write( + f"\n[REC] Run recorded: {self.run_dir}\n" + f"[REC] Duration: {mins}m {secs}s | " + f"Tracker: {self._tracker_frames} frames | " + f"Camera: {self._camera_frames} frames\n" + ) + + +class _TeeWriter: + """Writes to both a stream and a file simultaneously.""" + + def __init__(self, stream, log_file): + self._stream = stream + self._log = log_file + + def write(self, data): + self._stream.write(data) + try: + # Strip ANSI escape codes for the log file + clean = data + self._log.write(clean) + self._log.flush() + except (ValueError, IOError): + pass + + def flush(self): + self._stream.flush() + try: + self._log.flush() + except (ValueError, IOError): + pass + + def fileno(self): + return self._stream.fileno() + + def isatty(self): + return self._stream.isatty()