251 lines
8.2 KiB
Python
251 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Rover Controller - Controls the moving landing pad.
|
|
Usage: python rover_controller.py --pattern circular --speed 0.5 --amplitude 2.0
|
|
"""
|
|
|
|
import argparse
|
|
import math
|
|
import random
|
|
from enum import Enum
|
|
from typing import Tuple
|
|
|
|
import rclpy
|
|
from rclpy.node import Node
|
|
from geometry_msgs.msg import Twist, Point
|
|
from std_msgs.msg import String
|
|
import json
|
|
|
|
# Load configuration
|
|
try:
|
|
from config import ROVER
|
|
CONFIG_LOADED = True
|
|
except ImportError:
|
|
CONFIG_LOADED = False
|
|
ROVER = {"default_pattern": "stationary", "default_speed": 0.5, "default_amplitude": 2.0}
|
|
|
|
|
|
class MovementPattern(Enum):
|
|
STATIONARY = "stationary"
|
|
LINEAR = "linear"
|
|
CIRCULAR = "circular"
|
|
RANDOM = "random"
|
|
SQUARE = "square"
|
|
|
|
|
|
class RoverController(Node):
|
|
"""Controls the rover movement."""
|
|
|
|
def __init__(
|
|
self,
|
|
pattern: MovementPattern = None,
|
|
speed: float = None,
|
|
amplitude: float = None
|
|
):
|
|
super().__init__('rover_controller')
|
|
|
|
# Use config defaults if not specified
|
|
if pattern is None:
|
|
pattern = MovementPattern(ROVER.get("default_pattern", "stationary"))
|
|
if speed is None:
|
|
speed = ROVER.get("default_speed", 0.5)
|
|
if amplitude is None:
|
|
amplitude = ROVER.get("default_amplitude", 2.0)
|
|
|
|
self._pattern = pattern
|
|
self._speed = speed
|
|
self._amplitude = amplitude
|
|
self._time = 0.0
|
|
self._position = Point()
|
|
self._position.x = 0.0
|
|
self._position.y = 0.0
|
|
self._position.z = 0.15
|
|
self._target_x = 0.0
|
|
self._target_y = 0.0
|
|
self._random_timer = 0.0
|
|
self._square_segment = 0
|
|
self._control_rate = 50.0
|
|
self._max_x = 5.0
|
|
self._max_y = 5.0
|
|
|
|
self.get_logger().info('=' * 50)
|
|
self.get_logger().info('Rover Controller Starting...')
|
|
self.get_logger().info(f' Pattern: {pattern.value}')
|
|
self.get_logger().info(f' Speed: {speed} m/s')
|
|
self.get_logger().info(f' Amplitude: {amplitude} m')
|
|
self.get_logger().info('=' * 50)
|
|
|
|
self._cmd_vel_pub = self.create_publisher(Twist, '/rover/cmd_vel', 10)
|
|
self._position_pub = self.create_publisher(Point, '/rover/position', 10)
|
|
self._telemetry_pub = self.create_publisher(String, '/rover/telemetry', 10)
|
|
|
|
self.get_logger().info(' Publishing to: /rover/cmd_vel, /rover/position, /rover/telemetry')
|
|
|
|
control_period = 1.0 / self._control_rate
|
|
self._control_timer = self.create_timer(control_period, self._control_loop)
|
|
|
|
self.get_logger().info('Rover Controller Ready!')
|
|
|
|
def _control_loop(self) -> None:
|
|
dt = 1.0 / self._control_rate
|
|
self._time += dt
|
|
|
|
vel_x, vel_y = self._calculate_velocity()
|
|
|
|
self._position.x += vel_x * dt
|
|
self._position.y += vel_y * dt
|
|
self._position.x = max(-self._max_x, min(self._max_x, self._position.x))
|
|
self._position.y = max(-self._max_y, min(self._max_y, self._position.y))
|
|
|
|
cmd = Twist()
|
|
cmd.linear.x = vel_x
|
|
cmd.linear.y = vel_y
|
|
self._cmd_vel_pub.publish(cmd)
|
|
self._position_pub.publish(self._position)
|
|
|
|
telemetry = {
|
|
"position": {
|
|
"x": round(self._position.x, 4),
|
|
"y": round(self._position.y, 4),
|
|
"z": round(self._position.z, 4)
|
|
},
|
|
"velocity": {
|
|
"x": round(vel_x, 4),
|
|
"y": round(vel_y, 4),
|
|
"z": 0.0
|
|
},
|
|
"pattern": self._pattern.value,
|
|
"timestamp": round(self._time, 4)
|
|
}
|
|
telemetry_msg = String()
|
|
telemetry_msg.data = json.dumps(telemetry)
|
|
self._telemetry_pub.publish(telemetry_msg)
|
|
|
|
def _calculate_velocity(self) -> Tuple[float, float]:
|
|
if self._pattern == MovementPattern.STATIONARY:
|
|
return self._pattern_stationary()
|
|
elif self._pattern == MovementPattern.LINEAR:
|
|
return self._pattern_linear()
|
|
elif self._pattern == MovementPattern.CIRCULAR:
|
|
return self._pattern_circular()
|
|
elif self._pattern == MovementPattern.RANDOM:
|
|
return self._pattern_random()
|
|
elif self._pattern == MovementPattern.SQUARE:
|
|
return self._pattern_square()
|
|
return (0.0, 0.0)
|
|
|
|
def _pattern_stationary(self) -> Tuple[float, float]:
|
|
kp = 1.0
|
|
return (-kp * self._position.x, -kp * self._position.y)
|
|
|
|
def _pattern_linear(self) -> Tuple[float, float]:
|
|
omega = self._speed / self._amplitude
|
|
target_x = self._amplitude * math.sin(omega * self._time)
|
|
kp = 2.0
|
|
vel_x = kp * (target_x - self._position.x)
|
|
vel_x = max(-self._speed, min(self._speed, vel_x))
|
|
return (vel_x, 0.0)
|
|
|
|
def _pattern_circular(self) -> Tuple[float, float]:
|
|
omega = self._speed / self._amplitude
|
|
target_x = self._amplitude * math.cos(omega * self._time)
|
|
target_y = self._amplitude * math.sin(omega * self._time)
|
|
kp = 2.0
|
|
vel_x = kp * (target_x - self._position.x)
|
|
vel_y = kp * (target_y - self._position.y)
|
|
speed = math.sqrt(vel_x**2 + vel_y**2)
|
|
if speed > self._speed:
|
|
vel_x = vel_x / speed * self._speed
|
|
vel_y = vel_y / speed * self._speed
|
|
return (vel_x, vel_y)
|
|
|
|
def _pattern_random(self) -> Tuple[float, float]:
|
|
dt = 1.0 / self._control_rate
|
|
self._random_timer += dt
|
|
if self._random_timer >= 3.0:
|
|
self._random_timer = 0.0
|
|
self._target_x = random.uniform(-self._amplitude, self._amplitude)
|
|
self._target_y = random.uniform(-self._amplitude, self._amplitude)
|
|
kp = 1.0
|
|
vel_x = kp * (self._target_x - self._position.x)
|
|
vel_y = kp * (self._target_y - self._position.y)
|
|
speed = math.sqrt(vel_x**2 + vel_y**2)
|
|
if speed > self._speed:
|
|
vel_x = vel_x / speed * self._speed
|
|
vel_y = vel_y / speed * self._speed
|
|
return (vel_x, vel_y)
|
|
|
|
def _pattern_square(self) -> Tuple[float, float]:
|
|
corners = [
|
|
(self._amplitude, self._amplitude),
|
|
(-self._amplitude, self._amplitude),
|
|
(-self._amplitude, -self._amplitude),
|
|
(self._amplitude, -self._amplitude),
|
|
]
|
|
target = corners[self._square_segment % 4]
|
|
dx = target[0] - self._position.x
|
|
dy = target[1] - self._position.y
|
|
dist = math.sqrt(dx**2 + dy**2)
|
|
if dist < 0.1:
|
|
self._square_segment += 1
|
|
target = corners[self._square_segment % 4]
|
|
dx = target[0] - self._position.x
|
|
dy = target[1] - self._position.y
|
|
dist = math.sqrt(dx**2 + dy**2)
|
|
if dist > 0.01:
|
|
return (self._speed * dx / dist, self._speed * dy / dist)
|
|
return (0.0, 0.0)
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
description='Rover Controller - Moving Landing Platform',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
|
)
|
|
parser.add_argument(
|
|
'--pattern', '-p', type=str,
|
|
choices=[p.value for p in MovementPattern],
|
|
default='stationary', help='Movement pattern (default: stationary)'
|
|
)
|
|
parser.add_argument(
|
|
'--speed', '-s', type=float, default=0.5,
|
|
help='Movement speed in m/s (default: 0.5)'
|
|
)
|
|
parser.add_argument(
|
|
'--amplitude', '-a', type=float, default=2.0,
|
|
help='Movement amplitude in meters (default: 2.0)'
|
|
)
|
|
args, _ = parser.parse_known_args()
|
|
return args
|
|
|
|
|
|
def main(args=None):
|
|
cli_args = parse_args()
|
|
|
|
print(f"\n{'='*60}")
|
|
print(f" ROVER CONTROLLER - {cli_args.pattern.upper()}")
|
|
print(f"{'='*60}\n")
|
|
|
|
rclpy.init(args=args)
|
|
controller = None
|
|
|
|
try:
|
|
pattern = MovementPattern(cli_args.pattern)
|
|
controller = RoverController(
|
|
pattern=pattern,
|
|
speed=cli_args.speed,
|
|
amplitude=cli_args.amplitude
|
|
)
|
|
rclpy.spin(controller)
|
|
except KeyboardInterrupt:
|
|
print('\nShutting down...')
|
|
finally:
|
|
if controller:
|
|
controller.destroy_node()
|
|
if rclpy.ok():
|
|
rclpy.shutdown()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|