Camera Aruco Tags Dectection

This commit is contained in:
2026-02-12 15:53:36 -05:00
parent 92da41138b
commit c91ea920a8
20 changed files with 688 additions and 470 deletions

308
src/control/search.py Normal file
View File

@@ -0,0 +1,308 @@
#!/usr/bin/env python3
import math
import time
import numpy as np
from scipy.stats import levy, uniform
from time import sleep
from enum import Enum
from control.uav_controller import Controller
from vision.object_detector import ObjectDetector
from vision.visual_servoing import VisualServoing
from utils.helpers import distance_2d
class SearchMode(Enum):
SPIRAL = "spiral"
LAWNMOWER = "lawnmower"
LEVY = "levy"
class Search:
POSITION_TOLERANCE = 1.0
CHECK_INTERVAL = 0.5
MAX_TRAVEL_TIME = 30.0
CAM_FOV_METERS = 4.0
def __init__(self, ctrl: Controller, detector: ObjectDetector,
camera=None, mode: str = "spiral", altitude: float = 5.0,
actions: dict = None):
self.ctrl = ctrl
self.detector = detector
self.camera = camera
self.mode = SearchMode(mode.lower())
self.altitude = altitude
self.found_markers = {}
self.running = True
self.landed = False
self.actions = actions or {}
self.land_ids = set(self.actions.get("land", []))
self.servoing = None
if self.land_ids:
target_id = list(self.land_ids)[0]
self.servoing = VisualServoing(ctrl, target_marker_id=target_id)
self.spiral_max_legs = 12
self.spiral_initial_leg = self.CAM_FOV_METERS
self.spiral_leg_increment = self.CAM_FOV_METERS * 0.8
self.lawn_width = 30.0
self.lawn_height = 30.0
self.lawn_lane_spacing = self.CAM_FOV_METERS * 0.8
self.lawn_lanes = 2
self.levy_max_steps = 20
self.levy_field_size = 50.0
self.levy_angle_dist = uniform(loc=-180, scale=360)
def configure(self, **kwargs):
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
def run(self):
print(f"\n{'=' * 50}")
print(f" SEARCH: {self.mode.value.upper()} at {self.altitude}m")
if self.land_ids:
print(f" Landing targets: {self.land_ids}")
print(f"{'=' * 50}\n")
if self.mode == SearchMode.SPIRAL:
return self.spiral()
elif self.mode == SearchMode.LAWNMOWER:
return self.lawnmower()
elif self.mode == SearchMode.LEVY:
return self.levy_walk()
def get_camera_frame(self):
if self.camera is None:
return None
frame = self.camera.frames.get("downward")
if frame is None:
frame = self.camera.frames.get("gimbal")
return frame
def check_for_markers(self):
frame = self.get_camera_frame()
if frame is None:
return []
detections = self.detector.detect(frame)
new_markers = []
for d in detections:
marker_id = d.get("id")
if marker_id is None:
continue
if marker_id not in self.found_markers:
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
self.found_markers[marker_id] = {
"id": marker_id,
"uav_position": pos.copy(),
"distance": d.get("distance", 0),
"timestamp": time.time(),
}
new_markers.append(d)
print(f"\n[SEARCH] ArUco ID:{marker_id} detected! "
f"distance:{d['distance']:.2f}m "
f"UAV at ({pos['x']:.1f}, {pos['y']:.1f})")
if marker_id in self.land_ids:
print(f"\n[SEARCH] Landing target ID:{marker_id} found! Starting approach.")
self.execute_landing(detections)
return new_markers
return new_markers
def execute_landing(self, initial_detections):
if self.servoing is None:
self.ctrl.land()
self.landed = True
self.running = False
return
self.servoing.enable()
self.servoing.process_detections(initial_detections)
t0 = time.time()
timeout = 60.0
while time.time() - t0 < timeout and self.running:
frame = self.get_camera_frame()
if frame is None:
sleep(self.CHECK_INTERVAL)
continue
detections = self.detector.detect(frame)
landed = self.servoing.process_detections(detections)
if landed:
print(f"\n[SEARCH] Landed on target!")
self.landed = True
self.running = False
return
self.ctrl.update_state()
if self.ctrl.altitude < 0.3:
print(f"\n[SEARCH] Touchdown detected!")
self.landed = True
self.running = False
return
sleep(0.1)
if not self.landed:
print(f"\n[SEARCH] Landing approach timed out, landing at current position")
self.ctrl.land()
self.landed = True
self.running = False
def wait_for_position(self, target_x, target_y, timeout=None):
if timeout is None:
timeout = self.MAX_TRAVEL_TIME
t0 = time.time()
while time.time() - t0 < timeout and self.running:
self.ctrl.update_state()
self.check_for_markers()
if not self.running:
return False
pos = self.ctrl.get_local_position()
dist = distance_2d(
(pos['x'], pos['y']),
(target_x, target_y)
)
elapsed = int(time.time() - t0)
print(f"\r[SEARCH] Moving: {dist:.1f}m to target ({elapsed}s) "
f"markers found: {len(self.found_markers)} ",
end='', flush=True)
if dist < self.POSITION_TOLERANCE:
print()
return True
sleep(self.CHECK_INTERVAL)
print()
return False
def move_to_local(self, x, y):
z = -self.altitude
self.ctrl.move_local_ned(x, y, z)
return self.wait_for_position(x, y)
def move_relative(self, dx, dy):
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
target_x = pos['x'] + dx
target_y = pos['y'] + dy
self.ctrl.move_pos_rel(dx, dy, 0)
return self.wait_for_position(target_x, target_y)
def spiral(self):
distance = self.spiral_initial_leg
increment = self.spiral_leg_increment
travel_x = True
direction = 1
for leg in range(self.spiral_max_legs):
if not self.running:
break
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
print(f"[SEARCH] Spiral leg {leg + 1}/{self.spiral_max_legs} "
f"pos:({pos['x']:.1f}, {pos['y']:.1f}) "
f"step:{distance:.1f}m markers:{len(self.found_markers)}")
if travel_x:
dx = distance * direction
self.move_relative(dx, 0)
else:
dy = distance * direction
self.move_relative(0, dy)
direction *= -1
distance += increment
travel_x = not travel_x
print(f"[SEARCH] Spiral complete. Found {len(self.found_markers)} markers.")
return self.found_markers
def lawnmower(self):
lane_spacing = self.lawn_lane_spacing
height = self.lawn_height
num_lanes = max(1, int(self.lawn_width / lane_spacing))
self.ctrl.update_state()
start_pos = self.ctrl.get_local_position()
start_x = start_pos['x']
start_y = start_pos['y']
print(f"[SEARCH] Lawnmower: {num_lanes} lanes, "
f"{lane_spacing:.1f}m spacing, {height:.1f}m height")
for lane in range(num_lanes):
if not self.running:
break
lane_x = start_x + lane * lane_spacing
if lane % 2 == 0:
target_y = start_y + height
else:
target_y = start_y
print(f"[SEARCH] Lane {lane + 1}/{num_lanes} "
f"x:{lane_x:.1f} markers:{len(self.found_markers)}")
self.move_to_local(lane_x, target_y)
if not self.running:
break
if lane < num_lanes - 1:
next_x = start_x + (lane + 1) * lane_spacing
self.move_to_local(next_x, target_y)
print(f"[SEARCH] Lawnmower complete. Found {len(self.found_markers)} markers.")
return self.found_markers
def levy_walk(self):
field_size = self.levy_field_size
for step in range(self.levy_max_steps):
if not self.running:
break
angle_deg = self.levy_angle_dist.rvs()
angle_rad = math.radians(angle_deg)
raw_distance = levy.rvs(loc=1, scale=1)
distance = min(max(raw_distance, 1.0), field_size)
dx = distance * math.cos(angle_rad)
dy = distance * math.sin(angle_rad)
self.ctrl.update_state()
pos = self.ctrl.get_local_position()
print(f"[SEARCH] Lévy step {step + 1}/{self.levy_max_steps} "
f"angle:{angle_deg:.0f}° dist:{distance:.1f}m "
f"pos:({pos['x']:.1f}, {pos['y']:.1f}) "
f"markers:{len(self.found_markers)}")
self.move_relative(dx, dy)
print(f"[SEARCH] Lévy walk complete. Found {len(self.found_markers)} markers.")
return self.found_markers
def stop(self):
self.running = False
def get_results(self):
return {
"mode": self.mode.value,
"markers_found": len(self.found_markers),
"markers": self.found_markers,
"landed": self.landed,
}

