Initial Commit

This commit is contained in:
2026-02-09 03:39:49 +00:00
commit a756be4bf7
71 changed files with 6705 additions and 0 deletions

4
src/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
"""UAV-UGV Simulation Package - GPS-Denied Navigation with Geofencing."""
__version__ = "1.0.0"
__author__ = "Developer"

1
src/control/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Control module for UAV and UGV controllers."""

View File

@@ -0,0 +1,236 @@
#!/usr/bin/env python3
"""Mission Planner - Coordinates missions using local waypoints."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from nav_msgs.msg import Path
from std_msgs.msg import String, Bool
import yaml
from enum import Enum
class MissionState(Enum):
IDLE = 0
LOADING = 1
EXECUTING = 2
PAUSED = 3
COMPLETED = 4
ABORTED = 5
class MissionPlanner(Node):
def __init__(self):
super().__init__('mission_planner')
self.declare_parameter('mission_file', '')
self.mission = []
self.current_action_idx = 0
self.state = MissionState.IDLE
self.uav_goal_reached = False
self.ugv_goal_reached = False
self.command_sub = self.create_subscription(
String, '/mission/command', self.command_callback, 10)
self.uav_status_sub = self.create_subscription(
String, '/uav/controller/status', self.uav_status_callback, 10)
self.ugv_status_sub = self.create_subscription(
String, '/ugv/controller/status', self.ugv_status_callback, 10)
self.uav_reached_sub = self.create_subscription(
Bool, '/uav/waypoint_follower/waypoint_reached', self.uav_reached_callback, 10)
self.ugv_reached_sub = self.create_subscription(
Bool, '/ugv/goal_reached', self.ugv_reached_callback, 10)
self.uav_cmd_pub = self.create_publisher(String, '/uav/controller/command', 10)
self.ugv_cmd_pub = self.create_publisher(String, '/ugv/controller/command', 10)
self.uav_waypoint_pub = self.create_publisher(PoseStamped, '/uav/setpoint_position', 10)
self.ugv_goal_pub = self.create_publisher(PoseStamped, '/ugv/goal_pose', 10)
self.uav_path_pub = self.create_publisher(Path, '/waypoint_follower/set_path', 10)
self.status_pub = self.create_publisher(String, '/mission/status', 10)
self.mission_timer = self.create_timer(0.5, self.mission_loop)
self.get_logger().info('Mission Planner Started - LOCAL coordinates only')
def command_callback(self, msg):
cmd = msg.data.lower().split(':')
action = cmd[0]
if action == 'load':
if len(cmd) > 1:
self.load_mission_file(cmd[1])
else:
self.load_demo_mission()
elif action == 'start':
self.start_mission()
elif action == 'pause':
self.state = MissionState.PAUSED
elif action == 'resume':
self.state = MissionState.EXECUTING
elif action == 'abort':
self.abort_mission()
elif action == 'clear':
self.mission = []
self.current_action_idx = 0
self.state = MissionState.IDLE
def uav_status_callback(self, msg):
pass
def ugv_status_callback(self, msg):
pass
def uav_reached_callback(self, msg):
if msg.data:
self.uav_goal_reached = True
def ugv_reached_callback(self, msg):
if msg.data:
self.ugv_goal_reached = True
def load_demo_mission(self):
self.mission = [
{'type': 'uav_command', 'command': 'arm'},
{'type': 'uav_command', 'command': 'takeoff'},
{'type': 'wait', 'duration': 3.0},
{'type': 'uav_waypoint', 'x': 10.0, 'y': 0.0, 'z': 5.0},
{'type': 'uav_waypoint', 'x': 10.0, 'y': 10.0, 'z': 5.0},
{'type': 'ugv_goal', 'x': 5.0, 'y': 5.0},
{'type': 'wait_for_vehicles'},
{'type': 'uav_waypoint', 'x': 0.0, 'y': 0.0, 'z': 5.0},
{'type': 'ugv_goal', 'x': 0.0, 'y': 0.0},
{'type': 'wait_for_vehicles'},
{'type': 'uav_command', 'command': 'land'},
]
self.current_action_idx = 0
self.state = MissionState.IDLE
self.get_logger().info(f'Demo mission loaded: {len(self.mission)} actions')
def load_mission_file(self, filepath):
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
self.mission = data.get('mission', [])
self.current_action_idx = 0
self.state = MissionState.IDLE
self.get_logger().info(f'Mission loaded from {filepath}')
except Exception as e:
self.get_logger().error(f'Failed to load mission: {e}')
def start_mission(self):
if len(self.mission) == 0:
self.get_logger().warn('No mission loaded')
return
self.current_action_idx = 0
self.state = MissionState.EXECUTING
self.uav_goal_reached = False
self.ugv_goal_reached = False
self.get_logger().info('Mission started')
def abort_mission(self):
self.state = MissionState.ABORTED
stop_msg = String()
stop_msg.data = 'hold'
self.uav_cmd_pub.publish(stop_msg)
stop_msg.data = 'stop'
self.ugv_cmd_pub.publish(stop_msg)
self.get_logger().warn('Mission aborted')
def mission_loop(self):
status_msg = String()
status_msg.data = f'{self.state.name}|{self.current_action_idx}/{len(self.mission)}'
self.status_pub.publish(status_msg)
if self.state != MissionState.EXECUTING:
return
if self.current_action_idx >= len(self.mission):
self.state = MissionState.COMPLETED
self.get_logger().info('Mission completed!')
return
action = self.mission[self.current_action_idx]
completed = self.execute_action(action)
if completed:
self.current_action_idx += 1
self.uav_goal_reached = False
self.ugv_goal_reached = False
def execute_action(self, action):
action_type = action.get('type', '')
if action_type == 'uav_command':
cmd_msg = String()
cmd_msg.data = action['command']
self.uav_cmd_pub.publish(cmd_msg)
return True
elif action_type == 'ugv_command':
cmd_msg = String()
cmd_msg.data = action['command']
self.ugv_cmd_pub.publish(cmd_msg)
return True
elif action_type == 'uav_waypoint':
pose = PoseStamped()
pose.header.stamp = self.get_clock().now().to_msg()
pose.header.frame_id = 'odom'
pose.pose.position.x = float(action['x'])
pose.pose.position.y = float(action['y'])
pose.pose.position.z = float(action['z'])
pose.pose.orientation.w = 1.0
self.uav_waypoint_pub.publish(pose)
return self.uav_goal_reached
elif action_type == 'ugv_goal':
pose = PoseStamped()
pose.header.stamp = self.get_clock().now().to_msg()
pose.header.frame_id = 'odom'
pose.pose.position.x = float(action['x'])
pose.pose.position.y = float(action['y'])
pose.pose.orientation.w = 1.0
self.ugv_goal_pub.publish(pose)
return self.ugv_goal_reached
elif action_type == 'wait':
if not hasattr(self, '_wait_start'):
self._wait_start = self.get_clock().now()
elapsed = (self.get_clock().now() - self._wait_start).nanoseconds / 1e9
if elapsed >= action['duration']:
delattr(self, '_wait_start')
return True
return False
elif action_type == 'wait_for_vehicles':
return self.uav_goal_reached and self.ugv_goal_reached
return True
def main(args=None):
rclpy.init(args=args)
node = MissionPlanner()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,249 @@
#!/usr/bin/env python3
"""UAV Controller - Flight control using local position only."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, TwistStamped, Twist
from mavros_msgs.srv import CommandBool, SetMode, CommandTOL
from mavros_msgs.msg import State
from std_msgs.msg import String, Bool
import numpy as np
from enum import Enum
class FlightState(Enum):
DISARMED = 0
ARMED = 1
TAKING_OFF = 2
HOVERING = 3
NAVIGATING = 4
LANDING = 5
EMERGENCY = 6
class UAVController(Node):
def __init__(self):
super().__init__('uav_controller')
self.declare_parameter('takeoff_altitude', 5.0)
self.declare_parameter('position_tolerance', 0.3)
self.declare_parameter('namespace', '/uav')
self.takeoff_altitude = self.get_parameter('takeoff_altitude').value
self.position_tolerance = self.get_parameter('position_tolerance').value
ns = self.get_parameter('namespace').value
self.state = FlightState.DISARMED
self.mavros_state = None
self.current_pose = None
self.target_pose = None
self.home_position = None
self.state_sub = self.create_subscription(
State, f'{ns}/mavros/state', self.state_callback, 10)
self.local_pose_sub = self.create_subscription(
PoseStamped, f'{ns}/mavros/local_position/pose', self.pose_callback, 10)
self.cmd_sub = self.create_subscription(
String, f'{ns}/controller/command', self.command_callback, 10)
self.setpoint_sub = self.create_subscription(
PoseStamped, f'{ns}/setpoint_position', self.setpoint_callback, 10)
self.vel_sub = self.create_subscription(
Twist, f'{ns}/cmd_vel_safe', self.velocity_callback, 10)
self.setpoint_pub = self.create_publisher(
PoseStamped, f'{ns}/mavros/setpoint_position/local', 10)
self.vel_pub = self.create_publisher(
TwistStamped, f'{ns}/mavros/setpoint_velocity/cmd_vel', 10)
self.status_pub = self.create_publisher(
String, f'{ns}/controller/status', 10)
self.arming_client = self.create_client(CommandBool, f'{ns}/mavros/cmd/arming')
self.set_mode_client = self.create_client(SetMode, f'{ns}/mavros/set_mode')
self.takeoff_client = self.create_client(CommandTOL, f'{ns}/mavros/cmd/takeoff')
self.land_client = self.create_client(CommandTOL, f'{ns}/mavros/cmd/land')
self.setpoint_timer = self.create_timer(0.05, self.publish_setpoint)
self.status_timer = self.create_timer(0.5, self.publish_status)
self.get_logger().info('UAV Controller Started - GPS-denied mode')
def state_callback(self, msg):
self.mavros_state = msg
if msg.armed and self.state == FlightState.DISARMED:
self.state = FlightState.ARMED
def pose_callback(self, msg):
self.current_pose = msg
if self.home_position is None and self.mavros_state and self.mavros_state.armed:
self.home_position = np.array([
msg.pose.position.x,
msg.pose.position.y,
msg.pose.position.z
])
self.get_logger().info(f'Home position set: {self.home_position}')
def setpoint_callback(self, msg):
self.target_pose = msg
def velocity_callback(self, msg):
if self.state not in [FlightState.NAVIGATING, FlightState.HOVERING]:
return
vel_msg = TwistStamped()
vel_msg.header.stamp = self.get_clock().now().to_msg()
vel_msg.header.frame_id = 'base_link'
vel_msg.twist = msg
self.vel_pub.publish(vel_msg)
def command_callback(self, msg):
cmd = msg.data.lower()
if cmd == 'arm':
self.arm()
elif cmd == 'disarm':
self.disarm()
elif cmd == 'takeoff':
self.takeoff()
elif cmd == 'land':
self.land()
elif cmd == 'rtl':
self.return_to_launch()
elif cmd == 'hold':
self.hold_position()
elif cmd == 'guided':
self.set_mode('GUIDED')
def arm(self):
if not self.arming_client.wait_for_service(timeout_sec=5.0):
self.get_logger().error('Arming service not available')
return False
req = CommandBool.Request()
req.value = True
future = self.arming_client.call_async(req)
future.add_done_callback(self.arm_response)
return True
def arm_response(self, future):
try:
result = future.result()
if result.success:
self.state = FlightState.ARMED
self.get_logger().info('Vehicle armed')
else:
self.get_logger().error('Arming failed')
except Exception as e:
self.get_logger().error(f'Arming error: {e}')
def disarm(self):
req = CommandBool.Request()
req.value = False
future = self.arming_client.call_async(req)
self.state = FlightState.DISARMED
def set_mode(self, mode):
if not self.set_mode_client.wait_for_service(timeout_sec=5.0):
return False
req = SetMode.Request()
req.custom_mode = mode
future = self.set_mode_client.call_async(req)
return True
def takeoff(self):
self.set_mode('GUIDED')
self.arm()
if self.current_pose is not None:
self.target_pose = PoseStamped()
self.target_pose.header.frame_id = 'odom'
self.target_pose.pose.position.x = self.current_pose.pose.position.x
self.target_pose.pose.position.y = self.current_pose.pose.position.y
self.target_pose.pose.position.z = self.takeoff_altitude
self.target_pose.pose.orientation.w = 1.0
self.state = FlightState.TAKING_OFF
self.get_logger().info(f'Taking off to {self.takeoff_altitude}m')
def land(self):
if self.current_pose is not None:
self.target_pose = PoseStamped()
self.target_pose.header.frame_id = 'odom'
self.target_pose.pose.position.x = self.current_pose.pose.position.x
self.target_pose.pose.position.y = self.current_pose.pose.position.y
self.target_pose.pose.position.z = 0.0
self.target_pose.pose.orientation.w = 1.0
self.state = FlightState.LANDING
self.get_logger().info('Landing')
def return_to_launch(self):
if self.home_position is not None:
self.target_pose = PoseStamped()
self.target_pose.header.frame_id = 'odom'
self.target_pose.pose.position.x = float(self.home_position[0])
self.target_pose.pose.position.y = float(self.home_position[1])
self.target_pose.pose.position.z = self.takeoff_altitude
self.target_pose.pose.orientation.w = 1.0
self.state = FlightState.NAVIGATING
self.get_logger().info('Returning to local home position')
def hold_position(self):
if self.current_pose is not None:
self.target_pose = PoseStamped()
self.target_pose.header = self.current_pose.header
self.target_pose.pose = self.current_pose.pose
self.state = FlightState.HOVERING
def publish_setpoint(self):
if self.target_pose is None:
return
self.target_pose.header.stamp = self.get_clock().now().to_msg()
self.setpoint_pub.publish(self.target_pose)
if self.current_pose is not None:
error = np.array([
self.target_pose.pose.position.x - self.current_pose.pose.position.x,
self.target_pose.pose.position.y - self.current_pose.pose.position.y,
self.target_pose.pose.position.z - self.current_pose.pose.position.z
])
distance = np.linalg.norm(error)
if self.state == FlightState.TAKING_OFF and distance < self.position_tolerance:
self.state = FlightState.HOVERING
self.get_logger().info('Takeoff complete')
elif self.state == FlightState.LANDING and self.current_pose.pose.position.z < 0.2:
self.state = FlightState.ARMED
self.get_logger().info('Landing complete')
def publish_status(self):
status_msg = String()
armed = 'ARMED' if (self.mavros_state and self.mavros_state.armed) else 'DISARMED'
mode = self.mavros_state.mode if self.mavros_state else 'UNKNOWN'
status_msg.data = f'{self.state.name}|{armed}|{mode}'
self.status_pub.publish(status_msg)
def main(args=None):
rclpy.init(args=args)
node = UAVController()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,198 @@
#!/usr/bin/env python3
"""UGV Controller - Ground vehicle control using local position."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, PoseStamped
from nav_msgs.msg import Odometry
from std_msgs.msg import String, Bool
import numpy as np
from enum import Enum
class UGVState(Enum):
IDLE = 0
MOVING = 1
ROTATING = 2
STOPPED = 3
class UGVController(Node):
def __init__(self):
super().__init__('ugv_controller')
self.declare_parameter('max_linear_velocity', 1.0)
self.declare_parameter('max_angular_velocity', 1.5)
self.declare_parameter('kp_linear', 0.8)
self.declare_parameter('kp_angular', 1.5)
self.declare_parameter('position_tolerance', 0.3)
self.declare_parameter('angle_tolerance', 0.1)
self.max_linear_vel = self.get_parameter('max_linear_velocity').value
self.max_angular_vel = self.get_parameter('max_angular_velocity').value
self.kp_linear = self.get_parameter('kp_linear').value
self.kp_angular = self.get_parameter('kp_angular').value
self.position_tolerance = self.get_parameter('position_tolerance').value
self.angle_tolerance = self.get_parameter('angle_tolerance').value
self.state = UGVState.IDLE
self.current_odom = None
self.target_pose = None
self.home_position = None
self.odom_sub = self.create_subscription(
Odometry, '/ugv/odom', self.odom_callback, 10)
self.goal_sub = self.create_subscription(
PoseStamped, '/ugv/goal_pose', self.goal_callback, 10)
self.cmd_sub = self.create_subscription(
String, '/ugv/controller/command', self.command_callback, 10)
self.vel_sub = self.create_subscription(
Twist, '/ugv/cmd_vel_safe', self.velocity_callback, 10)
self.cmd_vel_pub = self.create_publisher(Twist, '/ugv/cmd_vel', 10)
self.status_pub = self.create_publisher(String, '/ugv/controller/status', 10)
self.goal_reached_pub = self.create_publisher(Bool, '/ugv/goal_reached', 10)
self.control_timer = self.create_timer(0.05, self.control_loop)
self.status_timer = self.create_timer(0.5, self.publish_status)
self.get_logger().info('UGV Controller Started - GPS-denied mode')
def odom_callback(self, msg):
self.current_odom = msg
if self.home_position is None:
self.home_position = np.array([
msg.pose.pose.position.x,
msg.pose.pose.position.y
])
self.get_logger().info(f'Home position set: {self.home_position}')
def goal_callback(self, msg):
self.target_pose = msg
self.state = UGVState.MOVING
self.get_logger().info(
f'Goal received: [{msg.pose.position.x:.2f}, {msg.pose.position.y:.2f}]')
def command_callback(self, msg):
cmd = msg.data.lower()
if cmd == 'stop':
self.stop()
self.state = UGVState.STOPPED
elif cmd == 'resume':
self.state = UGVState.MOVING
elif cmd == 'rtl':
self.return_to_launch()
def velocity_callback(self, msg):
if self.state == UGVState.MOVING:
self.cmd_vel_pub.publish(msg)
def control_loop(self):
if self.state != UGVState.MOVING:
return
if self.current_odom is None or self.target_pose is None:
return
current_pos = np.array([
self.current_odom.pose.pose.position.x,
self.current_odom.pose.pose.position.y
])
target_pos = np.array([
self.target_pose.pose.position.x,
self.target_pose.pose.position.y
])
error = target_pos - current_pos
distance = np.linalg.norm(error)
if distance < self.position_tolerance:
self.stop()
self.state = UGVState.IDLE
reached_msg = Bool()
reached_msg.data = True
self.goal_reached_pub.publish(reached_msg)
self.get_logger().info('Goal reached!')
return
target_angle = np.arctan2(error[1], error[0])
quat = self.current_odom.pose.pose.orientation
current_yaw = np.arctan2(
2.0 * (quat.w * quat.z + quat.x * quat.y),
1.0 - 2.0 * (quat.y * quat.y + quat.z * quat.z)
)
angle_error = self.normalize_angle(target_angle - current_yaw)
cmd = Twist()
if abs(angle_error) > self.angle_tolerance:
cmd.angular.z = np.clip(
self.kp_angular * angle_error,
-self.max_angular_vel,
self.max_angular_vel
)
if abs(angle_error) < np.pi / 4:
speed = self.kp_linear * distance * (1.0 - abs(angle_error) / np.pi)
cmd.linear.x = np.clip(speed, 0.0, self.max_linear_vel)
else:
cmd.linear.x = np.clip(
self.kp_linear * distance,
0.0,
self.max_linear_vel
)
self.cmd_vel_pub.publish(cmd)
def stop(self):
cmd = Twist()
self.cmd_vel_pub.publish(cmd)
def return_to_launch(self):
if self.home_position is not None:
self.target_pose = PoseStamped()
self.target_pose.header.frame_id = 'odom'
self.target_pose.pose.position.x = float(self.home_position[0])
self.target_pose.pose.position.y = float(self.home_position[1])
self.target_pose.pose.orientation.w = 1.0
self.state = UGVState.MOVING
self.get_logger().info('Returning to local home')
def normalize_angle(self, angle):
while angle > np.pi:
angle -= 2 * np.pi
while angle < -np.pi:
angle += 2 * np.pi
return angle
def publish_status(self):
status_msg = String()
status_msg.data = self.state.name
self.status_pub.publish(status_msg)
def main(args=None):
rclpy.init(args=args)
node = UGVController()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1 @@
"""Localization module for sensor fusion and position estimation."""

