Preflight Planner Update

This commit is contained in:
2026-02-13 16:48:14 -05:00
parent c7e9f81f55
commit 42e4fa28a9
10 changed files with 781 additions and 403 deletions

3
.gitignore vendored
View File

@@ -72,3 +72,6 @@ htmlcov/
wsl_env.sh
activate_venv.sh
# Simulation results
results/

View File

@@ -1,36 +0,0 @@
simulation:
world: uav_ugv_search.sdf
software_render: auto
physics:
max_step_size: 0.002
real_time_factor: 1.0
sensor_noise:
enabled: true
position_stddev: 0.05
velocity_stddev: 0.1
gyro_stddev: 0.0002
accel_stddev: 0.017
gps:
enabled: true
update_rate: 5
noise_stddev: 2.0
purpose: geofence_only
initial_positions:
uav:
x: 0.0
y: 0.0
z: 0.195
yaw: 90.0
ugv:
x: 5.0
y: 5.0
z: 0.0
yaw: 0.0
logging:
enabled: true
log_directory: logs

View File

@@ -1,5 +1,16 @@
altitude: 5.0
# ─── Mission / Search Configuration ──────────────────────────
# Single source of truth for all mission parameters.
# ── Flight ───────────────────────────────────────────────────
altitude: 4.0 # Search altitude (meters)
# ── ArUco Marker ─────────────────────────────────────────────
marker:
dictionary: DICT_4X4_50
size: 0.5 # Physical marker size in meters
landing_ids: [0] # Marker IDs that trigger landing
# ── Search Patterns ──────────────────────────────────────────
spiral:
max_legs: 12
initial_leg: 4.0
@@ -15,18 +26,16 @@ levy:
max_steps: 20
field_size: 50.0
actions:
land:
- 0
# ── Geofence ─────────────────────────────────────────────────
geofence:
enabled: true
warning_distance: 3.0
min_altitude: 0.0
max_altitude: 15.0
# Polygon vertices in local NED (x=North, y=East) meters
# Centered on UAV start position (0, 0)
points:
- [-5, -5]
- [-5, 25]
- [25, 25]
- [25, -5]
- [-15, -15]
- [-15, 15]
- [15, 15]
- [15, -15]

View File

@@ -1,74 +1,7 @@
# ─── UAV Configuration ───────────────────────────────────────
# UAV-specific settings. Mission config is in search.yaml.
connection:
sim: "tcp:127.0.0.1:5760"
real: "/dev/ttyAMA0"
baud: 57600
flight:
takeoff_altitude: 5.0
max_altitude: 15.0
min_altitude: 3.0
max_velocity: 2.0
max_acceleration: 1.0
max_climb_rate: 1.0
max_yaw_rate: 45.0
navigation:
frame: LOCAL_NED
waypoint_radius: 0.5
position_hold_radius: 0.2
home_mode: local
vision:
forward_camera:
enabled: true
frame_rate: 30
resolution: [640, 480]
downward_camera:
enabled: true
frame_rate: 30
resolution: [320, 240]
visual_odometry:
enabled: true
method: ORB
min_features: 100
max_features: 500
optical_flow:
enabled: true
method: Lucas-Kanade
window_size: 15
min_altitude: 0.3
max_altitude: 10.0
landmark_detection:
enabled: true
method: ArUco
marker_size: 0.15
position_estimation:
method: EKF
update_rate: 50
weights:
visual_odometry: 0.6
optical_flow: 0.3
imu: 0.1
process_noise:
position: 0.1
velocity: 0.5
measurement_noise:
visual_odom: 0.05
optical_flow: 0.1
imu: 0.2
obstacle_avoidance:
enabled: true
detection_range: 5.0
safety_margin: 1.0
safety:
geofence:
enabled: true
action_on_breach: RTL
max_altitude: 10
radius: 20
failsafe:
vision_loss_timeout: 5.0
action_on_vision_loss: HOLD

View File

@@ -1,46 +1,8 @@
vehicle:
wheelbase: 0.3
track_width: 0.25
wheel_radius: 0.05
connection:
sim: null
real: null
# ─── UGV Configuration ───────────────────────────────────────
# UGV-specific settings. Mission config is in search.yaml.
# UGV spawn position in local NED (meters)
# Also used by run_autonomous.sh to place the UGV model in Gazebo
position:
x: 5.0
y: 5.0
z: 0.0
yaw: 0.0
navigation:
max_linear_velocity: 1.0
max_angular_velocity: 1.5
linear_acceleration: 0.5
angular_acceleration: 1.0
waypoint_radius: 0.3
position_tolerance: 0.3
landing_pad:
marker_type: ArUco
marker_id: 0
marker_size: 0.3
pad_diameter: 1.0
color: [255, 255, 0]
vision:
detection_method: ArUco
camera:
enabled: true
frame_rate: 30
resolution: [320, 240]
safety:
geofence:
enabled: true
action_on_breach: STOP
failsafe:
vision_loss_timeout: 3.0
action_on_vision_loss: STOP
collision_distance: 0.2
action_on_collision: STOP

View File

@@ -91,7 +91,7 @@ def get_landing_id():
if search_cfg.exists():
with open(search_cfg) as f:
cfg = yaml.safe_load(f) or {}
land_ids = cfg.get("actions", {}).get("land", [])
land_ids = cfg.get("marker", {}).get("landing_ids", [])
if land_ids:
return land_ids[0]
return 0

View File