View File

@@ -26,6 +26,14 @@ GUIDED_MODE = 4
GUIDED_NOGPS_MODE = 20
DEFAULT_WPNAV_SPEED = 150
DEFAULT_WPNAV_ACCEL = 100
DEFAULT_WPNAV_SPEED_UP = 100
DEFAULT_WPNAV_SPEED_DN = 75
DEFAULT_WPNAV_ACCEL_Z = 75
DEFAULT_LOIT_SPEED = 150
class Controller:
HOLD_ALT = HOLD_ALT
@@ -277,9 +285,31 @@ class Controller:
print("\n[UAV] GPS timeout")
return False
def configure_speed_limits(self,
wpnav_speed=DEFAULT_WPNAV_SPEED,
wpnav_accel=DEFAULT_WPNAV_ACCEL,
wpnav_speed_up=DEFAULT_WPNAV_SPEED_UP,
wpnav_speed_dn=DEFAULT_WPNAV_SPEED_DN,
wpnav_accel_z=DEFAULT_WPNAV_ACCEL_Z,
loit_speed=DEFAULT_LOIT_SPEED):
params = {
'WPNAV_SPEED': wpnav_speed,
'WPNAV_ACCEL': wpnav_accel,
'WPNAV_SPEED_UP': wpnav_speed_up,
'WPNAV_SPEED_DN': wpnav_speed_dn,
'WPNAV_ACCEL_Z': wpnav_accel_z,
'LOIT_SPEED': loit_speed,
}
for name, value in params.items():
self.set_param(name, value)
print(f"[UAV] Speed limits set: horiz={wpnav_speed}cm/s "
f"accel={wpnav_accel}cm/s² up={wpnav_speed_up}cm/s "
f"dn={wpnav_speed_dn}cm/s")
def set_max_velocity(self, speed):
self.conn.mav.command_long_send(
self.conn.target_system,
self.conn.target_component,
mavlink.MAV_CMD_DO_CHANGE_SPEED,
0, speed, -1, 0, 0, 0, 0, 0)
0, 1, speed, -1, 0, 0, 0, 0)