View File

@@ -0,0 +1,202 @@
#!/usr/bin/env python3
"""EKF Fusion - Extended Kalman Filter for sensor fusion."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, TwistStamped, PoseWithCovarianceStamped
from sensor_msgs.msg import Imu
from nav_msgs.msg import Odometry
import numpy as np
from filterpy.kalman import ExtendedKalmanFilter
class EKFFusionNode(Node):
def __init__(self):
super().__init__('ekf_fusion_node')
self.declare_parameter('process_noise_pos', 0.1)
self.declare_parameter('process_noise_vel', 0.5)
self.declare_parameter('measurement_noise_vo', 0.05)
self.declare_parameter('measurement_noise_of', 0.1)
self.declare_parameter('update_rate', 50.0)
self.ekf = ExtendedKalmanFilter(dim_x=9, dim_z=6)
self.ekf.x = np.zeros(9)
self.ekf.P = np.eye(9) * 1.0
process_noise_pos = self.get_parameter('process_noise_pos').value
process_noise_vel = self.get_parameter('process_noise_vel').value
self.ekf.Q = np.diag([
process_noise_pos, process_noise_pos, process_noise_pos,
process_noise_vel, process_noise_vel, process_noise_vel,
0.1, 0.1, 0.1
])
meas_noise_vo = self.get_parameter('measurement_noise_vo').value
meas_noise_of = self.get_parameter('measurement_noise_of').value
self.ekf.R = np.diag([
meas_noise_vo, meas_noise_vo, meas_noise_vo,
meas_noise_of, meas_noise_of, meas_noise_of
])
self.prev_time = None
self.orientation = np.array([0.0, 0.0, 0.0, 1.0])
self.vo_sub = self.create_subscription(
PoseStamped, '/uav/visual_odometry/pose', self.vo_callback, 10)
self.of_sub = self.create_subscription(
TwistStamped, '/uav/optical_flow/velocity', self.of_callback, 10)
self.imu_sub = self.create_subscription(
Imu, '/uav/imu/data', self.imu_callback, 10)
self.pose_pub = self.create_publisher(
PoseWithCovarianceStamped, '/uav/ekf/pose', 10)
self.odom_pub = self.create_publisher(
Odometry, '/uav/ekf/odom', 10)
update_rate = self.get_parameter('update_rate').value
self.timer = self.create_timer(1.0 / update_rate, self.predict_step)
self.get_logger().info('EKF Fusion Node Started')
def state_transition(self, x, dt):
F = np.eye(9)
F[0, 3] = dt
F[1, 4] = dt
F[2, 5] = dt
F[3, 6] = dt
F[4, 7] = dt
F[5, 8] = dt
return F @ x
def jacobian_F(self, dt):
F = np.eye(9)
F[0, 3] = dt
F[1, 4] = dt
F[2, 5] = dt
F[3, 6] = dt
F[4, 7] = dt
F[5, 8] = dt
return F
def predict_step(self):
current_time = self.get_clock().now()
if self.prev_time is not None:
dt = (current_time.nanoseconds - self.prev_time.nanoseconds) / 1e9
if dt <= 0 or dt > 1.0:
dt = 0.02
else:
dt = 0.02
self.prev_time = current_time
F = self.jacobian_F(dt)
self.ekf.F = F
self.ekf.predict()
self.publish_state()
def vo_callback(self, msg):
z = np.array([
msg.pose.position.x,
msg.pose.position.y,
msg.pose.position.z,
0.0, 0.0, 0.0
])
self.orientation = np.array([
msg.pose.orientation.x,
msg.pose.orientation.y,
msg.pose.orientation.z,
msg.pose.orientation.w
])
H = np.zeros((6, 9))
H[0, 0] = 1.0
H[1, 1] = 1.0
H[2, 2] = 1.0
self.ekf.update(z, HJacobian=lambda x: H, Hx=lambda x: H @ x)
def of_callback(self, msg):
z = np.array([
self.ekf.x[0],
self.ekf.x[1],
self.ekf.x[2],
msg.twist.linear.x,
msg.twist.linear.y,
0.0
])
H = np.zeros((6, 9))
H[0, 0] = 1.0
H[1, 1] = 1.0
H[2, 2] = 1.0
H[3, 3] = 1.0
H[4, 4] = 1.0
self.ekf.update(z, HJacobian=lambda x: H, Hx=lambda x: H @ x)
def imu_callback(self, msg):
accel = np.array([
msg.linear_acceleration.x,
msg.linear_acceleration.y,
msg.linear_acceleration.z - 9.81
])
self.ekf.x[6:9] = accel
def publish_state(self):
pose_msg = PoseWithCovarianceStamped()
pose_msg.header.stamp = self.get_clock().now().to_msg()
pose_msg.header.frame_id = 'odom'
pose_msg.pose.pose.position.x = float(self.ekf.x[0])
pose_msg.pose.pose.position.y = float(self.ekf.x[1])
pose_msg.pose.pose.position.z = float(self.ekf.x[2])
pose_msg.pose.pose.orientation.x = float(self.orientation[0])
pose_msg.pose.pose.orientation.y = float(self.orientation[1])
pose_msg.pose.pose.orientation.z = float(self.orientation[2])
pose_msg.pose.pose.orientation.w = float(self.orientation[3])
cov = self.ekf.P[:6, :6].flatten().tolist()
pose_msg.pose.covariance = cov
self.pose_pub.publish(pose_msg)
odom_msg = Odometry()
odom_msg.header = pose_msg.header
odom_msg.child_frame_id = 'base_link'
odom_msg.pose = pose_msg.pose
odom_msg.twist.twist.linear.x = float(self.ekf.x[3])
odom_msg.twist.twist.linear.y = float(self.ekf.x[4])
odom_msg.twist.twist.linear.z = float(self.ekf.x[5])
self.odom_pub.publish(odom_msg)
def main(args=None):
rclpy.init(args=args)
node = EKFFusionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python3
"""Landmark Tracker - Tracks visual landmarks for position correction."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseArray, PoseStamped, Point
from visualization_msgs.msg import Marker, MarkerArray
from std_msgs.msg import Float32MultiArray
import numpy as np
class LandmarkTracker(Node):
def __init__(self):
super().__init__('landmark_tracker')
self.declare_parameter('max_landmarks', 50)
self.declare_parameter('matching_threshold', 0.5)
self.declare_parameter('landmark_timeout', 5.0)
self.max_landmarks = self.get_parameter('max_landmarks').value
self.matching_threshold = self.get_parameter('matching_threshold').value
self.landmark_timeout = self.get_parameter('landmark_timeout').value
self.landmarks = {}
self.landmark_id_counter = 0
self.current_pose = None
self.detection_sub = self.create_subscription(
PoseArray, '/uav/landmarks/poses', self.detection_callback, 10)
self.marker_ids_sub = self.create_subscription(
Float32MultiArray, '/uav/landmarks/ids', self.marker_ids_callback, 10)
self.pose_sub = self.create_subscription(
PoseStamped, '/uav/local_position/pose', self.pose_callback, 10)
self.landmark_map_pub = self.create_publisher(
MarkerArray, '/landmarks/map', 10)
self.position_correction_pub = self.create_publisher(
PoseStamped, '/landmarks/position_correction', 10)
self.timer = self.create_timer(1.0, self.cleanup_landmarks)
self.viz_timer = self.create_timer(0.5, self.publish_landmark_map)
self.get_logger().info('Landmark Tracker Started')
def pose_callback(self, msg):
self.current_pose = msg
def marker_ids_callback(self, msg):
pass
def detection_callback(self, msg):
if self.current_pose is None:
return
robot_pos = np.array([
self.current_pose.pose.position.x,
self.current_pose.pose.position.y,
self.current_pose.pose.position.z
])
for pose in msg.poses:
local_pos = np.array([pose.position.x, pose.position.y, pose.position.z])
global_pos = robot_pos + local_pos
matched_id = self.match_landmark(global_pos)
if matched_id is not None:
self.update_landmark(matched_id, global_pos)
else:
self.add_landmark(global_pos)
def match_landmark(self, position):
for lm_id, lm_data in self.landmarks.items():
distance = np.linalg.norm(position - lm_data['position'])
if distance < self.matching_threshold:
return lm_id
return None
def add_landmark(self, position):
if len(self.landmarks) >= self.max_landmarks:
oldest_id = min(self.landmarks.keys(),
key=lambda k: self.landmarks[k]['last_seen'])
del self.landmarks[oldest_id]
self.landmarks[self.landmark_id_counter] = {
'position': position.copy(),
'observations': 1,
'last_seen': self.get_clock().now(),
'covariance': np.eye(3) * 0.5
}
self.landmark_id_counter += 1
def update_landmark(self, lm_id, position):
lm = self.landmarks[lm_id]
alpha = 1.0 / (lm['observations'] + 1)
lm['position'] = (1 - alpha) * lm['position'] + alpha * position
lm['observations'] += 1
lm['last_seen'] = self.get_clock().now()
lm['covariance'] *= 0.95
def cleanup_landmarks(self):
current_time = self.get_clock().now()
expired = []
for lm_id, lm_data in self.landmarks.items():
dt = (current_time - lm_data['last_seen']).nanoseconds / 1e9
if dt > self.landmark_timeout:
expired.append(lm_id)
for lm_id in expired:
del self.landmarks[lm_id]
def publish_landmark_map(self):
marker_array = MarkerArray()
for lm_id, lm_data in self.landmarks.items():
marker = Marker()
marker.header.stamp = self.get_clock().now().to_msg()
marker.header.frame_id = 'odom'
marker.ns = 'landmarks'
marker.id = lm_id
marker.type = Marker.SPHERE
marker.action = Marker.ADD
marker.pose.position.x = float(lm_data['position'][0])
marker.pose.position.y = float(lm_data['position'][1])
marker.pose.position.z = float(lm_data['position'][2])
marker.pose.orientation.w = 1.0
scale = 0.1 + 0.02 * min(lm_data['observations'], 20)
marker.scale.x = scale
marker.scale.y = scale
marker.scale.z = scale
marker.color.r = 0.0
marker.color.g = 0.8
marker.color.b = 0.2
marker.color.a = 0.8
marker_array.markers.append(marker)
self.landmark_map_pub.publish(marker_array)
def main(args=None):
rclpy.init(args=args)
node = LandmarkTracker()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""Position Estimator - Fuses sensors for local position estimate."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, TwistStamped, PoseWithCovarianceStamped
from sensor_msgs.msg import Imu
from nav_msgs.msg import Odometry
import numpy as np
from scipy.spatial.transform import Rotation
class PositionEstimator(Node):
def __init__(self):
super().__init__('position_estimator')
self.declare_parameter('fusion_method', 'weighted_average')
self.declare_parameter('vo_weight', 0.6)
self.declare_parameter('optical_flow_weight', 0.3)
self.declare_parameter('imu_weight', 0.1)
self.declare_parameter('update_rate', 50.0)
self.fusion_method = self.get_parameter('fusion_method').value
self.vo_weight = self.get_parameter('vo_weight').value
self.of_weight = self.get_parameter('optical_flow_weight').value
self.imu_weight = self.get_parameter('imu_weight').value
self.position = np.zeros(3)
self.velocity = np.zeros(3)
self.orientation = np.array([0.0, 0.0, 0.0, 1.0])
self.vo_pose = None
self.of_velocity = None
self.imu_data = None
self.prev_time = None
self.vo_sub = self.create_subscription(
PoseStamped, '/uav/visual_odometry/pose', self.vo_callback, 10)
self.of_sub = self.create_subscription(
TwistStamped, '/uav/optical_flow/velocity', self.of_callback, 10)
self.imu_sub = self.create_subscription(
Imu, '/uav/imu/data', self.imu_callback, 10)
self.pose_pub = self.create_publisher(
PoseWithCovarianceStamped, '/uav/local_position/pose', 10)
self.odom_pub = self.create_publisher(
Odometry, '/uav/local_position/odom', 10)
update_rate = self.get_parameter('update_rate').value
self.timer = self.create_timer(1.0 / update_rate, self.update_estimate)
self.get_logger().info(f'Position Estimator Started - {self.fusion_method}')
def vo_callback(self, msg):
self.vo_pose = msg
def of_callback(self, msg):
self.of_velocity = msg
def imu_callback(self, msg):
self.imu_data = msg
def update_estimate(self):
current_time = self.get_clock().now()
if self.prev_time is not None:
dt = (current_time.nanoseconds - self.prev_time.nanoseconds) / 1e9
if dt <= 0:
dt = 0.02
else:
dt = 0.02
self.prev_time = current_time
if self.fusion_method == 'weighted_average':
self.weighted_average_fusion(dt)
else:
self.simple_fusion(dt)
self.publish_estimate()
def weighted_average_fusion(self, dt):
total_weight = 0.0
weighted_pos = np.zeros(3)
weighted_vel = np.zeros(3)
if self.vo_pose is not None:
vo_pos = np.array([
self.vo_pose.pose.position.x,
self.vo_pose.pose.position.y,
self.vo_pose.pose.position.z
])
weighted_pos += self.vo_weight * vo_pos
total_weight += self.vo_weight
self.orientation = np.array([
self.vo_pose.pose.orientation.x,
self.vo_pose.pose.orientation.y,
self.vo_pose.pose.orientation.z,
self.vo_pose.pose.orientation.w
])
if self.of_velocity is not None:
of_vel = np.array([
self.of_velocity.twist.linear.x,
self.of_velocity.twist.linear.y,
0.0
])
weighted_vel += self.of_weight * of_vel
if self.imu_data is not None:
imu_accel = np.array([
self.imu_data.linear_acceleration.x,
self.imu_data.linear_acceleration.y,
self.imu_data.linear_acceleration.z - 9.81
])
self.velocity += imu_accel * dt * self.imu_weight
if total_weight > 0:
self.position = weighted_pos / total_weight
if self.of_velocity is not None:
self.velocity = weighted_vel
def simple_fusion(self, dt):
if self.vo_pose is not None:
self.position = np.array([
self.vo_pose.pose.position.x,
self.vo_pose.pose.position.y,
self.vo_pose.pose.position.z
])
def publish_estimate(self):
pose_msg = PoseWithCovarianceStamped()
pose_msg.header.stamp = self.get_clock().now().to_msg()
pose_msg.header.frame_id = 'odom'
pose_msg.pose.pose.position.x = float(self.position[0])
pose_msg.pose.pose.position.y = float(self.position[1])
pose_msg.pose.pose.position.z = float(self.position[2])
pose_msg.pose.pose.orientation.x = float(self.orientation[0])
pose_msg.pose.pose.orientation.y = float(self.orientation[1])
pose_msg.pose.pose.orientation.z = float(self.orientation[2])
pose_msg.pose.pose.orientation.w = float(self.orientation[3])
pose_msg.pose.covariance = [
0.1, 0, 0, 0, 0, 0,
0, 0.1, 0, 0, 0, 0,
0, 0, 0.1, 0, 0, 0,
0, 0, 0, 0.05, 0, 0,
0, 0, 0, 0, 0.05, 0,
0, 0, 0, 0, 0, 0.05
]
self.pose_pub.publish(pose_msg)
odom_msg = Odometry()
odom_msg.header = pose_msg.header
odom_msg.child_frame_id = 'base_link'
odom_msg.pose = pose_msg.pose
odom_msg.twist.twist.linear.x = float(self.velocity[0])
odom_msg.twist.twist.linear.y = float(self.velocity[1])
odom_msg.twist.twist.linear.z = float(self.velocity[2])
self.odom_pub.publish(odom_msg)
def main(args=None):
rclpy.init(args=args)
node = PositionEstimator()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1 @@
"""Navigation module for local path planning and waypoint following."""

