Simulation Exit Fixes

This commit is contained in:
2026-02-13 17:10:41 -05:00
parent 42e4fa28a9
commit 50ef3f0490
9 changed files with 97 additions and 551 deletions

View File

@@ -1,23 +1,9 @@
#!/usr/bin/env python3
"""
Generate ArUco marker tags for use as Gazebo textures.
Usage:
python3 scripts/generate_tags.py # Generate default set
python3 scripts/generate_tags.py --ids 0 5 10 # Generate specific IDs
python3 scripts/generate_tags.py --dict DICT_6X6_250 --ids 0 1 2
python3 scripts/generate_tags.py --all # Generate all 50 tags in DICT_4X4_50
Output:
worlds/tags/aruco_<DICT>_<ID>.png (individual markers)
worlds/tags/land_tag.png (copy of the landing target, ID from config)
"""
import argparse
import sys
import os
from pathlib import Path
import cv2
import numpy as np
import yaml
@@ -66,7 +52,6 @@ DICT_SIZES = {
def generate_marker(dict_name, marker_id, pixel_size=600, border_bits=1):
"""Generate a single ArUco marker image with a white border."""
dict_id = ARUCO_DICTS[dict_name]
aruco_dict = cv2.aruco.getPredefinedDictionary(dict_id)
@@ -76,17 +61,12 @@ def generate_marker(dict_name, marker_id, pixel_size=600, border_bits=1):
marker_img = cv2.aruco.drawMarker(aruco_dict, marker_id, pixel_size)
border_px = pixel_size // 6
padded = np.ones(
(pixel_size + 2 * border_px, pixel_size + 2 * border_px),
dtype=np.uint8,
) * 255
padded = np.ones((pixel_size + 2 * border_px, pixel_size + 2 * border_px), dtype=np.uint8) * 255
padded[border_px:border_px + pixel_size, border_px:border_px + pixel_size] = marker_img
return padded
def get_landing_id():
"""Read the landing target ID from search.yaml config."""
search_cfg = CONFIG_DIR / "search.yaml"
if search_cfg.exists():
with open(search_cfg) as f:
@@ -98,72 +78,42 @@ def get_landing_id():
def main():
parser = argparse.ArgumentParser(
description="Generate ArUco marker tags for Gazebo simulation"
)
parser.add_argument(
"--dict", default="DICT_4X4_50",
choices=list(ARUCO_DICTS.keys()),
help="ArUco dictionary (default: DICT_4X4_50)",
)
parser.add_argument(
"--ids", type=int, nargs="+", default=None,
help="Marker IDs to generate (default: landing target from config)",
)
parser.add_argument(
"--all", action="store_true",
help="Generate all markers in the dictionary",
)
parser.add_argument(
"--size", type=int, default=600,
help="Marker image size in pixels (default: 600)",
)
parser.add_argument(
"--format", default="png", choices=["png", "jpg"],
help="Output image format (default: png)",
)
parser = argparse.ArgumentParser(description="Generate ArUco marker tags for Gazebo simulation")
parser.add_argument("--dict", default="DICT_4X4_50", choices=list(ARUCO_DICTS.keys()))
parser.add_argument("--ids", type=int, nargs="+", default=None)
parser.add_argument("--all", action="store_true")
parser.add_argument("--size", type=int, default=600)
parser.add_argument("--format", default="png", choices=["png", "jpg"])
args = parser.parse_args()
TAGS_DIR.mkdir(parents=True, exist_ok=True)
dict_name = args.dict
landing_id = get_landing_id()
if args.all:
max_id = DICT_SIZES.get(dict_name, 50)
ids = list(range(max_id))
ids = list(range(DICT_SIZES.get(args.dict, 50)))
elif args.ids is not None:
ids = args.ids
else:
ids = [landing_id]
print(f"Generating {len(ids)} ArUco marker(s)")
print(f" Dictionary: {dict_name}")
print(f" Size: {args.size}px")
print(f" Output: {TAGS_DIR}/")
print(f" Landing target ID: {landing_id}")
print()
print(f"Generating {len(ids)} ArUco marker(s) in {TAGS_DIR}/")
for marker_id in ids:
marker_img = generate_marker(dict_name, marker_id, args.size)
filename = f"aruco_{dict_name}_{marker_id}.{args.format}"
filepath = TAGS_DIR / filename
cv2.imwrite(str(filepath), marker_img)
print(f" {filename} ({marker_img.shape[0]}x{marker_img.shape[1]}px)")
marker_img = generate_marker(args.dict, marker_id, args.size)
filename = f"aruco_{args.dict}_{marker_id}.{args.format}"
cv2.imwrite(str(TAGS_DIR / filename), marker_img)
print(f" {filename}")
if marker_id == landing_id:
land_path = TAGS_DIR / f"land_tag.{args.format}"
cv2.imwrite(str(land_path), marker_img)
print(f" land_tag.{args.format} (copy of ID {landing_id})")
cv2.imwrite(str(TAGS_DIR / f"land_tag.{args.format}"), marker_img)
# Verify generated tags are detectable
print("\nVerifying detection...")
aruco_dict = cv2.aruco.getPredefinedDictionary(ARUCO_DICTS[dict_name])
aruco_dict = cv2.aruco.getPredefinedDictionary(ARUCO_DICTS[args.dict])
try:
params = cv2.aruco.DetectorParameters()
except AttributeError:
params = cv2.aruco.DetectorParameters_create()
params.minMarkerPerimeterRate = 0.01
try:
params.cornerRefinementMethod = cv2.aruco.CORNER_REFINE_SUBPIX
@@ -177,34 +127,13 @@ def main():
_detect = lambda img: cv2.aruco.detectMarkers(img, aruco_dict, parameters=params)
ok = 0
fail = 0
for marker_id in ids:
filename = f"aruco_{dict_name}_{marker_id}.{args.format}"
filepath = TAGS_DIR / filename
img = cv2.imread(str(filepath), cv2.IMREAD_GRAYSCALE)
corners, detected_ids, _ = _detect(img)
img = cv2.imread(str(TAGS_DIR / f"aruco_{args.dict}_{marker_id}.{args.format}"), cv2.IMREAD_GRAYSCALE)
_, detected_ids, _ = _detect(img)
if detected_ids is not None and marker_id in detected_ids.flatten():
ok += 1
else:
print(f" WARNING: {filename} failed detection check!")
fail += 1
# Also test at simulated camera distances
land_file = TAGS_DIR / f"aruco_{dict_name}_{landing_id}.{args.format}"
if land_file.exists():
land_img = cv2.imread(str(land_file), cv2.IMREAD_GRAYSCALE)
print(f"\nDistance detection test (ID {landing_id}):")
for sim_size in [120, 80, 60, 40]:
small = cv2.resize(land_img, (sim_size, sim_size), interpolation=cv2.INTER_AREA)
bg = np.full((480, 640), 180, dtype=np.uint8)
y, x = (480 - sim_size) // 2, (640 - sim_size) // 2
bg[y:y + sim_size, x:x + sim_size] = small
corners, detected_ids, _ = _detect(bg)
status = "OK" if detected_ids is not None else "FAIL"
print(f" {sim_size}x{sim_size}px in 640x480 frame: {status}")
print(f"\nDone. {ok}/{ok + fail} markers verified.")
print(f"Done. {ok}/{len(ids)} markers verified.")
if __name__ == "__main__":

View File

@@ -19,7 +19,6 @@ class SearchMode(Enum):
class Search:
POSITION_TOLERANCE = 1.0
CHECK_INTERVAL = 0.5
MAX_TRAVEL_TIME = 30.0
@@ -41,7 +40,6 @@ class Search:
self.actions = actions or {}
self.land_ids = set(self.actions.get("land", []))
# Search pattern parameters
self.spiral_max_legs = 12
self.spiral_initial_leg = self.CAM_FOV_METERS
self.spiral_leg_increment = self.CAM_FOV_METERS * 0.8
@@ -55,30 +53,16 @@ class Search:
self.levy_field_size = 50.0
self.levy_angle_dist = uniform(loc=-180, scale=360)
# Pre-flight plan
self.waypoints = []
self.current_wp = 0
self._on_waypoint_change = None # callback: fn(index)
self._on_waypoint_change = None
def configure(self, **kwargs):
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
# ═══════════════════════════════════════════════════════════
# PRE-FLIGHT PLANNING
# ═══════════════════════════════════════════════════════════
def plan(self, start_pos=(0.0, 0.0), geofence_points=None):
"""Pre-compute all waypoints for the search pattern.
Args:
start_pos: (x, y) starting position in local NED
geofence_points: list of (x, y) polygon vertices, or None
Returns:
List of (x, y) absolute waypoints
"""
if self.mode == SearchMode.SPIRAL:
waypoints = self._plan_spiral(start_pos)
elif self.mode == SearchMode.LAWNMOWER:
@@ -88,7 +72,6 @@ class Search:
else:
waypoints = [start_pos]
# Clip to geofence
if geofence_points:
waypoints = self._clip_to_geofence(waypoints, geofence_points)
@@ -105,7 +88,6 @@ class Search:
return waypoints
def _plan_spiral(self, start):
"""Expanding square spiral from center outward."""
waypoints = [start]
x, y = start
@@ -135,7 +117,6 @@ class Search:
return waypoints
def _plan_lawnmower(self, start):
"""Lawnmower (boustrophedon) pattern."""
waypoints = [start]
sx, sy = start
lane_spacing = self.lawn_lane_spacing
@@ -158,7 +139,6 @@ class Search:
return waypoints
def _plan_levy(self, start):
"""Lévy walk — random but pre-computed."""
waypoints = [start]
x, y = start
@@ -175,26 +155,18 @@ class Search:
return waypoints
def _clip_to_geofence(self, waypoints, polygon):
"""Clip waypoints to stay within the geofence polygon.
If a waypoint is outside, it is projected to the nearest
point on the polygon boundary. Consecutive out-of-bounds
waypoints are collapsed.
"""
clipped = []
for wx, wy in waypoints:
if self._point_in_polygon(wx, wy, polygon):
clipped.append((wx, wy))
else:
nearest = self._nearest_point_on_polygon(wx, wy, polygon)
# Avoid duplicate consecutive points
if not clipped or distance_2d(clipped[-1], nearest) > 0.5:
clipped.append(nearest)
return clipped
@staticmethod
def _point_in_polygon(px, py, polygon):
"""Ray-casting point-in-polygon test."""
n = len(polygon)
inside = False
j = n - 1
@@ -209,14 +181,12 @@ class Search:
@staticmethod
def _nearest_point_on_polygon(px, py, polygon):
"""Find the nearest point on the polygon edge to (px, py)."""
min_dist = float('inf')
nearest = (px, py)
n = len(polygon)
for i in range(n):
x1, y1 = polygon[i]
x2, y2 = polygon[(i + 1) % n]
# Project point onto segment
dx, dy = x2 - x1, y2 - y1
length_sq = dx * dx + dy * dy
if length_sq == 0:
@@ -230,12 +200,7 @@ class Search:
nearest = proj
return nearest
# ═══════════════════════════════════════════════════════════
# EXECUTION
# ═══════════════════════════════════════════════════════════
def run(self):
"""Execute the pre-planned search by following waypoints."""
if not self.waypoints:
self.plan()
@@ -282,10 +247,6 @@ class Search:
return self.found_markers
# ═══════════════════════════════════════════════════════════
# CAMERA & MARKER DETECTION
# ═══════════════════════════════════════════════════════════
def get_camera_frame(self):
if self.camera is None:
return None
@@ -329,13 +290,7 @@ class Search:
return new_markers
# ═══════════════════════════════════════════════════════════
# LANDING
# ═══════════════════════════════════════════════════════════
def execute_landing(self, initial_detections):
"""Visual-servoing descent: keep the ArUco tag centered
while descending."""
target_det = None
for d in initial_detections:
if d.get("id") in self.land_ids:
@@ -350,12 +305,10 @@ class Search:
print(f"[SEARCH] Target marker ID:{target_det['id']} "
f"distance:{target_det['distance']:.2f}m")
# Camera parameters (must match model.sdf)
IMG_W, IMG_H = 640, 480
IMG_CX, IMG_CY = IMG_W / 2, IMG_H / 2
HFOV = 1.3962634 # 80° horizontal FOV
# Landing parameters
DESCENT_STEP = 1.0
LAND_ALT = 1.5
CENTER_PX = 50
@@ -363,7 +316,6 @@ class Search:
MAX_LOST = 30
GAIN = 0.35
# Start visual servoing from current position
self.ctrl.update_state()
current_alt = self.ctrl.altitude
@@ -379,7 +331,6 @@ class Search:
self._landing = False
return
# Descend one step
current_alt = max(current_alt - DESCENT_STEP, LAND_ALT)
print(f"\n[SEARCH] Descending to {current_alt:.1f}m")
self.ctrl.update_state()
@@ -387,13 +338,11 @@ class Search:
self.ctrl.move_local_ned(cur['x'], cur['y'], -current_alt)
sleep(2.0)
# Final precision centering
print(f"[SEARCH] Final centering at {LAND_ALT:.1f}m")
self._center_over_marker(
LAND_ALT, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV,
35, MAX_CORRECTIONS, MAX_LOST, 0.25)
# Land
print(f"\n[SEARCH] ===== LANDING =====")
self.ctrl.land()
self.landed = True
@@ -401,10 +350,6 @@ class Search:
def _center_over_marker(self, target_alt, img_w, img_h, img_cx, img_cy,
hfov, center_px, max_corrections, max_lost, gain):
"""Center the UAV over the ArUco marker using pixel error.
Returns True if centered, False if partial, None if lost.
"""
lost_count = 0
centered = False
@@ -465,15 +410,10 @@ class Search:
sleep(0.5)
if not centered:
print(f"\n[SEARCH] Partially centered at {target_alt:.1f}m, "
f"continuing")
print(f"\n[SEARCH] Partially centered at {target_alt:.1f}m, continuing")
return centered
# ═══════════════════════════════════════════════════════════
# MOVEMENT
# ═══════════════════════════════════════════════════════════
def wait_for_position(self, target_x, target_y, timeout=None):
if timeout is None:
timeout = self.MAX_TRAVEL_TIME
@@ -500,7 +440,6 @@ class Search:
return False
def _wait_arrival(self, target_x, target_y, timeout=15.0):
"""Wait for position without checking markers (landing)."""
t0 = time.time()
while time.time() - t0 < timeout and self.running:
self.ctrl.update_state()
@@ -524,10 +463,6 @@ class Search:
self.ctrl.move_local_ned(x, y, z)
return self.wait_for_position(x, y)
# ═══════════════════════════════════════════════════════════
# RESULTS
# ═══════════════════════════════════════════════════════════
def stop(self):
self.running = False

View File

@@ -6,7 +6,6 @@ import argparse
import numpy as np
from time import sleep, perf_counter
from typing import Tuple
from pymavlink import mavutil
import pymavlink.dialects.v20.ardupilotmega as mavlink
from pymavlink.dialects.v20.ardupilotmega import (
@@ -25,7 +24,6 @@ LOWEST_ALT = 3.0
GUIDED_MODE = 4
GUIDED_NOGPS_MODE = 20
DEFAULT_WPNAV_SPEED = 150
DEFAULT_WPNAV_ACCEL = 100
DEFAULT_WPNAV_SPEED_UP = 100
@@ -35,7 +33,6 @@ DEFAULT_LOIT_SPEED = 150
class Controller:
HOLD_ALT = HOLD_ALT
LOWEST_ALT = LOWEST_ALT
@@ -133,23 +130,14 @@ class Controller:
def arm(self, retries: int = 30):
self.set_mode_guided()
for i in range(retries):
print(f"[UAV] Arm attempt {i+1}/{retries}...")
# Force arm: param2=21196 bypasses pre-arm checks
# (equivalent to MAVProxy's "arm throttle force")
self.conn.mav.command_long_send(
self.conn.target_system,
self.conn.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0,
1, # param1: 1 = arm
21196, # param2: force arm magic number
0, 0, 0, 0, 0)
0, 1, 21196, 0, 0, 0, 0, 0)
# Wait up to 4 seconds for armed status instead of
# blocking forever with motors_armed_wait()
armed = False
for _ in range(20):
self._drain_messages()
@@ -157,16 +145,12 @@ class Controller:
armed = True
break
time.sleep(0.2)
if armed:
self.armed = True
print("[UAV] Armed")
return True
print(f"[UAV] Arm attempt {i+1} failed, retrying...")
time.sleep(2)
print("[UAV] FAILED to arm after all retries")
return False
def disarm(self):
@@ -264,7 +248,6 @@ class Controller:
print(f"\r[UAV] Climbing: {self.altitude:.1f}m / {target_alt:.1f}m ",
end='', flush=True)
sleep(0.2)
print(f"\n[UAV] Altitude timeout at {self.altitude:.1f}m")
return False
def wait_for_gps(self, timeout: float = 120.0):
@@ -282,7 +265,6 @@ class Controller:
return True
elapsed = int(time.time() - t0)
print(f"\r[UAV] Waiting for GPS ... {elapsed}s ", end='', flush=True)
print("\n[UAV] GPS timeout")
return False
def configure_speed_limits(self,
@@ -292,7 +274,6 @@ class Controller:
wpnav_speed_dn=DEFAULT_WPNAV_SPEED_DN,
wpnav_accel_z=DEFAULT_WPNAV_ACCEL_Z,
loit_speed=DEFAULT_LOIT_SPEED):
params = {
'WPNAV_SPEED': wpnav_speed,
'WPNAV_ACCEL': wpnav_accel,

View File

@@ -65,7 +65,6 @@ def main():
parser.add_argument('--no-record', action='store_true', help='Disable recording')
args = parser.parse_args()
# ── Start recorder ──────────────────────────────────────────
recorder = None
if not args.no_record:
recorder = RunRecorder()
@@ -78,13 +77,11 @@ def main():
altitude = args.altitude or search_cfg.get('altitude', 5.0)
search_mode = args.search
# Marker config (single source of truth)
marker_cfg = search_cfg.get('marker', {})
marker_dict = marker_cfg.get('dictionary', 'DICT_4X4_50')
marker_size = marker_cfg.get('size', 0.5)
landing_ids = marker_cfg.get('landing_ids', [0])
# UGV position
ugv_pos_cfg = ugv_cfg.get('position', {})
ugv_position = (ugv_pos_cfg.get('x', 5.0), ugv_pos_cfg.get('y', 5.0))
@@ -128,7 +125,6 @@ def main():
if mode_cfg:
search.configure(**{f"{search_mode}_{k}": v for k, v in mode_cfg.items()})
# ── Pre-flight plan ─────────────────────────────────────────
geofence_cfg = search_cfg.get('geofence', {})
geofence_points = None
if geofence_cfg.get('enabled', False):
@@ -143,7 +139,6 @@ def main():
print(f"[MAIN] Search: {search_mode} Altitude: {altitude}m")
print(f"[MAIN] Marker: {marker_dict} size:{marker_size}m land:{landing_ids}")
# ── Start flight tracker (show plan before takeoff) ─────────
tracker = FlightTracker(
window_size=600,
world_range=30.0,
@@ -166,12 +161,10 @@ def main():
ctrl.takeoff(altitude)
ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30)
# ── Start recording tracker + camera ────────────────────────
if recorder:
recorder.start_recording(tracker=tracker, camera=camera)
recorder.snapshot_camera("pre_search")
# Feed position updates to tracker during search
original_check = search.check_for_markers
def tracked_check():
result = original_check()
@@ -186,13 +179,11 @@ def main():
return result
search.check_for_markers = tracked_check
# Wire up waypoint progress to tracker
search._on_waypoint_change = lambda idx: tracker.set_active_waypoint(idx)
results = search.run()
search_results = search.get_results()
# ── Snapshot on detection ───────────────────────────────────
if recorder and results:
recorder.snapshot_camera("marker_detected")
@@ -215,7 +206,6 @@ def main():
else:
wait_for_landing(ctrl)
# ── Finalize recording ──────────────────────────────────────
if recorder:
recorder.snapshot_camera("post_landing")
recorder.save_summary(
@@ -226,6 +216,18 @@ def main():
)
recorder.stop()
print(f"\n{'=' * 50}")
print(f" Simulation finished.")
print(f" Tracker and Gazebo windows are still open.")
print(f" Press Ctrl+C to exit.")
print(f"{'=' * 50}\n")
try:
while True:
sleep(1)
except KeyboardInterrupt:
pass
finally:
tracker.stop()
print("[MAIN] Done.")

View File

@@ -1,16 +1,4 @@
#!/usr/bin/env python3
"""
Flight Tracker - Real-time 2D visualization of UAV and UGV positions.
Opens a separate OpenCV window showing:
- Grid with coordinates
- UAV position (green square) with flight path trail
- UGV position (red square)
- Geofence boundary (yellow dashed polygon with warning zone)
- Position data overlay
Can run standalone or be imported and updated by main.py.
"""
import sys
import os
@@ -25,7 +13,6 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
class FlightTracker:
# Colors (BGR)
BG_COLOR = (30, 30, 30)
GRID_COLOR = (60, 60, 60)
GRID_MAJOR_COLOR = (80, 80, 80)
@@ -36,25 +23,16 @@ class FlightTracker:
TEXT_COLOR = (200, 200, 200)
TEXT_DIM_COLOR = (120, 120, 120)
MARKER_COLOR = (180, 0, 220)
FENCE_COLOR = (0, 220, 255) # Yellow-cyan
FENCE_WARN_COLOR = (0, 140, 200) # Darker yellow
FENCE_BREACH_COLOR = (0, 0, 255) # Red
PLAN_COLOR = (180, 140, 50) # Muted cyan-blue
PLAN_ACTIVE_COLOR = (0, 180, 255) # Orange
PLAN_VISITED_COLOR = (60, 60, 60) # Dim gray
FENCE_COLOR = (0, 220, 255)
FENCE_WARN_COLOR = (0, 140, 200)
FENCE_BREACH_COLOR = (0, 0, 255)
PLAN_COLOR = (180, 140, 50)
PLAN_ACTIVE_COLOR = (0, 180, 255)
PLAN_VISITED_COLOR = (60, 60, 60)
def __init__(self, window_size=600, world_range=30.0,
ugv_position=(5.0, 5.0), show_ugv=True,
geofence=None):
"""
Args:
window_size: Window dimension in pixels (square)
world_range: World coordinate range (-range to +range) in meters
ugv_position: Known UGV position in local NED (x, y)
show_ugv: Whether to draw the UGV
geofence: Dict with 'points' (list of [x,y]), 'warning_distance',
'min_altitude', 'max_altitude'
"""
self.window_size = window_size
self.world_range = world_range
self.ugv_pos = ugv_position
@@ -66,11 +44,9 @@ class FlightTracker:
self.trail = deque()
self.markers_found = {}
# Flight plan
self.plan_waypoints = []
self.active_waypoint = 0
# Geofence
self.geofence = geofence
self.fence_points = []
self.fence_warn_dist = 3.0
@@ -91,14 +67,11 @@ class FlightTracker:
self.total_width = self.window_size + self.panel_width
def world_to_pixel(self, x, y):
"""Convert NED world coords to pixel coords on the map.
NED: x=North(up on screen), y=East(right on screen)"""
px = int(self.window_size / 2 + (y / self.world_range) * (self.window_size / 2))
py = int(self.window_size / 2 - (x / self.world_range) * (self.window_size / 2))
return (px, py)
def update_uav(self, x, y, altitude=0.0, heading=0.0):
"""Update UAV position (NED coords)."""
with self._lock:
self.uav_pos = (x, y)
self.uav_alt = altitude
@@ -106,28 +79,23 @@ class FlightTracker:
self.trail.append((x, y))
def update_ugv(self, x, y):
"""Update UGV position."""
with self._lock:
self.ugv_pos = (x, y)
def add_marker(self, marker_id, x, y):
"""Record a detected marker position."""
with self._lock:
self.markers_found[marker_id] = (x, y)
def set_plan(self, waypoints):
"""Set pre-computed flight plan waypoints for visualization."""
with self._lock:
self.plan_waypoints = list(waypoints)
self.active_waypoint = 0
def set_active_waypoint(self, idx):
"""Update which waypoint the UAV is currently targeting."""
with self._lock:
self.active_waypoint = idx
def _point_in_polygon(self, px, py, polygon):
"""Simple ray-casting point-in-polygon test."""
n = len(polygon)
inside = False
j = n - 1
@@ -140,13 +108,11 @@ class FlightTracker:
return inside
def _distance_to_polygon_edge(self, px, py, polygon):
"""Minimum distance from point to polygon edges."""
min_dist = float('inf')
n = len(polygon)
for i in range(n):
x1, y1 = polygon[i]
x2, y2 = polygon[(i + 1) % n]
# Point-to-segment distance
dx, dy = x2 - x1, y2 - y1
length_sq = dx * dx + dy * dy
if length_sq == 0:
@@ -160,13 +126,11 @@ class FlightTracker:
return min_dist
def draw(self):
"""Render the full tracker frame."""
frame = np.full((self.window_size, self.total_width, 3),
self.BG_COLOR, dtype=np.uint8)
self._draw_grid(frame)
# Draw geofence before other elements
if self.fence_points:
self._draw_geofence(frame)
@@ -180,7 +144,6 @@ class FlightTracker:
plan_copy = list(self.plan_waypoints)
active_wp = self.active_waypoint
# Draw plan UNDER the trail
if plan_copy:
self._draw_plan(frame, plan_copy, active_wp)
@@ -202,57 +165,37 @@ class FlightTracker:
return frame
def _draw_grid(self, frame):
"""Draw coordinate grid."""
step = 5.0
r = self.world_range
n = int(r / step)
for i in range(-n, n + 1):
val = i * step
color = self.GRID_MAJOR_COLOR if i % 2 == 0 else self.GRID_COLOR
# Vertical lines (constant y)
px, _ = self.world_to_pixel(0, val)
cv2.line(frame, (px, 0), (px, self.window_size), color, 1)
# Horizontal lines (constant x)
_, py = self.world_to_pixel(val, 0)
cv2.line(frame, (0, py), (self.window_size, py), color, 1)
# Axis labels
if i != 0 and i % 2 == 0:
cv2.putText(frame, f"{val:.0f}",
(px + 2, self.window_size - 5),
cv2.putText(frame, f"{val:.0f}", (px + 2, self.window_size - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.3, self.TEXT_DIM_COLOR, 1)
cv2.putText(frame, f"{val:.0f}",
(5, py - 3),
cv2.putText(frame, f"{val:.0f}", (5, py - 3),
cv2.FONT_HERSHEY_SIMPLEX, 0.3, self.TEXT_DIM_COLOR, 1)
# Draw axes (origin cross)
cx, cy = self.world_to_pixel(0, 0)
cv2.line(frame, (cx, 0), (cx, self.window_size), self.AXIS_COLOR, 1)
cv2.line(frame, (0, cy), (self.window_size, cy), self.AXIS_COLOR, 1)
# Axis labels at origin
cv2.putText(frame, "N", (cx + 4, 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.AXIS_COLOR, 1)
cv2.putText(frame, "E", (self.window_size - 15, cy - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.AXIS_COLOR, 1)
cv2.putText(frame, "N", (cx + 4, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.AXIS_COLOR, 1)
cv2.putText(frame, "E", (self.window_size - 15, cy - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.AXIS_COLOR, 1)
def _draw_geofence(self, frame):
"""Draw geofence boundary with warning zone and fill."""
pts = [self.fence_points[i] for i in range(len(self.fence_points))]
pixel_pts = np.array([self.world_to_pixel(x, y) for x, y in pts], dtype=np.int32)
# Semi-transparent fill inside the geofence
overlay = frame.copy()
cv2.fillPoly(overlay, [pixel_pts], (40, 50, 40))
cv2.addWeighted(overlay, 0.3, frame, 0.7, 0, frame)
# Draw warning zone (inner boundary)
if self.fence_warn_dist > 0:
# Inset the polygon by warning_distance to show warning zone
# Simple approach: draw dashed inner boundary
warn_pts = []
cx = sum(p[0] for p in pts) / len(pts)
cy = sum(p[1] for p in pts) / len(pts)
@@ -266,25 +209,17 @@ class FlightTracker:
else:
warn_pts.append((x, y))
warn_pixel_pts = np.array(
[self.world_to_pixel(x, y) for x, y in warn_pts], dtype=np.int32
)
warn_pixel_pts = np.array([self.world_to_pixel(x, y) for x, y in warn_pts], dtype=np.int32)
self._draw_dashed_polygon(frame, warn_pixel_pts, self.FENCE_WARN_COLOR, 1, 8)
# Draw main fence boundary (solid bright line)
cv2.polylines(frame, [pixel_pts], True, self.FENCE_COLOR, 2)
# Corner markers
for px, py in pixel_pts:
cv2.circle(frame, (int(px), int(py)), 4, self.FENCE_COLOR, -1)
# Label
top_px, top_py = pixel_pts[0]
cv2.putText(frame, "GEOFENCE", (int(top_px) + 6, int(top_py) - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.FENCE_COLOR, 1)
def _draw_dashed_polygon(self, frame, pts, color, thickness, dash_len):
"""Draw a polygon with dashed lines."""
n = len(pts)
for i in range(n):
p1 = pts[i]
@@ -292,7 +227,6 @@ class FlightTracker:
self._draw_dashed_line(frame, tuple(p1), tuple(p2), color, thickness, dash_len)
def _draw_dashed_line(self, frame, pt1, pt2, color, thickness, dash_len):
"""Draw a dashed line between two points."""
x1, y1 = pt1
x2, y2 = pt2
dx = x2 - x1
@@ -313,268 +247,142 @@ class FlightTracker:
cv2.line(frame, (sx, sy), (ex, ey), color, thickness)
def _draw_plan(self, frame, waypoints, active_idx):
"""Draw the pre-flight plan: dashed path with numbered waypoints."""
if len(waypoints) < 2:
return
pts = [self.world_to_pixel(x, y) for x, y in waypoints]
# Draw connecting lines
for i in range(1, len(pts)):
if i <= active_idx:
color = self.PLAN_VISITED_COLOR
else:
color = self.PLAN_COLOR
color = self.PLAN_VISITED_COLOR if i <= active_idx else self.PLAN_COLOR
self._draw_dashed_line(frame, pts[i - 1], pts[i], color, 1, 8)
# Draw waypoint markers
for i, (px, py) in enumerate(pts):
if i < active_idx:
# Visited — small dim dot
cv2.circle(frame, (px, py), 3, self.PLAN_VISITED_COLOR, -1)
elif i == active_idx:
# Active target — bright ring
cv2.circle(frame, (px, py), 8, self.PLAN_ACTIVE_COLOR, 2)
cv2.circle(frame, (px, py), 3, self.PLAN_ACTIVE_COLOR, -1)
else:
# Future — small cyan dot
cv2.circle(frame, (px, py), 4, self.PLAN_COLOR, 1)
# Waypoint number label
label = str(i)
cv2.putText(frame, label, (px + 7, py - 5),
cv2.putText(frame, str(i), (px + 7, py - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.28, self.PLAN_COLOR, 1)
def _draw_trail(self, frame, trail):
"""Draw the flight path trail with persistent visibility."""
if len(trail) < 2:
return
# Minimum brightness so old trail never disappears
MIN_ALPHA = 0.3
FADE_POINTS = 200 # only the newest N points fade
FADE_POINTS = 200
pts = [self.world_to_pixel(x, y) for x, y in trail]
n = len(pts)
fade_start = max(0, n - FADE_POINTS)
for i in range(1, n):
if i <= fade_start:
# Old segments: solid dim color (never invisible)
alpha = MIN_ALPHA
else:
# Recent segments: fade from MIN_ALPHA to 1.0
progress = (i - fade_start) / FADE_POINTS
alpha = MIN_ALPHA + (1.0 - MIN_ALPHA) * progress
color = (
int(self.UAV_TRAIL_COLOR[0] * alpha),
color = (int(self.UAV_TRAIL_COLOR[0] * alpha),
int(self.UAV_TRAIL_COLOR[1] * alpha),
int(self.UAV_TRAIL_COLOR[2] * alpha),
)
int(self.UAV_TRAIL_COLOR[2] * alpha))
cv2.line(frame, pts[i - 1], pts[i], color, 2)
def _draw_uav(self, frame, pos, heading):
"""Draw UAV as a green square with heading indicator."""
px, py = self.world_to_pixel(pos[0], pos[1])
size = 8
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size),
self.UAV_COLOR, -1)
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size),
(255, 255, 255), 1)
# Heading line
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), self.UAV_COLOR, -1)
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), (255, 255, 255), 1)
hx = int(px + 15 * math.sin(heading))
hy = int(py - 15 * math.cos(heading))
cv2.line(frame, (px, py), (hx, hy), self.UAV_COLOR, 2)
# Label
cv2.putText(frame, "UAV", (px + 12, py - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.UAV_COLOR, 1)
cv2.putText(frame, "UAV", (px + 12, py - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.UAV_COLOR, 1)
def _draw_ugv(self, frame, pos):
"""Draw UGV as a red square."""
px, py = self.world_to_pixel(pos[0], pos[1])
size = 10
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size),
self.UGV_COLOR, -1)
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size),
(255, 255, 255), 1)
cv2.putText(frame, "UGV", (px + 14, py - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.UGV_COLOR, 1)
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), self.UGV_COLOR, -1)
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size), (255, 255, 255), 1)
cv2.putText(frame, "UGV", (px + 14, py - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.UGV_COLOR, 1)
def _draw_panel(self, frame, uav, alt, hdg, ugv, trail):
"""Draw the data panel on the right side."""
x0 = self.window_size + 10
y = 25
# Separator line
cv2.line(frame, (self.window_size, 0), (self.window_size, self.window_size),
self.GRID_MAJOR_COLOR, 1)
# Title
cv2.putText(frame, "FLIGHT TRACKER", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
cv2.line(frame, (self.window_size, 0), (self.window_size, self.window_size), self.GRID_MAJOR_COLOR, 1)
cv2.putText(frame, "FLIGHT TRACKER", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
y += 30
# Elapsed time
elapsed = time.time() - self.start_time
mins = int(elapsed // 60)
secs = int(elapsed % 60)
cv2.putText(frame, f"Time: {mins:02d}:{secs:02d}", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TEXT_DIM_COLOR, 1)
mins, secs = int(elapsed // 60), int(elapsed % 60)
cv2.putText(frame, f"Time: {mins:02d}:{secs:02d}", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TEXT_DIM_COLOR, 1)
y += 30
# UAV section
cv2.putText(frame, "UAV", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.UAV_COLOR, 1)
cv2.putText(frame, "UAV", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.UAV_COLOR, 1)
y += 22
cv2.putText(frame, f"X: {uav[0]:+.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
cv2.putText(frame, f"X: {uav[0]:+.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
cv2.putText(frame, f"Y: {uav[1]:+.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
cv2.putText(frame, f"Y: {uav[1]:+.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
cv2.putText(frame, f"Alt: {alt:.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
cv2.putText(frame, f"Alt: {alt:.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
cv2.putText(frame, f"Hdg: {math.degrees(hdg):.0f} deg", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
cv2.putText(frame, f"Hdg: {math.degrees(hdg):.0f} deg", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 30
# UGV section
if self.show_ugv:
cv2.putText(frame, "UGV", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.UGV_COLOR, 1)
cv2.putText(frame, "UGV", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.UGV_COLOR, 1)
y += 22
cv2.putText(frame, f"X: {ugv[0]:+.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
cv2.putText(frame, f"X: {ugv[0]:+.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
cv2.putText(frame, f"Y: {ugv[1]:+.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
cv2.putText(frame, f"Y: {ugv[1]:+.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
dist = np.sqrt((uav[0] - ugv[0])**2 + (uav[1] - ugv[1])**2)
cv2.putText(frame, f"Dist: {dist:.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
cv2.putText(frame, f"Dist: {dist:.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 30
# Geofence section
if self.fence_points:
# Check fence status
inside = self._point_in_polygon(uav[0], uav[1], self.fence_points)
alt_ok = self.fence_alt_min <= alt <= self.fence_alt_max
fence_dist = self._distance_to_polygon_edge(uav[0], uav[1], self.fence_points)
if not inside or not alt_ok:
status = "BREACHED"
status_color = self.FENCE_BREACH_COLOR
status, status_color = "BREACHED", self.FENCE_BREACH_COLOR
elif fence_dist < self.fence_warn_dist:
status = "WARNING"
status_color = self.FENCE_WARN_COLOR
status, status_color = "WARNING", self.FENCE_WARN_COLOR
else:
status = "OK"
status_color = self.FENCE_COLOR
status, status_color = "OK", self.FENCE_COLOR
cv2.putText(frame, "GEOFENCE", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.FENCE_COLOR, 1)
cv2.putText(frame, "GEOFENCE", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.FENCE_COLOR, 1)
y += 22
cv2.putText(frame, f"Status: {status}", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, status_color, 1)
cv2.putText(frame, f"Status: {status}", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, status_color, 1)
y += 18
cv2.putText(frame, f"Edge: {fence_dist:.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
cv2.putText(frame, f"Edge: {fence_dist:.1f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
cv2.putText(frame, f"Alt lim: {self.fence_alt_min:.0f}-{self.fence_alt_max:.0f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
cv2.putText(frame, f"Alt lim: {self.fence_alt_min:.0f}-{self.fence_alt_max:.0f}m", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 30
# Markers section
cv2.putText(frame, "MARKERS", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.MARKER_COLOR, 1)
cv2.putText(frame, "MARKERS", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.MARKER_COLOR, 1)
y += 22
if self.markers_found:
for mid, (mx, my) in self.markers_found.items():
cv2.putText(frame, f"ID:{mid} ({mx:.1f},{my:.1f})", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_COLOR, 1)
cv2.putText(frame, f"ID:{mid} ({mx:.1f},{my:.1f})", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_COLOR, 1)
y += 16
else:
cv2.putText(frame, "None", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_DIM_COLOR, 1)
cv2.putText(frame, "None", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_DIM_COLOR, 1)
y += 16
y += 20
cv2.putText(frame, f"Trail: {len(trail)} pts", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_DIM_COLOR, 1)
cv2.putText(frame, f"Trail: {len(trail)} pts", (x0, y), cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_DIM_COLOR, 1)
def start(self):
"""Start the display loop in a background thread."""
self.running = True
self._thread = threading.Thread(target=self._display_loop, daemon=True)
self._thread.start()
def _display_loop(self):
"""OpenCV display loop (runs in its own thread)."""
print("[TRACKER] Flight tracker window started")
while self.running:
frame = self.draw()
cv2.imshow("Flight Tracker", frame)
key = cv2.waitKey(50) & 0xFF
if key == ord('q'):
if cv2.waitKey(50) & 0xFF == ord('q'):
break
cv2.destroyWindow("Flight Tracker")
print("[TRACKER] Flight tracker stopped")
def stop(self):
"""Stop the display loop."""
self.running = False
if self._thread:
self._thread.join(timeout=2.0)
def main():
"""Standalone test mode - simulates a spiral flight path with geofence."""
geofence = {
'enabled': True,
'warning_distance': 3.0,
'min_altitude': 0.0,
'max_altitude': 15.0,
'points': [[-5, -5], [-5, 25], [25, 25], [25, -5]],
}
tracker = FlightTracker(
window_size=600,
world_range=30.0,
ugv_position=(5.0, 5.0),
geofence=geofence,
)
print("Flight Tracker - Standalone Test (with geofence)")
print("Press 'q' to quit")
x, y = 0.0, 0.0
t = 0.0
while True:
t += 0.05
radius = t * 0.5
x = radius * math.cos(t)
y = radius * math.sin(t)
heading = math.atan2(math.cos(t), -math.sin(t))
tracker.update_uav(x, y, altitude=5.0, heading=heading)
if t > 5 and 0 not in tracker.markers_found:
tracker.add_marker(0, 5.0, 5.0)
frame = tracker.draw()
cv2.imshow("Flight Tracker", frame)
key = cv2.waitKey(30) & 0xFF
if key == ord('q'):
break
cv2.destroyAllWindows()
if __name__ == "__main__":
main()

View File

@@ -1,5 +1,4 @@
#!/usr/bin/env python3
"""Utility helper functions."""
import numpy as np
import yaml
@@ -77,32 +76,19 @@ class PIDController:
self.ki = ki
self.kd = kd
self.output_limits = output_limits
self.integral = 0.0
self.last_error = 0.0
self.last_time = None
def compute(self, error, current_time):
if self.last_time is None:
dt = 0.0
else:
dt = current_time - self.last_time
dt = 0.0 if self.last_time is None else current_time - self.last_time
self.integral += error * dt
if dt > 0:
derivative = (error - self.last_error) / dt
else:
derivative = 0.0
derivative = (error - self.last_error) / dt if dt > 0 else 0.0
output = self.kp * error + self.ki * self.integral + self.kd * derivative
if self.output_limits:
output = clamp(output, self.output_limits[0], self.output_limits[1])
self.last_error = error
self.last_time = current_time
return output
def reset(self):

View File

@@ -1,14 +1,4 @@
#!/usr/bin/env python3
"""
Run Recorder — Captures logs, flight path, camera frames, and summary for each run.
Creates timestamped folders in /results with:
flight_path.avi — Flight tracker video
camera.avi — Downward camera video
flight_path.png — Final flight tracker snapshot
log.txt — Full console output
summary.json — Run metadata and results
"""
import os
import sys
@@ -23,8 +13,6 @@ from datetime import datetime
class RunRecorder:
"""Records all outputs from a simulation run."""
def __init__(self, results_dir=None, fps=5):
if results_dir is None:
project_dir = Path(__file__).resolve().parent.parent
@@ -33,7 +21,6 @@ class RunRecorder:
results_dir = Path(results_dir)
results_dir.mkdir(parents=True, exist_ok=True)
# Find next sequential run number
existing = [d.name for d in results_dir.iterdir()
if d.is_dir() and d.name.startswith("run_")]
nums = []
@@ -51,7 +38,6 @@ class RunRecorder:
self.fps = fps
self.start_time = time.time()
# Log capture
self._log_path = self.run_dir / "log.txt"
self._log_file = open(self._log_path, "w")
self._original_stdout = sys.stdout
@@ -59,29 +45,24 @@ class RunRecorder:
self._tee_stdout = _TeeWriter(sys.stdout, self._log_file)
self._tee_stderr = _TeeWriter(sys.stderr, self._log_file)
# Video writers (initialized lazily on first frame)
self._tracker_writer = None
self._camera_writer = None
self._tracker_size = None
self._camera_size = None
# Frame counters
self._tracker_frames = 0
self._camera_frames = 0
# Snapshot storage
self._last_tracker_frame = None
self._last_camera_frame = None
self._camera_snapshots = []
# Recording thread
self._recording = False
self._tracker_ref = None
self._camera_ref = None
self._record_thread = None
self._lock = threading.Lock()
# Metadata
self.metadata = {
"run": run_num,
"start_time": datetime.now().isoformat(),
@@ -91,17 +72,14 @@ class RunRecorder:
print(f"[REC] Recording to: {self.run_dir}")
def start_logging(self):
"""Redirect stdout/stderr to both console and log file."""
sys.stdout = self._tee_stdout
sys.stderr = self._tee_stderr
def stop_logging(self):
"""Restore original stdout/stderr."""
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
def start_recording(self, tracker=None, camera=None):
"""Start background recording of tracker and camera frames."""
self._tracker_ref = tracker
self._camera_ref = camera
self._recording = True
@@ -111,12 +89,10 @@ class RunRecorder:
self._record_thread.start()
def _record_loop(self):
"""Periodically capture frames from tracker and camera."""
interval = 1.0 / self.fps
while self._recording:
t0 = time.time()
# Capture tracker frame
if self._tracker_ref is not None:
try:
frame = self._tracker_ref.draw()
@@ -126,7 +102,6 @@ class RunRecorder:
except Exception:
pass
# Capture camera frame
if self._camera_ref is not None:
try:
frame = self._camera_ref.frames.get("downward")
@@ -141,7 +116,6 @@ class RunRecorder:
time.sleep(sleep_time)
def _write_tracker_frame(self, frame):
"""Write a frame to the tracker video."""
h, w = frame.shape[:2]
if self._tracker_writer is None:
self._tracker_size = (w, h)
@@ -152,7 +126,6 @@ class RunRecorder:
self._tracker_frames += 1
def _write_camera_frame(self, frame):
"""Write a frame to the camera video."""
h, w = frame.shape[:2]
if self._camera_writer is None:
self._camera_size = (w, h)
@@ -163,7 +136,6 @@ class RunRecorder:
self._camera_frames += 1
def snapshot_camera(self, label="snapshot"):
"""Save a named snapshot of the current camera frame."""
if self._camera_ref is None:
return
frame = self._camera_ref.frames.get("downward")
@@ -178,7 +150,6 @@ class RunRecorder:
def save_summary(self, search_mode="", altitude=0, markers=None,
landed=False, extra=None):
"""Write the run summary JSON."""
duration = time.time() - self.start_time
mins = int(duration // 60)
secs = int(duration % 60)
@@ -223,29 +194,24 @@ class RunRecorder:
print(f"[REC] Summary saved: {path}")
def stop(self):
"""Stop recording and finalize all outputs."""
self._recording = False
if self._record_thread:
self._record_thread.join(timeout=3.0)
# Save final flight path image
if self._last_tracker_frame is not None:
path = self.run_dir / "flight_path.png"
cv2.imwrite(str(path), self._last_tracker_frame)
print(f"[REC] Flight path saved: {path}")
# Save final camera frame
if self._last_camera_frame is not None:
path = self.run_dir / "camera_final.png"
cv2.imwrite(str(path), self._last_camera_frame)
# Release video writers
if self._tracker_writer:
self._tracker_writer.release()
if self._camera_writer:
self._camera_writer.release()
# Stop log capture
self.stop_logging()
self._log_file.close()
@@ -253,7 +219,6 @@ class RunRecorder:
mins = int(duration // 60)
secs = int(duration % 60)
# Print to original stdout since we stopped the tee
self._original_stdout.write(
f"\n[REC] Run recorded: {self.run_dir}\n"
f"[REC] Duration: {mins}m {secs}s | "
@@ -263,8 +228,6 @@ class RunRecorder:
class _TeeWriter:
"""Writes to both a stream and a file simultaneously."""
def __init__(self, stream, log_file):
self._stream = stream
self._log = log_file
@@ -272,9 +235,7 @@ class _TeeWriter:
def write(self, data):
self._stream.write(data)
try:
# Strip ANSI escape codes for the log file
clean = data
self._log.write(clean)
self._log.write(data)
self._log.flush()
except (ValueError, IOError):
pass

View File

@@ -1,9 +1,4 @@
#!/usr/bin/env python3
"""
Camera Processor - Handles Gazebo camera feeds for GPS-denied navigation.
Subscribes to gz-transport topics, processes frames, and displays via OpenCV.
Downstream modules (ArUco detector, optical flow) register as callbacks.
"""
import sys
import signal
@@ -13,7 +8,6 @@ import cv2
from gz.transport13 import Node
from gz.msgs10.image_pb2 import Image
# PixelFormatType enum values (from gz.msgs10 Image proto)
PF_RGB_INT8 = 3
PF_BGR_INT8 = 8
PF_R_FLOAT32 = 13
@@ -35,7 +29,6 @@ class CameraProcessor:
}
self.topics = topics
for name, topic in self.topics.items():
self.callbacks[name] = []
ok = self.node.subscribe(Image, topic, self._make_gz_callback(name))
@@ -50,19 +43,12 @@ class CameraProcessor:
def cb(msg):
fmt = msg.pixel_format_type
if fmt == PF_RGB_INT8:
arr = np.frombuffer(msg.data, dtype=np.uint8).reshape(
msg.height, msg.width, 3
)
arr = np.frombuffer(msg.data, dtype=np.uint8).reshape(msg.height, msg.width, 3)
bgr = cv2.cvtColor(arr, cv2.COLOR_RGB2BGR)
elif fmt == PF_BGR_INT8:
arr = np.frombuffer(msg.data, dtype=np.uint8).reshape(
msg.height, msg.width, 3
)
bgr = arr
bgr = np.frombuffer(msg.data, dtype=np.uint8).reshape(msg.height, msg.width, 3)
elif fmt == PF_R_FLOAT32:
arr = np.frombuffer(msg.data, dtype=np.float32).reshape(
msg.height, msg.width
)
arr = np.frombuffer(msg.data, dtype=np.float32).reshape(msg.height, msg.width)
normalized = cv2.normalize(arr, None, 0, 255, cv2.NORM_MINMAX)
bgr = cv2.cvtColor(normalized.astype(np.uint8), cv2.COLOR_GRAY2BGR)
else:
@@ -71,10 +57,8 @@ class CameraProcessor:
processed = self.process_image(bgr)
self.raw_frames[name] = processed.copy()
self.frames[name] = processed
for fn in self.callbacks.get(name, []):
fn(name, processed)
return cb
def process_image(self, image):
@@ -82,40 +66,26 @@ class CameraProcessor:
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
lab = cv2.cvtColor(processed, cv2.COLOR_BGR2LAB)
lab[:, :, 0] = clahe.apply(lab[:, :, 0])
processed = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
return processed
return cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
def register_callback(self, camera_name, fn):
if camera_name in self.callbacks:
self.callbacks[camera_name].append(fn)
print(f"[CAM] Registered callback for {camera_name}: {fn.__name__}")
def stop(self):
self.running = False
def spin(self):
print("[CAM] Running. Press 'q' to quit, 's' to screenshot.")
while self.running:
for name, frame in list(self.frames.items()):
cv2.imshow(name, frame)
key = cv2.waitKey(33) & 0xFF
if key == ord("q"):
if cv2.waitKey(33) & 0xFF == ord("q"):
break
elif key == ord("s"):
for name, frame in self.frames.items():
fname = f"{name}_{int(time.time())}.png"
cv2.imwrite(fname, frame)
print(f"[CAM] Saved {fname}")
cv2.destroyAllWindows()
print("[CAM] Stopped.")
def spin_headless(self):
print("[CAM] Running headless (no GUI).")
while self.running:
time.sleep(0.1)
print("[CAM] Stopped.")
def get_frame(self, camera_name):
return self.frames.get(camera_name)
@@ -124,7 +94,6 @@ class CameraProcessor:
def main():
cameras = "both"
show_gui = True
if len(sys.argv) > 1:
cameras = sys.argv[1].lower()
if "--headless" in sys.argv:
@@ -143,7 +112,6 @@ def main():
topics = all_topics
proc = CameraProcessor(topics=topics, show_gui=show_gui)
try:
import os
src_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@@ -155,13 +123,12 @@ def main():
def detection_overlay(camera_name, frame):
detections = detector.detect(frame)
if detections:
annotated = detector.annotate_frame(frame, detections)
proc.frames[camera_name] = annotated
proc.frames[camera_name] = detector.annotate_frame(frame, detections)
for cam_name in topics:
proc.register_callback(cam_name, detection_overlay)
except Exception as e:
print(f"[CAM] ArUco detection unavailable: {e}")
print(f"[CAM] Detection unavailable: {e}")
if show_gui:
proc.spin()

View File

@@ -32,13 +32,11 @@ H_RES = 480
class ObjectDetector:
CAM_DEG_FOV = 110
def __init__(self, marker_size=0.15, camera_matrix=None,
aruco_dict_name="DICT_4X4_50"):
self.marker_size = marker_size
if camera_matrix is not None:
self.camera_matrix = np.array(camera_matrix).reshape(3, 3)
else:
@@ -46,11 +44,9 @@ class ObjectDetector:
[[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]]
)
self.dist_coeffs = np.zeros(5)
dict_id = ARUCO_DICT.get(aruco_dict_name, cv2.aruco.DICT_4X4_50)
self.aruco_dict = cv2.aruco.getPredefinedDictionary(dict_id)
# Support both old (<4.7) and new (>=4.7) OpenCV ArUco API
try:
self.aruco_params = cv2.aruco.DetectorParameters()
self._new_api = True
@@ -80,17 +76,12 @@ class ObjectDetector:
self.found_markers = {}
self.on_detection = None
print(f"[DET] ObjectDetector initialized ({aruco_dict_name})")
def detect(self, frame):
if frame is None:
return []
if len(frame.shape) == 2:
gray = frame
else:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = frame if len(frame.shape) == 2 else cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if self._new_api:
corners, ids, rejected = self.aruco_detector.detectMarkers(gray)
@@ -110,7 +101,6 @@ class ObjectDetector:
for i, marker_id in enumerate(ids.flatten()):
tvec = tvecs[i][0]
center = np.mean(corners[i][0], axis=0)
detection = {
"id": int(marker_id),
"corners": corners[i][0].tolist(),
@@ -129,39 +119,26 @@ class ObjectDetector:
if detections and self.on_detection:
self.on_detection(detections)
return detections
def annotate_frame(self, frame, detections):
annotated = frame.copy()
if not detections:
return annotated
overlay = annotated.copy()
for d in detections:
corners_arr = np.array(d["corners"], dtype=np.int32)
# Fill marker area with purple
cv2.fillPoly(overlay, [corners_arr], (180, 0, 220))
# Draw purple border
cv2.polylines(annotated, [corners_arr], True, (180, 0, 220), 3)
# Center dot
cx = int(d["center_px"][0])
cy = int(d["center_px"][1])
cx, cy = int(d["center_px"][0]), int(d["center_px"][1])
cv2.circle(annotated, (cx, cy), 5, (255, 255, 255), -1)
# Label with ID and distance
label = f"ID:{d['id']} d:{d['distance']:.2f}m"
cv2.putText(annotated, label,
(int(corners_arr[0][0]), int(corners_arr[0][1]) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (180, 0, 220), 2)
# Blend overlay at 40% opacity
cv2.addWeighted(overlay, 0.4, annotated, 0.6, 0, annotated)
return annotated
def get_found_markers(self):