Code reorganization and Drone Logic Update

This commit is contained in:
2026-01-05 02:38:46 +00:00
parent c5b208c91a
commit 27a70c4983
32 changed files with 1018 additions and 812 deletions

0
src/__init__.py Normal file
View File

420
src/camera_viewer.py Normal file
View File

@@ -0,0 +1,420 @@
#!/usr/bin/env python3
"""
Drone Camera Viewer - Real-time camera feed display.
Displays the drone's camera feed from Gazebo simulation in a separate window.
Works with ROS 2 image topics.
Usage:
python camera_viewer.py # Default topic /drone/camera
python camera_viewer.py --topic /camera/image_raw
python camera_viewer.py --fps 60 # Higher framerate
python camera_viewer.py --record output.avi # Record to file
Topics:
- /drone/camera (Gazebo simulation)
- /camera/image_raw (generic)
- /ap/camera (ArduPilot if available)
"""
import argparse
import sys
import time
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import Image
# Try to import OpenCV
try:
import cv2
import numpy as np
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
print("Warning: OpenCV not installed. Install with: pip install opencv-python")
# Try to import cv_bridge (optional, for more robust conversion)
try:
from cv_bridge import CvBridge
CV_BRIDGE_AVAILABLE = True
except ImportError:
CV_BRIDGE_AVAILABLE = False
class CameraViewer(Node):
"""ROS 2 node that displays camera feed in a window."""
def __init__(
self,
topic: str = '/drone/camera',
fps: int = 30,
window_name: str = 'Drone Camera',
record_file: Optional[str] = None,
show_info: bool = True
):
super().__init__('camera_viewer')
self._topic = topic
self._target_fps = fps
self._window_name = window_name
self._show_info = show_info
self._record_file = record_file
# State
self._latest_frame: Optional[np.ndarray] = None
self._frame_count = 0
self._fps_actual = 0.0
self._last_fps_time = time.time()
self._fps_frame_count = 0
self._running = True
# Video writer for recording
self._video_writer: Optional[cv2.VideoWriter] = None
# CV Bridge for image conversion
if CV_BRIDGE_AVAILABLE:
self._bridge = CvBridge()
else:
self._bridge = None
# QoS for camera topics (best effort for performance)
camera_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1
)
# Subscribe to camera topic
self._camera_sub = self.create_subscription(
Image,
topic,
self._camera_callback,
camera_qos
)
self.get_logger().info('=' * 50)
self.get_logger().info('Camera Viewer Starting...')
self.get_logger().info(f' Topic: {topic}')
self.get_logger().info(f' Target FPS: {fps}')
if record_file:
self.get_logger().info(f' Recording to: {record_file}')
self.get_logger().info('=' * 50)
self.get_logger().info('Press Q or ESC in the window to quit')
# Create display timer at target FPS
display_period = 1.0 / fps
self._display_timer = self.create_timer(display_period, self._display_frame)
def _camera_callback(self, msg: Image):
"""Process incoming camera image."""
try:
# Convert ROS Image to OpenCV format
if self._bridge and CV_BRIDGE_AVAILABLE:
# Use cv_bridge for robust conversion
self._latest_frame = self._bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
else:
# Manual conversion
self._latest_frame = self._ros_image_to_cv2(msg)
self._frame_count += 1
self._fps_frame_count += 1
except Exception as e:
self.get_logger().warning(f'Failed to convert image: {e}')
def _ros_image_to_cv2(self, msg: Image) -> np.ndarray:
"""Convert ROS Image message to OpenCV format."""
# Get image dimensions
height = msg.height
width = msg.width
encoding = msg.encoding
# Convert based on encoding
if encoding in ['rgb8', 'bgr8']:
# 3-channel color image
channels = 3
dtype = np.uint8
elif encoding in ['rgba8', 'bgra8']:
# 4-channel color image
channels = 4
dtype = np.uint8
elif encoding in ['mono8', '8UC1']:
# Grayscale
channels = 1
dtype = np.uint8
elif encoding in ['mono16', '16UC1']:
channels = 1
dtype = np.uint16
elif encoding == '32FC1':
channels = 1
dtype = np.float32
else:
# Try to handle as 3-channel uint8
channels = 3
dtype = np.uint8
# Reshape data to image
if channels == 1:
frame = np.frombuffer(msg.data, dtype=dtype).reshape((height, width))
# Convert grayscale to BGR for display
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
else:
frame = np.frombuffer(msg.data, dtype=dtype).reshape((height, width, channels))
# Convert to BGR if needed
if encoding == 'rgb8':
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
elif encoding == 'rgba8':
frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2BGR)
elif encoding == 'bgra8':
frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
return frame
def _display_frame(self):
"""Display the latest frame."""
if not self._running:
return
# Calculate actual FPS
current_time = time.time()
elapsed = current_time - self._last_fps_time
if elapsed >= 1.0:
self._fps_actual = self._fps_frame_count / elapsed
self._fps_frame_count = 0
self._last_fps_time = current_time
# Check for window close or key press
key = cv2.waitKey(1) & 0xFF
if key in [ord('q'), ord('Q'), 27]: # Q or ESC
self._running = False
self.get_logger().info('Shutting down...')
rclpy.shutdown()
return
# Display frame if available
if self._latest_frame is not None:
display_frame = self._latest_frame.copy()
# Add info overlay
if self._show_info:
self._add_info_overlay(display_frame)
# Show frame
cv2.imshow(self._window_name, display_frame)
# Record if enabled
if self._record_file and self._video_writer is None:
# Initialize video writer with frame dimensions
h, w = display_frame.shape[:2]
fourcc = cv2.VideoWriter_fourcc(*'XVID')
self._video_writer = cv2.VideoWriter(
self._record_file, fourcc, self._target_fps, (w, h)
)
if self._video_writer:
self._video_writer.write(display_frame)
else:
# No frame yet - show waiting message
waiting_frame = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.putText(
waiting_frame,
'Waiting for camera feed...',
(120, 240),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(255, 255, 255),
2
)
cv2.putText(
waiting_frame,
f'Topic: {self._topic}',
(100, 280),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(128, 128, 128),
1
)
cv2.imshow(self._window_name, waiting_frame)
def _add_info_overlay(self, frame: np.ndarray):
"""Add information overlay to frame."""
h, w = frame.shape[:2]
# Semi-transparent background for text
overlay = frame.copy()
cv2.rectangle(overlay, (5, 5), (200, 85), (0, 0, 0), -1)
cv2.addWeighted(overlay, 0.5, frame, 0.5, 0, frame)
# FPS display
cv2.putText(
frame,
f'FPS: {self._fps_actual:.1f}',
(10, 25),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 0),
2
)
# Frame count
cv2.putText(
frame,
f'Frame: {self._frame_count}',
(10, 50),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 0),
2
)
# Resolution
cv2.putText(
frame,
f'Size: {w}x{h}',
(10, 75),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 0),
2
)
# Recording indicator
if self._record_file:
cv2.circle(frame, (w - 20, 20), 10, (0, 0, 255), -1)
cv2.putText(
frame,
'REC',
(w - 60, 25),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 255),
2
)
def shutdown(self):
"""Clean up resources."""
self._running = False
if self._video_writer:
self._video_writer.release()
cv2.destroyAllWindows()
def find_camera_topics() -> list:
"""Find available camera topics."""
import subprocess
try:
result = subprocess.run(
['ros2', 'topic', 'list'],
capture_output=True,
text=True,
timeout=5.0
)
topics = result.stdout.strip().split('\n')
# Filter for likely camera topics
camera_keywords = ['camera', 'image', 'rgb', 'depth']
camera_topics = [
t for t in topics
if any(kw in t.lower() for kw in camera_keywords)
]
return camera_topics
except Exception:
return []
def main():
if not CV2_AVAILABLE:
print("ERROR: OpenCV is required for camera viewer")
print("Install with: pip install opencv-python")
sys.exit(1)
parser = argparse.ArgumentParser(
description='Drone Camera Viewer - Display camera feed in real-time',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python camera_viewer.py # Default topic
python camera_viewer.py --topic /camera/image_raw
python camera_viewer.py --fps 60 # Higher framerate
python camera_viewer.py --record flight.avi # Record video
python camera_viewer.py --list # List camera topics
Common topics:
/drone/camera - Gazebo simulation
/camera/image_raw - Generic camera
/camera/color/image_raw - RGB camera
/camera/depth/image_raw - Depth camera
"""
)
parser.add_argument(
'--topic', '-t', type=str, default='/drone/camera',
help='Camera image topic (default: /drone/camera)'
)
parser.add_argument(
'--fps', type=int, default=30,
help='Target display framerate (default: 30)'
)
parser.add_argument(
'--record', '-r', type=str, default=None,
help='Record video to file (e.g., output.avi)'
)
parser.add_argument(
'--no-info', action='store_true',
help='Hide info overlay'
)
parser.add_argument(
'--list', '-l', action='store_true',
help='List available camera topics and exit'
)
parser.add_argument(
'--window-name', '-w', type=str, default='Drone Camera',
help='Window title'
)
args = parser.parse_args()
# List topics and exit
if args.list:
print("Searching for camera topics...")
topics = find_camera_topics()
if topics:
print("\nAvailable camera topics:")
for t in topics:
print(f" {t}")
else:
print("\nNo camera topics found. Is the simulation running?")
return
# Initialize ROS 2
rclpy.init()
viewer = None
try:
viewer = CameraViewer(
topic=args.topic,
fps=args.fps,
window_name=args.window_name,
record_file=args.record,
show_info=not args.no_info
)
rclpy.spin(viewer)
except KeyboardInterrupt:
print('\nShutting down...')
finally:
if viewer:
viewer.shutdown()
viewer.destroy_node()
if rclpy.ok():
rclpy.shutdown()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()

500
src/drone_controller.py Normal file
View File

@@ -0,0 +1,500 @@
#!/usr/bin/env python3
"""
DroneController - GPS-denied landing with 3-phase state machine.
Phases:
1. SEARCH - Find QR code on rover using camera
2. COMMAND - Send commands to rover
3. LAND - Land on the rover
"""
import json
import math
import base64
from enum import Enum, auto
from typing import Dict, Any, Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from geometry_msgs.msg import Twist, PoseStamped, TwistStamped
from sensor_msgs.msg import Imu, BatteryState, NavSatFix, Image
from std_msgs.msg import String
try:
from config import CONTROLLER, DRONE, LANDING
CONFIG_LOADED = True
except ImportError:
CONFIG_LOADED = False
CONTROLLER = {"Kp_z": 0.5, "Kd_z": 0.3, "Kp_xy": 0.3, "Kd_xy": 0.2, "rate": 50}
DRONE = {"max_thrust": 1.0, "max_pitch": 0.5, "max_roll": 0.5}
LANDING = {"height_threshold": 0.1, "success_velocity": 0.1}
try:
import cv2
import numpy as np
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
class Phase(Enum):
SEARCH = auto()
COMMAND = auto()
LAND = auto()
COMPLETE = auto()
def quaternion_to_euler(x: float, y: float, z: float, w: float) -> tuple:
sinr_cosp = 2.0 * (w * x + y * z)
cosr_cosp = 1.0 - 2.0 * (x * x + y * y)
roll = math.atan2(sinr_cosp, cosr_cosp)
sinp = 2.0 * (w * y - z * x)
if abs(sinp) >= 1:
pitch = math.copysign(math.pi / 2, sinp)
else:
pitch = math.asin(sinp)
siny_cosp = 2.0 * (w * z + x * y)
cosy_cosp = 1.0 - 2.0 * (y * y + z * z)
yaw = math.atan2(siny_cosp, cosy_cosp)
return roll, pitch, yaw
class DroneController(Node):
def __init__(self, use_ardupilot_topics: bool = True):
super().__init__('drone_controller')
self._control_rate = CONTROLLER.get("rate", 50.0)
self._Kp_z = CONTROLLER.get("Kp_z", 0.5)
self._Kd_z = CONTROLLER.get("Kd_z", 0.3)
self._Kp_xy = CONTROLLER.get("Kp_xy", 0.3)
self._Kd_xy = CONTROLLER.get("Kd_xy", 0.2)
self._max_thrust = DRONE.get("max_thrust", 1.0)
self._max_pitch = DRONE.get("max_pitch", 0.5)
self._max_roll = DRONE.get("max_roll", 0.5)
self._landing_height = LANDING.get("height_threshold", 0.1)
self._landing_velocity = LANDING.get("success_velocity", 0.1)
self._use_ardupilot = use_ardupilot_topics
self._phase = Phase.SEARCH
self._phase_start_time = self.get_clock().now()
self._qr_detected = False
self._qr_data: Optional[str] = None
self._qr_position: Optional[Dict[str, float]] = None
self._search_pattern_angle = 0.0
self._commands_sent = False
self._command_acknowledged = False
self._landing_complete = False
self._latest_telemetry: Optional[Dict[str, Any]] = None
self._rover_telemetry: Optional[Dict[str, Any]] = None
self._latest_camera_image: Optional[bytes] = None
self._telemetry_received = False
self._ap_pose: Optional[PoseStamped] = None
self._ap_twist: Optional[TwistStamped] = None
self._ap_imu: Optional[Imu] = None
self._ap_battery: Optional[BatteryState] = None
self.get_logger().info('=' * 50)
self.get_logger().info('Drone Controller - 3 Phase GPS-Denied Landing')
self.get_logger().info(f' Phase 1: SEARCH | Phase 2: COMMAND | Phase 3: LAND')
self.get_logger().info(f' Mode: {"ArduPilot DDS" if use_ardupilot_topics else "Legacy JSON"}')
self.get_logger().info('=' * 50)
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1
)
if use_ardupilot_topics:
self._setup_ardupilot_subscriptions(sensor_qos)
else:
self._setup_legacy_subscriptions()
self._rover_telemetry_sub = self.create_subscription(
String, '/rover/telemetry', self._rover_telemetry_callback, 10)
self._camera_sub = self.create_subscription(
Image, '/drone/camera', self._camera_callback, sensor_qos)
self._cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self._rover_cmd_pub = self.create_publisher(String, '/rover/command', 10)
self._control_timer = self.create_timer(1.0 / self._control_rate, self._control_loop)
self.get_logger().info(f'Current Phase: {self._phase.name}')
def _setup_ardupilot_subscriptions(self, qos: QoSProfile):
self._ap_pose_sub = self.create_subscription(
PoseStamped, '/ap/pose/filtered', self._ap_pose_callback, qos)
self._ap_twist_sub = self.create_subscription(
TwistStamped, '/ap/twist/filtered', self._ap_twist_callback, qos)
self._ap_imu_sub = self.create_subscription(
Imu, '/ap/imu/filtered', self._ap_imu_callback, qos)
self._ap_battery_sub = self.create_subscription(
BatteryState, '/ap/battery', self._ap_battery_callback, qos)
def _setup_legacy_subscriptions(self):
self._telemetry_sub = self.create_subscription(
String, '/drone/telemetry', self._telemetry_callback, 10)
def _ap_pose_callback(self, msg: PoseStamped):
self._ap_pose = msg
if not self._telemetry_received:
self._telemetry_received = True
self._warmup_count = 0
self.get_logger().info('First ArduPilot pose received!')
self._build_telemetry_from_ardupilot()
def _ap_twist_callback(self, msg: TwistStamped):
self._ap_twist = msg
self._build_telemetry_from_ardupilot()
def _ap_imu_callback(self, msg: Imu):
self._ap_imu = msg
self._build_telemetry_from_ardupilot()
def _ap_battery_callback(self, msg: BatteryState):
self._ap_battery = msg
self._build_telemetry_from_ardupilot()
def _telemetry_callback(self, msg: String) -> None:
try:
self._latest_telemetry = json.loads(msg.data)
if not self._telemetry_received:
self._telemetry_received = True
self._warmup_count = 0
self.get_logger().info('First telemetry received!')
except json.JSONDecodeError as e:
self.get_logger().warning(f'Failed to parse telemetry: {e}')
def _rover_telemetry_callback(self, msg: String) -> None:
try:
self._rover_telemetry = json.loads(msg.data)
except json.JSONDecodeError:
pass
def _camera_callback(self, msg: Image) -> None:
try:
self._latest_camera_image = bytes(msg.data)
self._camera_width = msg.width
self._camera_height = msg.height
self._camera_encoding = msg.encoding
except Exception as e:
self.get_logger().warning(f'Camera callback error: {e}')
def _build_telemetry_from_ardupilot(self):
telemetry = {}
if self._ap_pose:
pos = self._ap_pose.pose.position
ori = self._ap_pose.pose.orientation
roll, pitch, yaw = quaternion_to_euler(ori.x, ori.y, ori.z, ori.w)
telemetry['position'] = {'x': pos.x, 'y': pos.y, 'z': pos.z}
telemetry['altimeter'] = {
'altitude': pos.z,
'vertical_velocity': self._ap_twist.twist.linear.z if self._ap_twist else 0.0
}
telemetry['imu'] = {
'orientation': {'roll': roll, 'pitch': pitch, 'yaw': yaw},
'angular_velocity': {'x': 0, 'y': 0, 'z': 0}
}
if self._ap_twist:
twist = self._ap_twist.twist
telemetry['velocity'] = {
'x': twist.linear.x, 'y': twist.linear.y, 'z': twist.linear.z
}
if 'altimeter' in telemetry:
telemetry['altimeter']['vertical_velocity'] = twist.linear.z
if self._ap_imu:
if 'imu' not in telemetry:
telemetry['imu'] = {}
telemetry['imu']['angular_velocity'] = {
'x': self._ap_imu.angular_velocity.x,
'y': self._ap_imu.angular_velocity.y,
'z': self._ap_imu.angular_velocity.z
}
if self._ap_battery:
telemetry['battery'] = {
'voltage': self._ap_battery.voltage,
'remaining': self._ap_battery.percentage * 100
}
if self._qr_position:
telemetry['landing_pad'] = self._qr_position
elif self._rover_telemetry and self._ap_pose:
rover_pos = self._rover_telemetry.get('position', {})
rx, ry = rover_pos.get('x', 0), rover_pos.get('y', 0)
dx = self._ap_pose.pose.position.x
dy = self._ap_pose.pose.position.y
dz = self._ap_pose.pose.position.z
rel_x, rel_y = rx - dx, ry - dy
distance = math.sqrt(rel_x**2 + rel_y**2 + dz**2)
telemetry['landing_pad'] = {
'relative_x': rel_x, 'relative_y': rel_y,
'distance': distance, 'confidence': 1.0 if distance < 10.0 else 0.0
}
self._latest_telemetry = telemetry
def _control_loop(self) -> None:
if self._latest_telemetry is None:
return
if self._phase == Phase.COMPLETE:
return
if not hasattr(self, '_warmup_count'):
self._warmup_count = 0
self._warmup_count += 1
if self._warmup_count < 50:
return
if self._phase == Phase.SEARCH:
thrust, pitch, roll, yaw = self._execute_search_phase()
elif self._phase == Phase.COMMAND:
thrust, pitch, roll, yaw = self._execute_command_phase()
elif self._phase == Phase.LAND:
thrust, pitch, roll, yaw = self._execute_land_phase()
else:
thrust, pitch, roll, yaw = 0.0, 0.0, 0.0, 0.0
thrust = max(min(thrust, self._max_thrust), -self._max_thrust)
pitch = max(min(pitch, self._max_pitch), -self._max_pitch)
roll = max(min(roll, self._max_roll), -self._max_roll)
yaw = max(min(yaw, 0.5), -0.5)
self._publish_command(thrust, pitch, roll, yaw)
def _execute_search_phase(self) -> tuple:
qr_result = self.detect_qr_code()
if qr_result is not None:
self._qr_detected = True
self._qr_data = qr_result.get('data')
self._qr_position = qr_result.get('position')
self.get_logger().info(f'QR CODE DETECTED: {self._qr_data}')
self._transition_to_phase(Phase.COMMAND)
return (0.0, 0.0, 0.0, 0.0)
return self.calculate_search_maneuver(self._latest_telemetry)
def detect_qr_code(self) -> Optional[Dict[str, Any]]:
if not CV2_AVAILABLE or self._latest_camera_image is None:
return None
try:
if self._camera_encoding == 'rgb8':
img = np.frombuffer(self._latest_camera_image, dtype=np.uint8)
img = img.reshape((self._camera_height, self._camera_width, 3))
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
elif self._camera_encoding == 'bgr8':
img = np.frombuffer(self._latest_camera_image, dtype=np.uint8)
img = img.reshape((self._camera_height, self._camera_width, 3))
else:
return None
detector = cv2.QRCodeDetector()
data, points, _ = detector.detectAndDecode(img)
if data and points is not None:
cx = np.mean(points[0][:, 0])
cy = np.mean(points[0][:, 1])
rel_x = (cx - self._camera_width / 2) / (self._camera_width / 2)
rel_y = (cy - self._camera_height / 2) / (self._camera_height / 2)
qr_size = np.linalg.norm(points[0][0] - points[0][2])
altitude = self._latest_telemetry.get('altimeter', {}).get('altitude', 5.0)
return {
'data': data,
'position': {
'relative_x': rel_x * altitude * 0.5,
'relative_y': rel_y * altitude * 0.5,
'distance': altitude,
'confidence': min(qr_size / 100, 1.0)
}
}
except Exception as e:
self.get_logger().debug(f'QR detection error: {e}')
return None
def calculate_search_maneuver(self, telemetry: Dict[str, Any]) -> tuple:
altimeter = telemetry.get('altimeter', {})
altitude = altimeter.get('altitude', 5.0)
vertical_vel = altimeter.get('vertical_velocity', 0.0)
target_altitude = 5.0
altitude_error = target_altitude - altitude
thrust = self._Kp_z * altitude_error - self._Kd_z * vertical_vel
self._search_pattern_angle += 0.01
yaw = 0.1
velocity = telemetry.get('velocity', {})
pitch = -self._Kd_xy * velocity.get('x', 0)
roll = -self._Kd_xy * velocity.get('y', 0)
return (thrust, pitch, roll, yaw)
def _execute_command_phase(self) -> tuple:
if not self._commands_sent:
command = self.generate_rover_command(self._qr_data)
self._send_rover_command(command)
self._commands_sent = True
self._command_time = self.get_clock().now()
elapsed = (self.get_clock().now() - self._command_time).nanoseconds / 1e9
if self._command_acknowledged or elapsed > 5.0:
self.get_logger().info('Command phase complete, transitioning to LAND')
self._transition_to_phase(Phase.LAND)
return self.calculate_hover_maneuver(self._latest_telemetry)
def generate_rover_command(self, qr_data: Optional[str]) -> Dict[str, Any]:
return {
'type': 'prepare_landing',
'qr_data': qr_data,
'drone_position': self._latest_telemetry.get('position', {})
}
def _send_rover_command(self, command: Dict[str, Any]) -> None:
msg = String()
msg.data = json.dumps(command)
self._rover_cmd_pub.publish(msg)
self.get_logger().info(f'Sent rover command: {command.get("type")}')
def calculate_hover_maneuver(self, telemetry: Dict[str, Any]) -> tuple:
altimeter = telemetry.get('altimeter', {})
altitude = altimeter.get('altitude', 5.0)
vertical_vel = altimeter.get('vertical_velocity', 0.0)
altitude_error = 0
thrust = self._Kp_z * altitude_error - self._Kd_z * vertical_vel
velocity = telemetry.get('velocity', {})
pitch = -self._Kd_xy * velocity.get('x', 0)
roll = -self._Kd_xy * velocity.get('y', 0)
return (thrust, pitch, roll, 0.0)
def _execute_land_phase(self) -> tuple:
if self._check_landing_complete():
self._landing_complete = True
self.get_logger().info('LANDING COMPLETE!')
self._transition_to_phase(Phase.COMPLETE)
return (0.0, 0.0, 0.0, 0.0)
return self.calculate_landing_maneuver(
self._latest_telemetry,
self._rover_telemetry
)
def calculate_landing_maneuver(
self,
telemetry: Dict[str, Any],
rover_telemetry: Optional[Dict[str, Any]] = None
) -> tuple:
altimeter = telemetry.get('altimeter', {})
altitude = altimeter.get('altitude', 5.0)
vertical_vel = altimeter.get('vertical_velocity', 0.0)
velocity = telemetry.get('velocity', {'x': 0, 'y': 0, 'z': 0})
vel_x = velocity.get('x', 0.0)
vel_y = velocity.get('y', 0.0)
landing_pad = telemetry.get('landing_pad', None)
if altitude > 1.0:
target_descent_rate = -0.5
else:
target_descent_rate = -0.2
descent_error = target_descent_rate - vertical_vel
thrust = self._Kp_z * descent_error
if landing_pad is not None:
target_x = landing_pad.get('relative_x', 0.0)
target_y = landing_pad.get('relative_y', 0.0)
pitch = self._Kp_xy * target_x - self._Kd_xy * vel_x
roll = self._Kp_xy * target_y - self._Kd_xy * vel_y
else:
pitch = -self._Kd_xy * vel_x
roll = -self._Kd_xy * vel_y
yaw = 0.0
return (thrust, pitch, roll, yaw)
def _transition_to_phase(self, new_phase: Phase) -> None:
old_phase = self._phase
self._phase = new_phase
self._phase_start_time = self.get_clock().now()
self.get_logger().info(f'Phase: {old_phase.name} -> {new_phase.name}')
def _check_landing_complete(self) -> bool:
if self._latest_telemetry is None:
return False
try:
altimeter = self._latest_telemetry.get('altimeter', {})
altitude = altimeter.get('altitude', float('inf'))
vertical_velocity = abs(altimeter.get('vertical_velocity', float('inf')))
return altitude < self._landing_height and vertical_velocity < self._landing_velocity
except (KeyError, TypeError):
return False
def _publish_command(self, thrust: float, pitch: float, roll: float, yaw: float) -> None:
msg = Twist()
msg.linear.z = thrust
msg.linear.x = pitch
msg.linear.y = roll
msg.angular.z = yaw
self._cmd_vel_pub.publish(msg)
def get_current_phase(self) -> Phase:
return self._phase
def main(args=None):
import sys
use_ardupilot = '--ardupilot' in sys.argv or '-a' in sys.argv
filtered_args = [a for a in (args or sys.argv) if a not in ['--ardupilot', '-a']]
rclpy.init(args=filtered_args)
controller = None
try:
controller = DroneController(use_ardupilot_topics=use_ardupilot)
rclpy.spin(controller)
except KeyboardInterrupt:
print('\nShutting down...')
finally:
if controller:
controller.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()

268
src/gazebo_bridge.py Normal file
View File

@@ -0,0 +1,268 @@
#!/usr/bin/env python3
import base64
import json
import math
from typing import Optional, Dict, Any
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy
from geometry_msgs.msg import Twist, Pose
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Image
from std_msgs.msg import String
class GazeboBridge(Node):
CAMERA_FOV = 60.0
CAMERA_RANGE = 10.0
CAMERA_WIDTH = 320
CAMERA_HEIGHT = 240
def __init__(self):
super().__init__('gazebo_bridge')
self.get_logger().info('=' * 60)
self.get_logger().info('Gazebo Bridge Starting (GPS-Denied + Camera)...')
self.get_logger().info('=' * 60)
self._landed = False
self._step_count = 0
self._rover_pos = [0.0, 0.0, 0.15]
self._last_camera_image = None
self._drone_pos = [0.0, 0.0, 5.0]
self._drone_orn = [0.0, 0.0, 0.0, 1.0]
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
depth=10
)
self._cmd_vel_sub = self.create_subscription(
Twist, '/cmd_vel', self._cmd_vel_callback, 10
)
self.get_logger().info(' Subscribed to: /cmd_vel')
self._odom_sub = self.create_subscription(
Odometry, '/model/drone/odometry', self._odom_callback, sensor_qos
)
self.get_logger().info(' Subscribed to: /model/drone/odometry')
self._camera_sub = self.create_subscription(
Image, '/drone/camera', self._camera_callback, sensor_qos
)
self.get_logger().info(' Subscribed to: /drone/camera')
self._rover_sub = self.create_subscription(
String, '/rover/telemetry', self._rover_callback, 10
)
self.get_logger().info(' Subscribed to: /rover/telemetry')
self._gz_cmd_vel_pub = self.create_publisher(
Twist, '/drone/cmd_vel', 10
)
self.get_logger().info(' Publishing to: /drone/cmd_vel (Gazebo)')
self._telemetry_pub = self.create_publisher(
String, '/drone/telemetry', 10
)
self.get_logger().info(' Publishing to: /drone/telemetry')
self.get_logger().info('Gazebo Bridge Ready!')
def _cmd_vel_callback(self, msg: Twist) -> None:
gz_msg = Twist()
gz_msg.linear.x = msg.linear.x
gz_msg.linear.y = msg.linear.y
gz_msg.linear.z = msg.linear.z
gz_msg.angular.z = msg.angular.z
self._gz_cmd_vel_pub.publish(gz_msg)
def _rover_callback(self, msg: String) -> None:
"""Receive rover position (for telemetry only - rover is static in Gazebo)."""
try:
data = json.loads(msg.data)
pos = data.get('position', {})
self._rover_pos = [
pos.get('x', 0.0),
pos.get('y', 0.0),
pos.get('z', 0.15)
]
except json.JSONDecodeError:
pass
def _camera_callback(self, msg: Image) -> None:
"""Process camera images and encode as base64."""
try:
if msg.encoding == 'rgb8':
image = np.frombuffer(msg.data, dtype=np.uint8).reshape(
msg.height, msg.width, 3
)
elif msg.encoding == 'bgr8':
image = np.frombuffer(msg.data, dtype=np.uint8).reshape(
msg.height, msg.width, 3
)[:, :, ::-1]
else:
return
# Encode as base64 JPEG
try:
from PIL import Image as PILImage
import io
pil_img = PILImage.fromarray(image)
buffer = io.BytesIO()
pil_img.save(buffer, format='JPEG', quality=70)
self._last_camera_image = base64.b64encode(buffer.getvalue()).decode('utf-8')
except ImportError:
self._last_camera_image = base64.b64encode(image.tobytes()).decode('utf-8')
except Exception as e:
self.get_logger().warning(f'Camera processing error: {e}')
def _get_landing_pad_detection(self) -> Optional[Dict[str, float]]:
dx = self._rover_pos[0] - self._drone_pos[0]
dy = self._rover_pos[1] - self._drone_pos[1]
dz = self._rover_pos[2] - self._drone_pos[2]
horizontal_dist = math.sqrt(dx**2 + dy**2)
vertical_dist = -dz
if vertical_dist <= 0 or vertical_dist > self.CAMERA_RANGE:
return None
fov_rad = math.radians(self.CAMERA_FOV / 2)
max_horizontal = vertical_dist * math.tan(fov_rad)
if horizontal_dist > max_horizontal:
return None
roll, pitch, yaw = self._quaternion_to_euler(
self._drone_orn[0], self._drone_orn[1],
self._drone_orn[2], self._drone_orn[3]
)
cos_yaw = math.cos(-yaw)
sin_yaw = math.sin(-yaw)
relative_x = dx * cos_yaw - dy * sin_yaw
relative_y = dx * sin_yaw + dy * cos_yaw
angle = math.atan2(horizontal_dist, vertical_dist)
confidence = max(0.0, 1.0 - (angle / fov_rad))
confidence *= max(0.0, 1.0 - (vertical_dist / self.CAMERA_RANGE))
return {
"relative_x": round(relative_x, 4),
"relative_y": round(relative_y, 4),
"distance": round(vertical_dist, 4),
"confidence": round(confidence, 4)
}
def _odom_callback(self, msg: Odometry) -> None:
self._step_count += 1
pos = msg.pose.pose.position
vel = msg.twist.twist.linear
ang_vel = msg.twist.twist.angular
quat = msg.pose.pose.orientation
self._drone_pos = [pos.x, pos.y, pos.z]
self._drone_orn = [quat.x, quat.y, quat.z, quat.w]
roll, pitch, yaw = self._quaternion_to_euler(quat.x, quat.y, quat.z, quat.w)
landing_height = 0.5
self._landed = (
abs(pos.x - self._rover_pos[0]) < 0.5 and
abs(pos.y - self._rover_pos[1]) < 0.5 and
pos.z < landing_height and
abs(vel.z) < 0.2
)
pad_detection = self._get_landing_pad_detection()
telemetry = {
"imu": {
"orientation": {
"roll": round(roll, 4),
"pitch": round(pitch, 4),
"yaw": round(yaw, 4)
},
"angular_velocity": {
"x": round(ang_vel.x, 4),
"y": round(ang_vel.y, 4),
"z": round(ang_vel.z, 4)
},
"linear_acceleration": {
"x": 0.0,
"y": 0.0,
"z": 9.81
}
},
"altimeter": {
"altitude": round(pos.z, 4),
"vertical_velocity": round(vel.z, 4)
},
"velocity": {
"x": round(vel.x, 4),
"y": round(vel.y, 4),
"z": round(vel.z, 4)
},
"landing_pad": pad_detection,
"camera": {
"width": self.CAMERA_WIDTH,
"height": self.CAMERA_HEIGHT,
"fov": self.CAMERA_FOV,
"image": self._last_camera_image
},
"landed": self._landed,
"timestamp": round(self._step_count * 0.02, 4)
}
telemetry_msg = String()
telemetry_msg.data = json.dumps(telemetry)
self._telemetry_pub.publish(telemetry_msg)
def _quaternion_to_euler(self, x: float, y: float, z: float, w: float) -> tuple:
sinr_cosp = 2 * (w * x + y * z)
cosr_cosp = 1 - 2 * (x * x + y * y)
roll = math.atan2(sinr_cosp, cosr_cosp)
sinp = 2 * (w * y - z * x)
if abs(sinp) >= 1:
pitch = math.copysign(math.pi / 2, sinp)
else:
pitch = math.asin(sinp)
siny_cosp = 2 * (w * z + x * y)
cosy_cosp = 1 - 2 * (y * y + z * z)
yaw = math.atan2(siny_cosp, cosy_cosp)
return roll, pitch, yaw
def main(args=None):
print("\n" + "=" * 60)
print(" GAZEBO BRIDGE (GPS-DENIED + CAMERA)")
print("=" * 60 + "\n")
rclpy.init(args=args)
bridge_node = None
try:
bridge_node = GazeboBridge()
rclpy.spin(bridge_node)
except KeyboardInterrupt:
print('\nShutting down...')
finally:
if bridge_node is not None:
bridge_node.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()

424
src/mavlink_bridge.py Normal file
View File

@@ -0,0 +1,424 @@
#!/usr/bin/env python3
"""
MAVLink Command Interface for ArduPilot.
This module provides a Python interface for sending MAVLink commands to ArduPilot
when using the official ROS 2 DDS integration. Telemetry is received via native
ROS 2 topics (/ap/*), but commands are still sent via MAVLink.
Usage:
from mavlink_bridge import MAVLinkCommander
cmd = MAVLinkCommander()
cmd.connect()
cmd.arm()
cmd.takeoff(5.0)
cmd.land()
"""
import time
import threading
from typing import Optional, Callable
from enum import Enum
try:
from pymavlink import mavutil
PYMAVLINK_AVAILABLE = True
except ImportError:
PYMAVLINK_AVAILABLE = False
print("Warning: pymavlink not installed. Install with: pip install pymavlink")
class FlightMode(Enum):
"""ArduCopter flight modes."""
STABILIZE = 0
ACRO = 1
ALT_HOLD = 2
AUTO = 3
GUIDED = 4
LOITER = 5
RTL = 6
CIRCLE = 7
LAND = 9
DRIFT = 11
SPORT = 13
FLIP = 14
AUTOTUNE = 15
POSHOLD = 16
BRAKE = 17
THROW = 18
AVOID_ADSB = 19
GUIDED_NOGPS = 20
SMART_RTL = 21
FLOWHOLD = 22
FOLLOW = 23
ZIGZAG = 24
SYSTEMID = 25
AUTOROTATE = 26
class MAVLinkCommander:
"""
MAVLink command interface for ArduPilot.
Provides methods for arming, disarming, mode changes, and flight commands.
"""
def __init__(self, connection_string: str = "udpin:127.0.0.1:14550"):
"""
Initialize MAVLink commander.
Args:
connection_string: MAVLink connection string (default for MAVProxy output)
"""
if not PYMAVLINK_AVAILABLE:
raise ImportError("pymavlink is required: pip install pymavlink")
self._connection_string = connection_string
self._mav: Optional[mavutil.mavlink_connection] = None
self._connected = False
self._armed = False
self._flight_mode = "UNKNOWN"
self._heartbeat_thread: Optional[threading.Thread] = None
self._running = False
# Callbacks
self._on_heartbeat: Optional[Callable] = None
self._on_state_change: Optional[Callable] = None
def connect(self, timeout: float = 30.0) -> bool:
"""
Connect to ArduPilot via MAVLink.
Args:
timeout: Connection timeout in seconds
Returns:
True if connected successfully
"""
print(f"Connecting to {self._connection_string}...")
try:
self._mav = mavutil.mavlink_connection(self._connection_string)
# Wait for heartbeat
heartbeat = self._mav.wait_heartbeat(timeout=timeout)
if heartbeat is None:
print("Connection failed: No heartbeat received")
return False
self._connected = True
self._running = True
# Start heartbeat thread
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_loop, daemon=True
)
self._heartbeat_thread.start()
print(f"Connected! System: {self._mav.target_system}, Component: {self._mav.target_component}")
return True
except Exception as e:
print(f"Connection failed: {e}")
return False
def disconnect(self):
"""Disconnect from ArduPilot."""
self._running = False
if self._heartbeat_thread:
self._heartbeat_thread.join(timeout=2.0)
if self._mav:
self._mav.close()
self._connected = False
print("Disconnected")
def _heartbeat_loop(self):
"""Background thread for receiving heartbeats and updating state."""
while self._running and self._mav:
try:
msg = self._mav.recv_match(type='HEARTBEAT', blocking=True, timeout=1.0)
if msg:
self._armed = msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED
self._flight_mode = mavutil.mode_string_v10(msg)
if self._on_heartbeat:
self._on_heartbeat(self._armed, self._flight_mode)
except Exception:
pass
@property
def connected(self) -> bool:
return self._connected
@property
def armed(self) -> bool:
return self._armed
@property
def flight_mode(self) -> str:
return self._flight_mode
def arm(self, force: bool = False) -> bool:
"""
Arm the vehicle.
Args:
force: Force arm (bypass preflight checks)
Returns:
True if arm command was sent
"""
if not self._connected:
print("Not connected")
return False
print("Arming...")
param = 21196.0 if force else 0.0 # Force arm magic number
self._mav.mav.command_long_send(
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0, # Confirmation
1, # Arm
param, # Force
0, 0, 0, 0, 0
)
# Wait for confirmation
time.sleep(0.5)
return True
def disarm(self, force: bool = False) -> bool:
"""
Disarm the vehicle.
Args:
force: Force disarm
Returns:
True if disarm command was sent
"""
if not self._connected:
print("Not connected")
return False
print("Disarming...")
param = 21196.0 if force else 0.0
self._mav.mav.command_long_send(
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0,
0, # Disarm
param,
0, 0, 0, 0, 0
)
time.sleep(0.5)
return True
def set_mode(self, mode: str) -> bool:
"""
Set flight mode.
Args:
mode: Mode name (GUIDED, LAND, LOITER, etc.)
Returns:
True if mode change command was sent
"""
if not self._connected:
print("Not connected")
return False
mode_upper = mode.upper()
mode_id = self._mav.mode_mapping().get(mode_upper)
if mode_id is None:
print(f"Unknown mode: {mode}")
return False
print(f"Setting mode to {mode_upper}...")
self._mav.mav.set_mode_send(
self._mav.target_system,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode_id
)
time.sleep(0.5)
return True
def takeoff(self, altitude: float = 5.0) -> bool:
"""
Command takeoff to specified altitude.
Args:
altitude: Target altitude in meters
Returns:
True if takeoff command was sent
"""
if not self._connected:
print("Not connected")
return False
print(f"Taking off to {altitude}m...")
self._mav.mav.command_long_send(
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
0,
0, 0, 0, 0, 0, 0,
altitude
)
return True
def land(self) -> bool:
"""
Command landing.
Returns:
True if land command was sent
"""
if not self._connected:
print("Not connected")
return False
print("Landing...")
self._mav.mav.command_long_send(
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_CMD_NAV_LAND,
0,
0, 0, 0, 0, 0, 0, 0
)
return True
def goto_position_ned(self, north: float, east: float, down: float) -> bool:
"""
Go to position in NED frame relative to home.
Args:
north: North position (m)
east: East position (m)
down: Down position (m, negative is up)
Returns:
True if command was sent
"""
if not self._connected:
print("Not connected")
return False
print(f"Going to NED: ({north}, {east}, {down})")
self._mav.mav.set_position_target_local_ned_send(
0, # time_boot_ms
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
0b0000111111111000, # Position only
north, east, down, # Position
0, 0, 0, # Velocity
0, 0, 0, # Acceleration
0, 0 # Yaw, yaw_rate
)
return True
def set_velocity_ned(self, vn: float, ve: float, vd: float, yaw_rate: float = 0.0) -> bool:
"""
Set velocity in NED frame.
Args:
vn: North velocity (m/s)
ve: East velocity (m/s)
vd: Down velocity (m/s, positive is down)
yaw_rate: Yaw rate (rad/s)
Returns:
True if command was sent
"""
if not self._connected:
return False
self._mav.mav.set_position_target_local_ned_send(
0, # time_boot_ms
self._mav.target_system,
self._mav.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
0b0000011111000111, # Velocity only
0, 0, 0, # Position (ignored)
vn, ve, vd, # Velocity
0, 0, 0, # Acceleration
0, yaw_rate
)
return True
def set_param(self, param_id: str, value: float) -> bool:
"""
Set a parameter value.
Args:
param_id: Parameter name
value: Parameter value
Returns:
True if command was sent
"""
if not self._connected:
print("Not connected")
return False
print(f"Setting {param_id} = {value}")
self._mav.mav.param_set_send(
self._mav.target_system,
self._mav.target_component,
param_id.encode('utf-8'),
value,
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
)
return True
def main():
"""Example usage of MAVLinkCommander."""
if not PYMAVLINK_AVAILABLE:
print("pymavlink not installed")
return
cmd = MAVLinkCommander()
if not cmd.connect(timeout=10.0):
print("Failed to connect")
return
print(f"Armed: {cmd.armed}")
print(f"Mode: {cmd.flight_mode}")
# Example commands (uncomment to use):
# cmd.set_mode("GUIDED")
# cmd.arm()
# cmd.takeoff(5.0)
# time.sleep(10)
# cmd.land()
input("Press Enter to disconnect...")
cmd.disconnect()
if __name__ == '__main__':
main()

250
src/rover_controller.py Normal file
View File

@@ -0,0 +1,250 @@
#!/usr/bin/env python3
"""
Rover Controller - Controls the moving landing pad.
Usage: python rover_controller.py --pattern circular --speed 0.5 --amplitude 2.0
"""
import argparse
import math
import random
from enum import Enum
from typing import Tuple
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, Point
from std_msgs.msg import String
import json
# Load configuration
try:
from config import ROVER
CONFIG_LOADED = True
except ImportError:
CONFIG_LOADED = False
ROVER = {"default_pattern": "stationary", "default_speed": 0.5, "default_amplitude": 2.0}
class MovementPattern(Enum):
STATIONARY = "stationary"
LINEAR = "linear"
CIRCULAR = "circular"
RANDOM = "random"
SQUARE = "square"
class RoverController(Node):
"""Controls the rover movement."""
def __init__(
self,
pattern: MovementPattern = None,
speed: float = None,
amplitude: float = None
):
super().__init__('rover_controller')
# Use config defaults if not specified
if pattern is None:
pattern = MovementPattern(ROVER.get("default_pattern", "stationary"))
if speed is None:
speed = ROVER.get("default_speed", 0.5)
if amplitude is None:
amplitude = ROVER.get("default_amplitude", 2.0)
self._pattern = pattern
self._speed = speed
self._amplitude = amplitude
self._time = 0.0
self._position = Point()
self._position.x = 0.0
self._position.y = 0.0
self._position.z = 0.15
self._target_x = 0.0
self._target_y = 0.0
self._random_timer = 0.0
self._square_segment = 0
self._control_rate = 50.0
self._max_x = 5.0
self._max_y = 5.0
self.get_logger().info('=' * 50)
self.get_logger().info('Rover Controller Starting...')
self.get_logger().info(f' Pattern: {pattern.value}')
self.get_logger().info(f' Speed: {speed} m/s')
self.get_logger().info(f' Amplitude: {amplitude} m')
self.get_logger().info('=' * 50)
self._cmd_vel_pub = self.create_publisher(Twist, '/rover/cmd_vel', 10)
self._position_pub = self.create_publisher(Point, '/rover/position', 10)
self._telemetry_pub = self.create_publisher(String, '/rover/telemetry', 10)
self.get_logger().info(' Publishing to: /rover/cmd_vel, /rover/position, /rover/telemetry')
control_period = 1.0 / self._control_rate
self._control_timer = self.create_timer(control_period, self._control_loop)
self.get_logger().info('Rover Controller Ready!')
def _control_loop(self) -> None:
dt = 1.0 / self._control_rate
self._time += dt
vel_x, vel_y = self._calculate_velocity()
self._position.x += vel_x * dt
self._position.y += vel_y * dt
self._position.x = max(-self._max_x, min(self._max_x, self._position.x))
self._position.y = max(-self._max_y, min(self._max_y, self._position.y))
cmd = Twist()
cmd.linear.x = vel_x
cmd.linear.y = vel_y
self._cmd_vel_pub.publish(cmd)
self._position_pub.publish(self._position)
telemetry = {
"position": {
"x": round(self._position.x, 4),
"y": round(self._position.y, 4),
"z": round(self._position.z, 4)
},
"velocity": {
"x": round(vel_x, 4),
"y": round(vel_y, 4),
"z": 0.0
},
"pattern": self._pattern.value,
"timestamp": round(self._time, 4)
}
telemetry_msg = String()
telemetry_msg.data = json.dumps(telemetry)
self._telemetry_pub.publish(telemetry_msg)
def _calculate_velocity(self) -> Tuple[float, float]:
if self._pattern == MovementPattern.STATIONARY:
return self._pattern_stationary()
elif self._pattern == MovementPattern.LINEAR:
return self._pattern_linear()
elif self._pattern == MovementPattern.CIRCULAR:
return self._pattern_circular()
elif self._pattern == MovementPattern.RANDOM:
return self._pattern_random()
elif self._pattern == MovementPattern.SQUARE:
return self._pattern_square()
return (0.0, 0.0)
def _pattern_stationary(self) -> Tuple[float, float]:
kp = 1.0
return (-kp * self._position.x, -kp * self._position.y)
def _pattern_linear(self) -> Tuple[float, float]:
omega = self._speed / self._amplitude
target_x = self._amplitude * math.sin(omega * self._time)
kp = 2.0
vel_x = kp * (target_x - self._position.x)
vel_x = max(-self._speed, min(self._speed, vel_x))
return (vel_x, 0.0)
def _pattern_circular(self) -> Tuple[float, float]:
omega = self._speed / self._amplitude
target_x = self._amplitude * math.cos(omega * self._time)
target_y = self._amplitude * math.sin(omega * self._time)
kp = 2.0
vel_x = kp * (target_x - self._position.x)
vel_y = kp * (target_y - self._position.y)
speed = math.sqrt(vel_x**2 + vel_y**2)
if speed > self._speed:
vel_x = vel_x / speed * self._speed
vel_y = vel_y / speed * self._speed
return (vel_x, vel_y)
def _pattern_random(self) -> Tuple[float, float]:
dt = 1.0 / self._control_rate
self._random_timer += dt
if self._random_timer >= 3.0:
self._random_timer = 0.0
self._target_x = random.uniform(-self._amplitude, self._amplitude)
self._target_y = random.uniform(-self._amplitude, self._amplitude)
kp = 1.0
vel_x = kp * (self._target_x - self._position.x)
vel_y = kp * (self._target_y - self._position.y)
speed = math.sqrt(vel_x**2 + vel_y**2)
if speed > self._speed:
vel_x = vel_x / speed * self._speed
vel_y = vel_y / speed * self._speed
return (vel_x, vel_y)
def _pattern_square(self) -> Tuple[float, float]:
corners = [
(self._amplitude, self._amplitude),
(-self._amplitude, self._amplitude),
(-self._amplitude, -self._amplitude),
(self._amplitude, -self._amplitude),
]
target = corners[self._square_segment % 4]
dx = target[0] - self._position.x
dy = target[1] - self._position.y
dist = math.sqrt(dx**2 + dy**2)
if dist < 0.1:
self._square_segment += 1
target = corners[self._square_segment % 4]
dx = target[0] - self._position.x
dy = target[1] - self._position.y
dist = math.sqrt(dx**2 + dy**2)
if dist > 0.01:
return (self._speed * dx / dist, self._speed * dy / dist)
return (0.0, 0.0)
def parse_args():
parser = argparse.ArgumentParser(
description='Rover Controller - Moving Landing Platform',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--pattern', '-p', type=str,
choices=[p.value for p in MovementPattern],
default='stationary', help='Movement pattern (default: stationary)'
)
parser.add_argument(
'--speed', '-s', type=float, default=0.5,
help='Movement speed in m/s (default: 0.5)'
)
parser.add_argument(
'--amplitude', '-a', type=float, default=2.0,
help='Movement amplitude in meters (default: 2.0)'
)
args, _ = parser.parse_known_args()
return args
def main(args=None):
cli_args = parse_args()
print(f"\n{'='*60}")
print(f" ROVER CONTROLLER - {cli_args.pattern.upper()}")
print(f"{'='*60}\n")
rclpy.init(args=args)
controller = None
try:
pattern = MovementPattern(cli_args.pattern)
controller = RoverController(
pattern=pattern,
speed=cli_args.speed,
amplitude=cli_args.amplitude
)
rclpy.spin(controller)
except KeyboardInterrupt:
print('\nShutting down...')
finally:
if controller:
controller.destroy_node()
if rclpy.ok():
rclpy.shutdown()
if __name__ == '__main__':
main()