View File

@@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""Local Planner - Path planning using relative coordinates only."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, Twist, Point
from nav_msgs.msg import Path, Odometry
from visualization_msgs.msg import Marker
import numpy as np
class LocalPlanner(Node):
def __init__(self):
super().__init__('local_planner')
self.declare_parameter('max_velocity', 2.0)
self.declare_parameter('max_acceleration', 1.0)
self.declare_parameter('lookahead_distance', 1.0)
self.declare_parameter('goal_tolerance', 0.3)
self.max_velocity = self.get_parameter('max_velocity').value
self.max_acceleration = self.get_parameter('max_acceleration').value
self.lookahead_distance = self.get_parameter('lookahead_distance').value
self.goal_tolerance = self.get_parameter('goal_tolerance').value
self.current_pose = None
self.current_velocity = np.zeros(3)
self.path = []
self.current_waypoint_idx = 0
self.odom_sub = self.create_subscription(
Odometry, '/uav/local_position/odom', self.odom_callback, 10)
self.goal_sub = self.create_subscription(
PoseStamped, '/uav/goal_pose', self.goal_callback, 10)
self.path_sub = self.create_subscription(
Path, '/uav/planned_path', self.path_callback, 10)
self.cmd_vel_pub = self.create_publisher(Twist, '/uav/cmd_vel', 10)
self.setpoint_pub = self.create_publisher(PoseStamped, '/uav/setpoint_position', 10)
self.path_viz_pub = self.create_publisher(Path, '/uav/local_path', 10)
self.timer = self.create_timer(0.05, self.control_loop)
self.get_logger().info('Local Planner Started - GPS-denied navigation')
def odom_callback(self, msg):
self.current_pose = msg.pose.pose
self.current_velocity = np.array([
msg.twist.twist.linear.x,
msg.twist.twist.linear.y,
msg.twist.twist.linear.z
])
def goal_callback(self, msg):
goal = np.array([
msg.pose.position.x,
msg.pose.position.y,
msg.pose.position.z
])
self.path = [goal]
self.current_waypoint_idx = 0
self.get_logger().info(f'New goal received: [{goal[0]:.2f}, {goal[1]:.2f}, {goal[2]:.2f}]')
def path_callback(self, msg):
self.path = []
for pose_stamped in msg.poses:
waypoint = np.array([
pose_stamped.pose.position.x,
pose_stamped.pose.position.y,
pose_stamped.pose.position.z
])
self.path.append(waypoint)
self.current_waypoint_idx = 0
self.get_logger().info(f'Path received with {len(self.path)} waypoints')
def control_loop(self):
if self.current_pose is None or len(self.path) == 0:
return
if self.current_waypoint_idx >= len(self.path):
self.stop()
return
current_pos = np.array([
self.current_pose.position.x,
self.current_pose.position.y,
self.current_pose.position.z
])
target = self.path[self.current_waypoint_idx]
distance = np.linalg.norm(target - current_pos)
if distance < self.goal_tolerance:
self.current_waypoint_idx += 1
if self.current_waypoint_idx >= len(self.path):
self.get_logger().info('Path complete!')
self.stop()
return
target = self.path[self.current_waypoint_idx]
direction = target - current_pos
distance = np.linalg.norm(direction)
if distance > 0:
direction = direction / distance
speed = min(self.max_velocity, distance * 0.5)
velocity = direction * speed
self.publish_command(velocity, target)
def publish_command(self, velocity, target):
cmd = Twist()
cmd.linear.x = float(velocity[0])
cmd.linear.y = float(velocity[1])
cmd.linear.z = float(velocity[2])
self.cmd_vel_pub.publish(cmd)
setpoint = PoseStamped()
setpoint.header.stamp = self.get_clock().now().to_msg()
setpoint.header.frame_id = 'odom'
setpoint.pose.position.x = float(target[0])
setpoint.pose.position.y = float(target[1])
setpoint.pose.position.z = float(target[2])
setpoint.pose.orientation.w = 1.0
self.setpoint_pub.publish(setpoint)
def stop(self):
cmd = Twist()
self.cmd_vel_pub.publish(cmd)
def main(args=None):
rclpy.init(args=args)
node = LocalPlanner()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""Obstacle Avoidance - Vision-based obstacle detection and avoidance."""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, LaserScan
from geometry_msgs.msg import Twist, TwistStamped, Vector3
from std_msgs.msg import Bool
from cv_bridge import CvBridge
import cv2
import numpy as np
class ObstacleAvoidance(Node):
def __init__(self):
super().__init__('obstacle_avoidance')
self.bridge = CvBridge()
self.declare_parameter('detection_range', 5.0)
self.declare_parameter('safety_margin', 1.0)
self.declare_parameter('avoidance_gain', 1.0)
self.declare_parameter('enable', True)
self.detection_range = self.get_parameter('detection_range').value
self.safety_margin = self.get_parameter('safety_margin').value
self.avoidance_gain = self.get_parameter('avoidance_gain').value
self.enabled = self.get_parameter('enable').value
self.obstacle_detected = False
self.obstacle_direction = np.zeros(3)
self.obstacle_distance = float('inf')
self.depth_sub = self.create_subscription(
Image, '/uav/camera/depth/image_raw', self.depth_callback, 10)
self.scan_sub = self.create_subscription(
LaserScan, '/uav/scan', self.scan_callback, 10)
self.cmd_vel_sub = self.create_subscription(
Twist, '/uav/cmd_vel', self.cmd_vel_callback, 10)
self.enable_sub = self.create_subscription(
Bool, '/obstacle_avoidance/enable', self.enable_callback, 10)
self.cmd_vel_pub = self.create_publisher(Twist, '/uav/cmd_vel_safe', 10)
self.obstacle_pub = self.create_publisher(Bool, '/obstacle_avoidance/obstacle_detected', 10)
self.avoidance_vec_pub = self.create_publisher(Vector3, '/obstacle_avoidance/avoidance_vector', 10)
self.current_cmd = Twist()
self.timer = self.create_timer(0.05, self.avoidance_loop)
self.get_logger().info('Obstacle Avoidance Node Started')
def enable_callback(self, msg):
self.enabled = msg.data
def depth_callback(self, msg):
try:
depth_image = self.bridge.imgmsg_to_cv2(msg, 'passthrough')
height, width = depth_image.shape
center_region = depth_image[height//3:2*height//3, width//3:2*width//3]
valid_depths = center_region[center_region > 0]
if len(valid_depths) > 0:
min_distance = np.min(valid_depths)
if min_distance < self.detection_range:
self.obstacle_detected = True
self.obstacle_distance = min_distance
left_region = depth_image[:, :width//3]
right_region = depth_image[:, 2*width//3:]
left_clear = np.mean(left_region[left_region > 0]) if np.any(left_region > 0) else float('inf')
right_clear = np.mean(right_region[right_region > 0]) if np.any(right_region > 0) else float('inf')
if left_clear > right_clear:
self.obstacle_direction = np.array([0.0, 1.0, 0.0])
else:
self.obstacle_direction = np.array([0.0, -1.0, 0.0])
else:
self.obstacle_detected = False
self.obstacle_distance = float('inf')
except Exception as e:
self.get_logger().error(f'Depth processing error: {e}')
def scan_callback(self, msg):
ranges = np.array(msg.ranges)
valid_ranges = ranges[np.isfinite(ranges)]
if len(valid_ranges) > 0:
min_range = np.min(valid_ranges)
min_idx = np.argmin(ranges)
if min_range < self.detection_range:
self.obstacle_detected = True
self.obstacle_distance = min_range
angle = msg.angle_min + min_idx * msg.angle_increment
self.obstacle_direction = np.array([
-np.cos(angle),
-np.sin(angle),
0.0
])
else:
self.obstacle_detected = False
def cmd_vel_callback(self, msg):
self.current_cmd = msg
def avoidance_loop(self):
obstacle_msg = Bool()
obstacle_msg.data = self.obstacle_detected
self.obstacle_pub.publish(obstacle_msg)
if not self.enabled:
self.cmd_vel_pub.publish(self.current_cmd)
return
safe_cmd = Twist()
if self.obstacle_detected and self.obstacle_distance < self.safety_margin:
repulsion = self.avoidance_gain * (self.safety_margin / self.obstacle_distance)
avoidance = self.obstacle_direction * repulsion
safe_cmd.linear.x = self.current_cmd.linear.x + float(avoidance[0])
safe_cmd.linear.y = self.current_cmd.linear.y + float(avoidance[1])
safe_cmd.linear.z = self.current_cmd.linear.z + float(avoidance[2])
safe_cmd.angular = self.current_cmd.angular
vec_msg = Vector3()
vec_msg.x = float(avoidance[0])
vec_msg.y = float(avoidance[1])
vec_msg.z = float(avoidance[2])
self.avoidance_vec_pub.publish(vec_msg)
else:
safe_cmd = self.current_cmd
self.cmd_vel_pub.publish(safe_cmd)
def main(args=None):
rclpy.init(args=args)
node = ObstacleAvoidance()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,183 @@
#!/usr/bin/env python3
"""Waypoint Follower - Follows relative waypoints without GPS."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, Twist
from nav_msgs.msg import Odometry, Path
from std_msgs.msg import Int32, String, Bool
import numpy as np
from enum import Enum
class WaypointState(Enum):
IDLE = 0
NAVIGATING = 1
REACHED = 2
PAUSED = 3
class WaypointFollower(Node):
def __init__(self):
super().__init__('waypoint_follower')
self.declare_parameter('waypoint_radius', 0.5)
self.declare_parameter('max_velocity', 2.0)
self.declare_parameter('kp_linear', 0.8)
self.declare_parameter('kp_angular', 1.0)
self.waypoint_radius = self.get_parameter('waypoint_radius').value
self.max_velocity = self.get_parameter('max_velocity').value
self.kp_linear = self.get_parameter('kp_linear').value
self.kp_angular = self.get_parameter('kp_angular').value
self.waypoints = []
self.current_idx = 0
self.state = WaypointState.IDLE
self.current_pose = None
self.odom_sub = self.create_subscription(
Odometry, '/uav/local_position/odom', self.odom_callback, 10)
self.waypoint_sub = self.create_subscription(
PoseStamped, '/waypoint_follower/add_waypoint', self.add_waypoint_callback, 10)
self.path_sub = self.create_subscription(
Path, '/waypoint_follower/set_path', self.set_path_callback, 10)
self.command_sub = self.create_subscription(
String, '/waypoint_follower/command', self.command_callback, 10)
self.cmd_vel_pub = self.create_publisher(Twist, '/uav/cmd_vel', 10)
self.current_waypoint_pub = self.create_publisher(Int32, '/waypoint_follower/current_index', 10)
self.status_pub = self.create_publisher(String, '/waypoint_follower/status', 10)
self.reached_pub = self.create_publisher(Bool, '/waypoint_follower/waypoint_reached', 10)
self.timer = self.create_timer(0.05, self.control_loop)
self.get_logger().info('Waypoint Follower Started - Using LOCAL coordinates only')
def odom_callback(self, msg):
self.current_pose = msg.pose.pose
def add_waypoint_callback(self, msg):
waypoint = np.array([
msg.pose.position.x,
msg.pose.position.y,
msg.pose.position.z
])
self.waypoints.append(waypoint)
self.get_logger().info(f'Added waypoint {len(self.waypoints)}: {waypoint}')
def set_path_callback(self, msg):
self.waypoints = []
for pose_stamped in msg.poses:
waypoint = np.array([
pose_stamped.pose.position.x,
pose_stamped.pose.position.y,
pose_stamped.pose.position.z
])
self.waypoints.append(waypoint)
self.current_idx = 0
self.get_logger().info(f'Set path with {len(self.waypoints)} waypoints')
def command_callback(self, msg):
cmd = msg.data.lower()
if cmd == 'start':
if len(self.waypoints) > 0:
self.state = WaypointState.NAVIGATING
self.get_logger().info('Starting waypoint navigation')
elif cmd == 'pause':
self.state = WaypointState.PAUSED
self.stop()
elif cmd == 'resume':
self.state = WaypointState.NAVIGATING
elif cmd == 'stop':
self.state = WaypointState.IDLE
self.stop()
elif cmd == 'clear':
self.waypoints = []
self.current_idx = 0
self.state = WaypointState.IDLE
elif cmd == 'next':
if self.current_idx < len(self.waypoints) - 1:
self.current_idx += 1
elif cmd == 'prev':
if self.current_idx > 0:
self.current_idx -= 1
def control_loop(self):
self.publish_status()
if self.state != WaypointState.NAVIGATING:
return
if self.current_pose is None or len(self.waypoints) == 0:
return
if self.current_idx >= len(self.waypoints):
self.state = WaypointState.IDLE
self.stop()
self.get_logger().info('All waypoints reached!')
return
current_pos = np.array([
self.current_pose.position.x,
self.current_pose.position.y,
self.current_pose.position.z
])
target = self.waypoints[self.current_idx]
error = target - current_pos
distance = np.linalg.norm(error)
if distance < self.waypoint_radius:
reached_msg = Bool()
reached_msg.data = True
self.reached_pub.publish(reached_msg)
self.get_logger().info(f'Reached waypoint {self.current_idx + 1}/{len(self.waypoints)}')
self.current_idx += 1
return
direction = error / distance
speed = min(self.kp_linear * distance, self.max_velocity)
velocity = direction * speed
cmd = Twist()
cmd.linear.x = float(velocity[0])
cmd.linear.y = float(velocity[1])
cmd.linear.z = float(velocity[2])
self.cmd_vel_pub.publish(cmd)
def stop(self):
cmd = Twist()
self.cmd_vel_pub.publish(cmd)
def publish_status(self):
idx_msg = Int32()
idx_msg.data = self.current_idx
self.current_waypoint_pub.publish(idx_msg)
status_msg = String()
status_msg.data = f'{self.state.name}|{self.current_idx}/{len(self.waypoints)}'
self.status_pub.publish(status_msg)
def main(args=None):
rclpy.init(args=args)
node = WaypointFollower()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