@@ -9,7 +9,6 @@ from enum import Enum
from control.uav_controller import Controller
from vision.object_detector import ObjectDetector
from vision.visual_servoing import VisualServoing
from utils.helpers import distance_2d
@@ -42,11 +41,7 @@ class Search:
self.actions = actions or {}
self.land_ids = set(self.actions.get("land", []))
self.servoing = None
if self.land_ids:
target_id = list(self.land_ids)[0]
self.servoing = VisualServoing(ctrl, target_marker_id=target_id)
# Search pattern parameters
self.spiral_max_legs = 12
self.spiral_initial_leg = self.CAM_FOV_METERS
self.spiral_leg_increment = self.CAM_FOV_METERS * 0.8
@@ -60,24 +55,236 @@ class Search:
self.levy_field_size = 50.0
self.levy_angle_dist = uniform(loc=-180, scale=360)
# Pre-flight plan
self.waypoints = []
self.current_wp = 0
self._on_waypoint_change = None # callback: fn(index)
def configure(self, **kwargs):
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
# ═══════════════════════════════════════════════════════════
# PRE-FLIGHT PLANNING
# ═══════════════════════════════════════════════════════════
def plan(self, start_pos=(0.0, 0.0), geofence_points=None):
"""Pre-compute all waypoints for the search pattern.
Args:
start_pos: (x, y) starting position in local NED
geofence_points: list of (x, y) polygon vertices, or None
Returns:
List of (x, y) absolute waypoints
"""
if self.mode == SearchMode.SPIRAL:
waypoints = self._plan_spiral(start_pos)
elif self.mode == SearchMode.LAWNMOWER:
waypoints = self._plan_lawnmower(start_pos)
elif self.mode == SearchMode.LEVY:
waypoints = self._plan_levy(start_pos)
else:
waypoints = [start_pos]
# Clip to geofence
if geofence_points:
waypoints = self._clip_to_geofence(waypoints, geofence_points)
self.waypoints = waypoints
self.current_wp = 0
print(f"\n[PLAN] {self.mode.value.upper()} flight plan: "
f"{len(waypoints)} waypoints")
for i, (wx, wy) in enumerate(waypoints):
print(f" WP{i:02d}: ({wx:+.1f}, {wy:+.1f})")
if geofence_points:
print(f"[PLAN] Clipped to geofence ({len(geofence_points)} vertices)")
return waypoints
def _plan_spiral(self, start):
"""Expanding square spiral from center outward."""
waypoints = [start]
x, y = start
directions = [
(0, 1), # East (+Y)
(1, 0), # North (+X)
(0, -1), # West (-Y)
(-1, 0), # South (-X)
]
distance = self.spiral_initial_leg
dir_idx = 0
legs_at_this_dist = 0
for _ in range(self.spiral_max_legs):
dx_dir, dy_dir = directions[dir_idx % 4]
x += dx_dir * distance
y += dy_dir * distance
waypoints.append((x, y))
dir_idx += 1
legs_at_this_dist += 1
if legs_at_this_dist >= 2:
legs_at_this_dist = 0
distance += self.spiral_leg_increment
return waypoints
def _plan_lawnmower(self, start):
"""Lawnmower (boustrophedon) pattern."""
waypoints = [start]
sx, sy = start
lane_spacing = self.lawn_lane_spacing
height = self.lawn_height
num_lanes = max(1, int(self.lawn_width / lane_spacing))
for lane in range(num_lanes):
lane_x = sx + lane * lane_spacing
if lane % 2 == 0:
target_y = sy + height
else:
target_y = sy
waypoints.append((lane_x, target_y))
if lane < num_lanes - 1:
next_x = sx + (lane + 1) * lane_spacing
waypoints.append((next_x, target_y))
return waypoints
def _plan_levy(self, start):
"""Lévy walk — random but pre-computed."""
waypoints = [start]
x, y = start
for _ in range(self.levy_max_steps):
angle_deg = self.levy_angle_dist.rvs()
angle_rad = math.radians(angle_deg)
raw_distance = levy.rvs(loc=1, scale=1)
distance = min(max(raw_distance, 1.0), self.levy_field_size)
x += distance * math.cos(angle_rad)
y += distance * math.sin(angle_rad)
waypoints.append((x, y))
return waypoints
def _clip_to_geofence(self, waypoints, polygon):
"""Clip waypoints to stay within the geofence polygon.
If a waypoint is outside, it is projected to the nearest
point on the polygon boundary. Consecutive out-of-bounds
waypoints are collapsed.
"""
clipped = []
for wx, wy in waypoints:
if self._point_in_polygon(wx, wy, polygon):
clipped.append((wx, wy))
else:
nearest = self._nearest_point_on_polygon(wx, wy, polygon)
# Avoid duplicate consecutive points
if not clipped or distance_2d(clipped[-1], nearest) > 0.5:
clipped.append(nearest)
return clipped
@staticmethod
def _point_in_polygon(px, py, polygon):
"""Ray-casting point-in-polygon test."""
n = len(polygon)
inside = False
j = n - 1
for i in range(n):
xi, yi = polygon[i]
xj, yj = polygon[j]
if ((yi > py) != (yj > py)) and \
(px < (xj - xi) * (py - yi) / (yj - yi) + xi):
inside = not inside
j = i
return inside
@staticmethod
def _nearest_point_on_polygon(px, py, polygon):
"""Find the nearest point on the polygon edge to (px, py)."""
min_dist = float('inf')
nearest = (px, py)
n = len(polygon)
for i in range(n):
x1, y1 = polygon[i]
x2, y2 = polygon[(i + 1) % n]
# Project point onto segment
dx, dy = x2 - x1, y2 - y1
length_sq = dx * dx + dy * dy
if length_sq == 0:
proj = (x1, y1)
else:
t = max(0, min(1, ((px - x1) * dx + (py - y1) * dy) / length_sq))
proj = (x1 + t * dx, y1 + t * dy)
dist = math.sqrt((px - proj[0])**2 + (py - proj[1])**2)
if dist < min_dist:
min_dist = dist
nearest = proj
return nearest
# ═══════════════════════════════════════════════════════════
# EXECUTION
# ═══════════════════════════════════════════════════════════
def run(self):
"""Execute the pre-planned search by following waypoints."""
if not self.waypoints:
self.plan()
n = len(self.waypoints)
print(f"\n{'=' * 50}")
print(f" SEARCH: {self.mode.value.upper()} at {self.altitude}m")
print(f" Waypoints: {n}")
if self.land_ids:
print(f" Landing targets: {self.land_ids}")
print(f"{'=' * 50}\n")
if self.mode == SearchMode.SPIRAL:
return self.spiral()
elif self.mode == SearchMode.LAWNMOWER:
return self.lawnmower()
elif self.mode == SearchMode.LEVY:
return self.levy_walk()
for i, (wx, wy) in enumerate(self.waypoints):
if not self.running:
break
self.current_wp = i
if self._on_waypoint_change:
self._on_waypoint_change(i)
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
dist = distance_2d((pos['x'], pos['y']), (wx, wy))
dir_label = ""
if i > 0:
prev = self.waypoints[i - 1]
dx = wx - prev[0]
dy = wy - prev[1]
if abs(dx) > abs(dy):
dir_label = "N" if dx > 0 else "S"
else:
dir_label = "E" if dy > 0 else "W"
print(f"[SEARCH] WP {i + 1}/{n} "
f"→ ({wx:+.1f}, {wy:+.1f}) "
f"dist:{dist:.1f}m {dir_label} "
f"markers:{len(self.found_markers)}")
self.move_to_local(wx, wy)
if not self._landing:
print(f"\n[SEARCH] Search complete. "
f"Found {len(self.found_markers)} markers.")
return self.found_markers
# ═══════════════════════════════════════════════════════════
# CAMERA & MARKER DETECTION
# ═══════════════════════════════════════════════════════════
def get_camera_frame(self):
if self.camera is None:
@@ -114,15 +321,21 @@ class Search:
f"UAV at ({pos['x']:.1f}, {pos['y']:.1f})")
if marker_id in self.land_ids and not self._landing:
print(f"\n[SEARCH] Landing target ID:{marker_id} found! Starting approach.")
print(f"\n[SEARCH] Landing target ID:{marker_id} found! "
f"Starting approach.")
self._landing = True
self.execute_landing(detections)
return new_markers
return new_markers
# ═══════════════════════════════════════════════════════════
# LANDING
# ═══════════════════════════════════════════════════════════
def execute_landing(self, initial_detections):
"""Visual-servoing descent: keep the ArUco tag centered while descending."""
"""Visual-servoing descent: keep the ArUco tag centered
while descending."""
target_det = None
for d in initial_detections:
if d.get("id") in self.land_ids:
@@ -137,102 +350,34 @@ class Search:
print(f"[SEARCH] Target marker ID:{target_det['id']} "
f"distance:{target_det['distance']:.2f}m")
# Camera parameters
# Camera parameters (must match model.sdf)
IMG_W, IMG_H = 640, 480
IMG_CX, IMG_CY = IMG_W / 2, IMG_H / 2
HFOV = 1.3962634 # radians (~80 degrees)
HFOV = 1.3962634 # 80° horizontal FOV
# Landing parameters
DESCENT_STEP = 1.0 # descend 1m per step
LAND_ALT = 1.5 # switch to blind land at this altitude
CENTER_PX = 40 # centered if within this many pixels
MAX_CORRECTIONS = 15 # max correction iterations per altitude step
GAIN = 0.4 # damping factor for corrections
DESCENT_STEP = 1.0
LAND_ALT = 1.5
CENTER_PX = 50
MAX_CORRECTIONS = 30
MAX_LOST = 30
GAIN = 0.35
# Phase 1: Initial approach — fly toward marker using tvec as rough guide
tvec = target_det.get("tvec", [0, 0, 0])
# Start visual servoing from current position
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
current_alt = self.ctrl.altitude
# For a downward camera (90° pitch): tvec[0]=right=East, tvec[1]=down=North
# But this is unreliable, so just use it as a rough initial move
rough_x = pos['x'] + tvec[1]
rough_y = pos['y'] + tvec[0]
print(f"[SEARCH] Phase 1: Rough approach to ({rough_x:.1f}, {rough_y:.1f})")
self.ctrl.move_local_ned(rough_x, rough_y, -self.altitude)
self._wait_arrival(rough_x, rough_y, timeout=10.0)
sleep(1.0) # settle
# Phase 2: Visual servoing descent
current_alt = self.altitude
lost_count = 0
MAX_LOST = 10 # abort if marker lost this many consecutive times
print(f"[SEARCH] Phase 2: Visual servoing descent from {current_alt:.1f}m")
print(f"[SEARCH] Visual servoing descent from {current_alt:.1f}m")
while current_alt > LAND_ALT and self.running:
# Center over marker at current altitude
centered = False
for attempt in range(MAX_CORRECTIONS):
frame = self.get_camera_frame()
if frame is None:
sleep(0.2)
continue
centered = self._center_over_marker(
current_alt, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV,
CENTER_PX, MAX_CORRECTIONS, MAX_LOST, GAIN)
detections = self.detector.detect(frame)
target = None
for d in detections:
if d.get("id") in self.land_ids:
target = d
break
if target is None:
lost_count += 1
print(f"\r[SEARCH] Alt:{current_alt:.1f}m MARKER LOST ({lost_count}/{MAX_LOST}) ",
end='', flush=True)
if lost_count >= MAX_LOST:
print(f"\n[SEARCH] Marker lost too many times, aborting landing")
self._landing = False
return
sleep(0.3)
continue
lost_count = 0
# Pixel error from image center
cx, cy = target["center_px"]
err_x = cx - IMG_CX # positive = marker is right of center
err_y = cy - IMG_CY # positive = marker is below center
print(f"\r[SEARCH] Alt:{current_alt:.1f}m err=({err_x:+.0f},{err_y:+.0f})px "
f"dist:{target['distance']:.2f}m ({attempt+1}/{MAX_CORRECTIONS}) ",
end='', flush=True)
if abs(err_x) < CENTER_PX and abs(err_y) < CENTER_PX:
centered = True
break
# Convert pixel error to meters using FOV and altitude
# At current altitude, the ground plane width visible is:
# ground_width = 2 * alt * tan(HFOV/2)
self.ctrl.update_state()
alt = max(self.ctrl.altitude, 0.5)
ground_w = 2.0 * alt * math.tan(HFOV / 2.0)
m_per_px = ground_w / IMG_W
# Apply correction (pixel right = East = +y, pixel down = North = +x)
correction_y = err_x * m_per_px * GAIN # East
correction_x = err_y * m_per_px * GAIN # North
cur = self.ctrl.get_local_position()
new_x = cur['x'] + correction_x
new_y = cur['y'] + correction_y
self.ctrl.move_local_ned(new_x, new_y, -current_alt)
sleep(0.4)
if not centered:
print(f"\n[SEARCH] Could not fully center at {current_alt:.1f}m, continuing descent")
if centered is None:
print(f"\n[SEARCH] Marker lost, aborting landing")
self._landing = False
return
# Descend one step
current_alt = max(current_alt - DESCENT_STEP, LAND_ALT)
@@ -240,52 +385,94 @@ class Search:
self.ctrl.update_state()
cur = self.ctrl.get_local_position()
self.ctrl.move_local_ned(cur['x'], cur['y'], -current_alt)
sleep(1.5) # wait for descent
sleep(2.0)
# Phase 3: Final centering at low altitude
print(f"[SEARCH] Phase 3: Final centering at {LAND_ALT:.1f}m")
for attempt in range(MAX_CORRECTIONS):
# Final precision centering
print(f"[SEARCH] Final centering at {LAND_ALT:.1f}m")
self._center_over_marker(
LAND_ALT, IMG_W, IMG_H, IMG_CX, IMG_CY, HFOV,
35, MAX_CORRECTIONS, MAX_LOST, 0.25)
# Land
print(f"\n[SEARCH] ===== LANDING =====")
self.ctrl.land()
self.landed = True
self.running = False
def _center_over_marker(self, target_alt, img_w, img_h, img_cx, img_cy,
hfov, center_px, max_corrections, max_lost, gain):
"""Center the UAV over the ArUco marker using pixel error.
Returns True if centered, False if partial, None if lost.
"""
lost_count = 0
centered = False
for attempt in range(max_corrections):
frame = self.get_camera_frame()
if frame is None:
sleep(0.2)
sleep(0.3)
continue
detections = self.detector.detect(frame)
target = None
for d in detections:
if d.get("id") in self.land_ids:
target = d
break
if target is None:
lost_count += 1
print(f"\r[SEARCH] Alt:{target_alt:.1f}m "
f"MARKER LOST ({lost_count}/{max_lost}) ",
end='', flush=True)
if lost_count >= max_lost:
return None
sleep(0.3)
continue
cx, cy = target["center_px"]
err_x = cx - IMG_CX
err_y = cy - IMG_CY
lost_count = 0
print(f"\r[SEARCH] Final center: err=({err_x:+.0f},{err_y:+.0f})px ",
cx, cy = target["center_px"]
err_x = cx - img_cx
err_y = cy - img_cy
print(f"\r[SEARCH] Alt:{target_alt:.1f}m "
f"err=({err_x:+.0f},{err_y:+.0f})px "
f"dist:{target['distance']:.2f}m "
f"({attempt+1}/{max_corrections}) ",
end='', flush=True)
if abs(err_x) < 25 and abs(err_y) < 25:
print(f"\n[SEARCH] Centered! Landing now.")
if abs(err_x) < center_px and abs(err_y) < center_px:
centered = True
print(f"\n[SEARCH] Centered at {target_alt:.1f}m!")
break
self.ctrl.update_state()
alt = max(self.ctrl.altitude, 0.5)
ground_w = 2.0 * alt * math.tan(HFOV / 2.0)
m_per_px = ground_w / IMG_W
correction_y = err_x * m_per_px * 0.3
correction_x = err_y * m_per_px * 0.3
ground_w = 2.0 * alt * math.tan(hfov / 2.0)
m_per_px = ground_w / img_w
# Camera pitch-90° inverts image axes relative to NED
correction_y = -err_x * m_per_px * gain
correction_x = -err_y * m_per_px * gain
cur = self.ctrl.get_local_position()
self.ctrl.move_local_ned(cur['x'] + correction_x,
cur['y'] + correction_y, -LAND_ALT)
self.ctrl.move_local_ned(
cur['x'] + correction_x,
cur['y'] + correction_y,
-target_alt)
sleep(0.5)
# Phase 4: Land
print(f"[SEARCH] ===== LANDING =====")
self.ctrl.land()
self.landed = True
self.running = False
if not centered:
print(f"\n[SEARCH] Partially centered at {target_alt:.1f}m, "
f"continuing")
return centered
# ═══════════════════════════════════════════════════════════
# MOVEMENT
# ═══════════════════════════════════════════════════════════
def wait_for_position(self, target_x, target_y, timeout=None):
if timeout is None:
@@ -313,7 +500,7 @@ class Search:
return False
def _wait_arrival(self, target_x, target_y, timeout=15.0):
"""Wait for position without checking markers (used during landing)."""
"""Wait for position without checking markers (landing)."""
t0 = time.time()
while time.time() - t0 < timeout and self.running:
self.ctrl.update_state()
@@ -337,110 +524,9 @@ class Search:
self.ctrl.move_local_ned(x, y, z)
return self.wait_for_position(x, y)
def move_relative(self, dx, dy):
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
target_x = pos['x'] + dx
target_y = pos['y'] + dy
self.ctrl.move_pos_rel(dx, dy, 0)
return self.wait_for_position(target_x, target_y)
def spiral(self):
distance = self.spiral_initial_leg
increment = self.spiral_leg_increment
travel_x = True
direction = 1
for leg in range(self.spiral_max_legs):
if not self.running:
break
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
print(f"[SEARCH] Spiral leg {leg + 1}/{self.spiral_max_legs} "
f"pos:({pos['x']:.1f}, {pos['y']:.1f}) "
f"step:{distance:.1f}m markers:{len(self.found_markers)}")
if travel_x:
dx = distance * direction
self.move_relative(dx, 0)
else:
dy = distance * direction
self.move_relative(0, dy)
direction *= -1
distance += increment
travel_x = not travel_x
print(f"[SEARCH] Spiral complete. Found {len(self.found_markers)} markers.")
return self.found_markers
def lawnmower(self):
lane_spacing = self.lawn_lane_spacing
height = self.lawn_height
num_lanes = max(1, int(self.lawn_width / lane_spacing))
self.ctrl.update_state()
start_pos = self.ctrl.get_local_position()
start_x = start_pos['x']
start_y = start_pos['y']
print(f"[SEARCH] Lawnmower: {num_lanes} lanes, "
f"{lane_spacing:.1f}m spacing, {height:.1f}m height")
for lane in range(num_lanes):
if not self.running:
break
lane_x = start_x + lane * lane_spacing
if lane % 2 == 0:
target_y = start_y + height
else:
target_y = start_y
print(f"[SEARCH] Lane {lane + 1}/{num_lanes} "
f"x:{lane_x:.1f} markers:{len(self.found_markers)}")
self.move_to_local(lane_x, target_y)
if not self.running:
break
if lane < num_lanes - 1:
next_x = start_x + (lane + 1) * lane_spacing
self.move_to_local(next_x, target_y)
print(f"[SEARCH] Lawnmower complete. Found {len(self.found_markers)} markers.")
return self.found_markers
def levy_walk(self):
field_size = self.levy_field_size
for step in range(self.levy_max_steps):
if not self.running:
break
angle_deg = self.levy_angle_dist.rvs()
angle_rad = math.radians(angle_deg)
raw_distance = levy.rvs(loc=1, scale=1)
distance = min(max(raw_distance, 1.0), field_size)
dx = distance * math.cos(angle_rad)
dy = distance * math.sin(angle_rad)
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
print(f"[SEARCH] Lévy step {step + 1}/{self.levy_max_steps} "
f"angle:{angle_deg:.0f}° dist:{distance:.1f}m "
f"pos:({pos['x']:.1f}, {pos['y']:.1f}) "
f"markers:{len(self.found_markers)}")
self.move_relative(dx, dy)
print(f"[SEARCH] Lévy walk complete. Found {len(self.found_markers)} markers.")
return self.found_markers
# ═══════════════════════════════════════════════════════════
# RESULTS
# ═══════════════════════════════════════════════════════════
def stop(self):
self.running = False

View File

@@ -15,6 +15,7 @@ from control.search import Search
from vision.object_detector import ObjectDetector
from vision.camera_processor import CameraProcessor
from navigation.flight_tracker import FlightTracker
from utils.recorder import RunRecorder
PROJECT_DIR = Path(__file__).resolve().parent.parent
CONFIG_DIR = PROJECT_DIR / "config"
@@ -61,14 +62,32 @@ def main():
parser.add_argument('--search', default='spiral', choices=['spiral', 'lawnmower', 'levy'])
parser.add_argument('--altitude', type=float, default=None)
parser.add_argument('--no-camera', action='store_true')
parser.add_argument('--no-record', action='store_true', help='Disable recording')
args = parser.parse_args()
# ── Start recorder ──────────────────────────────────────────
recorder = None
if not args.no_record:
recorder = RunRecorder()
recorder.start_logging()
uav_cfg = load_config('uav.yaml')
search_cfg = load_config('search.yaml')
ugv_cfg = load_config('ugv.yaml')
altitude = args.altitude or search_cfg.get('altitude', uav_cfg.get('flight', {}).get('takeoff_altitude', 5.0))
altitude = args.altitude or search_cfg.get('altitude', 5.0)
search_mode = args.search
# Marker config (single source of truth)
marker_cfg = search_cfg.get('marker', {})
marker_dict = marker_cfg.get('dictionary', 'DICT_4X4_50')
marker_size = marker_cfg.get('size', 0.5)
landing_ids = marker_cfg.get('landing_ids', [0])
# UGV position
ugv_pos_cfg = ugv_cfg.get('position', {})
ugv_position = (ugv_pos_cfg.get('x', 5.0), ugv_pos_cfg.get('y', 5.0))
if args.connection:
conn_str = args.connection
elif args.device == 'real':
@@ -79,8 +98,8 @@ def main():
ctrl = Controller(conn_str)
detector = ObjectDetector(
marker_size=uav_cfg.get('vision', {}).get('landmark_detection', {}).get('marker_size', 0.15),
aruco_dict_name="DICT_4X4_50",
marker_size=marker_size,
aruco_dict_name=marker_dict,
)
camera = None
@@ -101,7 +120,7 @@ def main():
print(f"[MAIN] Camera unavailable: {e}")
camera = None
actions = search_cfg.get('actions', {})
actions = {'land': landing_ids}
search = Search(ctrl, detector, camera=camera, mode=search_mode,
altitude=altitude, actions=actions)
@@ -109,29 +128,48 @@ def main():
if mode_cfg:
search.configure(**{f"{search_mode}_{k}": v for k, v in mode_cfg.items()})
# ── Pre-flight plan ─────────────────────────────────────────
geofence_cfg = search_cfg.get('geofence', {})
geofence_points = None
if geofence_cfg.get('enabled', False):
raw_pts = geofence_cfg.get('points', [])
if raw_pts:
geofence_points = [tuple(p) for p in raw_pts]
waypoints = search.plan(start_pos=(0.0, 0.0),
geofence_points=geofence_points)
print(f"[MAIN] Config loaded from {CONFIG_DIR}")
print(f"[MAIN] Search: {search_mode} Altitude: {altitude}m")
if actions.get('land'):
print(f"[MAIN] Landing targets: {actions['land']}")
print(f"[MAIN] Marker: {marker_dict} size:{marker_size}m land:{landing_ids}")
# ── Start flight tracker (show plan before takeoff) ─────────
tracker = FlightTracker(
window_size=600,
world_range=30.0,
ugv_position=ugv_position,
geofence=geofence_cfg,
)
tracker.set_plan(waypoints)
tracker.start()
setup_ardupilot(ctrl)
ctrl.wait_for_gps()
if not ctrl.arm():
print("[UAV] Cannot arm - aborting")
if recorder:
recorder.save_summary(search_mode=search_mode, altitude=altitude)
recorder.stop()
return
ctrl.takeoff(altitude)
ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30)
# Start flight tracker
tracker = FlightTracker(
window_size=600,
world_range=30.0,
ugv_position=(5.0, 5.0),
geofence=search_cfg.get('geofence'),
)
tracker.start()
# ── Start recording tracker + camera ────────────────────────
if recorder:
recorder.start_recording(tracker=tracker, camera=camera)
recorder.snapshot_camera("pre_search")
# Feed position updates to tracker during search
original_check = search.check_for_markers
@@ -148,9 +186,16 @@ def main():
return result
search.check_for_markers = tracked_check
# Wire up waypoint progress to tracker
search._on_waypoint_change = lambda idx: tracker.set_active_waypoint(idx)
results = search.run()
search_results = search.get_results()
# ── Snapshot on detection ───────────────────────────────────
if recorder and results:
recorder.snapshot_camera("marker_detected")
print(f"\n{'=' * 50}")
print(f" SEARCH COMPLETE")
print(f"{'=' * 50}")
@@ -170,6 +215,17 @@ def main():
else:
wait_for_landing(ctrl)
# ── Finalize recording ──────────────────────────────────────
if recorder:
recorder.snapshot_camera("post_landing")
recorder.save_summary(
search_mode=search_mode,
altitude=altitude,
markers=results,
landed=search_results.get('landed', False),
)
recorder.stop()
tracker.stop()
print("[MAIN] Done.")

