Gazebo Sim World Update

This commit is contained in:
2026-02-10 23:40:06 -05:00
parent bf4daef8b5
commit 0e427f3597
14 changed files with 1021 additions and 1335 deletions

View File

@@ -1 +0,0 @@
"""Control module for UAV and UGV controllers."""

View File

@@ -1,304 +1,263 @@
#!/usr/bin/env python3
"""UAV Controller - Flight control using local position only."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, TwistStamped, Twist
from mavros_msgs.srv import CommandBool, SetMode, CommandTOL
from mavros_msgs.msg import State
from std_msgs.msg import String, Bool
import time
import math
import argparse
import numpy as np
from enum import Enum
from time import sleep, perf_counter
from typing import Tuple
from pymavlink import mavutil
import pymavlink.dialects.v20.ardupilotmega as mavlink
from pymavlink.dialects.v20.ardupilotmega import (
MAVLink_set_position_target_local_ned_message,
MAVLink_set_position_target_global_int_message,
MAV_FRAME_GLOBAL_RELATIVE_ALT_INT,
MAV_FRAME_BODY_OFFSET_NED,
MAV_FRAME_LOCAL_NED,
MAV_CMD_DO_SET_MODE,
MAV_CMD_NAV_TAKEOFF,
MAV_CMD_NAV_LAND,
)
HOLD_ALT = 5.0
LOWEST_ALT = 3.0
GUIDED_MODE = 4
GUIDED_NOGPS_MODE = 20
class FlightState(Enum):
DISARMED = 0
ARMED = 1
TAKING_OFF = 2
HOVERING = 3
NAVIGATING = 4
LANDING = 5
EMERGENCY = 6
class Controller:
HOLD_ALT = HOLD_ALT
LOWEST_ALT = LOWEST_ALT
def __init__(self, connection_string: str):
print(f"[UAV] Connecting to {connection_string} ...")
self.conn = mavutil.mavlink_connection(connection_string)
self.conn.wait_heartbeat()
print("[UAV] Heartbeat from system %u component %u" %
(self.conn.target_system, self.conn.target_component))
self.conn.mav.request_data_stream_send(
self.conn.target_system,
self.conn.target_component,
mavutil.mavlink.MAV_DATA_STREAM_ALL,
50, 1)
self.current_yaw = 0.0
self.altitude = 0.0
self.position = {'x': 0.0, 'y': 0.0, 'z': 0.0}
self.armed = False
def _drain_messages(self):
while True:
msg = self.conn.recv_match(blocking=False)
if msg is None:
break
mtype = msg.get_type()
if mtype == 'ATTITUDE':
self.current_yaw = msg.yaw
elif mtype == 'LOCAL_POSITION_NED':
self.position = {'x': msg.x, 'y': msg.y, 'z': msg.z}
self.altitude = -msg.z
elif mtype == 'GLOBAL_POSITION_INT':
if self.altitude == 0:
self.altitude = msg.relative_alt / 1000.0
elif mtype == 'HEARTBEAT':
self.armed = bool(msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED)
elif mtype == 'STATUSTEXT':
text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore')
text = text.strip()
if text:
print(f"[SITL] {text}")
def update_state(self):
self._drain_messages()
def get_position(self) -> dict:
self.conn.mav.command_long_send(
self.conn.target_system,
self.conn.target_component,
mavlink.MAV_CMD_REQUEST_MESSAGE,
0, 33, 0, 0, 0, 0, 0, 0)
pos = self.conn.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=5)
if pos is None:
return {'lat': 0, 'lon': 0}
return {'lat': pos.lat, 'lon': pos.lon}
def get_local_position(self) -> dict:
self.update_state()
return self.position.copy()
def get_altitude(self) -> float:
self.update_state()
return self.altitude
def set_param(self, name: str, value: float):
self.conn.mav.param_set_send(
self.conn.target_system,
self.conn.target_component,
name.encode('utf-8'),
value,
mavutil.mavlink.MAV_PARAM_TYPE_REAL32)
sleep(0.5)
self._drain_messages()
def set_mode_guided(self):
self.conn.mav.command_long_send(
self.conn.target_system,
self.conn.target_component,
MAV_CMD_DO_SET_MODE,
0, 89, GUIDED_MODE, 0, 0, 0, 0, 0)
sleep(1)
self._drain_messages()
print("[UAV] Mode -> GUIDED")
def set_mode_guided_nogps(self):
self.conn.mav.command_long_send(
self.conn.target_system,
self.conn.target_component,
MAV_CMD_DO_SET_MODE,
0, 89, GUIDED_NOGPS_MODE, 0, 0, 0, 0, 0)
sleep(1)
self._drain_messages()
print("[UAV] Mode -> GUIDED_NOGPS")
def arm(self, retries: int = 10):
self.set_mode_guided()
for attempt in range(1, retries + 1):
print(f"[UAV] Arm attempt {attempt}/{retries}...")
self.conn.arducopter_arm()
t0 = time.time()
while time.time() - t0 < 5:
self._drain_messages()
if self.armed:
print("[UAV] Armed")
return True
sleep(0.2)
self._drain_messages()
print("[UAV] FAILED to arm after all retries")
return False
class UAVController(Node):
def __init__(self):
super().__init__('uav_controller')
self.declare_parameter('takeoff_altitude', 5.0)
self.declare_parameter('position_tolerance', 0.3)
self.declare_parameter('namespace', '/uav')
self.declare_parameter('auto_arm', True)
self.declare_parameter('auto_takeoff', True)
self.declare_parameter('startup_delay', 15.0) # Wait for EKF
self.takeoff_altitude = self.get_parameter('takeoff_altitude').value
self.position_tolerance = self.get_parameter('position_tolerance').value
ns = self.get_parameter('namespace').value
self.auto_arm = self.get_parameter('auto_arm').value
self.auto_takeoff = self.get_parameter('auto_takeoff').value
self.startup_delay = self.get_parameter('startup_delay').value
self.state = FlightState.DISARMED
self.mavros_state = None
self.current_pose = None
self.target_pose = None
self.home_position = None
self.startup_complete = False
self.ekf_ready = False
self.state_sub = self.create_subscription(
State, f'{ns}/mavros/state', self.state_callback, 10)
self.local_pose_sub = self.create_subscription(
PoseStamped, f'{ns}/mavros/local_position/pose', self.pose_callback, 10)
self.cmd_sub = self.create_subscription(
String, f'{ns}/controller/command', self.command_callback, 10)
self.setpoint_sub = self.create_subscription(
PoseStamped, f'{ns}/setpoint_position', self.setpoint_callback, 10)
self.vel_sub = self.create_subscription(
Twist, f'{ns}/cmd_vel_safe', self.velocity_callback, 10)
self.setpoint_pub = self.create_publisher(
PoseStamped, f'{ns}/mavros/setpoint_position/local', 10)
self.vel_pub = self.create_publisher(
TwistStamped, f'{ns}/mavros/setpoint_velocity/cmd_vel', 10)
self.status_pub = self.create_publisher(
String, f'{ns}/controller/status', 10)
self.arming_client = self.create_client(CommandBool, f'{ns}/mavros/cmd/arming')
self.set_mode_client = self.create_client(SetMode, f'{ns}/mavros/set_mode')
self.takeoff_client = self.create_client(CommandTOL, f'{ns}/mavros/cmd/takeoff')
self.land_client = self.create_client(CommandTOL, f'{ns}/mavros/cmd/land')
self.setpoint_timer = self.create_timer(0.05, self.publish_setpoint)
self.status_timer = self.create_timer(0.5, self.publish_status)
# Delayed auto-start (wait for EKF initialization)
if self.auto_arm or self.auto_takeoff:
self.get_logger().info(f'Auto-mission enabled. Starting in {self.startup_delay}s...')
self.startup_timer = self.create_timer(self.startup_delay, self.auto_startup)
self.get_logger().info('UAV Controller Started - GPS-denied mode')
def auto_startup(self):
"""Auto arm and takeoff after startup delay."""
# Only run once
if self.startup_complete:
return
self.startup_complete = True
self.startup_timer.cancel()
self.get_logger().info('Auto-startup: Beginning autonomous sequence...')
# Check if MAVROS is connected
if self.mavros_state is None:
self.get_logger().warn('MAVROS not connected yet, retrying in 5s...')
self.startup_timer = self.create_timer(5.0, self.auto_startup)
self.startup_complete = False
return
# Set GUIDED mode first
self.get_logger().info('Auto-startup: Setting GUIDED mode...')
self.set_mode('GUIDED')
# Wait a moment then arm
self.create_timer(2.0, self._auto_arm_callback, callback_group=None)
def _auto_arm_callback(self):
"""Callback to arm after mode is set."""
if self.auto_arm:
self.get_logger().info('Auto-startup: Arming...')
self.arm()
# Wait then takeoff
if self.auto_takeoff:
self.create_timer(3.0, self._auto_takeoff_callback, callback_group=None)
def _auto_takeoff_callback(self):
"""Callback to takeoff after arming."""
if self.mavros_state and self.mavros_state.armed:
self.get_logger().info('Auto-startup: Taking off...')
self.takeoff()
else:
self.get_logger().warn('Auto-startup: Not armed, cannot takeoff')
def state_callback(self, msg):
self.mavros_state = msg
if msg.armed and self.state == FlightState.DISARMED:
self.state = FlightState.ARMED
def pose_callback(self, msg):
self.current_pose = msg
if self.home_position is None and self.mavros_state and self.mavros_state.armed:
self.home_position = np.array([
msg.pose.position.x,
msg.pose.position.y,
msg.pose.position.z
])
self.get_logger().info(f'Home position set: {self.home_position}')
def setpoint_callback(self, msg):
self.target_pose = msg
def velocity_callback(self, msg):
if self.state not in [FlightState.NAVIGATING, FlightState.HOVERING]:
return
vel_msg = TwistStamped()
vel_msg.header.stamp = self.get_clock().now().to_msg()
vel_msg.header.frame_id = 'base_link'
vel_msg.twist = msg
self.vel_pub.publish(vel_msg)
def command_callback(self, msg):
cmd = msg.data.lower()
if cmd == 'arm':
self.arm()
elif cmd == 'disarm':
self.disarm()
elif cmd == 'takeoff':
self.takeoff()
elif cmd == 'land':
self.land()
elif cmd == 'rtl':
self.return_to_launch()
elif cmd == 'hold':
self.hold_position()
elif cmd == 'guided':
self.set_mode('GUIDED')
def arm(self):
if not self.arming_client.wait_for_service(timeout_sec=5.0):
self.get_logger().error('Arming service not available')
return False
req = CommandBool.Request()
req.value = True
future = self.arming_client.call_async(req)
future.add_done_callback(self.arm_response)
return True
def arm_response(self, future):
try:
result = future.result()
if result.success:
self.state = FlightState.ARMED
self.get_logger().info('Vehicle armed')
else:
self.get_logger().error('Arming failed')
except Exception as e:
self.get_logger().error(f'Arming error: {e}')
def disarm(self):
req = CommandBool.Request()
req.value = False
future = self.arming_client.call_async(req)
self.state = FlightState.DISARMED
def set_mode(self, mode):
if not self.set_mode_client.wait_for_service(timeout_sec=5.0):
return False
req = SetMode.Request()
req.custom_mode = mode
future = self.set_mode_client.call_async(req)
return True
def takeoff(self):
self.set_mode('GUIDED')
self.arm()
if self.current_pose is not None:
self.target_pose = PoseStamped()
self.target_pose.header.frame_id = 'odom'
self.target_pose.pose.position.x = self.current_pose.pose.position.x
self.target_pose.pose.position.y = self.current_pose.pose.position.y
self.target_pose.pose.position.z = self.takeoff_altitude
self.target_pose.pose.orientation.w = 1.0
self.state = FlightState.TAKING_OFF
self.get_logger().info(f'Taking off to {self.takeoff_altitude}m')
self.conn.arducopter_disarm()
print("[UAV] Disarmed")
def takeoff(self, altitude: float = HOLD_ALT):
self.conn.mav.command_long_send(
self.conn.target_system,
self.conn.target_component,
MAV_CMD_NAV_TAKEOFF,
0, 0, 0, 0, 0, 0, 0, altitude)
print(f"[UAV] Takeoff -> {altitude}m")
def land(self):
if self.current_pose is not None:
self.target_pose = PoseStamped()
self.target_pose.header.frame_id = 'odom'
self.target_pose.pose.position.x = self.current_pose.pose.position.x
self.target_pose.pose.position.y = self.current_pose.pose.position.y
self.target_pose.pose.position.z = 0.0
self.target_pose.pose.orientation.w = 1.0
self.state = FlightState.LANDING
self.get_logger().info('Landing')
def return_to_launch(self):
if self.home_position is not None:
self.target_pose = PoseStamped()
self.target_pose.header.frame_id = 'odom'
self.target_pose.pose.position.x = float(self.home_position[0])
self.target_pose.pose.position.y = float(self.home_position[1])
self.target_pose.pose.position.z = self.takeoff_altitude
self.target_pose.pose.orientation.w = 1.0
self.state = FlightState.NAVIGATING
self.get_logger().info('Returning to local home position')
def hold_position(self):
if self.current_pose is not None:
self.target_pose = PoseStamped()
self.target_pose.header = self.current_pose.header
self.target_pose.pose = self.current_pose.pose
self.state = FlightState.HOVERING
def publish_setpoint(self):
if self.target_pose is None:
return
self.target_pose.header.stamp = self.get_clock().now().to_msg()
self.setpoint_pub.publish(self.target_pose)
if self.current_pose is not None:
error = np.array([
self.target_pose.pose.position.x - self.current_pose.pose.position.x,
self.target_pose.pose.position.y - self.current_pose.pose.position.y,
self.target_pose.pose.position.z - self.current_pose.pose.position.z
])
distance = np.linalg.norm(error)
if self.state == FlightState.TAKING_OFF and distance < self.position_tolerance:
self.state = FlightState.HOVERING
self.get_logger().info('Takeoff complete')
elif self.state == FlightState.LANDING and self.current_pose.pose.position.z < 0.2:
self.state = FlightState.ARMED
self.get_logger().info('Landing complete')
def publish_status(self):
status_msg = String()
armed = 'ARMED' if (self.mavros_state and self.mavros_state.armed) else 'DISARMED'
mode = self.mavros_state.mode if self.mavros_state else 'UNKNOWN'
status_msg.data = f'{self.state.name}|{armed}|{mode}'
self.status_pub.publish(status_msg)
self.conn.set_mode_rtl()
print("[UAV] Landing (RTL)")
def land_at(self, lat: int, lon: int):
self.conn.mav.command_long_send(
self.conn.target_system,
self.conn.target_component,
MAV_CMD_NAV_LAND,
0, 0, 0, 0, 0,
lat / 1e7 if abs(lat) > 1e6 else lat,
lon / 1e7 if abs(lon) > 1e6 else lon,
0)
print(f"[UAV] Landing at ({lat}, {lon})")
def main(args=None):
rclpy.init(args=args)
node = UAVController()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
def move_pos_rel(self, x: float, y: float, z: float):
move_msg = MAVLink_set_position_target_local_ned_message(
int(perf_counter() * 1000),
self.conn.target_system,
self.conn.target_component,
MAV_FRAME_BODY_OFFSET_NED,
3576, x, y, z,
0, 0, 0, 0, 0, 0, 0, 0)
self.conn.mav.send(move_msg)
def move_pos_rel_yaw(self, x: float, y: float, z: float, yaw: float):
move_msg = MAVLink_set_position_target_local_ned_message(
int(perf_counter() * 1000),
self.conn.target_system,
self.conn.target_component,
MAV_FRAME_BODY_OFFSET_NED,
3576, x, y, z,
0, 0, 0, 0, 0, 0, yaw, 0)
self.conn.mav.send(move_msg)
if __name__ == '__main__':
main()
def move_vel_rel_alt(self, vx: float, vy: float, z: float):
move_msg = MAVLink_set_position_target_local_ned_message(
int(perf_counter() * 1000),
self.conn.target_system,
self.conn.target_component,
MAV_FRAME_BODY_OFFSET_NED,
3555, 0, 0, z,
vx, vy, 0, 0, 0, 0, 0, 0)
self.conn.mav.send(move_msg)
def move_local_ned(self, x: float, y: float, z: float):
move_msg = MAVLink_set_position_target_local_ned_message(
int(perf_counter() * 1000),
self.conn.target_system,
self.conn.target_component,
MAV_FRAME_LOCAL_NED,
3576, x, y, z,
0, 0, 0, 0, 0, 0, 0, 0)
self.conn.mav.send(move_msg)
def go_to(self, lat: int, lon: int, alt: float, yaw: float = 0):
move_msg = MAVLink_set_position_target_global_int_message(
int(perf_counter() * 1000),
self.conn.target_system,
self.conn.target_component,
MAV_FRAME_GLOBAL_RELATIVE_ALT_INT,
504, lat, lon, alt,
0, 0, 0, 0, 0, 0, yaw, 0)
self.conn.mav.send(move_msg)
def distance(self, p1: Tuple, p2: Tuple) -> float:
return float(np.linalg.norm(np.asarray(p1) - np.asarray(p2)))
def wait_altitude(self, target_alt: float, tolerance: float = 0.5,
timeout: float = 30.0):
t0 = time.time()
while time.time() - t0 < timeout:
self.update_state()
if abs(self.altitude - target_alt) < tolerance:
print(f"\n[UAV] Altitude reached: {self.altitude:.1f}m")
return True
print(f"\r[UAV] Climbing: {self.altitude:.1f}m / {target_alt:.1f}m ",
end='', flush=True)
sleep(0.2)
print(f"\n[UAV] Altitude timeout at {self.altitude:.1f}m")
return False
def wait_for_gps(self, timeout: float = 120.0):
print("[UAV] Waiting for GPS lock & EKF ...")
t0 = time.time()
while time.time() - t0 < timeout:
self._drain_messages()
msg = self.conn.recv_match(type='GPS_RAW_INT', blocking=True, timeout=2)
if msg and msg.fix_type >= 3:
print(f"\n[UAV] GPS fix: {msg.fix_type} sats: {msg.satellites_visible}")
print("[UAV] Waiting for EKF to settle ...")
for _ in range(15):
self._drain_messages()
sleep(1)
return True
elapsed = int(time.time() - t0)
print(f"\r[UAV] Waiting for GPS ... {elapsed}s ", end='', flush=True)
print("\n[UAV] GPS timeout")
return False
def set_max_velocity(self, speed):
self.conn.mav.command_long_send(
self.conn.target_system,
self.conn.target_component,
mavlink.MAV_CMD_DO_CHANGE_SPEED,
0, speed, -1, 0, 0, 0, 0, 0)

View File

@@ -1,198 +1,94 @@
#!/usr/bin/env python3
"""UGV Controller - Ground vehicle control using local position."""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, PoseStamped
from nav_msgs.msg import Odometry
from std_msgs.msg import String, Bool
import math
import numpy as np
from enum import Enum
from time import sleep, perf_counter
from typing import Tuple, Optional
try:
from pymavlink import mavutil
import pymavlink.dialects.v20.ardupilotmega as mavlink
from pymavlink.dialects.v20.ardupilotmega import (
MAVLink_set_position_target_local_ned_message,
MAV_FRAME_BODY_OFFSET_NED,
MAV_FRAME_LOCAL_NED,
)
HAS_MAVLINK = True
except ImportError:
HAS_MAVLINK = False
MAX_LINEAR_VEL = 1.0
MAX_ANGULAR_VEL = 1.5
POSITION_TOL = 0.3
class UGVState(Enum):
IDLE = 0
MOVING = 1
ROTATING = 2
STOPPED = 3
class UGVController:
def __init__(self, connection_string: Optional[str] = None,
static_pos: Tuple[float, float] = (10.0, 5.0)):
self.conn = None
self.backend = 'passive'
self.position = {'x': static_pos[0], 'y': static_pos[1], 'z': 0.0}
self.yaw = 0.0
if connection_string and HAS_MAVLINK:
try:
print(f"[UGV] Connecting via pymavlink: {connection_string}")
self.conn = mavutil.mavlink_connection(connection_string)
self.conn.wait_heartbeat(timeout=10)
print("[UGV] Heartbeat received (system %u component %u)" %
(self.conn.target_system, self.conn.target_component))
self.backend = 'mavlink'
except Exception as e:
print(f"[UGV] MAVLink connection failed: {e}")
self.conn = None
if self.backend == 'passive':
print(f"[UGV] Passive mode - static at ({static_pos[0]}, {static_pos[1]})")
def update_state(self):
if self.backend == 'mavlink' and self.conn:
while True:
msg = self.conn.recv_match(blocking=False)
if msg is None:
break
mtype = msg.get_type()
if mtype == 'LOCAL_POSITION_NED':
self.position = {'x': msg.x, 'y': msg.y, 'z': msg.z}
elif mtype == 'ATTITUDE':
self.yaw = msg.yaw
def get_position(self) -> dict:
self.update_state()
return self.position.copy()
def move_to(self, x: float, y: float):
if self.backend == 'mavlink' and self.conn:
move_msg = MAVLink_set_position_target_local_ned_message(
int(perf_counter() * 1000),
self.conn.target_system,
self.conn.target_component,
MAV_FRAME_LOCAL_NED,
3576, x, y, 0,
0, 0, 0, 0, 0, 0, 0, 0)
self.conn.mav.send(move_msg)
print(f"[UGV] Moving to ({x:.1f}, {y:.1f})")
def send_velocity(self, vx: float, vy: float, yaw_rate: float = 0.0):
if self.backend == 'mavlink' and self.conn:
move_msg = MAVLink_set_position_target_local_ned_message(
int(perf_counter() * 1000),
self.conn.target_system,
self.conn.target_component,
MAV_FRAME_BODY_OFFSET_NED,
3527, 0, 0, 0,
vx, vy, 0, 0, 0, 0, 0, yaw_rate)
self.conn.mav.send(move_msg)
class UGVController(Node):
def __init__(self):
super().__init__('ugv_controller')
self.declare_parameter('max_linear_velocity', 1.0)
self.declare_parameter('max_angular_velocity', 1.5)
self.declare_parameter('kp_linear', 0.8)
self.declare_parameter('kp_angular', 1.5)
self.declare_parameter('position_tolerance', 0.3)
self.declare_parameter('angle_tolerance', 0.1)
self.max_linear_vel = self.get_parameter('max_linear_velocity').value
self.max_angular_vel = self.get_parameter('max_angular_velocity').value
self.kp_linear = self.get_parameter('kp_linear').value
self.kp_angular = self.get_parameter('kp_angular').value
self.position_tolerance = self.get_parameter('position_tolerance').value
self.angle_tolerance = self.get_parameter('angle_tolerance').value
self.state = UGVState.IDLE
self.current_odom = None
self.target_pose = None
self.home_position = None
self.odom_sub = self.create_subscription(
Odometry, '/ugv/odom', self.odom_callback, 10)
self.goal_sub = self.create_subscription(
PoseStamped, '/ugv/goal_pose', self.goal_callback, 10)
self.cmd_sub = self.create_subscription(
String, '/ugv/controller/command', self.command_callback, 10)
self.vel_sub = self.create_subscription(
Twist, '/ugv/cmd_vel_safe', self.velocity_callback, 10)
self.cmd_vel_pub = self.create_publisher(Twist, '/ugv/cmd_vel', 10)
self.status_pub = self.create_publisher(String, '/ugv/controller/status', 10)
self.goal_reached_pub = self.create_publisher(Bool, '/ugv/goal_reached', 10)
self.control_timer = self.create_timer(0.05, self.control_loop)
self.status_timer = self.create_timer(0.5, self.publish_status)
self.get_logger().info('UGV Controller Started - GPS-denied mode')
def odom_callback(self, msg):
self.current_odom = msg
if self.home_position is None:
self.home_position = np.array([
msg.pose.pose.position.x,
msg.pose.pose.position.y
])
self.get_logger().info(f'Home position set: {self.home_position}')
def goal_callback(self, msg):
self.target_pose = msg
self.state = UGVState.MOVING
self.get_logger().info(
f'Goal received: [{msg.pose.position.x:.2f}, {msg.pose.position.y:.2f}]')
def command_callback(self, msg):
cmd = msg.data.lower()
if cmd == 'stop':
self.stop()
self.state = UGVState.STOPPED
elif cmd == 'resume':
self.state = UGVState.MOVING
elif cmd == 'rtl':
self.return_to_launch()
def velocity_callback(self, msg):
if self.state == UGVState.MOVING:
self.cmd_vel_pub.publish(msg)
def control_loop(self):
if self.state != UGVState.MOVING:
return
if self.current_odom is None or self.target_pose is None:
return
current_pos = np.array([
self.current_odom.pose.pose.position.x,
self.current_odom.pose.pose.position.y
])
target_pos = np.array([
self.target_pose.pose.position.x,
self.target_pose.pose.position.y
])
error = target_pos - current_pos
distance = np.linalg.norm(error)
if distance < self.position_tolerance:
self.stop()
self.state = UGVState.IDLE
reached_msg = Bool()
reached_msg.data = True
self.goal_reached_pub.publish(reached_msg)
self.get_logger().info('Goal reached!')
return
target_angle = np.arctan2(error[1], error[0])
quat = self.current_odom.pose.pose.orientation
current_yaw = np.arctan2(
2.0 * (quat.w * quat.z + quat.x * quat.y),
1.0 - 2.0 * (quat.y * quat.y + quat.z * quat.z)
)
angle_error = self.normalize_angle(target_angle - current_yaw)
cmd = Twist()
if abs(angle_error) > self.angle_tolerance:
cmd.angular.z = np.clip(
self.kp_angular * angle_error,
-self.max_angular_vel,
self.max_angular_vel
)
if abs(angle_error) < np.pi / 4:
speed = self.kp_linear * distance * (1.0 - abs(angle_error) / np.pi)
cmd.linear.x = np.clip(speed, 0.0, self.max_linear_vel)
else:
cmd.linear.x = np.clip(
self.kp_linear * distance,
0.0,
self.max_linear_vel
)
self.cmd_vel_pub.publish(cmd)
def stop(self):
cmd = Twist()
self.cmd_vel_pub.publish(cmd)
def return_to_launch(self):
if self.home_position is not None:
self.target_pose = PoseStamped()
self.target_pose.header.frame_id = 'odom'
self.target_pose.pose.position.x = float(self.home_position[0])
self.target_pose.pose.position.y = float(self.home_position[1])
self.target_pose.pose.orientation.w = 1.0
self.state = UGVState.MOVING
self.get_logger().info('Returning to local home')
def normalize_angle(self, angle):
while angle > np.pi:
angle -= 2 * np.pi
while angle < -np.pi:
angle += 2 * np.pi
return angle
def publish_status(self):
status_msg = String()
status_msg.data = self.state.name
self.status_pub.publish(status_msg)
self.send_velocity(0, 0, 0)
def main(args=None):
rclpy.init(args=args)
node = UGVController()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
def distance_to(self, x: float, y: float) -> float:
dx = x - self.position['x']
dy = y - self.position['y']
return math.sqrt(dx**2 + dy**2)