1
src/nodes/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""ROS 2 Node implementations."""

View File

@@ -0,0 +1,7 @@
#!/usr/bin/env python3
"""Geofence Node - ROS 2 wrapper."""
from src.safety.geofence_monitor import GeofenceMonitor, main
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,112 @@
#!/usr/bin/env python3
"""Multi-Vehicle Coordination Node."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, TwistStamped
from std_msgs.msg import String, Bool
import numpy as np
class MultiVehicleCoordinator(Node):
def __init__(self):
super().__init__('multi_vehicle_coord')
self.declare_parameter('min_separation', 3.0)
self.declare_parameter('coordination_mode', 'leader_follower')
self.min_separation = self.get_parameter('min_separation').value
self.coord_mode = self.get_parameter('coordination_mode').value
self.uav_pose = None
self.ugv_pose = None
self.uav_pose_sub = self.create_subscription(
PoseStamped, '/uav/local_position/pose', self.uav_pose_callback, 10)
self.ugv_pose_sub = self.create_subscription(
PoseStamped, '/ugv/local_position/pose', self.ugv_pose_callback, 10)
self.uav_cmd_pub = self.create_publisher(String, '/uav/controller/command', 10)
self.ugv_cmd_pub = self.create_publisher(String, '/ugv/controller/command', 10)
self.uav_goal_pub = self.create_publisher(PoseStamped, '/uav/setpoint_position', 10)
self.ugv_goal_pub = self.create_publisher(PoseStamped, '/ugv/goal_pose', 10)
self.collision_warning_pub = self.create_publisher(Bool, '/coordination/collision_warning', 10)
self.status_pub = self.create_publisher(String, '/coordination/status', 10)
self.timer = self.create_timer(0.1, self.coordination_loop)
self.get_logger().info(f'Multi-Vehicle Coordinator Started - Mode: {self.coord_mode}')
def uav_pose_callback(self, msg):
self.uav_pose = msg
def ugv_pose_callback(self, msg):
self.ugv_pose = msg
def coordination_loop(self):
if self.uav_pose is None or self.ugv_pose is None:
return
uav_pos = np.array([
self.uav_pose.pose.position.x,
self.uav_pose.pose.position.y,
self.uav_pose.pose.position.z
])
ugv_pos = np.array([
self.ugv_pose.pose.position.x,
self.ugv_pose.pose.position.y,
0.0
])
horizontal_distance = np.linalg.norm(uav_pos[:2] - ugv_pos[:2])
collision_warning = Bool()
collision_warning.data = horizontal_distance < self.min_separation
self.collision_warning_pub.publish(collision_warning)
if collision_warning.data:
self.get_logger().warn(f'Vehicles too close: {horizontal_distance:.2f}m')
status = String()
status.data = f'separation:{horizontal_distance:.2f}|mode:{self.coord_mode}'
self.status_pub.publish(status)
if self.coord_mode == 'leader_follower':
self.leader_follower_control(uav_pos, ugv_pos)
elif self.coord_mode == 'formation':
self.formation_control(uav_pos, ugv_pos)
def leader_follower_control(self, uav_pos, ugv_pos):
offset = np.array([0.0, 0.0, 5.0])
target_uav_pos = ugv_pos + offset
goal = PoseStamped()
goal.header.stamp = self.get_clock().now().to_msg()
goal.header.frame_id = 'odom'
goal.pose.position.x = float(target_uav_pos[0])
goal.pose.position.y = float(target_uav_pos[1])
goal.pose.position.z = float(target_uav_pos[2])
goal.pose.orientation.w = 1.0
def formation_control(self, uav_pos, ugv_pos):
pass
def main(args=None):
rclpy.init(args=args)
node = MultiVehicleCoordinator()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""Vision-Based Navigation Node."""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import PoseStamped, TwistStamped
from mavros_msgs.srv import CommandBool, SetMode
from mavros_msgs.msg import State
from cv_bridge import CvBridge
import numpy as np
class VisionNavNode(Node):
def __init__(self):
super().__init__('vision_nav_node')
self.bridge = CvBridge()
self.current_image = None
self.current_local_pose = None
self.current_vo_pose = None
self.is_armed = False
self.current_mode = ""
self.waypoints = [
{'x': 0.0, 'y': 0.0, 'z': 5.0},
{'x': 10.0, 'y': 0.0, 'z': 5.0},
{'x': 10.0, 'y': 10.0, 'z': 5.0},
{'x': 0.0, 'y': 10.0, 'z': 5.0},
{'x': 0.0, 'y': 0.0, 'z': 5.0},
{'x': 0.0, 'y': 0.0, 'z': 0.0},
]
self.current_waypoint_idx = 0
self.waypoint_tolerance = 0.5
self.image_sub = self.create_subscription(
Image, '/uav/camera/forward/image_raw', self.image_callback, 10)
self.local_pose_sub = self.create_subscription(
PoseStamped, '/uav/mavros/local_position/pose', self.local_pose_callback, 10)
self.vo_pose_sub = self.create_subscription(
PoseStamped, '/uav/visual_odometry/pose', self.vo_pose_callback, 10)
self.state_sub = self.create_subscription(
State, '/uav/mavros/state', self.state_callback, 10)
self.setpoint_local_pub = self.create_publisher(
PoseStamped, '/uav/mavros/setpoint_position/local', 10)
self.setpoint_vel_pub = self.create_publisher(
TwistStamped, '/uav/mavros/setpoint_velocity/cmd_vel', 10)
self.arming_client = self.create_client(CommandBool, '/uav/mavros/cmd/arming')
self.set_mode_client = self.create_client(SetMode, '/uav/mavros/set_mode')
self.control_timer = self.create_timer(0.05, self.control_loop)
self.setpoint_timer = self.create_timer(0.1, self.publish_setpoint)
self.get_logger().info('Vision Navigation Node Started - GPS-DENIED mode')
def image_callback(self, msg):
try:
self.current_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
except Exception as e:
self.get_logger().error(f'Image conversion failed: {e}')
def local_pose_callback(self, msg):
self.current_local_pose = msg
def vo_pose_callback(self, msg):
self.current_vo_pose = msg
def state_callback(self, msg):
self.is_armed = msg.armed
self.current_mode = msg.mode
def publish_setpoint(self):
if self.current_waypoint_idx >= len(self.waypoints):
return
waypoint = self.waypoints[self.current_waypoint_idx]
setpoint = PoseStamped()
setpoint.header.stamp = self.get_clock().now().to_msg()
setpoint.header.frame_id = 'map'
setpoint.pose.position.x = waypoint['x']
setpoint.pose.position.y = waypoint['y']
setpoint.pose.position.z = waypoint['z']
setpoint.pose.orientation.w = 1.0
self.setpoint_local_pub.publish(setpoint)
def control_loop(self):
if self.current_local_pose is None:
return
if self.current_waypoint_idx < len(self.waypoints):
waypoint = self.waypoints[self.current_waypoint_idx]
dx = waypoint['x'] - self.current_local_pose.pose.position.x
dy = waypoint['y'] - self.current_local_pose.pose.position.y
dz = waypoint['z'] - self.current_local_pose.pose.position.z
distance = np.sqrt(dx**2 + dy**2 + dz**2)
if distance < self.waypoint_tolerance:
self.get_logger().info(
f'Reached waypoint {self.current_waypoint_idx}: '
f'[{waypoint["x"]}, {waypoint["y"]}, {waypoint["z"]}] (LOCAL coordinates)')
self.current_waypoint_idx += 1
if self.current_waypoint_idx >= len(self.waypoints):
self.get_logger().info('Mission complete!')
def main(args=None):
rclpy.init(args=args)
node = VisionNavNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,7 @@
#!/usr/bin/env python3
"""Visual Odometry Node - ROS 2 wrapper."""
from src.vision.visual_odometry import VisualOdometryNode, main
if __name__ == '__main__':
main()

1
src/safety/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Safety module for geofencing and failsafe handling."""

