diff --git a/scripts/generate_tags.py b/scripts/generate_tags.py index f7ee568..daf4861 100644 --- a/scripts/generate_tags.py +++ b/scripts/generate_tags.py @@ -1,23 +1,9 @@ #!/usr/bin/env python3 -""" -Generate ArUco marker tags for use as Gazebo textures. - -Usage: - python3 scripts/generate_tags.py # Generate default set - python3 scripts/generate_tags.py --ids 0 5 10 # Generate specific IDs - python3 scripts/generate_tags.py --dict DICT_6X6_250 --ids 0 1 2 - python3 scripts/generate_tags.py --all # Generate all 50 tags in DICT_4X4_50 - -Output: - worlds/tags/aruco__.png (individual markers) - worlds/tags/land_tag.png (copy of the landing target, ID from config) -""" import argparse import sys import os from pathlib import Path - import cv2 import numpy as np import yaml @@ -66,7 +52,6 @@ DICT_SIZES = { def generate_marker(dict_name, marker_id, pixel_size=600, border_bits=1): - """Generate a single ArUco marker image with a white border.""" dict_id = ARUCO_DICTS[dict_name] aruco_dict = cv2.aruco.getPredefinedDictionary(dict_id) @@ -76,17 +61,12 @@ def generate_marker(dict_name, marker_id, pixel_size=600, border_bits=1): marker_img = cv2.aruco.drawMarker(aruco_dict, marker_id, pixel_size) border_px = pixel_size // 6 - padded = np.ones( - (pixel_size + 2 * border_px, pixel_size + 2 * border_px), - dtype=np.uint8, - ) * 255 + padded = np.ones((pixel_size + 2 * border_px, pixel_size + 2 * border_px), dtype=np.uint8) * 255 padded[border_px:border_px + pixel_size, border_px:border_px + pixel_size] = marker_img - return padded def get_landing_id(): - """Read the landing target ID from search.yaml config.""" search_cfg = CONFIG_DIR / "search.yaml" if search_cfg.exists(): with open(search_cfg) as f: @@ -98,72 +78,42 @@ def get_landing_id(): def main(): - parser = argparse.ArgumentParser( - description="Generate ArUco marker tags for Gazebo simulation" - ) - parser.add_argument( - "--dict", default="DICT_4X4_50", - choices=list(ARUCO_DICTS.keys()), - help="ArUco dictionary (default: DICT_4X4_50)", - ) - parser.add_argument( - "--ids", type=int, nargs="+", default=None, - help="Marker IDs to generate (default: landing target from config)", - ) - parser.add_argument( - "--all", action="store_true", - help="Generate all markers in the dictionary", - ) - parser.add_argument( - "--size", type=int, default=600, - help="Marker image size in pixels (default: 600)", - ) - parser.add_argument( - "--format", default="png", choices=["png", "jpg"], - help="Output image format (default: png)", - ) + parser = argparse.ArgumentParser(description="Generate ArUco marker tags for Gazebo simulation") + parser.add_argument("--dict", default="DICT_4X4_50", choices=list(ARUCO_DICTS.keys())) + parser.add_argument("--ids", type=int, nargs="+", default=None) + parser.add_argument("--all", action="store_true") + parser.add_argument("--size", type=int, default=600) + parser.add_argument("--format", default="png", choices=["png", "jpg"]) args = parser.parse_args() TAGS_DIR.mkdir(parents=True, exist_ok=True) - - dict_name = args.dict landing_id = get_landing_id() if args.all: - max_id = DICT_SIZES.get(dict_name, 50) - ids = list(range(max_id)) + ids = list(range(DICT_SIZES.get(args.dict, 50))) elif args.ids is not None: ids = args.ids else: ids = [landing_id] - print(f"Generating {len(ids)} ArUco marker(s)") - print(f" Dictionary: {dict_name}") - print(f" Size: {args.size}px") - print(f" Output: {TAGS_DIR}/") - print(f" Landing target ID: {landing_id}") - print() + print(f"Generating {len(ids)} ArUco marker(s) in {TAGS_DIR}/") for marker_id in ids: - marker_img = generate_marker(dict_name, marker_id, args.size) - - filename = f"aruco_{dict_name}_{marker_id}.{args.format}" - filepath = TAGS_DIR / filename - cv2.imwrite(str(filepath), marker_img) - print(f" {filename} ({marker_img.shape[0]}x{marker_img.shape[1]}px)") + marker_img = generate_marker(args.dict, marker_id, args.size) + filename = f"aruco_{args.dict}_{marker_id}.{args.format}" + cv2.imwrite(str(TAGS_DIR / filename), marker_img) + print(f" {filename}") if marker_id == landing_id: - land_path = TAGS_DIR / f"land_tag.{args.format}" - cv2.imwrite(str(land_path), marker_img) - print(f" land_tag.{args.format} (copy of ID {landing_id})") + cv2.imwrite(str(TAGS_DIR / f"land_tag.{args.format}"), marker_img) - # Verify generated tags are detectable print("\nVerifying detection...") - aruco_dict = cv2.aruco.getPredefinedDictionary(ARUCO_DICTS[dict_name]) + aruco_dict = cv2.aruco.getPredefinedDictionary(ARUCO_DICTS[args.dict]) try: params = cv2.aruco.DetectorParameters() except AttributeError: params = cv2.aruco.DetectorParameters_create() + params.minMarkerPerimeterRate = 0.01 try: params.cornerRefinementMethod = cv2.aruco.CORNER_REFINE_SUBPIX @@ -177,34 +127,13 @@ def main(): _detect = lambda img: cv2.aruco.detectMarkers(img, aruco_dict, parameters=params) ok = 0 - fail = 0 for marker_id in ids: - filename = f"aruco_{dict_name}_{marker_id}.{args.format}" - filepath = TAGS_DIR / filename - img = cv2.imread(str(filepath), cv2.IMREAD_GRAYSCALE) - corners, detected_ids, _ = _detect(img) - + img = cv2.imread(str(TAGS_DIR / f"aruco_{args.dict}_{marker_id}.{args.format}"), cv2.IMREAD_GRAYSCALE) + _, detected_ids, _ = _detect(img) if detected_ids is not None and marker_id in detected_ids.flatten(): ok += 1 - else: - print(f" WARNING: {filename} failed detection check!") - fail += 1 - - # Also test at simulated camera distances - land_file = TAGS_DIR / f"aruco_{dict_name}_{landing_id}.{args.format}" - if land_file.exists(): - land_img = cv2.imread(str(land_file), cv2.IMREAD_GRAYSCALE) - print(f"\nDistance detection test (ID {landing_id}):") - for sim_size in [120, 80, 60, 40]: - small = cv2.resize(land_img, (sim_size, sim_size), interpolation=cv2.INTER_AREA) - bg = np.full((480, 640), 180, dtype=np.uint8) - y, x = (480 - sim_size) // 2, (640 - sim_size) // 2 - bg[y:y + sim_size, x:x + sim_size] = small - corners, detected_ids, _ = _detect(bg) - status = "OK" if detected_ids is not None else "FAIL" - print(f" {sim_size}x{sim_size}px in 640x480 frame: {status}") - - print(f"\nDone. {ok}/{ok + fail} markers verified.") + + print(f"Done. {ok}/{len(ids)} markers verified.") if __name__ == "__main__": diff --git a/src/control/search.py b/src/control/search.py index 50b2fd2..ac7ba87 100644 --- a/src/control/search.py +++ b/src/control/search.py @@ -19,7 +19,6 @@ class SearchMode(Enum): class Search: - POSITION_TOLERANCE = 1.0 CHECK_INTERVAL = 0.5 MAX_TRAVEL_TIME = 30.0 @@ -41,7 +40,6 @@ class Search: self.actions = actions or {} self.land_ids = set(self.actions.get("land", [])) - # Search pattern parameters self.spiral_max_legs = 12 self.spiral_initial_leg = self.CAM_FOV_METERS self.spiral_leg_increment = self.CAM_FOV_METERS * 0.8 @@ -55,30 +53,16 @@ class Search: self.levy_field_size = 50.0 self.levy_angle_dist = uniform(loc=-180, scale=360) - # Pre-flight plan self.waypoints = [] self.current_wp = 0 - self._on_waypoint_change = None # callback: fn(index) + self._on_waypoint_change = None def configure(self, **kwargs): for key, value in kwargs.items(): if hasattr(self, key): setattr(self, key, value) - # ═══════════════════════════════════════════════════════════ - # PRE-FLIGHT PLANNING - # ═══════════════════════════════════════════════════════════ - def plan(self, start_pos=(0.0, 0.0), geofence_points=None): - """Pre-compute all waypoints for the search pattern. - - Args: - start_pos: (x, y) starting position in local NED - geofence_points: list of (x, y) polygon vertices, or None - - Returns: - List of (x, y) absolute waypoints - """ if self.mode == SearchMode.SPIRAL: waypoints = self._plan_spiral(start_pos) elif self.mode == SearchMode.LAWNMOWER: @@ -88,7 +72,6 @@ class Search: else: waypoints = [start_pos] - # Clip to geofence if geofence_points: waypoints = self._clip_to_geofence(waypoints, geofence_points) @@ -105,7 +88,6 @@ class Search: return waypoints def _plan_spiral(self, start): - """Expanding square spiral from center outward.""" waypoints = [start] x, y = start @@ -135,7 +117,6 @@ class Search: return waypoints def _plan_lawnmower(self, start): - """Lawnmower (boustrophedon) pattern.""" waypoints = [start] sx, sy = start lane_spacing = self.lawn_lane_spacing @@ -158,7 +139,6 @@ class Search: return waypoints def _plan_levy(self, start): - """Lévy walk — random but pre-computed.""" waypoints = [start] x, y = start @@ -175,26 +155,18 @@ class Search: return waypoints def _clip_to_geofence(self, waypoints, polygon): - """Clip waypoints to stay within the geofence polygon. - - If a waypoint is outside, it is projected to the nearest - point on the polygon boundary. Consecutive out-of-bounds - waypoints are collapsed. - """ clipped = [] for wx, wy in waypoints: if self._point_in_polygon(wx, wy, polygon): clipped.append((wx, wy)) else: nearest = self._nearest_point_on_polygon(wx, wy, polygon) - # Avoid duplicate consecutive points if not clipped or distance_2d(clipped[-1], nearest) > 0.5: clipped.append(nearest) return clipped @staticmethod def _point_in_polygon(px, py, polygon): - """Ray-casting point-in-polygon test.""" n = len(polygon) inside = False j = n - 1 @@ -209,14 +181,12 @@ class Search: @staticmethod def _nearest_point_on_polygon(px, py, polygon): - """Find the nearest point on the polygon edge to (px, py).""" min_dist = float('inf') nearest = (px, py) n = len(polygon) for i in range(n): x1, y1 = polygon[i] x2, y2 = polygon[(i + 1) % n] - # Project point onto segment dx, dy = x2 - x1, y2 - y1 length_sq = dx * dx + dy * dy if length_sq == 0: @@ -230,12 +200,7 @@ class Search: nearest = proj return nearest - # ═══════════════════════════════════════════════════════════ - # EXECUTION - # ═══════════════════════════════════════════════════════════ - def run(self): - """Execute the pre-planned search by following waypoints.""" if not self.waypoints: self.plan() @@ -282,10 +247,6 @@ class Search: return self.found_markers - # ═══════════════════════════════════════════════════════════ - # CAMERA & MARKER DETECTION - # ═══════════════════════════════════════════════════════════ - def get_camera_frame(self): if self.camera is None: return None @@ -329,13 +290,7 @@ class Search: return new_markers - # ═══════════════════════════════════════════════════════════ - # LANDING - # ═══════════════════════════════════════════════════════════ - def execute_landing(self, initial_detections): - """Visual-servoing descent: keep the ArUco tag centered - while descending.""" target_det = None for d in initial_detections: if d.get("id") in self.land_ids: @@ -350,12 +305,10 @@ class Search: print(f"[SEARCH] Target marker ID:{target_det['id']} " f"distance:{target_det['distance']:.2f}m") - # Camera parameters (must match model.sdf) IMG_W, IMG_H = 640, 480 IMG_CX, IMG_CY = IMG_W / 2, IMG_H / 2 HFOV = 1.3962634 # 80° horizontal FOV - # Landing parameters DESCENT_STEP = 1.0 LAND_ALT = 1.5 CENTER_PX = 50 @@ -363,7 +316,6 @@ class Search: MAX_LOST = 30 GAIN = 0.35 - # Start visual servoing from current position self.ctrl.update_state() current_alt = self.ctrl.altitude @@ -379,7 +331,6 @@ class Search: self._landing = False return - # Descend one step current_alt = max(current_alt - DESCENT_STEP, LAND_ALT) print(f"\n[SEARCH] Descending to {current_alt:.1f}m") self.ctrl.update_state() @@ -387,13 +338,11 @@ class Search: self.ctrl.move_local_ned(cur['x'], cur['y'], -current_alt) sleep(2.0) - # Final precision centering print(f"[SEARCH] Final centering at {LAND_ALT:.1f}m") self._center_over_marker( LAND_ALT, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV, 35, MAX_CORRECTIONS, MAX_LOST, 0.25) - # Land print(f"\n[SEARCH] ===== LANDING =====") self.ctrl.land() self.landed = True @@ -401,10 +350,6 @@ class Search: def _center_over_marker(self, target_alt, img_w, img_h, img_cx, img_cy, hfov, center_px, max_corrections, max_lost, gain): - """Center the UAV over the ArUco marker using pixel error. - - Returns True if centered, False if partial, None if lost. - """ lost_count = 0 centered = False @@ -465,15 +410,10 @@ class Search: sleep(0.5) if not centered: - print(f"\n[SEARCH] Partially centered at {target_alt:.1f}m, " - f"continuing") + print(f"\n[SEARCH] Partially centered at {target_alt:.1f}m, continuing") return centered - # ═══════════════════════════════════════════════════════════ - # MOVEMENT - # ═══════════════════════════════════════════════════════════ - def wait_for_position(self, target_x, target_y, timeout=None): if timeout is None: timeout = self.MAX_TRAVEL_TIME @@ -500,7 +440,6 @@ class Search: return False def _wait_arrival(self, target_x, target_y, timeout=15.0): - """Wait for position without checking markers (landing).""" t0 = time.time() while time.time() - t0 < timeout and self.running: self.ctrl.update_state() @@ -524,10 +463,6 @@ class Search: self.ctrl.move_local_ned(x, y, z) return self.wait_for_position(x, y) - # ═══════════════════════════════════════════════════════════ - # RESULTS - # ═══════════════════════════════════════════════════════════ - def stop(self): self.running = False diff --git a/src/control/uav_controller.py b/src/control/uav_controller.py index 8a6581b..2a701ed 100644 --- a/src/control/uav_controller.py +++ b/src/control/uav_controller.py @@ -6,7 +6,6 @@ import argparse import numpy as np from time import sleep, perf_counter from typing import Tuple - from pymavlink import mavutil import pymavlink.dialects.v20.ardupilotmega as mavlink from pymavlink.dialects.v20.ardupilotmega import ( @@ -25,7 +24,6 @@ LOWEST_ALT = 3.0 GUIDED_MODE = 4 GUIDED_NOGPS_MODE = 20 - DEFAULT_WPNAV_SPEED = 150 DEFAULT_WPNAV_ACCEL = 100 DEFAULT_WPNAV_SPEED_UP = 100 @@ -35,7 +33,6 @@ DEFAULT_LOIT_SPEED = 150 class Controller: - HOLD_ALT = HOLD_ALT LOWEST_ALT = LOWEST_ALT @@ -133,23 +130,14 @@ class Controller: def arm(self, retries: int = 30): self.set_mode_guided() - for i in range(retries): print(f"[UAV] Arm attempt {i+1}/{retries}...") - - # Force arm: param2=21196 bypasses pre-arm checks - # (equivalent to MAVProxy's "arm throttle force") self.conn.mav.command_long_send( self.conn.target_system, self.conn.target_component, mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, - 0, - 1, # param1: 1 = arm - 21196, # param2: force arm magic number - 0, 0, 0, 0, 0) + 0, 1, 21196, 0, 0, 0, 0, 0) - # Wait up to 4 seconds for armed status instead of - # blocking forever with motors_armed_wait() armed = False for _ in range(20): self._drain_messages() @@ -157,16 +145,12 @@ class Controller: armed = True break time.sleep(0.2) - if armed: self.armed = True print("[UAV] Armed") return True - print(f"[UAV] Arm attempt {i+1} failed, retrying...") time.sleep(2) - - print("[UAV] FAILED to arm after all retries") return False def disarm(self): @@ -264,7 +248,6 @@ class Controller: print(f"\r[UAV] Climbing: {self.altitude:.1f}m / {target_alt:.1f}m ", end='', flush=True) sleep(0.2) - print(f"\n[UAV] Altitude timeout at {self.altitude:.1f}m") return False def wait_for_gps(self, timeout: float = 120.0): @@ -282,7 +265,6 @@ class Controller: return True elapsed = int(time.time() - t0) print(f"\r[UAV] Waiting for GPS ... {elapsed}s ", end='', flush=True) - print("\n[UAV] GPS timeout") return False def configure_speed_limits(self, @@ -292,7 +274,6 @@ class Controller: wpnav_speed_dn=DEFAULT_WPNAV_SPEED_DN, wpnav_accel_z=DEFAULT_WPNAV_ACCEL_Z, loit_speed=DEFAULT_LOIT_SPEED): - params = { 'WPNAV_SPEED': wpnav_speed, 'WPNAV_ACCEL': wpnav_accel, diff --git a/src/main.py b/src/main.py index 9e2e862..ad80436 100644 --- a/src/main.py +++ b/src/main.py @@ -65,7 +65,6 @@ def main(): parser.add_argument('--no-record', action='store_true', help='Disable recording') args = parser.parse_args() - # ── Start recorder ────────────────────────────────────────── recorder = None if not args.no_record: recorder = RunRecorder() @@ -78,13 +77,11 @@ def main(): altitude = args.altitude or search_cfg.get('altitude', 5.0) search_mode = args.search - # Marker config (single source of truth) marker_cfg = search_cfg.get('marker', {}) marker_dict = marker_cfg.get('dictionary', 'DICT_4X4_50') marker_size = marker_cfg.get('size', 0.5) landing_ids = marker_cfg.get('landing_ids', [0]) - # UGV position ugv_pos_cfg = ugv_cfg.get('position', {}) ugv_position = (ugv_pos_cfg.get('x', 5.0), ugv_pos_cfg.get('y', 5.0)) @@ -128,7 +125,6 @@ def main(): if mode_cfg: search.configure(**{f"{search_mode}_{k}": v for k, v in mode_cfg.items()}) - # ── Pre-flight plan ───────────────────────────────────────── geofence_cfg = search_cfg.get('geofence', {}) geofence_points = None if geofence_cfg.get('enabled', False): @@ -143,7 +139,6 @@ def main(): print(f"[MAIN] Search: {search_mode} Altitude: {altitude}m") print(f"[MAIN] Marker: {marker_dict} size:{marker_size}m land:{landing_ids}") - # ── Start flight tracker (show plan before takeoff) ───────── tracker = FlightTracker( window_size=600, world_range=30.0, @@ -166,12 +161,10 @@ def main(): ctrl.takeoff(altitude) ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30) - # ── Start recording tracker + camera ──────────────────────── if recorder: recorder.start_recording(tracker=tracker, camera=camera) recorder.snapshot_camera("pre_search") - # Feed position updates to tracker during search original_check = search.check_for_markers def tracked_check(): result = original_check() @@ -186,13 +179,11 @@ def main(): return result search.check_for_markers = tracked_check - # Wire up waypoint progress to tracker search._on_waypoint_change = lambda idx: tracker.set_active_waypoint(idx) results = search.run() search_results = search.get_results() - # ── Snapshot on detection ─────────────────────────────────── if recorder and results: recorder.snapshot_camera("marker_detected") @@ -215,7 +206,6 @@ def main(): else: wait_for_landing(ctrl) - # ── Finalize recording ────────────────────────────────────── if recorder: recorder.snapshot_camera("post_landing") recorder.save_summary( @@ -226,8 +216,20 @@ def main(): ) recorder.stop() - tracker.stop() - print("[MAIN] Done.") + print(f"\n{'=' * 50}") + print(f" Simulation finished.") + print(f" Tracker and Gazebo windows are still open.") + print(f" Press Ctrl+C to exit.") + print(f"{'=' * 50}\n") + + try: + while True: + sleep(1) + except KeyboardInterrupt: + pass + finally: + tracker.stop() + print("[MAIN] Done.") if __name__ == '__main__': diff --git a/src/navigation/flight_tracker.py b/src/navigation/flight_tracker.py index 35cd2cc..22f57e8 100644 --- a/src/navigation/flight_tracker.py +++ b/src/navigation/flight_tracker.py @@ -1,16 +1,4 @@ #!/usr/bin/env python3 -""" -Flight Tracker - Real-time 2D visualization of UAV and UGV positions. - -Opens a separate OpenCV window showing: -- Grid with coordinates -- UAV position (green square) with flight path trail -- UGV position (red square) -- Geofence boundary (yellow dashed polygon with warning zone) -- Position data overlay - -Can run standalone or be imported and updated by main.py. -""" import sys import os @@ -25,7 +13,6 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) class FlightTracker: - # Colors (BGR) BG_COLOR = (30, 30, 30) GRID_COLOR = (60, 60, 60) GRID_MAJOR_COLOR = (80, 80, 80) @@ -36,25 +23,16 @@ class FlightTracker: TEXT_COLOR = (200, 200, 200) TEXT_DIM_COLOR = (120, 120, 120) MARKER_COLOR = (180, 0, 220) - FENCE_COLOR = (0, 220, 255) # Yellow-cyan - FENCE_WARN_COLOR = (0, 140, 200) # Darker yellow - FENCE_BREACH_COLOR = (0, 0, 255) # Red - PLAN_COLOR = (180, 140, 50) # Muted cyan-blue - PLAN_ACTIVE_COLOR = (0, 180, 255) # Orange - PLAN_VISITED_COLOR = (60, 60, 60) # Dim gray + FENCE_COLOR = (0, 220, 255) + FENCE_WARN_COLOR = (0, 140, 200) + FENCE_BREACH_COLOR = (0, 0, 255) + PLAN_COLOR = (180, 140, 50) + PLAN_ACTIVE_COLOR = (0, 180, 255) + PLAN_VISITED_COLOR = (60, 60, 60) def __init__(self, window_size=600, world_range=30.0, ugv_position=(5.0, 5.0), show_ugv=True, geofence=None): - """ - Args: - window_size: Window dimension in pixels (square) - world_range: World coordinate range (-range to +range) in meters - ugv_position: Known UGV position in local NED (x, y) - show_ugv: Whether to draw the UGV - geofence: Dict with 'points' (list of [x,y]), 'warning_distance', - 'min_altitude', 'max_altitude' - """ self.window_size = window_size self.world_range = world_range self.ugv_pos = ugv_position @@ -66,11 +44,9 @@ class FlightTracker: self.trail = deque() self.markers_found = {} - # Flight plan self.plan_waypoints = [] self.active_waypoint = 0 - # Geofence self.geofence = geofence self.fence_points = [] self.fence_warn_dist = 3.0 @@ -91,14 +67,11 @@ class FlightTracker: self.total_width = self.window_size + self.panel_width def world_to_pixel(self, x, y): - """Convert NED world coords to pixel coords on the map. - NED: x=North(up on screen), y=East(right on screen)""" px = int(self.window_size / 2 + (y / self.world_range) * (self.window_size / 2)) py = int(self.window_size / 2 - (x / self.world_range) * (self.window_size / 2)) return (px, py) def update_uav(self, x, y, altitude=0.0, heading=0.0): - """Update UAV position (NED coords).""" with self._lock: self.uav_pos = (x, y) self.uav_alt = altitude @@ -106,28 +79,23 @@ class FlightTracker: self.trail.append((x, y)) def update_ugv(self, x, y): - """Update UGV position.""" with self._lock: self.ugv_pos = (x, y) def add_marker(self, marker_id, x, y): - """Record a detected marker position.""" with self._lock: self.markers_found[marker_id] = (x, y) def set_plan(self, waypoints): - """Set pre-computed flight plan waypoints for visualization.""" with self._lock: self.plan_waypoints = list(waypoints) self.active_waypoint = 0 def set_active_waypoint(self, idx): - """Update which waypoint the UAV is currently targeting.""" with self._lock: self.active_waypoint = idx def _point_in_polygon(self, px, py, polygon): - """Simple ray-casting point-in-polygon test.""" n = len(polygon) inside = False j = n - 1 @@ -140,13 +108,11 @@ class FlightTracker: return inside def _distance_to_polygon_edge(self, px, py, polygon): - """Minimum distance from point to polygon edges.""" min_dist = float('inf') n = len(polygon) for i in range(n): x1, y1 = polygon[i] x2, y2 = polygon[(i + 1) % n] - # Point-to-segment distance dx, dy = x2 - x1, y2 - y1 length_sq = dx * dx + dy * dy if length_sq == 0: @@ -160,13 +126,11 @@ class FlightTracker: return min_dist def draw(self): - """Render the full tracker frame.""" frame = np.full((self.window_size, self.total_width, 3), self.BG_COLOR, dtype=np.uint8) self._draw_grid(frame) - # Draw geofence before other elements if self.fence_points: self._draw_geofence(frame) @@ -180,7 +144,6 @@ class FlightTracker: plan_copy = list(self.plan_waypoints) active_wp = self.active_waypoint - # Draw plan UNDER the trail if plan_copy: self._draw_plan(frame, plan_copy, active_wp) @@ -202,57 +165,37 @@ class FlightTracker: return frame def _draw_grid(self, frame): - """Draw coordinate grid.""" step = 5.0 r = self.world_range - n = int(r / step) for i in range(-n, n + 1): val = i * step color = self.GRID_MAJOR_COLOR if i % 2 == 0 else self.GRID_COLOR - - # Vertical lines (constant y) px, _ = self.world_to_pixel(0, val) cv2.line(frame, (px, 0), (px, self.window_size), color, 1) - - # Horizontal lines (constant x) _, py = self.world_to_pixel(val, 0) cv2.line(frame, (0, py), (self.window_size, py), color, 1) - - # Axis labels if i != 0 and i % 2 == 0: - cv2.putText(frame, f"{val:.0f}", - (px + 2, self.window_size - 5), + cv2.putText(frame, f"{val:.0f}", (px + 2, self.window_size - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.3, self.TEXT_DIM_COLOR, 1) - cv2.putText(frame, f"{val:.0f}", - (5, py - 3), + cv2.putText(frame, f"{val:.0f}", (5, py - 3), cv2.FONT_HERSHEY_SIMPLEX, 0.3, self.TEXT_DIM_COLOR, 1) - # Draw axes (origin cross) cx, cy = self.world_to_pixel(0, 0) cv2.line(frame, (cx, 0), (cx, self.window_size), self.AXIS_COLOR, 1) cv2.line(frame, (0, cy), (self.window_size, cy), self.AXIS_COLOR, 1) - - # Axis labels at origin - cv2.putText(frame, "N", (cx + 4, 15), - cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.AXIS_COLOR, 1) - cv2.putText(frame, "E", (self.window_size - 15, cy - 5), - cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.AXIS_COLOR, 1) + cv2.putText(frame, "N", (cx + 4, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.AXIS_COLOR, 1) + cv2.putText(frame, "E", (self.window_size - 15, cy - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.AXIS_COLOR, 1) def _draw_geofence(self, frame): - """Draw geofence boundary with warning zone and fill.""" pts = [self.fence_points[i] for i in range(len(self.fence_points))] pixel_pts = np.array([self.world_to_pixel(x, y) for x, y in pts], dtype=np.int32) - # Semi-transparent fill inside the geofence overlay = frame.copy() cv2.fillPoly(overlay, [pixel_pts], (40, 50, 40)) cv2.addWeighted(overlay, 0.3, frame, 0.7, 0, frame) - # Draw warning zone (inner boundary) if self.fence_warn_dist > 0: - # Inset the polygon by warning_distance to show warning zone - # Simple approach: draw dashed inner boundary warn_pts = [] cx = sum(p[0] for p in pts) / len(pts) cy = sum(p[1] for p in pts) / len(pts) @@ -266,25 +209,17 @@ class FlightTracker: else: warn_pts.append((x, y)) - warn_pixel_pts = np.array( - [self.world_to_pixel(x, y) for x, y in warn_pts], dtype=np.int32 - ) + warn_pixel_pts = np.array([self.world_to_pixel(x, y) for x, y in warn_pts], dtype=np.int32) self._draw_dashed_polygon(frame, warn_pixel_pts, self.FENCE_WARN_COLOR, 1, 8) - # Draw main fence boundary (solid bright line) cv2.polylines(frame, [pixel_pts], True, self.FENCE_COLOR, 2) - - # Corner markers for px, py in pixel_pts: cv2.circle(frame, (int(px), int(py)), 4, self.FENCE_COLOR, -1) - - # Label top_px, top_py = pixel_pts[0] cv2.putText(frame, "GEOFENCE", (int(top_px) + 6, int(top_py) - 8), cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.FENCE_COLOR, 1) def _draw_dashed_polygon(self, frame, pts, color, thickness, dash_len): - """Draw a polygon with dashed lines.""" n = len(pts) for i in range(n): p1 = pts[i] @@ -292,7 +227,6 @@ class FlightTracker: self._draw_dashed_line(frame, tuple(p1), tuple(p2), color, thickness, dash_len) def _draw_dashed_line(self, frame, pt1, pt2, color, thickness, dash_len): - """Draw a dashed line between two points.""" x1, y1 = pt1 x2, y2 = pt2 dx = x2 - x1 @@ -313,268 +247,142 @@ class FlightTracker: cv2.line(frame, (sx, sy), (ex, ey), color, thickness) def _draw_plan(self, frame, waypoints, active_idx): - """Draw the pre-flight plan: dashed path with numbered waypoints.""" if len(waypoints) < 2: return - pts = [self.world_to_pixel(x, y) for x, y in waypoints] - - # Draw connecting lines for i in range(1, len(pts)): - if i <= active_idx: - color = self.PLAN_VISITED_COLOR - else: - color = self.PLAN_COLOR + color = self.PLAN_VISITED_COLOR if i <= active_idx else self.PLAN_COLOR self._draw_dashed_line(frame, pts[i - 1], pts[i], color, 1, 8) - # Draw waypoint markers for i, (px, py) in enumerate(pts): if i < active_idx: - # Visited — small dim dot cv2.circle(frame, (px, py), 3, self.PLAN_VISITED_COLOR, -1) elif i == active_idx: - # Active target — bright ring cv2.circle(frame, (px, py), 8, self.PLAN_ACTIVE_COLOR, 2) cv2.circle(frame, (px, py), 3, self.PLAN_ACTIVE_COLOR, -1) else: - # Future — small cyan dot cv2.circle(frame, (px, py), 4, self.PLAN_COLOR, 1) - - # Waypoint number label - label = str(i) - cv2.putText(frame, label, (px + 7, py - 5), + cv2.putText(frame, str(i), (px + 7, py - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.28, self.PLAN_COLOR, 1) def _draw_trail(self, frame, trail): - """Draw the flight path trail with persistent visibility.""" if len(trail) < 2: return - - # Minimum brightness so old trail never disappears MIN_ALPHA = 0.3 - FADE_POINTS = 200 # only the newest N points fade - + FADE_POINTS = 200 pts = [self.world_to_pixel(x, y) for x, y in trail] n = len(pts) fade_start = max(0, n - FADE_POINTS) for i in range(1, n): if i <= fade_start: - # Old segments: solid dim color (never invisible) alpha = MIN_ALPHA else: - # Recent segments: fade from MIN_ALPHA to 1.0 progress = (i - fade_start) / FADE_POINTS alpha = MIN_ALPHA + (1.0 - MIN_ALPHA) * progress - color = ( - int(self.UAV_TRAIL_COLOR[0] * alpha), - int(self.UAV_TRAIL_COLOR[1] * alpha), - int(self.UAV_TRAIL_COLOR[2] * alpha), - ) + color = (int(self.UAV_TRAIL_COLOR[0] * alpha), + int(self.UAV_TRAIL_COLOR[1] * alpha), + int(self.UAV_TRAIL_COLOR[2] * alpha)) cv2.line(frame, pts[i - 1], pts[i], color, 2) def _draw_uav(self, frame, pos, heading): - """Draw UAV as a green square with heading indicator.""" px, py = self.world_to_pixel(pos[0], pos[1]) size = 8 - cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), - self.UAV_COLOR, -1) - cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), - (255, 255, 255), 1) - - # Heading line + cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), self.UAV_COLOR, -1) + cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), (255, 255, 255), 1) hx = int(px + 15 * math.sin(heading)) hy = int(py - 15 * math.cos(heading)) cv2.line(frame, (px, py), (hx, hy), self.UAV_COLOR, 2) - - # Label - cv2.putText(frame, "UAV", (px + 12, py - 4), - cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.UAV_COLOR, 1) + cv2.putText(frame, "UAV", (px + 12, py - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.UAV_COLOR, 1) def _draw_ugv(self, frame, pos): - """Draw UGV as a red square.""" px, py = self.world_to_pixel(pos[0], pos[1]) size = 10 - cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), - self.UGV_COLOR, -1) - cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), - (255, 255, 255), 1) - cv2.putText(frame, "UGV", (px + 14, py - 4), - cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.UGV_COLOR, 1) + cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), self.UGV_COLOR, -1) + cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), (255, 255, 255), 1) + cv2.putText(frame, "UGV", (px + 14, py - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.UGV_COLOR, 1) def _draw_panel(self, frame, uav, alt, hdg, ugv, trail): - """Draw the data panel on the right side.""" x0 = self.window_size + 10 y = 25 - - # Separator line - cv2.line(frame, (self.window_size, 0), (self.window_size, self.window_size), - self.GRID_MAJOR_COLOR, 1) - - # Title - cv2.putText(frame, "FLIGHT TRACKER", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + cv2.line(frame, (self.window_size, 0), (self.window_size, self.window_size), self.GRID_MAJOR_COLOR, 1) + cv2.putText(frame, "FLIGHT TRACKER", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) y += 30 - - # Elapsed time elapsed = time.time() - self.start_time - mins = int(elapsed // 60) - secs = int(elapsed % 60) - cv2.putText(frame, f"Time: {mins:02d}:{secs:02d}", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TEXT_DIM_COLOR, 1) + mins, secs = int(elapsed // 60), int(elapsed % 60) + cv2.putText(frame, f"Time: {mins:02d}:{secs:02d}", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TEXT_DIM_COLOR, 1) y += 30 - - # UAV section - cv2.putText(frame, "UAV", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.UAV_COLOR, 1) + cv2.putText(frame, "UAV", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.UAV_COLOR, 1) y += 22 - cv2.putText(frame, f"X: {uav[0]:+.1f}m", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) + cv2.putText(frame, f"X: {uav[0]:+.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) y += 18 - cv2.putText(frame, f"Y: {uav[1]:+.1f}m", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) + cv2.putText(frame, f"Y: {uav[1]:+.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) y += 18 - cv2.putText(frame, f"Alt: {alt:.1f}m", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) + cv2.putText(frame, f"Alt: {alt:.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) y += 18 - cv2.putText(frame, f"Hdg: {math.degrees(hdg):.0f} deg", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) + cv2.putText(frame, f"Hdg: {math.degrees(hdg):.0f} deg", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) y += 30 - # UGV section if self.show_ugv: - cv2.putText(frame, "UGV", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.UGV_COLOR, 1) + cv2.putText(frame, "UGV", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.UGV_COLOR, 1) y += 22 - cv2.putText(frame, f"X: {ugv[0]:+.1f}m", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) + cv2.putText(frame, f"X: {ugv[0]:+.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) y += 18 - cv2.putText(frame, f"Y: {ugv[1]:+.1f}m", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) + cv2.putText(frame, f"Y: {ugv[1]:+.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) y += 18 dist = np.sqrt((uav[0] - ugv[0])**2 + (uav[1] - ugv[1])**2) - cv2.putText(frame, f"Dist: {dist:.1f}m", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) + cv2.putText(frame, f"Dist: {dist:.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) y += 30 - # Geofence section if self.fence_points: - # Check fence status inside = self._point_in_polygon(uav[0], uav[1], self.fence_points) alt_ok = self.fence_alt_min <= alt <= self.fence_alt_max fence_dist = self._distance_to_polygon_edge(uav[0], uav[1], self.fence_points) - if not inside or not alt_ok: - status = "BREACHED" - status_color = self.FENCE_BREACH_COLOR + status, status_color = "BREACHED", self.FENCE_BREACH_COLOR elif fence_dist < self.fence_warn_dist: - status = "WARNING" - status_color = self.FENCE_WARN_COLOR + status, status_color = "WARNING", self.FENCE_WARN_COLOR else: - status = "OK" - status_color = self.FENCE_COLOR + status, status_color = "OK", self.FENCE_COLOR - cv2.putText(frame, "GEOFENCE", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.FENCE_COLOR, 1) + cv2.putText(frame, "GEOFENCE", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.FENCE_COLOR, 1) y += 22 - cv2.putText(frame, f"Status: {status}", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.38, status_color, 1) + cv2.putText(frame, f"Status: {status}", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, status_color, 1) y += 18 - cv2.putText(frame, f"Edge: {fence_dist:.1f}m", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) + cv2.putText(frame, f"Edge: {fence_dist:.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) y += 18 - cv2.putText(frame, f"Alt lim: {self.fence_alt_min:.0f}-{self.fence_alt_max:.0f}m", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) + cv2.putText(frame, f"Alt lim: {self.fence_alt_min:.0f}-{self.fence_alt_max:.0f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1) y += 30 - # Markers section - cv2.putText(frame, "MARKERS", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.MARKER_COLOR, 1) + cv2.putText(frame, "MARKERS", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.MARKER_COLOR, 1) y += 22 if self.markers_found: for mid, (mx, my) in self.markers_found.items(): - cv2.putText(frame, f"ID:{mid} ({mx:.1f},{my:.1f})", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_COLOR, 1) + cv2.putText(frame, f"ID:{mid} ({mx:.1f},{my:.1f})", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_COLOR, 1) y += 16 else: - cv2.putText(frame, "None", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_DIM_COLOR, 1) + cv2.putText(frame, "None", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_DIM_COLOR, 1) y += 16 - y += 20 - cv2.putText(frame, f"Trail: {len(trail)} pts", (x0, y), - cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_DIM_COLOR, 1) + cv2.putText(frame, f"Trail: {len(trail)} pts", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_DIM_COLOR, 1) def start(self): - """Start the display loop in a background thread.""" self.running = True self._thread = threading.Thread(target=self._display_loop, daemon=True) self._thread.start() def _display_loop(self): - """OpenCV display loop (runs in its own thread).""" print("[TRACKER] Flight tracker window started") while self.running: frame = self.draw() cv2.imshow("Flight Tracker", frame) - key = cv2.waitKey(50) & 0xFF - if key == ord('q'): + if cv2.waitKey(50) & 0xFF == ord('q'): break cv2.destroyWindow("Flight Tracker") print("[TRACKER] Flight tracker stopped") def stop(self): - """Stop the display loop.""" self.running = False if self._thread: self._thread.join(timeout=2.0) - - -def main(): - """Standalone test mode - simulates a spiral flight path with geofence.""" - - geofence = { - 'enabled': True, - 'warning_distance': 3.0, - 'min_altitude': 0.0, - 'max_altitude': 15.0, - 'points': [[-5, -5], [-5, 25], [25, 25], [25, -5]], - } - - tracker = FlightTracker( - window_size=600, - world_range=30.0, - ugv_position=(5.0, 5.0), - geofence=geofence, - ) - - print("Flight Tracker - Standalone Test (with geofence)") - print("Press 'q' to quit") - - x, y = 0.0, 0.0 - t = 0.0 - - while True: - t += 0.05 - radius = t * 0.5 - x = radius * math.cos(t) - y = radius * math.sin(t) - heading = math.atan2(math.cos(t), -math.sin(t)) - - tracker.update_uav(x, y, altitude=5.0, heading=heading) - - if t > 5 and 0 not in tracker.markers_found: - tracker.add_marker(0, 5.0, 5.0) - - frame = tracker.draw() - cv2.imshow("Flight Tracker", frame) - key = cv2.waitKey(30) & 0xFF - if key == ord('q'): - break - - cv2.destroyAllWindows() - - -if __name__ == "__main__": - main() diff --git a/src/utils/helpers.py b/src/utils/helpers.py index 8c3d7db..1a2e935 100644 --- a/src/utils/helpers.py +++ b/src/utils/helpers.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -"""Utility helper functions.""" import numpy as np import yaml @@ -77,32 +76,19 @@ class PIDController: self.ki = ki self.kd = kd self.output_limits = output_limits - self.integral = 0.0 self.last_error = 0.0 self.last_time = None def compute(self, error, current_time): - if self.last_time is None: - dt = 0.0 - else: - dt = current_time - self.last_time - + dt = 0.0 if self.last_time is None else current_time - self.last_time self.integral += error * dt - - if dt > 0: - derivative = (error - self.last_error) / dt - else: - derivative = 0.0 - + derivative = (error - self.last_error) / dt if dt > 0 else 0.0 output = self.kp * error + self.ki * self.integral + self.kd * derivative - if self.output_limits: output = clamp(output, self.output_limits[0], self.output_limits[1]) - self.last_error = error self.last_time = current_time - return output def reset(self): diff --git a/src/utils/recorder.py b/src/utils/recorder.py index b62e93e..1954bdf 100644 --- a/src/utils/recorder.py +++ b/src/utils/recorder.py @@ -1,14 +1,4 @@ #!/usr/bin/env python3 -""" -Run Recorder — Captures logs, flight path, camera frames, and summary for each run. - -Creates timestamped folders in /results with: - flight_path.avi — Flight tracker video - camera.avi — Downward camera video - flight_path.png — Final flight tracker snapshot - log.txt — Full console output - summary.json — Run metadata and results -""" import os import sys @@ -23,8 +13,6 @@ from datetime import datetime class RunRecorder: - """Records all outputs from a simulation run.""" - def __init__(self, results_dir=None, fps=5): if results_dir is None: project_dir = Path(__file__).resolve().parent.parent @@ -33,7 +21,6 @@ class RunRecorder: results_dir = Path(results_dir) results_dir.mkdir(parents=True, exist_ok=True) - # Find next sequential run number existing = [d.name for d in results_dir.iterdir() if d.is_dir() and d.name.startswith("run_")] nums = [] @@ -51,7 +38,6 @@ class RunRecorder: self.fps = fps self.start_time = time.time() - # Log capture self._log_path = self.run_dir / "log.txt" self._log_file = open(self._log_path, "w") self._original_stdout = sys.stdout @@ -59,29 +45,24 @@ class RunRecorder: self._tee_stdout = _TeeWriter(sys.stdout, self._log_file) self._tee_stderr = _TeeWriter(sys.stderr, self._log_file) - # Video writers (initialized lazily on first frame) self._tracker_writer = None self._camera_writer = None self._tracker_size = None self._camera_size = None - # Frame counters self._tracker_frames = 0 self._camera_frames = 0 - # Snapshot storage self._last_tracker_frame = None self._last_camera_frame = None self._camera_snapshots = [] - # Recording thread self._recording = False self._tracker_ref = None self._camera_ref = None self._record_thread = None self._lock = threading.Lock() - # Metadata self.metadata = { "run": run_num, "start_time": datetime.now().isoformat(), @@ -91,17 +72,14 @@ class RunRecorder: print(f"[REC] Recording to: {self.run_dir}") def start_logging(self): - """Redirect stdout/stderr to both console and log file.""" sys.stdout = self._tee_stdout sys.stderr = self._tee_stderr def stop_logging(self): - """Restore original stdout/stderr.""" sys.stdout = self._original_stdout sys.stderr = self._original_stderr def start_recording(self, tracker=None, camera=None): - """Start background recording of tracker and camera frames.""" self._tracker_ref = tracker self._camera_ref = camera self._recording = True @@ -111,12 +89,10 @@ class RunRecorder: self._record_thread.start() def _record_loop(self): - """Periodically capture frames from tracker and camera.""" interval = 1.0 / self.fps while self._recording: t0 = time.time() - # Capture tracker frame if self._tracker_ref is not None: try: frame = self._tracker_ref.draw() @@ -126,7 +102,6 @@ class RunRecorder: except Exception: pass - # Capture camera frame if self._camera_ref is not None: try: frame = self._camera_ref.frames.get("downward") @@ -141,7 +116,6 @@ class RunRecorder: time.sleep(sleep_time) def _write_tracker_frame(self, frame): - """Write a frame to the tracker video.""" h, w = frame.shape[:2] if self._tracker_writer is None: self._tracker_size = (w, h) @@ -152,7 +126,6 @@ class RunRecorder: self._tracker_frames += 1 def _write_camera_frame(self, frame): - """Write a frame to the camera video.""" h, w = frame.shape[:2] if self._camera_writer is None: self._camera_size = (w, h) @@ -163,7 +136,6 @@ class RunRecorder: self._camera_frames += 1 def snapshot_camera(self, label="snapshot"): - """Save a named snapshot of the current camera frame.""" if self._camera_ref is None: return frame = self._camera_ref.frames.get("downward") @@ -178,7 +150,6 @@ class RunRecorder: def save_summary(self, search_mode="", altitude=0, markers=None, landed=False, extra=None): - """Write the run summary JSON.""" duration = time.time() - self.start_time mins = int(duration // 60) secs = int(duration % 60) @@ -223,29 +194,24 @@ class RunRecorder: print(f"[REC] Summary saved: {path}") def stop(self): - """Stop recording and finalize all outputs.""" self._recording = False if self._record_thread: self._record_thread.join(timeout=3.0) - # Save final flight path image if self._last_tracker_frame is not None: path = self.run_dir / "flight_path.png" cv2.imwrite(str(path), self._last_tracker_frame) print(f"[REC] Flight path saved: {path}") - # Save final camera frame if self._last_camera_frame is not None: path = self.run_dir / "camera_final.png" cv2.imwrite(str(path), self._last_camera_frame) - # Release video writers if self._tracker_writer: self._tracker_writer.release() if self._camera_writer: self._camera_writer.release() - # Stop log capture self.stop_logging() self._log_file.close() @@ -253,7 +219,6 @@ class RunRecorder: mins = int(duration // 60) secs = int(duration % 60) - # Print to original stdout since we stopped the tee self._original_stdout.write( f"\n[REC] Run recorded: {self.run_dir}\n" f"[REC] Duration: {mins}m {secs}s | " @@ -263,8 +228,6 @@ class RunRecorder: class _TeeWriter: - """Writes to both a stream and a file simultaneously.""" - def __init__(self, stream, log_file): self._stream = stream self._log = log_file @@ -272,9 +235,7 @@ class _TeeWriter: def write(self, data): self._stream.write(data) try: - # Strip ANSI escape codes for the log file - clean = data - self._log.write(clean) + self._log.write(data) self._log.flush() except (ValueError, IOError): pass diff --git a/src/vision/camera_processor.py b/src/vision/camera_processor.py index 6933a59..1efc6a2 100644 --- a/src/vision/camera_processor.py +++ b/src/vision/camera_processor.py @@ -1,9 +1,4 @@ #!/usr/bin/env python3 -""" -Camera Processor - Handles Gazebo camera feeds for GPS-denied navigation. -Subscribes to gz-transport topics, processes frames, and displays via OpenCV. -Downstream modules (ArUco detector, optical flow) register as callbacks. -""" import sys import signal @@ -13,7 +8,6 @@ import cv2 from gz.transport13 import Node from gz.msgs10.image_pb2 import Image -# PixelFormatType enum values (from gz.msgs10 Image proto) PF_RGB_INT8 = 3 PF_BGR_INT8 = 8 PF_R_FLOAT32 = 13 @@ -35,7 +29,6 @@ class CameraProcessor: } self.topics = topics - for name, topic in self.topics.items(): self.callbacks[name] = [] ok = self.node.subscribe(Image, topic, self._make_gz_callback(name)) @@ -50,19 +43,12 @@ class CameraProcessor: def cb(msg): fmt = msg.pixel_format_type if fmt == PF_RGB_INT8: - arr = np.frombuffer(msg.data, dtype=np.uint8).reshape( - msg.height, msg.width, 3 - ) + arr = np.frombuffer(msg.data, dtype=np.uint8).reshape(msg.height, msg.width, 3) bgr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR) elif fmt == PF_BGR_INT8: - arr = np.frombuffer(msg.data, dtype=np.uint8).reshape( - msg.height, msg.width, 3 - ) - bgr = arr + bgr = np.frombuffer(msg.data, dtype=np.uint8).reshape(msg.height, msg.width, 3) elif fmt == PF_R_FLOAT32: - arr = np.frombuffer(msg.data, dtype=np.float32).reshape( - msg.height, msg.width - ) + arr = np.frombuffer(msg.data, dtype=np.float32).reshape(msg.height, msg.width) normalized = cv2.normalize(arr, None, 0, 255, cv2.NORM_MINMAX) bgr = cv2.cvtColor(normalized.astype(np.uint8), cv2.COLOR_GRAY2BGR) else: @@ -71,10 +57,8 @@ class CameraProcessor: processed = self.process_image(bgr) self.raw_frames[name] = processed.copy() self.frames[name] = processed - for fn in self.callbacks.get(name, []): fn(name, processed) - return cb def process_image(self, image): @@ -82,40 +66,26 @@ class CameraProcessor: clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) lab = cv2.cvtColor(processed, cv2.COLOR_BGR2LAB) lab[:, :, 0] = clahe.apply(lab[:, :, 0]) - processed = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) - return processed + return cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) def register_callback(self, camera_name, fn): if camera_name in self.callbacks: self.callbacks[camera_name].append(fn) - print(f"[CAM] Registered callback for {camera_name}: {fn.__name__}") def stop(self): self.running = False def spin(self): - print("[CAM] Running. Press 'q' to quit, 's' to screenshot.") while self.running: for name, frame in list(self.frames.items()): cv2.imshow(name, frame) - - key = cv2.waitKey(33) & 0xFF - if key == ord("q"): + if cv2.waitKey(33) & 0xFF == ord("q"): break - elif key == ord("s"): - for name, frame in self.frames.items(): - fname = f"{name}_{int(time.time())}.png" - cv2.imwrite(fname, frame) - print(f"[CAM] Saved {fname}") - cv2.destroyAllWindows() - print("[CAM] Stopped.") def spin_headless(self): - print("[CAM] Running headless (no GUI).") while self.running: time.sleep(0.1) - print("[CAM] Stopped.") def get_frame(self, camera_name): return self.frames.get(camera_name) @@ -124,7 +94,6 @@ class CameraProcessor: def main(): cameras = "both" show_gui = True - if len(sys.argv) > 1: cameras = sys.argv[1].lower() if "--headless" in sys.argv: @@ -143,7 +112,6 @@ def main(): topics = all_topics proc = CameraProcessor(topics=topics, show_gui=show_gui) - try: import os src_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -155,13 +123,12 @@ def main(): def detection_overlay(camera_name, frame): detections = detector.detect(frame) if detections: - annotated = detector.annotate_frame(frame, detections) - proc.frames[camera_name] = annotated + proc.frames[camera_name] = detector.annotate_frame(frame, detections) for cam_name in topics: proc.register_callback(cam_name, detection_overlay) except Exception as e: - print(f"[CAM] ArUco detection unavailable: {e}") + print(f"[CAM] Detection unavailable: {e}") if show_gui: proc.spin() diff --git a/src/vision/object_detector.py b/src/vision/object_detector.py index 3077370..0a706f9 100644 --- a/src/vision/object_detector.py +++ b/src/vision/object_detector.py @@ -32,13 +32,11 @@ H_RES = 480 class ObjectDetector: - CAM_DEG_FOV = 110 def __init__(self, marker_size=0.15, camera_matrix=None, aruco_dict_name="DICT_4X4_50"): self.marker_size = marker_size - if camera_matrix is not None: self.camera_matrix = np.array(camera_matrix).reshape(3, 3) else: @@ -46,11 +44,9 @@ class ObjectDetector: [[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]] ) self.dist_coeffs = np.zeros(5) - dict_id = ARUCO_DICT.get(aruco_dict_name, cv2.aruco.DICT_4X4_50) self.aruco_dict = cv2.aruco.getPredefinedDictionary(dict_id) - # Support both old (<4.7) and new (>=4.7) OpenCV ArUco API try: self.aruco_params = cv2.aruco.DetectorParameters() self._new_api = True @@ -80,17 +76,12 @@ class ObjectDetector: self.found_markers = {} self.on_detection = None - print(f"[DET] ObjectDetector initialized ({aruco_dict_name})") def detect(self, frame): if frame is None: return [] - - if len(frame.shape) == 2: - gray = frame - else: - gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + gray = frame if len(frame.shape) == 2 else cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if self._new_api: corners, ids, rejected = self.aruco_detector.detectMarkers(gray) @@ -110,7 +101,6 @@ class ObjectDetector: for i, marker_id in enumerate(ids.flatten()): tvec = tvecs[i][0] center = np.mean(corners[i][0], axis=0) - detection = { "id": int(marker_id), "corners": corners[i][0].tolist(), @@ -129,39 +119,26 @@ class ObjectDetector: if detections and self.on_detection: self.on_detection(detections) - return detections def annotate_frame(self, frame, detections): annotated = frame.copy() if not detections: return annotated - overlay = annotated.copy() for d in detections: corners_arr = np.array(d["corners"], dtype=np.int32) - - # Fill marker area with purple cv2.fillPoly(overlay, [corners_arr], (180, 0, 220)) - - # Draw purple border cv2.polylines(annotated, [corners_arr], True, (180, 0, 220), 3) - - # Center dot - cx = int(d["center_px"][0]) - cy = int(d["center_px"][1]) + cx, cy = int(d["center_px"][0]), int(d["center_px"][1]) cv2.circle(annotated, (cx, cy), 5, (255, 255, 255), -1) - - # Label with ID and distance label = f"ID:{d['id']} d:{d['distance']:.2f}m" cv2.putText(annotated, label, (int(corners_arr[0][0]), int(corners_arr[0][1]) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (180, 0, 220), 2) - # Blend overlay at 40% opacity cv2.addWeighted(overlay, 0.4, annotated, 0.6, 0, annotated) - return annotated def get_found_markers(self):