View File

@@ -39,6 +39,9 @@ class FlightTracker:
FENCE_COLOR = (0, 220, 255) # Yellow-cyan
FENCE_WARN_COLOR = (0, 140, 200) # Darker yellow
FENCE_BREACH_COLOR = (0, 0, 255) # Red
PLAN_COLOR = (180, 140, 50) # Muted cyan-blue
PLAN_ACTIVE_COLOR = (0, 180, 255) # Orange
PLAN_VISITED_COLOR = (60, 60, 60) # Dim gray
def __init__(self, window_size=600, world_range=30.0,
ugv_position=(5.0, 5.0), show_ugv=True,
@@ -60,9 +63,13 @@ class FlightTracker:
self.uav_pos = (0.0, 0.0)
self.uav_alt = 0.0
self.uav_heading = 0.0
self.trail = deque(maxlen=5000)
self.trail = deque()
self.markers_found = {}
# Flight plan
self.plan_waypoints = []
self.active_waypoint = 0
# Geofence
self.geofence = geofence
self.fence_points = []
@@ -108,6 +115,17 @@ class FlightTracker:
with self._lock:
self.markers_found[marker_id] = (x, y)
def set_plan(self, waypoints):
"""Set pre-computed flight plan waypoints for visualization."""
with self._lock:
self.plan_waypoints = list(waypoints)
self.active_waypoint = 0
def set_active_waypoint(self, idx):
"""Update which waypoint the UAV is currently targeting."""
with self._lock:
self.active_waypoint = idx
def _point_in_polygon(self, px, py, polygon):
"""Simple ray-casting point-in-polygon test."""
n = len(polygon)
@@ -159,6 +177,12 @@ class FlightTracker:
uav_hdg = self.uav_heading
ugv = self.ugv_pos
markers = dict(self.markers_found)
plan_copy = list(self.plan_waypoints)
active_wp = self.active_waypoint
# Draw plan UNDER the trail
if plan_copy:
self._draw_plan(frame, plan_copy, active_wp)
self._draw_trail(frame, trail_copy)
@@ -288,13 +312,61 @@ class FlightTracker:
ey = int(y1 + dy * end)
cv2.line(frame, (sx, sy), (ex, ey), color, thickness)
def _draw_plan(self, frame, waypoints, active_idx):
"""Draw the pre-flight plan: dashed path with numbered waypoints."""
if len(waypoints) < 2:
return
pts = [self.world_to_pixel(x, y) for x, y in waypoints]
# Draw connecting lines
for i in range(1, len(pts)):
if i <= active_idx:
color = self.PLAN_VISITED_COLOR
else:
color = self.PLAN_COLOR
self._draw_dashed_line(frame, pts[i - 1], pts[i], color, 1, 8)
# Draw waypoint markers
for i, (px, py) in enumerate(pts):
if i < active_idx:
# Visited — small dim dot
cv2.circle(frame, (px, py), 3, self.PLAN_VISITED_COLOR, -1)
elif i == active_idx:
# Active target — bright ring
cv2.circle(frame, (px, py), 8, self.PLAN_ACTIVE_COLOR, 2)
cv2.circle(frame, (px, py), 3, self.PLAN_ACTIVE_COLOR, -1)
else:
# Future — small cyan dot
cv2.circle(frame, (px, py), 4, self.PLAN_COLOR, 1)
# Waypoint number label
label = str(i)
cv2.putText(frame, label, (px + 7, py - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.28, self.PLAN_COLOR, 1)
def _draw_trail(self, frame, trail):
"""Draw the flight path trail."""
"""Draw the flight path trail with persistent visibility."""
if len(trail) < 2:
return
# Minimum brightness so old trail never disappears
MIN_ALPHA = 0.3
FADE_POINTS = 200 # only the newest N points fade
pts = [self.world_to_pixel(x, y) for x, y in trail]
for i in range(1, len(pts)):
alpha = i / len(pts)
n = len(pts)
fade_start = max(0, n - FADE_POINTS)
for i in range(1, n):
if i <= fade_start:
# Old segments: solid dim color (never invisible)
alpha = MIN_ALPHA
else:
# Recent segments: fade from MIN_ALPHA to 1.0
progress = (i - fade_start) / FADE_POINTS
alpha = MIN_ALPHA + (1.0 - MIN_ALPHA) * progress
color = (
int(self.UAV_TRAIL_COLOR[0] * alpha),
int(self.UAV_TRAIL_COLOR[1] * alpha),

293
src/utils/recorder.py Normal file
View File

@@ -0,0 +1,293 @@
#!/usr/bin/env python3
"""
Run Recorder — Captures logs, flight path, camera frames, and summary for each run.
Creates timestamped folders in /results with:
flight_path.avi — Flight tracker video
camera.avi — Downward camera video
flight_path.png — Final flight tracker snapshot
log.txt — Full console output
summary.json — Run metadata and results
"""
import os
import sys
import io
import json
import time
import threading
import cv2
import numpy as np
from pathlib import Path
from datetime import datetime
class RunRecorder:
"""Records all outputs from a simulation run."""
def __init__(self, results_dir=None, fps=5):
if results_dir is None:
project_dir = Path(__file__).resolve().parent.parent
results_dir = project_dir / "results"
results_dir = Path(results_dir)
results_dir.mkdir(parents=True, exist_ok=True)
# Find next sequential run number
existing = [d.name for d in results_dir.iterdir()
if d.is_dir() and d.name.startswith("run_")]
nums = []
for name in existing:
try:
nums.append(int(name.split("_")[1]))
except (IndexError, ValueError):
pass
run_num = max(nums, default=0) + 1
self.run_dir = results_dir / f"run_{run_num}"
self.run_dir.mkdir(parents=True, exist_ok=True)
self.run_num = run_num
self.fps = fps
self.start_time = time.time()
# Log capture
self._log_path = self.run_dir / "log.txt"
self._log_file = open(self._log_path, "w")
self._original_stdout = sys.stdout
self._original_stderr = sys.stderr
self._tee_stdout = _TeeWriter(sys.stdout, self._log_file)
self._tee_stderr = _TeeWriter(sys.stderr, self._log_file)
# Video writers (initialized lazily on first frame)
self._tracker_writer = None
self._camera_writer = None
self._tracker_size = None
self._camera_size = None
# Frame counters
self._tracker_frames = 0
self._camera_frames = 0
# Snapshot storage
self._last_tracker_frame = None
self._last_camera_frame = None
self._camera_snapshots = []
# Recording thread
self._recording = False
self._tracker_ref = None
self._camera_ref = None
self._record_thread = None
self._lock = threading.Lock()
# Metadata
self.metadata = {
"run": run_num,
"start_time": datetime.now().isoformat(),
"run_dir": str(self.run_dir),
}
print(f"[REC] Recording to: {self.run_dir}")
def start_logging(self):
"""Redirect stdout/stderr to both console and log file."""
sys.stdout = self._tee_stdout
sys.stderr = self._tee_stderr
def stop_logging(self):
"""Restore original stdout/stderr."""
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
def start_recording(self, tracker=None, camera=None):
"""Start background recording of tracker and camera frames."""
self._tracker_ref = tracker
self._camera_ref = camera
self._recording = True
self._record_thread = threading.Thread(
target=self._record_loop, daemon=True
)
self._record_thread.start()
def _record_loop(self):
"""Periodically capture frames from tracker and camera."""
interval = 1.0 / self.fps
while self._recording:
t0 = time.time()
# Capture tracker frame
if self._tracker_ref is not None:
try:
frame = self._tracker_ref.draw()
if frame is not None:
self._write_tracker_frame(frame)
self._last_tracker_frame = frame.copy()
except Exception:
pass
# Capture camera frame
if self._camera_ref is not None:
try:
frame = self._camera_ref.frames.get("downward")
if frame is not None:
self._write_camera_frame(frame)
self._last_camera_frame = frame.copy()
except Exception:
pass
elapsed = time.time() - t0
sleep_time = max(0, interval - elapsed)
time.sleep(sleep_time)
def _write_tracker_frame(self, frame):
"""Write a frame to the tracker video."""
h, w = frame.shape[:2]
if self._tracker_writer is None:
self._tracker_size = (w, h)
fourcc = cv2.VideoWriter_fourcc(*'XVID')
path = str(self.run_dir / "flight_path.avi")
self._tracker_writer = cv2.VideoWriter(path, fourcc, self.fps, (w, h))
self._tracker_writer.write(frame)
self._tracker_frames += 1
def _write_camera_frame(self, frame):
"""Write a frame to the camera video."""
h, w = frame.shape[:2]
if self._camera_writer is None:
self._camera_size = (w, h)
fourcc = cv2.VideoWriter_fourcc(*'XVID')
path = str(self.run_dir / "camera.avi")
self._camera_writer = cv2.VideoWriter(path, fourcc, self.fps, (w, h))
self._camera_writer.write(frame)
self._camera_frames += 1
def snapshot_camera(self, label="snapshot"):
"""Save a named snapshot of the current camera frame."""
if self._camera_ref is None:
return
frame = self._camera_ref.frames.get("downward")
if frame is None:
return
idx = len(self._camera_snapshots)
filename = f"camera_{idx:03d}_{label}.png"
path = self.run_dir / filename
cv2.imwrite(str(path), frame)
self._camera_snapshots.append(filename)
print(f"[REC] Snapshot: {filename}")
def save_summary(self, search_mode="", altitude=0, markers=None,
landed=False, extra=None):
"""Write the run summary JSON."""
duration = time.time() - self.start_time
mins = int(duration // 60)
secs = int(duration % 60)
summary = {
**self.metadata,
"end_time": datetime.now().isoformat(),
"duration_seconds": round(duration, 1),
"duration_display": f"{mins}m {secs}s",
"search_mode": search_mode,
"altitude": altitude,
"landed": landed,
"markers_found": {},
"recordings": {
"log": "log.txt",
"flight_path_video": "flight_path.avi",
"flight_path_image": "flight_path.png",
"camera_video": "camera.avi",
"camera_snapshots": self._camera_snapshots,
},
"frame_counts": {
"tracker": self._tracker_frames,
"camera": self._camera_frames,
},
}
if markers:
for mid, info in markers.items():
pos = info.get('uav_position', {})
summary["markers_found"][str(mid)] = {
"x": round(pos.get('x', 0), 2),
"y": round(pos.get('y', 0), 2),
"distance": round(info.get('distance', 0), 2),
}
if extra:
summary.update(extra)
path = self.run_dir / "summary.json"
with open(path, "w") as f:
json.dump(summary, f, indent=2)
print(f"[REC] Summary saved: {path}")
def stop(self):
"""Stop recording and finalize all outputs."""
self._recording = False
if self._record_thread:
self._record_thread.join(timeout=3.0)
# Save final flight path image
if self._last_tracker_frame is not None:
path = self.run_dir / "flight_path.png"
cv2.imwrite(str(path), self._last_tracker_frame)
print(f"[REC] Flight path saved: {path}")
# Save final camera frame
if self._last_camera_frame is not None:
path = self.run_dir / "camera_final.png"
cv2.imwrite(str(path), self._last_camera_frame)
# Release video writers
if self._tracker_writer:
self._tracker_writer.release()
if self._camera_writer:
self._camera_writer.release()
# Stop log capture
self.stop_logging()
self._log_file.close()
duration = time.time() - self.start_time
mins = int(duration // 60)
secs = int(duration % 60)
# Print to original stdout since we stopped the tee
self._original_stdout.write(
f"\n[REC] Run recorded: {self.run_dir}\n"
f"[REC] Duration: {mins}m {secs}s | "
f"Tracker: {self._tracker_frames} frames | "
f"Camera: {self._camera_frames} frames\n"
)
class _TeeWriter:
"""Writes to both a stream and a file simultaneously."""
def __init__(self, stream, log_file):
self._stream = stream
self._log = log_file
def write(self, data):
self._stream.write(data)
try:
# Strip ANSI escape codes for the log file
clean = data
self._log.write(clean)
self._log.flush()
except (ValueError, IOError):
pass
def flush(self):
self._stream.flush()
try:
self._log.flush()
except (ValueError, IOError):
pass
def fileno(self):
return self._stream.fileno()
def isatty(self):
return self._stream.isatty()