View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""Failsafe Handler - Emergency procedures for vision loss and other failures."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, TwistStamped
from std_msgs.msg import String, Bool
from sensor_msgs.msg import BatteryState
from enum import Enum
class FailsafeState(Enum):
NORMAL = 0
VISION_LOSS = 1
LOW_BATTERY = 2
GEOFENCE_BREACH = 3
COMM_LOSS = 4
EMERGENCY = 5
class FailsafeHandler(Node):
def __init__(self):
super().__init__('failsafe_handler')
self.declare_parameter('vision_loss_timeout', 5.0)
self.declare_parameter('low_battery_threshold', 20.0)
self.declare_parameter('critical_battery_threshold', 10.0)
self.declare_parameter('action_on_vision_loss', 'HOLD')
self.declare_parameter('action_on_low_battery', 'RTL')
self.declare_parameter('action_on_critical_battery', 'LAND')
self.vision_timeout = self.get_parameter('vision_loss_timeout').value
self.low_battery = self.get_parameter('low_battery_threshold').value
self.critical_battery = self.get_parameter('critical_battery_threshold').value
self.vision_loss_action = self.get_parameter('action_on_vision_loss').value
self.low_battery_action = self.get_parameter('action_on_low_battery').value
self.critical_battery_action = self.get_parameter('action_on_critical_battery').value
self.state = FailsafeState.NORMAL
self.last_vo_time = None
self.battery_percent = 100.0
self.geofence_ok = True
self.vision_triggered = False
self.battery_triggered = False
self.vo_pose_sub = self.create_subscription(
PoseStamped, '/uav/visual_odometry/pose', self.vo_callback, 10)
self.battery_sub = self.create_subscription(
BatteryState, '/uav/battery', self.battery_callback, 10)
self.geofence_sub = self.create_subscription(
Bool, '/geofence/status', self.geofence_callback, 10)
self.geofence_action_sub = self.create_subscription(
String, '/geofence/action', self.geofence_action_callback, 10)
self.uav_cmd_pub = self.create_publisher(String, '/uav/controller/command', 10)
self.ugv_cmd_pub = self.create_publisher(String, '/ugv/controller/command', 10)
self.failsafe_status_pub = self.create_publisher(String, '/failsafe/status', 10)
self.failsafe_active_pub = self.create_publisher(Bool, '/failsafe/active', 10)
self.check_timer = self.create_timer(0.5, self.check_failsafes)
self.get_logger().info('Failsafe Handler Started')
def vo_callback(self, msg):
self.last_vo_time = self.get_clock().now()
if self.state == FailsafeState.VISION_LOSS:
self.state = FailsafeState.NORMAL
self.vision_triggered = False
self.get_logger().info('Vision restored - returning to normal')
def battery_callback(self, msg):
self.battery_percent = msg.percentage * 100.0
def geofence_callback(self, msg):
self.geofence_ok = msg.data
def geofence_action_callback(self, msg):
self.state = FailsafeState.GEOFENCE_BREACH
self.execute_action(msg.data)
def check_failsafes(self):
if self.last_vo_time is not None:
elapsed = (self.get_clock().now() - self.last_vo_time).nanoseconds / 1e9
if elapsed > self.vision_timeout and not self.vision_triggered:
self.state = FailsafeState.VISION_LOSS
self.vision_triggered = True
self.get_logger().error(f'VISION LOSS - No VO for {elapsed:.1f}s')
self.execute_action(self.vision_loss_action)
if self.battery_percent <= self.critical_battery and not self.battery_triggered:
self.state = FailsafeState.LOW_BATTERY
self.battery_triggered = True
self.get_logger().error(f'CRITICAL BATTERY: {self.battery_percent:.1f}%')
self.execute_action(self.critical_battery_action)
elif self.battery_percent <= self.low_battery and not self.battery_triggered:
self.state = FailsafeState.LOW_BATTERY
self.battery_triggered = True
self.get_logger().warn(f'LOW BATTERY: {self.battery_percent:.1f}%')
self.execute_action(self.low_battery_action)
self.publish_status()
def execute_action(self, action):
cmd_msg = String()
if action == 'HOLD':
cmd_msg.data = 'hold'
elif action == 'RTL':
cmd_msg.data = 'rtl'
elif action == 'LAND':
cmd_msg.data = 'land'
elif action == 'STOP':
cmd_msg.data = 'stop'
else:
cmd_msg.data = 'hold'
self.uav_cmd_pub.publish(cmd_msg)
ugv_cmd = String()
ugv_cmd.data = 'stop'
self.ugv_cmd_pub.publish(ugv_cmd)
self.get_logger().warn(f'Failsafe action executed: {action}')
def publish_status(self):
status_msg = String()
status_msg.data = f'{self.state.name}|battery:{self.battery_percent:.1f}|geofence:{self.geofence_ok}'
self.failsafe_status_pub.publish(status_msg)
active_msg = Bool()
active_msg.data = self.state != FailsafeState.NORMAL
self.failsafe_active_pub.publish(active_msg)
def main(args=None):
rclpy.init(args=args)
node = FailsafeHandler()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,162 @@
#!/usr/bin/env python3
"""Geofence Monitor - GPS ONLY used for safety boundaries."""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import NavSatFix
from geometry_msgs.msg import Point
from std_msgs.msg import Bool, String
from visualization_msgs.msg import Marker
import numpy as np
from shapely.geometry import Point as ShapelyPoint, Polygon
class GeofenceMonitor(Node):
def __init__(self):
super().__init__('geofence_monitor')
self.declare_parameter('fence_enabled', True)
self.declare_parameter('fence_type', 'polygon')
self.declare_parameter('warning_distance', 10.0)
self.declare_parameter('breach_action', 'RTL')
self.declare_parameter('min_altitude', 0.0)
self.declare_parameter('max_altitude', 50.0)
self.declare_parameter('consecutive_breaches_required', 3)
self.fence_enabled = self.get_parameter('fence_enabled').value
self.fence_type = self.get_parameter('fence_type').value
self.warning_distance = self.get_parameter('warning_distance').value
self.breach_action = self.get_parameter('breach_action').value
self.min_altitude = self.get_parameter('min_altitude').value
self.max_altitude = self.get_parameter('max_altitude').value
self.breaches_required = self.get_parameter('consecutive_breaches_required').value
self.fence_points = [
(47.397742, 8.545594),
(47.398242, 8.545594),
(47.398242, 8.546094),
(47.397742, 8.546094),
]
self.fence_polygon = Polygon(self.fence_points)
self.current_position = None
self.inside_fence = True
self.breach_count = 0
self.gps_sub = self.create_subscription(
NavSatFix, '/uav/mavros/global_position/global', self.gps_callback, 10)
self.fence_status_pub = self.create_publisher(Bool, '/geofence/status', 10)
self.fence_action_pub = self.create_publisher(String, '/geofence/action', 10)
self.fence_viz_pub = self.create_publisher(Marker, '/geofence/visualization', 10)
self.warning_pub = self.create_publisher(String, '/geofence/warning', 10)
self.viz_timer = self.create_timer(1.0, self.publish_visualization)
self.get_logger().info('Geofence Monitor Started - GPS ONLY for safety boundaries')
self.get_logger().info(f'Fence type: {self.fence_type}, Action: {self.breach_action}')
def gps_callback(self, msg):
if not self.fence_enabled:
return
self.current_position = (msg.latitude, msg.longitude, msg.altitude)
point = ShapelyPoint(msg.latitude, msg.longitude)
inside_horizontal = self.fence_polygon.contains(point)
inside_vertical = self.min_altitude <= msg.altitude <= self.max_altitude
self.inside_fence = inside_horizontal and inside_vertical
status_msg = Bool()
status_msg.data = self.inside_fence
self.fence_status_pub.publish(status_msg)
if not self.inside_fence:
self.breach_count += 1
if not inside_horizontal:
distance = self.fence_polygon.exterior.distance(point) * 111000
self.get_logger().warn(f'GEOFENCE: Outside boundary by {distance:.1f}m')
if not inside_vertical:
if msg.altitude < self.min_altitude:
self.get_logger().warn(f'GEOFENCE: Below min altitude ({msg.altitude:.1f}m)')
else:
self.get_logger().warn(f'GEOFENCE: Above max altitude ({msg.altitude:.1f}m)')
if self.breach_count >= self.breaches_required:
self.trigger_breach_action()
else:
self.breach_count = 0
if inside_horizontal:
distance_to_edge = self.fence_polygon.exterior.distance(point) * 111000
if distance_to_edge < self.warning_distance:
warning_msg = String()
warning_msg.data = f'Approaching boundary: {distance_to_edge:.1f}m remaining'
self.warning_pub.publish(warning_msg)
def trigger_breach_action(self):
action_msg = String()
action_msg.data = self.breach_action
self.fence_action_pub.publish(action_msg)
self.get_logger().error(f'GEOFENCE BREACH ACTION: {self.breach_action}')
self.breach_count = 0
def publish_visualization(self):
if not self.fence_enabled:
return
marker = Marker()
marker.header.stamp = self.get_clock().now().to_msg()
marker.header.frame_id = 'map'
marker.ns = 'geofence'
marker.id = 0
marker.type = Marker.LINE_STRIP
marker.action = Marker.ADD
marker.scale.x = 2.0
if self.inside_fence:
marker.color.r = 0.0
marker.color.g = 1.0
marker.color.b = 0.0
else:
marker.color.r = 1.0
marker.color.g = 0.0
marker.color.b = 0.0
marker.color.a = 0.8
for lat, lon in self.fence_points:
p = Point()
p.x = (lon - self.fence_points[0][1]) * 111000
p.y = (lat - self.fence_points[0][0]) * 111000
p.z = 0.0
marker.points.append(p)
p = Point()
p.x = 0.0
p.y = 0.0
p.z = 0.0
marker.points.append(p)
self.fence_viz_pub.publish(marker)
def main(args=None):
rclpy.init(args=args)
node = GeofenceMonitor()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