View File

@@ -11,9 +11,9 @@ from pathlib import Path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from control.uav_controller import Controller
from control.ugv_controller import UGVController
from utils.helpers import clamp, distance_2d, PIDController, LowPassFilter
from utils.transforms import normalize_angle, body_to_world, world_to_body
from control.search import Search
from vision.object_detector import ObjectDetector
from vision.camera_processor import CameraProcessor
PROJECT_DIR = Path(__file__).resolve().parent.parent
CONFIG_DIR = PROJECT_DIR / "config"
@@ -28,151 +28,15 @@ def load_config(name: str) -> dict:
return yaml.safe_load(f) or {}
def load_mission(name: str) -> dict:
path = CONFIG_DIR / "missions" / f"{name}.yaml"
if not path.exists():
print(f"[WARN] Mission not found: {path}")
return {}
with open(path, 'r') as f:
return yaml.safe_load(f) or {}
def setup_ardupilot(ctrl: Controller):
ctrl.set_param('ARMING_CHECK', 0)
ctrl.set_param('SCHED_LOOP_RATE', 200)
ctrl.set_param('FS_THR_ENABLE', 0)
ctrl.set_param('FS_GCS_ENABLE', 0)
ctrl.configure_speed_limits()
sleep(2)
def mission_hover(ctrl: Controller, uav_cfg: dict, mission_cfg: dict):
altitude = mission_cfg.get('altitude', uav_cfg['flight']['takeoff_altitude'])
duration = mission_cfg.get('duration', 30.0)
print("\n" + "=" * 50)
print(f" HOVER MISSION: {altitude}m for {duration}s")
print("=" * 50 + "\n")
setup_ardupilot(ctrl)
ctrl.wait_for_gps()
if not ctrl.arm():
print("[UAV] Cannot arm - aborting mission")
return
ctrl.takeoff(altitude)
ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30)
print(f"[UAV] Hovering for {duration}s ...")
t0 = time.time()
while time.time() - t0 < duration:
ctrl.update_state()
remaining = duration - (time.time() - t0)
print(f"\r[UAV] Hovering: {remaining:.0f}s remaining Alt: {ctrl.altitude:.1f}m ",
end='', flush=True)
sleep(0.5)
print("\n[UAV] Hover complete")
ctrl.land()
wait_for_landing(ctrl)
def mission_square(ctrl: Controller, uav_cfg: dict, mission_cfg: dict):
altitude = mission_cfg.get('altitude', uav_cfg['flight']['takeoff_altitude'])
side = mission_cfg.get('side_length', 5.0)
print("\n" + "=" * 50)
print(f" SQUARE MISSION: {side}m sides at {altitude}m")
print("=" * 50 + "\n")
setup_ardupilot(ctrl)
ctrl.wait_for_gps()
if not ctrl.arm():
print("[UAV] Cannot arm - aborting mission")
return
ctrl.takeoff(altitude)
ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30)
waypoints = [
(side, 0, 0),
(0, side, 0),
(-side, 0, 0),
(0, -side, 0),
]
for i, (x, y, z) in enumerate(waypoints):
print(f"[UAV] Waypoint {i+1}/4: move ({x}, {y}, {z})")
ctrl.move_pos_rel(x, y, z)
sleep(5)
print("[UAV] Square complete")
ctrl.land()
wait_for_landing(ctrl)
def mission_search(ctrl: Controller, ugv: UGVController,
uav_cfg: dict, mission_cfg: dict):
altitude = mission_cfg.get('altitude', uav_cfg['flight']['takeoff_altitude'])
search_cfg = mission_cfg.get('search', {})
initial_leg = search_cfg.get('initial_leg', 3.0)
leg_increment = search_cfg.get('leg_increment', 2.0)
max_legs = search_cfg.get('max_legs', 12)
detection_radius = search_cfg.get('detection_radius', 2.0)
print("\n" + "=" * 50)
print(f" SEARCH MISSION at {altitude}m")
print("=" * 50 + "\n")
setup_ardupilot(ctrl)
ctrl.wait_for_gps()
if not ctrl.arm():
print("[UAV] Cannot arm - aborting mission")
return
ctrl.takeoff(altitude)
ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30)
ugv_pos = ugv.get_position()
print(f"[UAV] UGV target at ({ugv_pos['x']:.1f}, {ugv_pos['y']:.1f})")
distance_step = initial_leg
travel_x = True
direction = 1
for leg in range(max_legs):
ctrl.update_state()
uav_pos = ctrl.get_local_position()
dist_to_ugv = distance_2d(
(uav_pos['x'], uav_pos['y']),
(ugv_pos['x'], ugv_pos['y'])
)
print(f"[UAV] Spiral leg {leg+1}/{max_legs} dist_to_ugv: {dist_to_ugv:.1f}m")
if dist_to_ugv < detection_radius:
print("[UAV] UGV found! Landing.")
ctrl.land()
wait_for_landing(ctrl)
print("[UAV] Landed on UGV!")
return
if travel_x:
ctrl.move_pos_rel(distance_step * direction, 0, 0)
else:
ctrl.move_pos_rel(0, distance_step * direction, 0)
direction *= -1
distance_step += leg_increment
travel_x = not travel_x
sleep(4)
print("[UAV] Search complete - UGV not found, landing")
ctrl.land()
wait_for_landing(ctrl)
def wait_for_landing(ctrl: Controller, timeout: float = 60.0):
print("[UAV] Waiting for landing ...")
t0 = time.time()
@@ -193,20 +57,16 @@ def main():
parser = argparse.ArgumentParser(description='UAV-UGV Simulation')
parser.add_argument('--device', default='sim', choices=['sim', 'real'])
parser.add_argument('--connection', default=None)
parser.add_argument('--mission', default='hover')
parser.add_argument('--search', default='spiral', choices=['spiral', 'lawnmower', 'levy'])
parser.add_argument('--altitude', type=float, default=None)
parser.add_argument('--duration', type=float, default=None)
parser.add_argument('--no-camera', action='store_true')
args = parser.parse_args()
uav_cfg = load_config('uav.yaml')
ugv_cfg = load_config('ugv.yaml')
props = load_config('properties.yaml')
mission_cfg = load_mission(args.mission)
search_cfg = load_config('search.yaml')
if args.altitude is not None:
mission_cfg['altitude'] = args.altitude
if args.duration is not None:
mission_cfg['duration'] = args.duration
altitude = args.altitude or search_cfg.get('altitude', uav_cfg.get('flight', {}).get('takeoff_altitude', 5.0))
search_mode = args.search
if args.connection:
conn_str = args.connection
@@ -215,32 +75,77 @@ def main():
else:
conn_str = uav_cfg.get('connection', {}).get('sim', 'tcp:127.0.0.1:5760')
ugv_pos = ugv_cfg.get('position', {})
ugv = UGVController(
connection_string=ugv_cfg.get('connection', {}).get('sim'),
static_pos=(ugv_pos.get('x', 10.0), ugv_pos.get('y', 5.0)),
)
ctrl = Controller(conn_str)
detector = ObjectDetector(
marker_size=uav_cfg.get('vision', {}).get('landmark_detection', {}).get('marker_size', 0.15),
aruco_dict_name="DICT_4X4_50",
)
camera = None
if not args.no_camera:
try:
camera = CameraProcessor(show_gui=False)
print("[MAIN] Camera processor started")
def detection_overlay(camera_name, frame):
detections = detector.detect(frame)
if detections:
annotated = detector.annotate_frame(frame, detections)
camera.frames[camera_name] = annotated
camera.register_callback("downward", detection_overlay)
camera.register_callback("gimbal", detection_overlay)
except Exception as e:
print(f"[MAIN] Camera unavailable: {e}")
camera = None
actions = search_cfg.get('actions', {})
search = Search(ctrl, detector, camera=camera, mode=search_mode,
altitude=altitude, actions=actions)
mode_cfg = search_cfg.get(search_mode, {})
if mode_cfg:
search.configure(**{f"{search_mode}_{k}": v for k, v in mode_cfg.items()})
print(f"[MAIN] Config loaded from {CONFIG_DIR}")
print(f"[MAIN] Mission: {args.mission}")
print(f"[MAIN] Search: {search_mode} Altitude: {altitude}m")
if actions.get('land'):
print(f"[MAIN] Landing targets: {actions['land']}")
missions = {
'hover': lambda: mission_hover(ctrl, uav_cfg, mission_cfg),
'square': lambda: mission_square(ctrl, uav_cfg, mission_cfg),
'search': lambda: mission_search(ctrl, ugv, uav_cfg, mission_cfg),
}
setup_ardupilot(ctrl)
ctrl.wait_for_gps()
runner = missions.get(args.mission)
if runner:
runner()
else:
print(f"[MAIN] Unknown mission: {args.mission}")
print(f"[MAIN] Available: {list(missions.keys())}")
if not ctrl.arm():
print("[UAV] Cannot arm - aborting")
return
print("[MAIN] Mission finished.")
ctrl.takeoff(altitude)
ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30)
results = search.run()
search_results = search.get_results()
print(f"\n{'=' * 50}")
print(f" SEARCH COMPLETE")
print(f"{'=' * 50}")
print(f" Mode: {search_mode}")
print(f" Markers found: {len(results)}")
for marker_id, info in results.items():
pos = info['uav_position']
print(f" ID:{marker_id} at ({pos['x']:.1f}, {pos['y']:.1f}) "
f"distance:{info['distance']:.2f}m")
if search_results.get('landed'):
print(f" Landed on target: YES")
print(f"{'=' * 50}\n")
if not search_results.get('landed'):
ctrl.land()
wait_for_landing(ctrl)
else:
wait_for_landing(ctrl)
print("[MAIN] Done.")
if __name__ == '__main__':

