Aruco Tag Landing and Flight Tracker

This commit is contained in:
2026-02-13 15:36:26 -05:00
parent c91ea920a8
commit 067c96ed28
11 changed files with 929 additions and 57 deletions

View File

@@ -18,3 +18,15 @@ levy:
actions:
land:
- 0
geofence:
enabled: true
warning_distance: 3.0
min_altitude: 0.0
max_altitude: 15.0
# Polygon vertices in local NED (x=North, y=East) meters
points:
- [-5, -5]
- [-5, 25]
- [25, 25]
- [25, -5]

211
scripts/generate_tags.py Normal file
View File

@@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""
Generate ArUco marker tags for use as Gazebo textures.
Usage:
python3 scripts/generate_tags.py # Generate default set
python3 scripts/generate_tags.py --ids 0 5 10 # Generate specific IDs
python3 scripts/generate_tags.py --dict DICT_6X6_250 --ids 0 1 2
python3 scripts/generate_tags.py --all # Generate all 50 tags in DICT_4X4_50
Output:
worlds/tags/aruco_<DICT>_<ID>.png (individual markers)
worlds/tags/land_tag.png (copy of the landing target, ID from config)
"""
import argparse
import sys
import os
from pathlib import Path
import cv2
import numpy as np
import yaml
PROJECT_DIR = Path(__file__).resolve().parent.parent
TAGS_DIR = PROJECT_DIR / "worlds" / "tags"
CONFIG_DIR = PROJECT_DIR / "config"
ARUCO_DICTS = {
"DICT_4X4_50": cv2.aruco.DICT_4X4_50,
"DICT_4X4_100": cv2.aruco.DICT_4X4_100,
"DICT_4X4_250": cv2.aruco.DICT_4X4_250,
"DICT_4X4_1000": cv2.aruco.DICT_4X4_1000,
"DICT_5X5_50": cv2.aruco.DICT_5X5_50,
"DICT_5X5_100": cv2.aruco.DICT_5X5_100,
"DICT_5X5_250": cv2.aruco.DICT_5X5_250,
"DICT_5X5_1000": cv2.aruco.DICT_5X5_1000,
"DICT_6X6_50": cv2.aruco.DICT_6X6_50,
"DICT_6X6_100": cv2.aruco.DICT_6X6_100,
"DICT_6X6_250": cv2.aruco.DICT_6X6_250,
"DICT_6X6_1000": cv2.aruco.DICT_6X6_1000,
"DICT_7X7_50": cv2.aruco.DICT_7X7_50,
"DICT_7X7_100": cv2.aruco.DICT_7X7_100,
"DICT_7X7_250": cv2.aruco.DICT_7X7_250,
"DICT_7X7_1000": cv2.aruco.DICT_7X7_1000,
}
DICT_SIZES = {
"DICT_4X4_50": 50,
"DICT_4X4_100": 100,
"DICT_4X4_250": 250,
"DICT_4X4_1000": 1000,
"DICT_5X5_50": 50,
"DICT_5X5_100": 100,
"DICT_5X5_250": 250,
"DICT_5X5_1000": 1000,
"DICT_6X6_50": 50,
"DICT_6X6_100": 100,
"DICT_6X6_250": 250,
"DICT_6X6_1000": 1000,
"DICT_7X7_50": 50,
"DICT_7X7_100": 100,
"DICT_7X7_250": 250,
"DICT_7X7_1000": 1000,
}
def generate_marker(dict_name, marker_id, pixel_size=600, border_bits=1):
"""Generate a single ArUco marker image with a white border."""
dict_id = ARUCO_DICTS[dict_name]
aruco_dict = cv2.aruco.getPredefinedDictionary(dict_id)
try:
marker_img = cv2.aruco.generateImageMarker(aruco_dict, marker_id, pixel_size)
except AttributeError:
marker_img = cv2.aruco.drawMarker(aruco_dict, marker_id, pixel_size)
border_px = pixel_size // 6
padded = np.ones(
(pixel_size + 2 * border_px, pixel_size + 2 * border_px),
dtype=np.uint8,
) * 255
padded[border_px:border_px + pixel_size, border_px:border_px + pixel_size] = marker_img
return padded
def get_landing_id():
"""Read the landing target ID from search.yaml config."""
search_cfg = CONFIG_DIR / "search.yaml"
if search_cfg.exists():
with open(search_cfg) as f:
cfg = yaml.safe_load(f) or {}
land_ids = cfg.get("actions", {}).get("land", [])
if land_ids:
return land_ids[0]
return 0
def main():
parser = argparse.ArgumentParser(
description="Generate ArUco marker tags for Gazebo simulation"
)
parser.add_argument(
"--dict", default="DICT_4X4_50",
choices=list(ARUCO_DICTS.keys()),
help="ArUco dictionary (default: DICT_4X4_50)",
)
parser.add_argument(
"--ids", type=int, nargs="+", default=None,
help="Marker IDs to generate (default: landing target from config)",
)
parser.add_argument(
"--all", action="store_true",
help="Generate all markers in the dictionary",
)
parser.add_argument(
"--size", type=int, default=600,
help="Marker image size in pixels (default: 600)",
)
parser.add_argument(
"--format", default="png", choices=["png", "jpg"],
help="Output image format (default: png)",
)
args = parser.parse_args()
TAGS_DIR.mkdir(parents=True, exist_ok=True)
dict_name = args.dict
landing_id = get_landing_id()
if args.all:
max_id = DICT_SIZES.get(dict_name, 50)
ids = list(range(max_id))
elif args.ids is not None:
ids = args.ids
else:
ids = [landing_id]
print(f"Generating {len(ids)} ArUco marker(s)")
print(f" Dictionary: {dict_name}")
print(f" Size: {args.size}px")
print(f" Output: {TAGS_DIR}/")
print(f" Landing target ID: {landing_id}")
print()
for marker_id in ids:
marker_img = generate_marker(dict_name, marker_id, args.size)
filename = f"aruco_{dict_name}_{marker_id}.{args.format}"
filepath = TAGS_DIR / filename
cv2.imwrite(str(filepath), marker_img)
print(f" {filename} ({marker_img.shape[0]}x{marker_img.shape[1]}px)")
if marker_id == landing_id:
land_path = TAGS_DIR / f"land_tag.{args.format}"
cv2.imwrite(str(land_path), marker_img)
print(f" land_tag.{args.format} (copy of ID {landing_id})")
# Verify generated tags are detectable
print("\nVerifying detection...")
aruco_dict = cv2.aruco.getPredefinedDictionary(ARUCO_DICTS[dict_name])
try:
params = cv2.aruco.DetectorParameters()
except AttributeError:
params = cv2.aruco.DetectorParameters_create()
params.minMarkerPerimeterRate = 0.01
try:
params.cornerRefinementMethod = cv2.aruco.CORNER_REFINE_SUBPIX
except AttributeError:
pass
try:
detector = cv2.aruco.ArucoDetector(aruco_dict, params)
_detect = lambda img: detector.detectMarkers(img)
except AttributeError:
_detect = lambda img: cv2.aruco.detectMarkers(img, aruco_dict, parameters=params)
ok = 0
fail = 0
for marker_id in ids:
filename = f"aruco_{dict_name}_{marker_id}.{args.format}"
filepath = TAGS_DIR / filename
img = cv2.imread(str(filepath), cv2.IMREAD_GRAYSCALE)
corners, detected_ids, _ = _detect(img)
if detected_ids is not None and marker_id in detected_ids.flatten():
ok += 1
else:
print(f" WARNING: {filename} failed detection check!")
fail += 1
# Also test at simulated camera distances
land_file = TAGS_DIR / f"aruco_{dict_name}_{landing_id}.{args.format}"
if land_file.exists():
land_img = cv2.imread(str(land_file), cv2.IMREAD_GRAYSCALE)
print(f"\nDistance detection test (ID {landing_id}):")
for sim_size in [120, 80, 60, 40]:
small = cv2.resize(land_img, (sim_size, sim_size), interpolation=cv2.INTER_AREA)
bg = np.full((480, 640), 180, dtype=np.uint8)
y, x = (480 - sim_size) // 2, (640 - sim_size) // 2
bg[y:y + sim_size, x:x + sim_size] = small
corners, detected_ids, _ = _detect(bg)
status = "OK" if detected_ids is not None else "FAIL"
print(f" {sim_size}x{sim_size}px in 640x480 frame: {status}")
print(f"\nDone. {ok}/{ok + fail} markers verified.")
if __name__ == "__main__":
main()

View File

@@ -37,6 +37,7 @@ class Search:
self.found_markers = {}
self.running = True
self.landed = False
self._landing = False
self.actions = actions or {}
self.land_ids = set(self.actions.get("land", []))
@@ -81,9 +82,9 @@ class Search:
def get_camera_frame(self):
if self.camera is None:
return None
frame = self.camera.frames.get("downward")
frame = self.camera.raw_frames.get("downward")
if frame is None:
frame = self.camera.frames.get("gimbal")
frame = self.camera.raw_frames.get("gimbal")
return frame
def check_for_markers(self):
@@ -112,51 +113,101 @@ class Search:
f"distance:{d['distance']:.2f}m "
f"UAV at ({pos['x']:.1f}, {pos['y']:.1f})")
if marker_id in self.land_ids:
if marker_id in self.land_ids and not self._landing:
print(f"\n[SEARCH] Landing target ID:{marker_id} found! Starting approach.")
self._landing = True
self.execute_landing(detections)
return new_markers
return new_markers
def execute_landing(self, initial_detections):
if self.servoing is None:
self.ctrl.land()
self.landed = True
self.running = False
"""Fly to the marker position, center over it, then land."""
target_det = None
for d in initial_detections:
if d.get("id") in self.land_ids:
target_det = d
break
if target_det is None:
print("[SEARCH] Lost landing target, resuming search")
return
self.servoing.enable()
self.servoing.process_detections(initial_detections)
# Phase 1: Fly to marker position using tvec offset
# tvec gives [right, down, forward] from camera in meters
tvec = target_det.get("tvec", [0, 0, 0])
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
t0 = time.time()
timeout = 60.0
while time.time() - t0 < timeout and self.running:
# Camera points down: tvec[0]=right=East(+y), tvec[1]=down, tvec[2]=forward=North(+x)
marker_x = pos['x'] + tvec[2]
marker_y = pos['y'] + tvec[0]
print(f"[SEARCH] Flying to marker at NED ({marker_x:.1f}, {marker_y:.1f})")
self.ctrl.move_local_ned(marker_x, marker_y, -self.altitude)
self._wait_arrival(marker_x, marker_y, timeout=15.0)
# Phase 2: Hover and refine position using camera feedback
print("[SEARCH] Centering over marker...")
center_attempts = 0
max_attempts = 30
centered_count = 0
while center_attempts < max_attempts and self.running:
center_attempts += 1
frame = self.get_camera_frame()
if frame is None:
sleep(self.CHECK_INTERVAL)
sleep(0.2)
continue
detections = self.detector.detect(frame)
landed = self.servoing.process_detections(detections)
target = None
for d in detections:
if d.get("id") in self.land_ids:
target = d
break
if landed:
print(f"\n[SEARCH] Landed on target!")
self.landed = True
self.running = False
return
if target is None:
print(f"\r[SEARCH] Centering: marker not visible ({center_attempts}/{max_attempts}) ",
end='', flush=True)
sleep(0.3)
centered_count = 0
continue
# Calculate pixel error from image center
center_px = target["center_px"]
img_cx, img_cy = 320, 240
error_x = center_px[0] - img_cx # positive = marker is right
error_y = center_px[1] - img_cy # positive = marker is below
print(f"\r[SEARCH] Centering: err=({error_x:.0f},{error_y:.0f})px "
f"dist={target['distance']:.2f}m ({center_attempts}/{max_attempts}) ",
end='', flush=True)
# Check if centered enough (within 30px of center)
if abs(error_x) < 30 and abs(error_y) < 30:
centered_count += 1
if centered_count >= 3:
print(f"\n[SEARCH] Centered over marker!")
break
else:
centered_count = 0
# Send small position corrections
# Convert pixel error to meters (rough: at 5m alt, 640px ~ 8m FOV)
meters_per_px = (self.altitude * 0.0025)
correction_y = error_x * meters_per_px # pixel right -> NED east (+y)
correction_x = error_y * meters_per_px # pixel down -> NED north (+x)
self.ctrl.update_state()
if self.ctrl.altitude < 0.3:
print(f"\n[SEARCH] Touchdown detected!")
self.landed = True
self.running = False
return
cur = self.ctrl.get_local_position()
new_x = cur['x'] + correction_x * 0.5 # dampen corrections
new_y = cur['y'] + correction_y * 0.5
self.ctrl.move_local_ned(new_x, new_y, -self.altitude)
sleep(0.1)
sleep(0.3)
if not self.landed:
print(f"\n[SEARCH] Landing approach timed out, landing at current position")
# Phase 3: Land
print(f"\n[SEARCH] Landing on target!")
self.ctrl.land()
self.landed = True
self.running = False
@@ -186,6 +237,26 @@ class Search:
print()
return False
def _wait_arrival(self, target_x, target_y, timeout=15.0):
"""Wait for position without checking markers (used during landing)."""
t0 = time.time()
while time.time() - t0 < timeout and self.running:
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
dist = distance_2d(
(pos['x'], pos['y']),
(target_x, target_y)
)
elapsed = int(time.time() - t0)
print(f"\r[SEARCH] Approaching marker: {dist:.1f}m ({elapsed}s) ",
end='', flush=True)
if dist < self.POSITION_TOLERANCE:
print()
return True
sleep(self.CHECK_INTERVAL)
print()
return False
def move_to_local(self, x, y):
z = -self.altitude
self.ctrl.move_local_ned(x, y, z)

View File

@@ -14,6 +14,7 @@ from control.uav_controller import Controller
from control.search import Search
from vision.object_detector import ObjectDetector
from vision.camera_processor import CameraProcessor
from navigation.flight_tracker import FlightTracker
PROJECT_DIR = Path(__file__).resolve().parent.parent
CONFIG_DIR = PROJECT_DIR / "config"
@@ -123,6 +124,30 @@ def main():
ctrl.takeoff(altitude)
ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30)
# Start flight tracker
tracker = FlightTracker(
window_size=600,
world_range=30.0,
ugv_position=(5.0, 5.0),
geofence=search_cfg.get('geofence'),
)
tracker.start()
# Feed position updates to tracker during search
original_check = search.check_for_markers
def tracked_check():
result = original_check()
ctrl.update_state()
pos = ctrl.get_local_position()
tracker.update_uav(pos['x'], pos['y'],
altitude=ctrl.altitude,
heading=ctrl.current_yaw)
for mid, info in search.found_markers.items():
p = info['uav_position']
tracker.add_marker(mid, p['x'], p['y'])
return result
search.check_for_markers = tracked_check
results = search.run()
search_results = search.get_results()
@@ -145,6 +170,7 @@ def main():
else:
wait_for_landing(ctrl)
tracker.stop()
print("[MAIN] Done.")

View File

@@ -0,0 +1,508 @@
#!/usr/bin/env python3
"""
Flight Tracker - Real-time 2D visualization of UAV and UGV positions.
Opens a separate OpenCV window showing:
- Grid with coordinates
- UAV position (green square) with flight path trail
- UGV position (red square)
- Geofence boundary (yellow dashed polygon with warning zone)
- Position data overlay
Can run standalone or be imported and updated by main.py.
"""
import sys
import os
import time
import math
import threading
import numpy as np
import cv2
from collections import deque
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
class FlightTracker:
# Colors (BGR)
BG_COLOR = (30, 30, 30)
GRID_COLOR = (60, 60, 60)
GRID_MAJOR_COLOR = (80, 80, 80)
AXIS_COLOR = (120, 120, 120)
UAV_COLOR = (0, 220, 100)
UAV_TRAIL_COLOR = (0, 180, 80)
UGV_COLOR = (60, 60, 255)
TEXT_COLOR = (200, 200, 200)
TEXT_DIM_COLOR = (120, 120, 120)
MARKER_COLOR = (180, 0, 220)
FENCE_COLOR = (0, 220, 255) # Yellow-cyan
FENCE_WARN_COLOR = (0, 140, 200) # Darker yellow
FENCE_BREACH_COLOR = (0, 0, 255) # Red
def __init__(self, window_size=600, world_range=30.0,
ugv_position=(5.0, 5.0), show_ugv=True,
geofence=None):
"""
Args:
window_size: Window dimension in pixels (square)
world_range: World coordinate range (-range to +range) in meters
ugv_position: Known UGV position in local NED (x, y)
show_ugv: Whether to draw the UGV
geofence: Dict with 'points' (list of [x,y]), 'warning_distance',
'min_altitude', 'max_altitude'
"""
self.window_size = window_size
self.world_range = world_range
self.ugv_pos = ugv_position
self.show_ugv = show_ugv
self.uav_pos = (0.0, 0.0)
self.uav_alt = 0.0
self.uav_heading = 0.0
self.trail = deque(maxlen=5000)
self.markers_found = {}
# Geofence
self.geofence = geofence
self.fence_points = []
self.fence_warn_dist = 3.0
self.fence_alt_min = 0.0
self.fence_alt_max = 50.0
if geofence and geofence.get('enabled', True):
self.fence_points = [tuple(p) for p in geofence.get('points', [])]
self.fence_warn_dist = geofence.get('warning_distance', 3.0)
self.fence_alt_min = geofence.get('min_altitude', 0.0)
self.fence_alt_max = geofence.get('max_altitude', 50.0)
self.running = False
self._thread = None
self._lock = threading.Lock()
self.start_time = time.time()
self.panel_width = 200
self.total_width = self.window_size + self.panel_width
def world_to_pixel(self, x, y):
"""Convert NED world coords to pixel coords on the map.
NED: x=North(up on screen), y=East(right on screen)"""
px = int(self.window_size / 2 + (y / self.world_range) * (self.window_size / 2))
py = int(self.window_size / 2 - (x / self.world_range) * (self.window_size / 2))
return (px, py)
def update_uav(self, x, y, altitude=0.0, heading=0.0):
"""Update UAV position (NED coords)."""
with self._lock:
self.uav_pos = (x, y)
self.uav_alt = altitude
self.uav_heading = heading
self.trail.append((x, y))
def update_ugv(self, x, y):
"""Update UGV position."""
with self._lock:
self.ugv_pos = (x, y)
def add_marker(self, marker_id, x, y):
"""Record a detected marker position."""
with self._lock:
self.markers_found[marker_id] = (x, y)
def _point_in_polygon(self, px, py, polygon):
"""Simple ray-casting point-in-polygon test."""
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i]
xj, yj = polygon[j]
if ((yi > py) != (yj > py)) and (px < (xj - xi) * (py - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
def _distance_to_polygon_edge(self, px, py, polygon):
"""Minimum distance from point to polygon edges."""
min_dist = float('inf')
n = len(polygon)
for i in range(n):
x1, y1 = polygon[i]
x2, y2 = polygon[(i + 1) % n]
# Point-to-segment distance
dx, dy = x2 - x1, y2 - y1
length_sq = dx * dx + dy * dy
if length_sq == 0:
dist = math.sqrt((px - x1) ** 2 + (py - y1) ** 2)
else:
t = max(0, min(1, ((px - x1) * dx + (py - y1) * dy) / length_sq))
proj_x = x1 + t * dx
proj_y = y1 + t * dy
dist = math.sqrt((px - proj_x) ** 2 + (py - proj_y) ** 2)
min_dist = min(min_dist, dist)
return min_dist
def draw(self):
"""Render the full tracker frame."""
frame = np.full((self.window_size, self.total_width, 3),
self.BG_COLOR, dtype=np.uint8)
self._draw_grid(frame)
# Draw geofence before other elements
if self.fence_points:
self._draw_geofence(frame)
with self._lock:
trail_copy = list(self.trail)
uav = self.uav_pos
uav_alt = self.uav_alt
uav_hdg = self.uav_heading
ugv = self.ugv_pos
markers = dict(self.markers_found)
self._draw_trail(frame, trail_copy)
if self.show_ugv:
self._draw_ugv(frame, ugv)
for mid, (mx, my) in markers.items():
px, py = self.world_to_pixel(mx, my)
cv2.drawMarker(frame, (px, py), self.MARKER_COLOR,
cv2.MARKER_DIAMOND, 12, 2)
cv2.putText(frame, f"ID:{mid}", (px + 8, py - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.MARKER_COLOR, 1)
self._draw_uav(frame, uav, uav_hdg)
self._draw_panel(frame, uav, uav_alt, uav_hdg, ugv, trail_copy)
return frame
def _draw_grid(self, frame):
"""Draw coordinate grid."""
step = 5.0
r = self.world_range
n = int(r / step)
for i in range(-n, n + 1):
val = i * step
color = self.GRID_MAJOR_COLOR if i % 2 == 0 else self.GRID_COLOR
# Vertical lines (constant y)
px, _ = self.world_to_pixel(0, val)
cv2.line(frame, (px, 0), (px, self.window_size), color, 1)
# Horizontal lines (constant x)
_, py = self.world_to_pixel(val, 0)
cv2.line(frame, (0, py), (self.window_size, py), color, 1)
# Axis labels
if i != 0 and i % 2 == 0:
cv2.putText(frame, f"{val:.0f}",
(px + 2, self.window_size - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.3, self.TEXT_DIM_COLOR, 1)
cv2.putText(frame, f"{val:.0f}",
(5, py - 3),
cv2.FONT_HERSHEY_SIMPLEX, 0.3, self.TEXT_DIM_COLOR, 1)
# Draw axes (origin cross)
cx, cy = self.world_to_pixel(0, 0)
cv2.line(frame, (cx, 0), (cx, self.window_size), self.AXIS_COLOR, 1)
cv2.line(frame, (0, cy), (self.window_size, cy), self.AXIS_COLOR, 1)
# Axis labels at origin
cv2.putText(frame, "N", (cx + 4, 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.AXIS_COLOR, 1)
cv2.putText(frame, "E", (self.window_size - 15, cy - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.AXIS_COLOR, 1)
def _draw_geofence(self, frame):
"""Draw geofence boundary with warning zone and fill."""
pts = [self.fence_points[i] for i in range(len(self.fence_points))]
pixel_pts = np.array([self.world_to_pixel(x, y) for x, y in pts], dtype=np.int32)
# Semi-transparent fill inside the geofence
overlay = frame.copy()
cv2.fillPoly(overlay, [pixel_pts], (40, 50, 40))
cv2.addWeighted(overlay, 0.3, frame, 0.7, 0, frame)
# Draw warning zone (inner boundary)
if self.fence_warn_dist > 0:
# Inset the polygon by warning_distance to show warning zone
# Simple approach: draw dashed inner boundary
warn_pts = []
cx = sum(p[0] for p in pts) / len(pts)
cy = sum(p[1] for p in pts) / len(pts)
for x, y in pts:
dx = x - cx
dy = y - cy
length = math.sqrt(dx * dx + dy * dy)
if length > 0:
shrink = self.fence_warn_dist / length
warn_pts.append((x - dx * shrink, y - dy * shrink))
else:
warn_pts.append((x, y))
warn_pixel_pts = np.array(
[self.world_to_pixel(x, y) for x, y in warn_pts], dtype=np.int32
)
self._draw_dashed_polygon(frame, warn_pixel_pts, self.FENCE_WARN_COLOR, 1, 8)
# Draw main fence boundary (solid bright line)
cv2.polylines(frame, [pixel_pts], True, self.FENCE_COLOR, 2)
# Corner markers
for px, py in pixel_pts:
cv2.circle(frame, (int(px), int(py)), 4, self.FENCE_COLOR, -1)
# Label
top_px, top_py = pixel_pts[0]
cv2.putText(frame, "GEOFENCE", (int(top_px) + 6, int(top_py) - 8),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.FENCE_COLOR, 1)
def _draw_dashed_polygon(self, frame, pts, color, thickness, dash_len):
"""Draw a polygon with dashed lines."""
n = len(pts)
for i in range(n):
p1 = pts[i]
p2 = pts[(i + 1) % n]
self._draw_dashed_line(frame, tuple(p1), tuple(p2), color, thickness, dash_len)
def _draw_dashed_line(self, frame, pt1, pt2, color, thickness, dash_len):
"""Draw a dashed line between two points."""
x1, y1 = pt1
x2, y2 = pt2
dx = x2 - x1
dy = y2 - y1
length = math.sqrt(dx * dx + dy * dy)
if length == 0:
return
dx /= length
dy /= length
num_dashes = int(length / (dash_len * 2))
for i in range(num_dashes + 1):
start = i * dash_len * 2
end = min(start + dash_len, length)
sx = int(x1 + dx * start)
sy = int(y1 + dy * start)
ex = int(x1 + dx * end)
ey = int(y1 + dy * end)
cv2.line(frame, (sx, sy), (ex, ey), color, thickness)
def _draw_trail(self, frame, trail):
"""Draw the flight path trail."""
if len(trail) < 2:
return
pts = [self.world_to_pixel(x, y) for x, y in trail]
for i in range(1, len(pts)):
alpha = i / len(pts)
color = (
int(self.UAV_TRAIL_COLOR[0] * alpha),
int(self.UAV_TRAIL_COLOR[1] * alpha),
int(self.UAV_TRAIL_COLOR[2] * alpha),
)
cv2.line(frame, pts[i - 1], pts[i], color, 2)
def _draw_uav(self, frame, pos, heading):
"""Draw UAV as a green square with heading indicator."""
px, py = self.world_to_pixel(pos[0], pos[1])
size = 8
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size),
self.UAV_COLOR, -1)
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size),
(255, 255, 255), 1)
# Heading line
hx = int(px + 15 * math.sin(heading))
hy = int(py - 15 * math.cos(heading))
cv2.line(frame, (px, py), (hx, hy), self.UAV_COLOR, 2)
# Label
cv2.putText(frame, "UAV", (px + 12, py - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.UAV_COLOR, 1)
def _draw_ugv(self, frame, pos):
"""Draw UGV as a red square."""
px, py = self.world_to_pixel(pos[0], pos[1])
size = 10
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size),
self.UGV_COLOR, -1)
cv2.rectangle(frame, (px - size, py - size), (px + size, py + size),
(255, 255, 255), 1)
cv2.putText(frame, "UGV", (px + 14, py - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.UGV_COLOR, 1)
def _draw_panel(self, frame, uav, alt, hdg, ugv, trail):
"""Draw the data panel on the right side."""
x0 = self.window_size + 10
y = 25
# Separator line
cv2.line(frame, (self.window_size, 0), (self.window_size, self.window_size),
self.GRID_MAJOR_COLOR, 1)
# Title
cv2.putText(frame, "FLIGHT TRACKER", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
y += 30
# Elapsed time
elapsed = time.time() - self.start_time
mins = int(elapsed // 60)
secs = int(elapsed % 60)
cv2.putText(frame, f"Time: {mins:02d}:{secs:02d}", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TEXT_DIM_COLOR, 1)
y += 30
# UAV section
cv2.putText(frame, "UAV", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.UAV_COLOR, 1)
y += 22
cv2.putText(frame, f"X: {uav[0]:+.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
cv2.putText(frame, f"Y: {uav[1]:+.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
cv2.putText(frame, f"Alt: {alt:.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
cv2.putText(frame, f"Hdg: {math.degrees(hdg):.0f} deg", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 30
# UGV section
if self.show_ugv:
cv2.putText(frame, "UGV", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.UGV_COLOR, 1)
y += 22
cv2.putText(frame, f"X: {ugv[0]:+.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
cv2.putText(frame, f"Y: {ugv[1]:+.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
dist = np.sqrt((uav[0] - ugv[0])**2 + (uav[1] - ugv[1])**2)
cv2.putText(frame, f"Dist: {dist:.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 30
# Geofence section
if self.fence_points:
# Check fence status
inside = self._point_in_polygon(uav[0], uav[1], self.fence_points)
alt_ok = self.fence_alt_min <= alt <= self.fence_alt_max
fence_dist = self._distance_to_polygon_edge(uav[0], uav[1], self.fence_points)
if not inside or not alt_ok:
status = "BREACHED"
status_color = self.FENCE_BREACH_COLOR
elif fence_dist < self.fence_warn_dist:
status = "WARNING"
status_color = self.FENCE_WARN_COLOR
else:
status = "OK"
status_color = self.FENCE_COLOR
cv2.putText(frame, "GEOFENCE", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.FENCE_COLOR, 1)
y += 22
cv2.putText(frame, f"Status: {status}", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, status_color, 1)
y += 18
cv2.putText(frame, f"Edge: {fence_dist:.1f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 18
cv2.putText(frame, f"Alt lim: {self.fence_alt_min:.0f}-{self.fence_alt_max:.0f}m", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.38, self.TEXT_COLOR, 1)
y += 30
# Markers section
cv2.putText(frame, "MARKERS", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.MARKER_COLOR, 1)
y += 22
if self.markers_found:
for mid, (mx, my) in self.markers_found.items():
cv2.putText(frame, f"ID:{mid} ({mx:.1f},{my:.1f})", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_COLOR, 1)
y += 16
else:
cv2.putText(frame, "None", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_DIM_COLOR, 1)
y += 16
y += 20
cv2.putText(frame, f"Trail: {len(trail)} pts", (x0, y),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, self.TEXT_DIM_COLOR, 1)
def start(self):
"""Start the display loop in a background thread."""
self.running = True
self._thread = threading.Thread(target=self._display_loop, daemon=True)
self._thread.start()
def _display_loop(self):
"""OpenCV display loop (runs in its own thread)."""
print("[TRACKER] Flight tracker window started")
while self.running:
frame = self.draw()
cv2.imshow("Flight Tracker", frame)
key = cv2.waitKey(50) & 0xFF
if key == ord('q'):
break
cv2.destroyWindow("Flight Tracker")
print("[TRACKER] Flight tracker stopped")
def stop(self):
"""Stop the display loop."""
self.running = False
if self._thread:
self._thread.join(timeout=2.0)
def main():
"""Standalone test mode - simulates a spiral flight path with geofence."""
geofence = {
'enabled': True,
'warning_distance': 3.0,
'min_altitude': 0.0,
'max_altitude': 15.0,
'points': [[-5, -5], [-5, 25], [25, 25], [25, -5]],
}
tracker = FlightTracker(
window_size=600,
world_range=30.0,
ugv_position=(5.0, 5.0),
geofence=geofence,
)
print("Flight Tracker - Standalone Test (with geofence)")
print("Press 'q' to quit")
x, y = 0.0, 0.0
t = 0.0
while True:
t += 0.05
radius = t * 0.5
x = radius * math.cos(t)
y = radius * math.sin(t)
heading = math.atan2(math.cos(t), -math.sin(t))
tracker.update_uav(x, y, altitude=5.0, heading=heading)
if t > 5 and 0 not in tracker.markers_found:
tracker.add_marker(0, 5.0, 5.0)
frame = tracker.draw()
cv2.imshow("Flight Tracker", frame)
key = cv2.waitKey(30) & 0xFF
if key == ord('q'):
break
cv2.destroyAllWindows()
if __name__ == "__main__":
main()

View File

@@ -24,6 +24,7 @@ class CameraProcessor:
self.node = Node()
self.show_gui = show_gui
self.frames = {}
self.raw_frames = {}
self.callbacks = {}
self.running = True
@@ -68,6 +69,7 @@ class CameraProcessor:
return
processed = self.process_image(bgr)
self.raw_frames[name] = processed.copy()
self.frames[name] = processed
for fn in self.callbacks.get(name, []):
@@ -143,6 +145,10 @@ def main():
proc = CameraProcessor(topics=topics, show_gui=show_gui)
try:
import os
src_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if src_dir not in sys.path:
sys.path.insert(0, src_dir)
from vision.object_detector import ObjectDetector
detector = ObjectDetector(aruco_dict_name="DICT_4X4_50")

View File

@@ -47,12 +47,36 @@ class ObjectDetector:
)
self.dist_coeffs = np.zeros(5)
dict_id = ARUCO_DICT.get(aruco_dict_name, cv2.aruco.DICT_6X6_250)
dict_id = ARUCO_DICT.get(aruco_dict_name, cv2.aruco.DICT_4X4_50)
self.aruco_dict = cv2.aruco.getPredefinedDictionary(dict_id)
# Support both old (<4.7) and new (>=4.7) OpenCV ArUco API
try:
self.aruco_params = cv2.aruco.DetectorParameters()
self._new_api = True
except AttributeError:
self.aruco_params = cv2.aruco.DetectorParameters_create()
self._new_api = False
self.aruco_params.minMarkerPerimeterRate = 0.01
self.aruco_params.maxMarkerPerimeterRate = 4.0
self.aruco_params.adaptiveThreshWinSizeMin = 3
self.aruco_params.adaptiveThreshWinSizeMax = 30
self.aruco_params.adaptiveThreshWinSizeStep = 5
self.aruco_params.adaptiveThreshConstant = 7
self.aruco_params.minCornerDistanceRate = 0.01
try:
self.aruco_params.cornerRefinementMethod = cv2.aruco.CORNER_REFINE_SUBPIX
except AttributeError:
pass
self.aruco_params.cornerRefinementWinSize = 5
if self._new_api:
self.aruco_detector = cv2.aruco.ArucoDetector(
self.aruco_dict, self.aruco_params
)
else:
self.aruco_detector = None
self.found_markers = {}
self.on_detection = None
@@ -68,7 +92,12 @@ class ObjectDetector:
else:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if self._new_api:
corners, ids, rejected = self.aruco_detector.detectMarkers(gray)
else:
corners, ids, rejected = cv2.aruco.detectMarkers(
gray, self.aruco_dict, parameters=self.aruco_params
)
detections = []
if ids is None:
@@ -108,21 +137,30 @@ class ObjectDetector:
if not detections:
return annotated
overlay = annotated.copy()
for d in detections:
corners_arr = np.array(d["corners"], dtype=np.int32)
for j in range(4):
pt1 = tuple(corners_arr[j])
pt2 = tuple(corners_arr[(j + 1) % 4])
cv2.line(annotated, pt1, pt2, (0, 255, 0), 2)
# Fill marker area with purple
cv2.fillPoly(overlay, [corners_arr], (180, 0, 220))
# Draw purple border
cv2.polylines(annotated, [corners_arr], True, (180, 0, 220), 3)
# Center dot
cx = int(d["center_px"][0])
cy = int(d["center_px"][1])
cv2.circle(annotated, (cx, cy), 4, (0, 0, 255), -1)
cv2.circle(annotated, (cx, cy), 5, (255, 255, 255), -1)
# Label with ID and distance
label = f"ID:{d['id']} d:{d['distance']:.2f}m"
cv2.putText(annotated, label,
(int(corners_arr[0][0]), int(corners_arr[0][1]) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (180, 0, 220), 2)
# Blend overlay at 40% opacity
cv2.addWeighted(overlay, 0.4, annotated, 0.6, 0, annotated)
return annotated

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.0 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 12 KiB

BIN
worlds/tags/land_tag.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.0 KiB

View File

@@ -162,30 +162,30 @@
<!-- Main body (blue box) -->
<link name="base_link">
<pose>0 0 0.15 0 0 0</pose>
<pose>0 0 0.2 0 0 0</pose>
<visual name="body">
<geometry><box><size>0.6 0.4 0.2</size></box></geometry>
<geometry><box><size>1.0 0.7 0.3</size></box></geometry>
<material>
<ambient>0.2 0.2 0.7 1</ambient>
<diffuse>0.2 0.2 0.8 1</diffuse>
</material>
</visual>
<collision name="body_collision">
<geometry><box><size>0.6 0.4 0.2</size></box></geometry>
<geometry><box><size>1.0 0.7 0.3</size></box></geometry>
</collision>
</link>
<!-- ArUco landing tag on top -->
<link name="aruco_tag">
<pose>0 0 0.26 0 0 0</pose>
<pose>0 0 0.36 0 0 0</pose>
<visual name="aruco_visual">
<geometry><box><size>0.4 0.4 0.005</size></box></geometry>
<geometry><box><size>0.5 0.5 0.005</size></box></geometry>
<material>
<ambient>1 1 1 1</ambient>
<diffuse>1 1 1 1</diffuse>
<pbr>
<metal>
<albedo_map>tags/land_tag.jpg</albedo_map>
<albedo_map>tags/land_tag.png</albedo_map>
</metal>
</pbr>
</material>
@@ -194,26 +194,26 @@
<!-- Wheels (visual only since static) -->
<link name="wheel_fl">
<pose>0.2 0.22 0.06 1.5708 0 0</pose>
<visual name="v"><geometry><cylinder><radius>0.06</radius><length>0.04</length></cylinder></geometry>
<pose>0.35 0.38 0.1 1.5708 0 0</pose>
<visual name="v"><geometry><cylinder><radius>0.1</radius><length>0.06</length></cylinder></geometry>
<material><ambient>0.15 0.15 0.15 1</ambient><diffuse>0.15 0.15 0.15 1</diffuse></material>
</visual>
</link>
<link name="wheel_fr">
<pose>0.2 -0.22 0.06 1.5708 0 0</pose>
<visual name="v"><geometry><cylinder><radius>0.06</radius><length>0.04</length></cylinder></geometry>
<pose>0.35 -0.38 0.1 1.5708 0 0</pose>
<visual name="v"><geometry><cylinder><radius>0.1</radius><length>0.06</length></cylinder></geometry>
<material><ambient>0.15 0.15 0.15 1</ambient><diffuse>0.15 0.15 0.15 1</diffuse></material>
</visual>
</link>
<link name="wheel_rl">
<pose>-0.2 0.22 0.06 1.5708 0 0</pose>
<visual name="v"><geometry><cylinder><radius>0.06</radius><length>0.04</length></cylinder></geometry>
<pose>-0.35 0.38 0.1 1.5708 0 0</pose>
<visual name="v"><geometry><cylinder><radius>0.1</radius><length>0.06</length></cylinder></geometry>
<material><ambient>0.15 0.15 0.15 1</ambient><diffuse>0.15 0.15 0.15 1</diffuse></material>
</visual>
</link>
<link name="wheel_rr">
<pose>-0.2 -0.22 0.06 1.5708 0 0</pose>
<visual name="v"><geometry><cylinder><radius>0.06</radius><length>0.04</length></cylinder></geometry>
<pose>-0.35 -0.38 0.1 1.5708 0 0</pose>
<visual name="v"><geometry><cylinder><radius>0.1</radius><length>0.06</length></cylinder></geometry>
<material><ambient>0.15 0.15 0.15 1</ambient><diffuse>0.15 0.15 0.15 1</diffuse></material>
</visual>
</link>