1
src/utils/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Utility functions and helpers."""

111
src/utils/helpers.py Normal file
View File

@@ -0,0 +1,111 @@
#!/usr/bin/env python3
"""Utility helper functions."""
import numpy as np
import yaml
from pathlib import Path
def load_yaml_config(filepath):
with open(filepath, 'r') as f:
return yaml.safe_load(f)
def save_yaml_config(data, filepath):
with open(filepath, 'w') as f:
yaml.dump(data, f, default_flow_style=False)
def clamp(value, min_val, max_val):
return max(min_val, min(value, max_val))
def lerp(a, b, t):
return a + (b - a) * clamp(t, 0.0, 1.0)
def smooth_step(edge0, edge1, x):
t = clamp((x - edge0) / (edge1 - edge0), 0.0, 1.0)
return t * t * (3.0 - 2.0 * t)
def distance_2d(p1, p2):
return np.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)
def distance_3d(p1, p2):
return np.linalg.norm(np.array(p2) - np.array(p1))
def moving_average(values, window_size):
if len(values) < window_size:
return np.mean(values)
return np.mean(values[-window_size:])
class RateLimiter:
def __init__(self, max_rate):
self.max_rate = max_rate
self.last_value = 0.0
def apply(self, value, dt):
max_change = self.max_rate * dt
change = clamp(value - self.last_value, -max_change, max_change)
self.last_value += change
return self.last_value
class LowPassFilter:
def __init__(self, alpha=0.1):
self.alpha = alpha
self.value = None
def apply(self, new_value):
if self.value is None:
self.value = new_value
else:
self.value = self.alpha * new_value + (1 - self.alpha) * self.value
return self.value
def reset(self):
self.value = None
class PIDController:
def __init__(self, kp, ki, kd, output_limits=None):
self.kp = kp
self.ki = ki
self.kd = kd
self.output_limits = output_limits
self.integral = 0.0
self.last_error = 0.0
self.last_time = None
def compute(self, error, current_time):
if self.last_time is None:
dt = 0.0
else:
dt = current_time - self.last_time
self.integral += error * dt
if dt > 0:
derivative = (error - self.last_error) / dt
else:
derivative = 0.0
output = self.kp * error + self.ki * self.integral + self.kd * derivative
if self.output_limits:
output = clamp(output, self.output_limits[0], self.output_limits[1])
self.last_error = error
self.last_time = current_time
return output
def reset(self):
self.integral = 0.0
self.last_error = 0.0
self.last_time = None

84
src/utils/transforms.py Normal file
View File

@@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""Coordinate transforms for local navigation."""
import numpy as np
from scipy.spatial.transform import Rotation
def quaternion_to_euler(quat):
r = Rotation.from_quat([quat[0], quat[1], quat[2], quat[3]])
return r.as_euler('xyz')
def euler_to_quaternion(roll, pitch, yaw):
r = Rotation.from_euler('xyz', [roll, pitch, yaw])
return r.as_quat()
def rotation_matrix_from_quaternion(quat):
r = Rotation.from_quat([quat[0], quat[1], quat[2], quat[3]])
return r.as_matrix()
def transform_point(point, translation, rotation_quat):
R = rotation_matrix_from_quaternion(rotation_quat)
return R @ point + translation
def inverse_transform_point(point, translation, rotation_quat):
R = rotation_matrix_from_quaternion(rotation_quat)
return R.T @ (point - translation)
def body_to_world(body_vel, yaw):
cos_yaw = np.cos(yaw)
sin_yaw = np.sin(yaw)
world_vel = np.zeros(3)
world_vel[0] = body_vel[0] * cos_yaw - body_vel[1] * sin_yaw
world_vel[1] = body_vel[0] * sin_yaw + body_vel[1] * cos_yaw
world_vel[2] = body_vel[2]
return world_vel
def world_to_body(world_vel, yaw):
cos_yaw = np.cos(yaw)
sin_yaw = np.sin(yaw)
body_vel = np.zeros(3)
body_vel[0] = world_vel[0] * cos_yaw + world_vel[1] * sin_yaw
body_vel[1] = -world_vel[0] * sin_yaw + world_vel[1] * cos_yaw
body_vel[2] = world_vel[2]
return body_vel
def normalize_angle(angle):
while angle > np.pi:
angle -= 2 * np.pi
while angle < -np.pi:
angle += 2 * np.pi
return angle
def angle_difference(angle1, angle2):
diff = angle1 - angle2
return normalize_angle(diff)
def create_transformation_matrix(translation, rotation_quat):
T = np.eye(4)
T[:3, :3] = rotation_matrix_from_quaternion(rotation_quat)
T[:3, 3] = translation
return T
def invert_transformation_matrix(T):
R = T[:3, :3]
t = T[:3, 3]
T_inv = np.eye(4)
T_inv[:3, :3] = R.T
T_inv[:3, 3] = -R.T @ t
return T_inv