View File

@@ -142,6 +142,21 @@ def main():
proc = CameraProcessor(topics=topics, show_gui=show_gui)
try:
from vision.object_detector import ObjectDetector
detector = ObjectDetector(aruco_dict_name="DICT_4X4_50")
def detection_overlay(camera_name, frame):
detections = detector.detect(frame)
if detections:
annotated = detector.annotate_frame(frame, detections)
proc.frames[camera_name] = annotated
for cam_name in topics:
proc.register_callback(cam_name, detection_overlay)
except Exception as e:
print(f"[CAM] ArUco detection unavailable: {e}")
if show_gui:
proc.spin()
else:

View File

@@ -1,17 +1,42 @@
#!/usr/bin/env python3
"""
Object Detector - ArUco marker and feature detection.
Registers as a callback on CameraProcessor to receive processed frames.
"""
import cv2
import numpy as np
from scipy.spatial.transform import Rotation
ARUCO_DICT = {
"DICT_4X4_50": cv2.aruco.DICT_4X4_50,
"DICT_4X4_100": cv2.aruco.DICT_4X4_100,
"DICT_4X4_250": cv2.aruco.DICT_4X4_250,
"DICT_4X4_1000": cv2.aruco.DICT_4X4_1000,
"DICT_5X5_50": cv2.aruco.DICT_5X5_50,
"DICT_5X5_100": cv2.aruco.DICT_5X5_100,
"DICT_5X5_250": cv2.aruco.DICT_5X5_250,
"DICT_5X5_1000": cv2.aruco.DICT_5X5_1000,
"DICT_6X6_50": cv2.aruco.DICT_6X6_50,
"DICT_6X6_100": cv2.aruco.DICT_6X6_100,
"DICT_6X6_250": cv2.aruco.DICT_6X6_250,
"DICT_6X6_1000": cv2.aruco.DICT_6X6_1000,
"DICT_7X7_50": cv2.aruco.DICT_7X7_50,
"DICT_7X7_100": cv2.aruco.DICT_7X7_100,
"DICT_7X7_250": cv2.aruco.DICT_7X7_250,
"DICT_7X7_1000": cv2.aruco.DICT_7X7_1000,
"DICT_ARUCO_ORIGINAL": cv2.aruco.DICT_ARUCO_ORIGINAL,
"DICT_APRILTAG_16h5": cv2.aruco.DICT_APRILTAG_16h5,
"DICT_APRILTAG_25h9": cv2.aruco.DICT_APRILTAG_25h9,
"DICT_APRILTAG_36h10": cv2.aruco.DICT_APRILTAG_36h10,
"DICT_APRILTAG_36h11": cv2.aruco.DICT_APRILTAG_36h11,
}
W_RES = 640
H_RES = 480
class ObjectDetector:
def __init__(self, marker_size=0.15, camera_matrix=None, detection_method="ArUco"):
self.detection_method = detection_method
CAM_DEG_FOV = 110
def __init__(self, marker_size=0.15, camera_matrix=None,
aruco_dict_name="DICT_4X4_50"):
self.marker_size = marker_size
if camera_matrix is not None:
@@ -22,105 +47,87 @@ class ObjectDetector:
)
self.dist_coeffs = np.zeros(5)
if self.detection_method == "ArUco":
self.aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
self.aruco_params = cv2.aruco.DetectorParameters()
self.aruco_detector = cv2.aruco.ArucoDetector(
self.aruco_dict, self.aruco_params
)
dict_id = ARUCO_DICT.get(aruco_dict_name, cv2.aruco.DICT_6X6_250)
self.aruco_dict = cv2.aruco.getPredefinedDictionary(dict_id)
self.aruco_params = cv2.aruco.DetectorParameters()
self.aruco_detector = cv2.aruco.ArucoDetector(
self.aruco_dict, self.aruco_params
)
self.last_detections = []
self.found_markers = {}
self.on_detection = None
print(f"[DET] Object Detector initialized ({self.detection_method})")
print(f"[DET] ObjectDetector initialized ({aruco_dict_name})")
def detect(self, camera_name, frame):
if self.detection_method == "ArUco":
detections, annotated = self.detect_aruco(frame)
def detect(self, frame):
if frame is None:
return []
if len(frame.shape) == 2:
gray = frame
else:
detections, annotated = self.detect_orb(frame)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
self.last_detections = detections
corners, ids, rejected = self.aruco_detector.detectMarkers(gray)
detections = []
if ids is None:
return detections
rvecs, tvecs, _ = cv2.aruco.estimatePoseSingleMarkers(
corners, self.marker_size, self.camera_matrix, self.dist_coeffs
)
for i, marker_id in enumerate(ids.flatten()):
tvec = tvecs[i][0]
center = np.mean(corners[i][0], axis=0)
detection = {
"id": int(marker_id),
"corners": corners[i][0].tolist(),
"center_px": center.tolist(),
"tvec": tvec.tolist(),
"distance": float(np.linalg.norm(tvec)),
}
detections.append(detection)
if int(marker_id) not in self.found_markers:
self.found_markers[int(marker_id)] = {
"id": int(marker_id),
"times_seen": 0,
}
self.found_markers[int(marker_id)]["times_seen"] += 1
if detections and self.on_detection:
self.on_detection(detections)
if annotated is not None:
cv2.imshow(f"{camera_name} [detections]", annotated)
return detections
def detect_aruco(self, frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
corners, ids, rejected = self.aruco_detector.detectMarkers(gray)
detections = []
def annotate_frame(self, frame, detections):
annotated = frame.copy()
if not detections:
return annotated
if ids is not None:
rvecs, tvecs, _ = cv2.aruco.estimatePoseSingleMarkers(
corners, self.marker_size, self.camera_matrix, self.dist_coeffs
)
for d in detections:
corners_arr = np.array(d["corners"], dtype=np.int32)
for j in range(4):
pt1 = tuple(corners_arr[j])
pt2 = tuple(corners_arr[(j + 1) % 4])
cv2.line(annotated, pt1, pt2, (0, 255, 0), 2)
for i, marker_id in enumerate(ids.flatten()):
tvec = tvecs[i][0]
rvec = rvecs[i][0]
cx = int(d["center_px"][0])
cy = int(d["center_px"][1])
cv2.circle(annotated, (cx, cy), 4, (0, 0, 255), -1)
rotation_matrix, _ = cv2.Rodrigues(rvec)
r = Rotation.from_matrix(rotation_matrix)
quat = r.as_quat()
label = f"ID:{d['id']} d:{d['distance']:.2f}m"
cv2.putText(annotated, label,
(int(corners_arr[0][0]), int(corners_arr[0][1]) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
center = np.mean(corners[i][0], axis=0)
return annotated
detection = {
"id": int(marker_id),
"position": tvec.tolist(),
"orientation_quat": quat.tolist(),
"rvec": rvec.tolist(),
"tvec": tvec.tolist(),
"corners": corners[i][0].tolist(),
"center_px": center.tolist(),
"distance": float(np.linalg.norm(tvec)),
}
detections.append(detection)
def get_found_markers(self):
return self.found_markers
cv2.aruco.drawDetectedMarkers(annotated, corners, ids)
for i in range(len(rvecs)):
cv2.drawFrameAxes(
annotated,
self.camera_matrix,
self.dist_coeffs,
rvecs[i],
tvecs[i],
self.marker_size * 0.5,
)
d = detections[i]
label = f"ID:{d['id']} d:{d['distance']:.2f}m"
pos = (int(d["center_px"][0]), int(d["center_px"][1]) - 10)
cv2.putText(
annotated, label, pos, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2
)
return detections, annotated
def detect_orb(self, frame):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
orb = cv2.ORB_create(nfeatures=500)
keypoints, descriptors = orb.detectAndCompute(gray, None)
detections = []
for kp in keypoints[:50]:
detections.append(
{
"type": "feature",
"center_px": [kp.pt[0], kp.pt[1]],
"size": kp.size,
"response": kp.response,
}
)
annotated = cv2.drawKeypoints(
frame, keypoints[:50], None, color=(0, 255, 0), flags=0
)
return detections, annotated
def reset(self):
self.found_markers = {}

View File

@@ -1,26 +1,24 @@
#!/usr/bin/env python3
"""
Visual Servoing - Vision-based control for precision landing on ArUco marker.
Uses ObjectDetector detections + pymavlink to send velocity commands.
"""
import numpy as np
class VisualServoing:
def __init__(self, controller, target_marker_id=0, desired_distance=1.0):
def __init__(self, controller, target_marker_id=0, land_altitude=0.5):
self.controller = controller
self.target_marker_id = target_marker_id
self.desired_distance = desired_distance
self.land_altitude = land_altitude
self.kp_xy = 0.5
self.kp_z = 0.3
self.max_velocity = 1.0
self.max_velocity = 0.5
self.center_tolerance = 30
self.land_distance = 1.0
self.enabled = False
self.target_acquired = False
self.centered = False
self.image_center = (320, 240)
self.last_detection = None
print(f"[VS] Visual Servoing initialized (target marker: {target_marker_id})")
@@ -33,9 +31,9 @@ class VisualServoing:
self.enabled = False
print("[VS] Disabled")
def on_detections(self, detections):
def process_detections(self, detections):
if not self.enabled:
return
return False
target = None
for d in detections:
@@ -45,32 +43,45 @@ class VisualServoing:
if target is None:
self.target_acquired = False
return
return False
self.target_acquired = True
self.last_detection = target
self.compute_control(target)
return self.compute_control(target)
def compute_control(self, detection):
center_px = detection["center_px"]
distance = detection["distance"]
position = detection["position"]
error_x = self.image_center[0] - center_px[0]
error_y = self.image_center[1] - center_px[1]
error_z = distance - self.desired_distance
self.centered = (abs(error_x) < self.center_tolerance and
abs(error_y) < self.center_tolerance)
if self.centered and distance < self.land_distance:
print(f"\n[VS] Centered and close enough ({distance:.2f}m) - landing")
self.controller.land()
return True
vx = np.clip(self.kp_xy * error_y / 100.0, -self.max_velocity, self.max_velocity)
vy = np.clip(self.kp_xy * error_x / 100.0, -self.max_velocity, self.max_velocity)
vz = np.clip(-self.kp_z * error_z, -self.max_velocity, self.max_velocity)
self.controller.send_velocity(vx, vy, vz)
descend_rate = 0.0
if self.centered:
descend_rate = min(self.kp_z * distance, self.max_velocity)
self.controller.update_state()
target_z = -(self.controller.altitude - descend_rate)
self.controller.move_vel_rel_alt(vx, vy, target_z)
print(
f"\r[VS] Target ID:{detection['id']} "
f"\r[VS] ID:{detection['id']} "
f"d:{distance:.2f}m "
f"err:({error_x:.0f},{error_y:.0f}) "
f"vel:({vx:.2f},{vy:.2f},{vz:.2f})",
f"vel:({vx:.2f},{vy:.2f}) "
f"{'CENTERED' if self.centered else 'ALIGNING'}",
end="",
flush=True,
)
return False