1
src/vision/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Vision processing module for GPS-denied navigation."""

View File

@@ -0,0 +1,227 @@
#!/usr/bin/env python3
"""
Camera Processor Node
Handles camera feed processing for GPS-denied navigation
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from std_msgs.msg import Header
from cv_bridge import CvBridge
import cv2
import numpy as np
class CameraProcessor(Node):
"""
Processes camera feeds for visual odometry and obstacle detection.
Outputs processed images for downstream nodes.
"""
def __init__(self):
super().__init__('camera_processor')
self.bridge = CvBridge()
# Camera parameters
self.camera_matrix = None
self.dist_coeffs = None
self.image_size = None
# Image processing parameters
self.declare_parameter('undistort', True)
self.declare_parameter('grayscale_output', True)
self.declare_parameter('histogram_equalization', True)
self.declare_parameter('resize_factor', 1.0)
self.undistort = self.get_parameter('undistort').value
self.grayscale_output = self.get_parameter('grayscale_output').value
self.histogram_eq = self.get_parameter('histogram_equalization').value
self.resize_factor = self.get_parameter('resize_factor').value
# Undistort maps (computed once)
self.map1 = None
self.map2 = None
# Subscribers - Forward camera
self.forward_image_sub = self.create_subscription(
Image,
'/uav/camera/forward/image_raw',
self.forward_image_callback,
10
)
self.forward_info_sub = self.create_subscription(
CameraInfo,
'/uav/camera/forward/camera_info',
self.camera_info_callback,
10
)
# Subscribers - Downward camera
self.downward_image_sub = self.create_subscription(
Image,
'/uav/camera/downward/image_raw',
self.downward_image_callback,
10
)
# Publishers - Processed images
self.forward_processed_pub = self.create_publisher(
Image,
'/uav/camera/forward/image_processed',
10
)
self.downward_processed_pub = self.create_publisher(
Image,
'/uav/camera/downward/image_processed',
10
)
# Debug visualization
self.debug_pub = self.create_publisher(
Image,
'/uav/camera/debug',
10
)
self.get_logger().info('Camera Processor Node Started')
def camera_info_callback(self, msg: CameraInfo):
"""Extract and store camera calibration parameters."""
if self.camera_matrix is None:
self.camera_matrix = np.array(msg.k).reshape(3, 3)
self.dist_coeffs = np.array(msg.d)
self.image_size = (msg.width, msg.height)
# Compute undistortion maps
if self.undistort and self.dist_coeffs is not None:
new_camera_matrix, _ = cv2.getOptimalNewCameraMatrix(
self.camera_matrix,
self.dist_coeffs,
self.image_size,
alpha=0
)
self.map1, self.map2 = cv2.initUndistortRectifyMap(
self.camera_matrix,
self.dist_coeffs,
None,
new_camera_matrix,
self.image_size,
cv2.CV_16SC2
)
self.get_logger().info(
f'Camera calibration received: {self.image_size[0]}x{self.image_size[1]}'
)
def process_image(self, image: np.ndarray) -> np.ndarray:
"""
Apply image processing pipeline.
Args:
image: Input BGR image
Returns:
Processed image
"""
processed = image.copy()
# 1. Undistort if calibration available
if self.undistort and self.map1 is not None:
processed = cv2.remap(processed, self.map1, self.map2, cv2.INTER_LINEAR)
# 2. Resize if needed
if self.resize_factor != 1.0:
new_size = (
int(processed.shape[1] * self.resize_factor),
int(processed.shape[0] * self.resize_factor)
)
processed = cv2.resize(processed, new_size, interpolation=cv2.INTER_AREA)
# 3. Convert to grayscale if requested
if self.grayscale_output:
if len(processed.shape) == 3:
processed = cv2.cvtColor(processed, cv2.COLOR_BGR2GRAY)
# 4. Histogram equalization for better feature detection
if self.histogram_eq:
if len(processed.shape) == 2:
# CLAHE for better results than standard histogram equalization
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
processed = clahe.apply(processed)
else:
# For color images, apply to L channel in LAB
lab = cv2.cvtColor(processed, cv2.COLOR_BGR2LAB)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
lab[:, :, 0] = clahe.apply(lab[:, :, 0])
processed = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
return processed
def forward_image_callback(self, msg: Image):
"""Process forward camera images for visual odometry."""
try:
# Convert ROS Image to OpenCV
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
# Process image
processed = self.process_image(cv_image)
# Convert back to ROS Image
if len(processed.shape) == 2:
encoding = 'mono8'
else:
encoding = 'bgr8'
processed_msg = self.bridge.cv2_to_imgmsg(processed, encoding)
processed_msg.header = msg.header
# Publish processed image
self.forward_processed_pub.publish(processed_msg)
except Exception as e:
self.get_logger().error(f'Forward image processing error: {e}')
def downward_image_callback(self, msg: Image):
"""Process downward camera images for optical flow."""
try:
# Convert ROS Image to OpenCV
cv_image = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
# Process image
processed = self.process_image(cv_image)
# Convert back to ROS Image
if len(processed.shape) == 2:
encoding = 'mono8'
else:
encoding = 'bgr8'
processed_msg = self.bridge.cv2_to_imgmsg(processed, encoding)
processed_msg.header = msg.header
# Publish processed image
self.downward_processed_pub.publish(processed_msg)
except Exception as e:
self.get_logger().error(f'Downward image processing error: {e}')
def main(args=None):
rclpy.init(args=args)
node = CameraProcessor()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,132 @@
#!/usr/bin/env python3
"""Object Detector - Visual landmark and obstacle detection."""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import PoseArray, Pose
from std_msgs.msg import Float32MultiArray
from cv_bridge import CvBridge
import cv2
import numpy as np
class ObjectDetector(Node):
def __init__(self):
super().__init__('object_detector')
self.bridge = CvBridge()
self.declare_parameter('detection_method', 'ArUco')
self.declare_parameter('marker_size', 0.15)
self.declare_parameter('camera_matrix', [500.0, 0.0, 320.0, 0.0, 500.0, 240.0, 0.0, 0.0, 1.0])
self.detection_method = self.get_parameter('detection_method').value
self.marker_size = self.get_parameter('marker_size').value
camera_matrix_flat = self.get_parameter('camera_matrix').value
self.camera_matrix = np.array(camera_matrix_flat).reshape(3, 3)
self.dist_coeffs = np.zeros(5)
if self.detection_method == 'ArUco':
self.aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
self.aruco_params = cv2.aruco.DetectorParameters()
self.aruco_detector = cv2.aruco.ArucoDetector(self.aruco_dict, self.aruco_params)
self.image_sub = self.create_subscription(
Image, '/uav/camera/forward/image_raw', self.image_callback, 10)
self.landmarks_pub = self.create_publisher(PoseArray, '/uav/landmarks/poses', 10)
self.marker_ids_pub = self.create_publisher(Float32MultiArray, '/uav/landmarks/ids', 10)
self.debug_image_pub = self.create_publisher(Image, '/uav/landmarks/debug', 10)
self.get_logger().info(f'Object Detector Started - {self.detection_method}')
def image_callback(self, msg):
try:
frame = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
if self.detection_method == 'ArUco':
self.detect_aruco(frame, msg.header)
else:
self.detect_orb_features(frame, msg.header)
except Exception as e:
self.get_logger().error(f'Detection error: {e}')
def detect_aruco(self, frame, header):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
corners, ids, rejected = self.aruco_detector.detectMarkers(gray)
pose_array = PoseArray()
pose_array.header = header
pose_array.header.frame_id = 'camera_link'
id_array = Float32MultiArray()
if ids is not None:
rvecs, tvecs, _ = cv2.aruco.estimatePoseSingleMarkers(
corners, self.marker_size, self.camera_matrix, self.dist_coeffs)
for i, marker_id in enumerate(ids.flatten()):
pose = Pose()
pose.position.x = float(tvecs[i][0][0])
pose.position.y = float(tvecs[i][0][1])
pose.position.z = float(tvecs[i][0][2])
rotation_matrix, _ = cv2.Rodrigues(rvecs[i])
from scipy.spatial.transform import Rotation
r = Rotation.from_matrix(rotation_matrix)
quat = r.as_quat()
pose.orientation.x = quat[0]
pose.orientation.y = quat[1]
pose.orientation.z = quat[2]
pose.orientation.w = quat[3]
pose_array.poses.append(pose)
id_array.data.append(float(marker_id))
debug_frame = cv2.aruco.drawDetectedMarkers(frame.copy(), corners, ids)
for i in range(len(rvecs)):
cv2.drawFrameAxes(debug_frame, self.camera_matrix, self.dist_coeffs,
rvecs[i], tvecs[i], self.marker_size * 0.5)
debug_msg = self.bridge.cv2_to_imgmsg(debug_frame, 'bgr8')
debug_msg.header = header
self.debug_image_pub.publish(debug_msg)
self.landmarks_pub.publish(pose_array)
self.marker_ids_pub.publish(id_array)
def detect_orb_features(self, frame, header):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
orb = cv2.ORB_create(nfeatures=500)
keypoints, descriptors = orb.detectAndCompute(gray, None)
pose_array = PoseArray()
pose_array.header = header
for kp in keypoints[:50]:
pose = Pose()
pose.position.x = float(kp.pt[0])
pose.position.y = float(kp.pt[1])
pose.position.z = 0.0
pose_array.poses.append(pose)
self.landmarks_pub.publish(pose_array)
def main(args=None):
rclpy.init(args=args)
node = ObjectDetector()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

128
src/vision/optical_flow.py Normal file
View File

@@ -0,0 +1,128 @@
#!/usr/bin/env python3
"""Optical Flow - Velocity estimation from downward camera."""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, Range
from geometry_msgs.msg import TwistStamped
from cv_bridge import CvBridge
import cv2
import numpy as np
class OpticalFlowNode(Node):
def __init__(self):
super().__init__('optical_flow_node')
self.bridge = CvBridge()
self.prev_frame = None
self.prev_points = None
self.current_altitude = 1.0
self.declare_parameter('window_size', 15)
self.declare_parameter('max_level', 3)
self.declare_parameter('min_altitude', 0.3)
self.declare_parameter('max_altitude', 10.0)
self.declare_parameter('focal_length', 500.0)
self.window_size = self.get_parameter('window_size').value
self.max_level = self.get_parameter('max_level').value
self.min_altitude = self.get_parameter('min_altitude').value
self.max_altitude = self.get_parameter('max_altitude').value
self.focal_length = self.get_parameter('focal_length').value
self.lk_params = dict(
winSize=(self.window_size, self.window_size),
maxLevel=self.max_level,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)
self.feature_params = dict(
maxCorners=100,
qualityLevel=0.3,
minDistance=7,
blockSize=7
)
self.velocity = np.zeros(2)
self.prev_time = None
self.image_sub = self.create_subscription(
Image, '/uav/camera/downward/image_raw', self.image_callback, 10)
self.altitude_sub = self.create_subscription(
Range, '/uav/rangefinder/range', self.altitude_callback, 10)
self.velocity_pub = self.create_publisher(
TwistStamped, '/uav/optical_flow/velocity', 10)
self.get_logger().info('Optical Flow Node Started')
def altitude_callback(self, msg):
if msg.range > self.min_altitude and msg.range < self.max_altitude:
self.current_altitude = msg.range
def image_callback(self, msg):
try:
frame = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
current_time = self.get_clock().now()
if self.prev_frame is not None and self.prev_points is not None:
new_points, status, error = cv2.calcOpticalFlowPyrLK(
self.prev_frame, gray, self.prev_points, None, **self.lk_params)
if new_points is not None:
good_new = new_points[status == 1]
good_old = self.prev_points[status == 1]
if len(good_new) > 10:
flow = good_new - good_old
avg_flow = np.mean(flow, axis=0)
if self.prev_time is not None:
dt = (current_time - self.prev_time).nanoseconds / 1e9
if dt > 0:
pixel_velocity = avg_flow / dt
self.velocity[0] = pixel_velocity[0] * self.current_altitude / self.focal_length
self.velocity[1] = pixel_velocity[1] * self.current_altitude / self.focal_length
self.publish_velocity()
self.prev_points = cv2.goodFeaturesToTrack(gray, **self.feature_params)
self.prev_frame = gray
self.prev_time = current_time
except Exception as e:
self.get_logger().error(f'Optical flow error: {e}')
def publish_velocity(self):
vel_msg = TwistStamped()
vel_msg.header.stamp = self.get_clock().now().to_msg()
vel_msg.header.frame_id = 'base_link'
vel_msg.twist.linear.x = float(self.velocity[0])
vel_msg.twist.linear.y = float(self.velocity[1])
vel_msg.twist.linear.z = 0.0
self.velocity_pub.publish(vel_msg)
def main(args=None):
rclpy.init(args=args)
node = OpticalFlowNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,173 @@
#!/usr/bin/env python3
"""Visual Odometry - Camera-based position estimation without GPS."""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, CameraInfo
from geometry_msgs.msg import PoseStamped, TwistStamped
from cv_bridge import CvBridge
import cv2
import numpy as np
from scipy.spatial.transform import Rotation
class VisualOdometryNode(Node):
def __init__(self):
super().__init__('visual_odometry_node')
self.bridge = CvBridge()
self.prev_frame = None
self.prev_keypoints = None
self.prev_descriptors = None
self.camera_matrix = None
self.dist_coeffs = None
self.current_pose = np.eye(4)
self.position = np.zeros(3)
self.orientation = np.eye(3)
self.prev_time = None
self.velocity = np.zeros(3)
self.detector_type = self.declare_parameter('detector_type', 'ORB').value
self.min_features = self.declare_parameter('min_features', 100).value
self.feature_quality = self.declare_parameter('feature_quality', 0.01).value
if self.detector_type == 'ORB':
self.detector = cv2.ORB_create(nfeatures=500)
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
elif self.detector_type == 'SIFT':
self.detector = cv2.SIFT_create(nfeatures=500)
self.matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
else:
self.detector = cv2.ORB_create(nfeatures=500)
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
self.image_sub = self.create_subscription(
Image, '/uav/camera/forward/image_raw', self.image_callback, 10)
self.camera_info_sub = self.create_subscription(
CameraInfo, '/uav/camera/forward/camera_info', self.camera_info_callback, 10)
self.pose_pub = self.create_publisher(PoseStamped, '/uav/visual_odometry/pose', 10)
self.velocity_pub = self.create_publisher(TwistStamped, '/uav/visual_odometry/velocity', 10)
self.timer = self.create_timer(0.033, self.publish_pose)
self.get_logger().info(f'Visual Odometry Node Started - {self.detector_type} detector')
def camera_info_callback(self, msg):
if self.camera_matrix is None:
self.camera_matrix = np.array(msg.k).reshape(3, 3)
self.dist_coeffs = np.array(msg.d)
self.get_logger().info('Camera calibration received')
def image_callback(self, msg):
try:
frame = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
keypoints, descriptors = self.detector.detectAndCompute(gray, None)
current_time = self.get_clock().now()
if self.prev_frame is not None and len(keypoints) >= self.min_features:
matches = self.match_features(self.prev_descriptors, descriptors)
if len(matches) >= self.min_features:
self.estimate_motion(self.prev_keypoints, keypoints, matches, current_time)
self.prev_frame = gray
self.prev_keypoints = keypoints
self.prev_descriptors = descriptors
self.prev_time = current_time
except Exception as e:
self.get_logger().error(f'Visual odometry error: {e}')
def match_features(self, desc1, desc2):
if desc1 is None or desc2 is None:
return []
matches = self.matcher.knnMatch(desc1, desc2, k=2)
good_matches = []
for match_pair in matches:
if len(match_pair) == 2:
m, n = match_pair
if m.distance < 0.7 * n.distance:
good_matches.append(m)
return good_matches
def estimate_motion(self, prev_kp, curr_kp, matches, current_time):
if self.camera_matrix is None or len(matches) < 5:
return
pts1 = np.float32([prev_kp[m.queryIdx].pt for m in matches])
pts2 = np.float32([curr_kp[m.trainIdx].pt for m in matches])
E, mask = cv2.findEssentialMat(
pts1, pts2, self.camera_matrix,
method=cv2.RANSAC, prob=0.999, threshold=1.0)
if E is None:
return
_, R, t, mask = cv2.recoverPose(E, pts1, pts2, self.camera_matrix, mask=mask)
scale = 1.0
translation = scale * t.flatten()
if self.prev_time is not None:
dt = (current_time - self.prev_time).nanoseconds / 1e9
if dt > 0:
self.velocity = translation / dt
self.position += self.orientation @ translation
self.orientation = R @ self.orientation
def publish_pose(self):
pose_msg = PoseStamped()
pose_msg.header.stamp = self.get_clock().now().to_msg()
pose_msg.header.frame_id = 'odom'
pose_msg.pose.position.x = float(self.position[0])
pose_msg.pose.position.y = float(self.position[1])
pose_msg.pose.position.z = float(self.position[2])
rotation = Rotation.from_matrix(self.orientation)
quat = rotation.as_quat()
pose_msg.pose.orientation.x = quat[0]
pose_msg.pose.orientation.y = quat[1]
pose_msg.pose.orientation.z = quat[2]
pose_msg.pose.orientation.w = quat[3]
self.pose_pub.publish(pose_msg)
vel_msg = TwistStamped()
vel_msg.header.stamp = self.get_clock().now().to_msg()
vel_msg.header.frame_id = 'odom'
vel_msg.twist.linear.x = float(self.velocity[0])
vel_msg.twist.linear.y = float(self.velocity[1])
vel_msg.twist.linear.z = float(self.velocity[2])
self.velocity_pub.publish(vel_msg)
def main(args=None):
rclpy.init(args=args)
node = VisualOdometryNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""Visual Servoing - Vision-based control for precision positioning."""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import PoseStamped, TwistStamped, Pose
from std_msgs.msg import Bool
from cv_bridge import CvBridge
import cv2
import numpy as np
class VisualServoing(Node):
def __init__(self):
super().__init__('visual_servoing')
self.bridge = CvBridge()
self.declare_parameter('target_marker_id', 0)
self.declare_parameter('desired_distance', 1.0)
self.declare_parameter('kp_linear', 0.5)
self.declare_parameter('kp_angular', 0.3)
self.declare_parameter('max_velocity', 1.0)
self.target_marker_id = self.get_parameter('target_marker_id').value
self.desired_distance = self.get_parameter('desired_distance').value
self.kp_linear = self.get_parameter('kp_linear').value
self.kp_angular = self.get_parameter('kp_angular').value
self.max_velocity = self.get_parameter('max_velocity').value
self.enabled = False
self.target_pose = None
self.image_center = (320, 240)
self.camera_matrix = np.array([
[500.0, 0.0, 320.0],
[0.0, 500.0, 240.0],
[0.0, 0.0, 1.0]
])
self.dist_coeffs = np.zeros(5)
self.aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
self.aruco_params = cv2.aruco.DetectorParameters()
self.aruco_detector = cv2.aruco.ArucoDetector(self.aruco_dict, self.aruco_params)
self.image_sub = self.create_subscription(
Image, '/uav/camera/forward/image_raw', self.image_callback, 10)
self.enable_sub = self.create_subscription(
Bool, '/visual_servoing/enable', self.enable_callback, 10)
self.target_sub = self.create_subscription(
Pose, '/visual_servoing/target', self.target_callback, 10)
self.velocity_pub = self.create_publisher(
TwistStamped, '/uav/visual_servoing/cmd_vel', 10)
self.status_pub = self.create_publisher(
Bool, '/visual_servoing/target_acquired', 10)
self.get_logger().info('Visual Servoing Node Started')
def enable_callback(self, msg):
self.enabled = msg.data
self.get_logger().info(f'Visual servoing {"enabled" if self.enabled else "disabled"}')
def target_callback(self, msg):
self.target_pose = msg
def image_callback(self, msg):
if not self.enabled:
return
try:
frame = self.bridge.imgmsg_to_cv2(msg, 'bgr8')
self.image_center = (frame.shape[1] // 2, frame.shape[0] // 2)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
corners, ids, _ = self.aruco_detector.detectMarkers(gray)
target_acquired = Bool()
target_acquired.data = False
if ids is not None and self.target_marker_id in ids:
idx = np.where(ids == self.target_marker_id)[0][0]
target_corners = corners[idx]
rvecs, tvecs, _ = cv2.aruco.estimatePoseSingleMarkers(
[target_corners], 0.15, self.camera_matrix, self.dist_coeffs)
target_pos = tvecs[0][0]
self.compute_control(target_pos, target_corners)
target_acquired.data = True
self.status_pub.publish(target_acquired)
except Exception as e:
self.get_logger().error(f'Visual servoing error: {e}')
def compute_control(self, target_pos, corners):
marker_center = np.mean(corners[0], axis=0)
error_x = self.image_center[0] - marker_center[0]
error_y = self.image_center[1] - marker_center[1]
error_z = target_pos[2] - self.desired_distance
vel_msg = TwistStamped()
vel_msg.header.stamp = self.get_clock().now().to_msg()
vel_msg.header.frame_id = 'base_link'
vel_msg.twist.linear.x = np.clip(self.kp_linear * error_z, -self.max_velocity, self.max_velocity)
vel_msg.twist.linear.y = np.clip(self.kp_linear * error_x / 100.0, -self.max_velocity, self.max_velocity)
vel_msg.twist.linear.z = np.clip(self.kp_linear * error_y / 100.0, -self.max_velocity, self.max_velocity)
vel_msg.twist.angular.z = np.clip(self.kp_angular * error_x / 100.0, -1.0, 1.0)
self.velocity_pub.publish(vel_msg)
def main(args=None):
rclpy.init(args=args)
node = VisualServoing()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()