From 0e427f3597beb63a303fed43cfd78d9992dab9c1 Mon Sep 17 00:00:00 2001 From: SirBlobby Date: Tue, 10 Feb 2026 23:40:06 -0500 Subject: [PATCH] Gazebo Sim World Update --- .gitignore | 3 + activate_venv.sh | 79 ++-- config/ardupilot_gps_denied.parm | 15 +- scripts/run_autonomous.sh | 164 +++---- setup.sh | 7 +- src/__init__.py | 4 - src/autonomous_controller.py | 708 ------------------------------- src/control/__init__.py | 1 - src/control/uav_controller.py | 541 +++++++++++------------ src/control/ugv_controller.py | 274 ++++-------- src/main.py | 216 ++++++++++ src/utils/__init__.py | 1 - worlds/uav_ugv_search.sdf | 320 ++++++++++++++ wsl_env.sh | 23 +- 14 files changed, 1021 insertions(+), 1335 deletions(-) delete mode 100755 src/autonomous_controller.py create mode 100644 src/main.py create mode 100644 worlds/uav_ugv_search.sdf diff --git a/.gitignore b/.gitignore index f963bd8..e64f8df 100644 --- a/.gitignore +++ b/.gitignore @@ -69,3 +69,6 @@ htmlcov/ # Jupyter .ipynb_checkpoints/ + +wsl_env.sh +activate_venv.sh diff --git a/activate_venv.sh b/activate_venv.sh index fd754d2..7b3f2b2 100755 --- a/activate_venv.sh +++ b/activate_venv.sh @@ -1,61 +1,56 @@ #!/bin/bash -# Activate virtual environment and setup for UAV-UGV Simulation +# UAV-UGV Simulation - Environment Activation +# Usage: source activate_venv.sh SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -# Detect ROS distro -ROS_DISTRO="" -for distro in humble jazzy iron galactic; do - if [ -d "/opt/ros/$distro" ]; then - ROS_DISTRO="$distro" - break - fi -done - -# Source ROS 2 -if [ -n "$ROS_DISTRO" ] && [ -f "/opt/ros/${ROS_DISTRO}/setup.bash" ]; then - source /opt/ros/${ROS_DISTRO}/setup.bash +# ROS 2 Setup +if [ -f "/opt/ros/jazzy/setup.bash" ]; then + source /opt/ros/jazzy/setup.bash + ROS_VER="jazzy" +elif [ -f "/opt/ros/humble/setup.bash" ]; then + source /opt/ros/humble/setup.bash + ROS_VER="humble" +else + echo "[WARN] ROS 2 not found in /opt/ros/" + ROS_VER="none" fi -# Source Gazebo setup (critical for shader/resource paths) -if [ -f /usr/share/gazebo/setup.bash ]; then - source /usr/share/gazebo/setup.bash -elif [ -f /usr/share/gazebo-11/setup.bash ]; then - source /usr/share/gazebo-11/setup.bash +# ArduPilot Environment +export ARDUPILOT_HOME="$HOME/ardupilot" +export PATH="$ARDUPILOT_HOME/Tools/autotest:$PATH" +export PATH="$HOME/.local/bin:$PATH" + +# Deactivate existing venv +if [ -n "$VIRTUAL_ENV" ]; then + deactivate 2>/dev/null || true fi -# Activate Python virtual environment +# Activate project venv if [ -f "$SCRIPT_DIR/venv/bin/activate" ]; then source "$SCRIPT_DIR/venv/bin/activate" fi -# Source ROS workspace if built -if [ -f "$SCRIPT_DIR/../install/setup.bash" ]; then - source "$SCRIPT_DIR/../install/setup.bash" -elif [ -f "$SCRIPT_DIR/install/setup.bash" ]; then - source "$SCRIPT_DIR/install/setup.bash" +# Gazebo paths (new Gazebo - Ignition/Harmonic) +export GZ_SIM_RESOURCE_PATH="$SCRIPT_DIR/models:$SCRIPT_DIR/worlds:${GZ_SIM_RESOURCE_PATH:-}" + +# ArduPilot Gazebo plugin +if [ -d "$HOME/ardupilot_gazebo/build" ]; then + export GZ_SIM_SYSTEM_PLUGIN_PATH="$HOME/ardupilot_gazebo/build:${GZ_SIM_SYSTEM_PLUGIN_PATH:-}" + export GZ_SIM_RESOURCE_PATH="$HOME/ardupilot_gazebo/models:$HOME/ardupilot_gazebo/worlds:$GZ_SIM_RESOURCE_PATH" fi -# Gazebo model and resource paths -export GAZEBO_MODEL_PATH="$SCRIPT_DIR/models:${GAZEBO_MODEL_PATH:-}" -export GAZEBO_RESOURCE_PATH="$SCRIPT_DIR/worlds:${GAZEBO_RESOURCE_PATH:-}" - -# ArduPilot Gazebo (if installed) -if [ -d "$HOME/ardupilot_gazebo" ]; then - export GAZEBO_MODEL_PATH="$HOME/ardupilot_gazebo/models:$GAZEBO_MODEL_PATH" - export GAZEBO_RESOURCE_PATH="$HOME/ardupilot_gazebo/worlds:$GAZEBO_RESOURCE_PATH" -fi - -# WSL environment (if applicable) +# WSL environment if grep -qEi "(microsoft|wsl)" /proc/version 2>/dev/null; then - if [ -f "$SCRIPT_DIR/wsl_env.sh" ]; then - source "$SCRIPT_DIR/wsl_env.sh" + if [ -d "/mnt/wslg" ]; then + export DISPLAY=:0 + else + export DISPLAY=$(cat /etc/resolv.conf 2>/dev/null | grep nameserver | awk '{print $2}'):0 fi + export LIBGL_ALWAYS_INDIRECT=0 + export MESA_GL_VERSION_OVERRIDE=3.3 fi -echo -e "\033[0;32mEnvironment activated (ROS 2 ${ROS_DISTRO:-not found})\033[0m" -echo "Run: bash scripts/run_simulation.sh" +echo -e "\033[0;32mEnvironment activated (ROS 2 $ROS_VER)\033[0m" echo "" -echo "Options:" -echo " bash scripts/run_simulation.sh # Default world" -echo " bash scripts/run_simulation.sh --software-render # For WSL/rendering issues" +echo "Run simulation: bash scripts/run_simulation.sh" diff --git a/config/ardupilot_gps_denied.parm b/config/ardupilot_gps_denied.parm index afb6f0e..eb75a2a 100644 --- a/config/ardupilot_gps_denied.parm +++ b/config/ardupilot_gps_denied.parm @@ -54,12 +54,19 @@ FLTMODE5 6 # RTL FLTMODE6 9 # Land # ==================== -# Arming Checks +# Scheduler (WSL fix) # ==================== -ARMING_CHECK 0 # Disable all checks (for simulation) +SCHED_LOOP_RATE 200 + +# ==================== +# Arming / Failsafes +# ==================== +ARMING_CHECK 0 +FS_THR_ENABLE 0 +FS_GCS_ENABLE 0 # ==================== # Logging # ==================== -LOG_BITMASK 176126 # Log all relevant data -LOG_DISARMED 0 # Don't log when disarmed +LOG_BITMASK 176126 +LOG_DISARMED 0 diff --git a/scripts/run_autonomous.sh b/scripts/run_autonomous.sh index ff3aa08..3fabf26 100755 --- a/scripts/run_autonomous.sh +++ b/scripts/run_autonomous.sh @@ -1,13 +1,10 @@ #!/bin/bash -# Autonomous UAV-UGV Simulation Runner -# Launches Gazebo + ArduPilot SITL + Autonomous Controller set -e SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" PROJECT_DIR="$(dirname "$SCRIPT_DIR")" -# Colors RED='\033[0;31m' GREEN='\033[0;32m' YELLOW='\033[1;33m' @@ -18,138 +15,163 @@ print_info() { echo -e "${CYAN}[INFO]${NC} $1"; } print_success() { echo -e "${GREEN}[SUCCESS]${NC} $1"; } print_error() { echo -e "${RED}[ERROR]${NC} $1"; } -# Parse arguments -SOFTWARE_RENDER=false -WORLD="iris_runway.sdf" -MISSION="hover" # hover, square, circle +SOFTWARE_RENDER=auto +WORLD="uav_ugv_search.sdf" +MISSION="hover" +ALTITUDE=5.0 +DURATION=30.0 while [[ $# -gt 0 ]]; do case $1 in --software-render) SOFTWARE_RENDER=true; shift ;; + --no-software-render) SOFTWARE_RENDER=false; shift ;; --world) WORLD="$2"; shift 2 ;; --mission) MISSION="$2"; shift 2 ;; + --altitude) ALTITUDE="$2"; shift 2 ;; + --duration) DURATION="$2"; shift 2 ;; -h|--help) echo "Usage: $0 [options]" - echo " --software-render Use software rendering (WSL/no GPU)" - echo " --world World file to load (default: iris_runway.sdf)" - echo " --mission Mission type: hover, square, circle (default: hover)" + echo " --software-render Force software rendering" + echo " --no-software-render Disable software rendering" + echo " --world World file (default: uav_ugv_search.sdf)" + echo " --mission hover, square, search (default: hover)" + echo " --altitude Takeoff altitude (default: 5.0)" + echo " --duration Hover duration (default: 30.0)" exit 0 ;; *) shift ;; esac done -# Cleanup function -cleanup() { - print_info "Cleaning up..." +cleanup_all() { + print_info "Cleaning up ..." pkill -f "gz sim" 2>/dev/null || true pkill -f "arducopter" 2>/dev/null || true pkill -f "sim_vehicle.py" 2>/dev/null || true pkill -f "mavproxy" 2>/dev/null || true - pkill -f "autonomous_controller.py" 2>/dev/null || true + pkill -f "main.py" 2>/dev/null || true } -trap cleanup EXIT -# Setup environment +cleanup_sitl() { + pkill -f "arducopter" 2>/dev/null || true + pkill -f "sim_vehicle.py" 2>/dev/null || true + pkill -f "mavproxy" 2>/dev/null || true + pkill -f "main.py" 2>/dev/null || true +} +trap cleanup_all EXIT + export PATH=$PATH:$HOME/ardupilot/Tools/autotest:$HOME/.local/bin -export GZ_SIM_RESOURCE_PATH=$HOME/ardupilot_gazebo/models:$HOME/ardupilot_gazebo/worlds +export GZ_SIM_RESOURCE_PATH="$PROJECT_DIR/models:$PROJECT_DIR/worlds:$HOME/ardupilot_gazebo/models:$HOME/ardupilot_gazebo/worlds" export GZ_SIM_SYSTEM_PLUGIN_PATH=$HOME/ardupilot_gazebo/build -if [ "$SOFTWARE_RENDER" = true ]; then - print_info "Using software rendering (Mesa)" - export LIBGL_ALWAYS_SOFTWARE=1 - export GALLIUM_DRIVER=llvmpipe +if [ "$SOFTWARE_RENDER" = "auto" ]; then + if grep -qi microsoft /proc/version 2>/dev/null; then + print_info "WSL detected -> software rendering" + SOFTWARE_RENDER=true + else + SOFTWARE_RENDER=false + fi fi -# Kill any existing processes -cleanup 2>/dev/null +if [ "$SOFTWARE_RENDER" = "true" ]; then + export LIBGL_ALWAYS_SOFTWARE=1 + export GALLIUM_DRIVER=llvmpipe + export MESA_GL_VERSION_OVERRIDE=3.3 +fi + +cleanup_all 2>/dev/null + +WORLD_FILE="" +if [ -f "$PROJECT_DIR/worlds/$WORLD" ]; then + WORLD_FILE="$PROJECT_DIR/worlds/$WORLD" +elif [ -f "$HOME/ardupilot_gazebo/worlds/$WORLD" ]; then + WORLD_FILE="$HOME/ardupilot_gazebo/worlds/$WORLD" +elif [ -f "$WORLD" ]; then + WORLD_FILE="$WORLD" +else + print_error "World file not found: $WORLD" + exit 1 +fi print_info "===================================" -print_info " Autonomous UAV-UGV Simulation" +print_info " UAV-UGV Simulation" print_info "===================================" -print_info "World: $WORLD" -print_info "Mission: $MISSION" +print_info "World: $WORLD_FILE" +print_info "Mission: $MISSION" +print_info "Altitude: ${ALTITUDE}m" echo "" -# Step 1: Start Gazebo -print_info "Starting Gazebo Harmonic..." -gz sim -v4 -r $HOME/ardupilot_gazebo/worlds/$WORLD & +print_info "[1/3] Starting Gazebo ..." +gz sim -v4 -r "$WORLD_FILE" & GZ_PID=$! -sleep 8 +sleep 10 -# Check if Gazebo started if ! kill -0 $GZ_PID 2>/dev/null; then print_error "Gazebo failed to start" exit 1 fi print_success "Gazebo running (PID: $GZ_PID)" -# Step 2: Start ArduPilot SITL -print_info "Starting ArduPilot SITL..." +print_info "[2/3] Starting ArduPilot SITL ..." cd ~/ardupilot -# Build SITL command -# We enable MAVProxy (--console --map) for visualization -# MAVProxy will sit between SITL and our controller -# SITL -> TCP:5760 -> MAVProxy -> UDP:14550 (GCS) -# -> UDP:14551 (Autonomous Controller) SITL_ARGS="-v ArduCopter -f gazebo-iris --model JSON -I0" -SITL_ARGS="$SITL_ARGS --console --map" +SITL_ARGS="$SITL_ARGS --no-mavproxy" -# Add output ports -# 14550: External GCS (QGroundControl) -# 14551: Autonomous Controller -SITL_ARGS="$SITL_ARGS --out 127.0.0.1:14550 --out 127.0.0.1:14551" - -# Check if custom param file exists PARAM_FILE="$PROJECT_DIR/config/ardupilot_gps_denied.parm" if [ -f "$PARAM_FILE" ]; then - print_info "Loading custom parameters..." + print_info "Loading params: $PARAM_FILE" SITL_ARGS="$SITL_ARGS --add-param-file $PARAM_FILE" fi -print_info "SITL will output status to this terminal" -print_info "Connect GCS to udp:127.0.0.1:14550 for monitoring" - -# Start SITL sim_vehicle.py $SITL_ARGS & SITL_PID=$! -# Wait for SITL to initialize -print_info "Waiting for SITL to initialize (~20 seconds)..." +print_info "Waiting for SITL (~20s) ..." sleep 20 -# Check if SITL started (check for arducopter process, not sim_vehicle.py) if ! pgrep -f "arducopter" > /dev/null 2>&1; then print_error "ArduPilot SITL failed to start" exit 1 fi -print_success "ArduPilot SITL running" +print_success "ArduPilot SITL running (TCP 5760)" -# Step 3: Start the autonomous controller -print_info "Starting Autonomous Controller..." -print_info "Mission: $MISSION" +print_info "[3/3] Starting main.py ..." +cd "$PROJECT_DIR" +sleep 3 -# Wait a bit more, then start controller with retry -sleep 5 +python3 src/main.py \ + --device sim \ + --connection "tcp:127.0.0.1:5760" \ + --mission "$MISSION" \ + --altitude "$ALTITUDE" \ + --duration "$DURATION" & +MAIN_PID=$! +print_success "main.py running (PID: $MAIN_PID)" -python3 "$PROJECT_DIR/src/autonomous_controller.py" --mission "$MISSION" --connection "udp:127.0.0.1:14551" & -CTRL_PID=$! - -print_success "Autonomous Controller started (PID: $CTRL_PID)" print_info "" print_info "===================================" print_info " Simulation Running" print_info "===================================" -print_info "The UAV will automatically:" -print_info " 1. Wait for EKF initialization (~30-60s)" -print_info " 2. Arm and enter GUIDED mode" -print_info " 3. Take off to 5 meters" -print_info " 4. Execute mission: $MISSION" +print_info " Gazebo -> ArduPilot SITL (TCP:5760)" +print_info " |" +print_info " main.py (pymavlink)" print_info "" -print_info "Press Ctrl+C to stop" +print_info " Mission: $MISSION Alt: ${ALTITUDE}m" +print_info " Press Ctrl+C to stop" print_info "===================================" -# Wait for controller to finish or user interrupt -wait $CTRL_PID +wait $MAIN_PID 2>/dev/null +cleanup_sitl + +print_info "" +print_info "==================================" +print_info " Mission Complete!" +print_info "==================================" +print_info "Gazebo GUI still open." +print_info "Press Ctrl+C to exit." +print_info "==================================" +print_info "" + +wait $GZ_PID 2>/dev/null diff --git a/setup.sh b/setup.sh index 5c94ebb..23cccef 100755 --- a/setup.sh +++ b/setup.sh @@ -455,12 +455,15 @@ if [ "$INSTALL_ARDUPILOT" = true ]; then # Install MAVProxy using pipx (required for Ubuntu 23.04+ PEP 668) print_info "Installing MAVProxy..." - pipx install MAVProxy --include-deps || { + if pipx install MAVProxy --include-deps; then + print_info "Injecting dependencies into MAVProxy venv..." + pipx inject mavproxy future pexpect + else # Fallback: try pip with --break-system-packages pip3 install --user --break-system-packages pymavlink mavproxy 2>/dev/null || \ pip3 install --user pymavlink mavproxy 2>/dev/null || \ print_warning "MAVProxy installation failed - install manually" - } + fi ((STEP++)) # ======================================================================== diff --git a/src/__init__.py b/src/__init__.py index 847e6e8..e69de29 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,4 +0,0 @@ -"""UAV-UGV Simulation Package - GPS-Denied Navigation with Geofencing.""" - -__version__ = "1.0.0" -__author__ = "Developer" diff --git a/src/autonomous_controller.py b/src/autonomous_controller.py deleted file mode 100755 index 080afc5..0000000 --- a/src/autonomous_controller.py +++ /dev/null @@ -1,708 +0,0 @@ -#!/usr/bin/env python3 -""" -Autonomous UAV Controller -========================= -GPS-denied navigation simulation with GPS-based geofencing for safety. - -Uses GUIDED_NOGPS mode with velocity/attitude commands while keeping -GPS enabled in EKF for geofence and LOCAL_POSITION_NED telemetry. - -Usage: - python src/autonomous_controller.py --mission hover - python src/autonomous_controller.py --mission square --size 5 - python src/autonomous_controller.py --mission circle --size 3 -""" - -import sys -import os -import time -import math -import argparse - -from pymavlink import mavutil - - -class AutonomousController: - """Autonomous UAV controller using pymavlink.""" - - # ArduCopter mode IDs - COPTER_MODES = { - 'STABILIZE': 0, - 'ACRO': 1, - 'ALT_HOLD': 2, - 'AUTO': 3, - 'GUIDED': 4, - 'LOITER': 5, - 'RTL': 6, - 'CIRCLE': 7, - 'LAND': 9, - 'GUIDED_NOGPS': 20, - } - - def __init__(self, connection_string='tcp:127.0.0.1:5760'): - """Initialize controller.""" - self.connection_string = connection_string - self.mav = None - self.armed = False - self.mode = "UNKNOWN" - self.altitude = 0 - self.position = {"x": 0, "y": 0, "z": 0} - - def connect(self, timeout=30): - """Connect to ArduPilot SITL.""" - print(f"[CTRL] Connecting to {self.connection_string}...") - - try: - self.mav = mavutil.mavlink_connection(self.connection_string) - - print("[CTRL] Waiting for heartbeat...") - msg = None - for attempt in range(3): - msg = self.mav.wait_heartbeat(timeout=timeout) - if msg and self.mav.target_system > 0: - break - print(f"[CTRL] Heartbeat attempt {attempt + 1}/3...") - - if not msg or self.mav.target_system == 0: - print("[ERROR] Failed to receive heartbeat") - return False - - print(f"[CTRL] Connected! System {self.mav.target_system}, Component {self.mav.target_component}") - - # Request ALL data streams at higher rate - self.mav.mav.request_data_stream_send( - self.mav.target_system, - self.mav.target_component, - mavutil.mavlink.MAV_DATA_STREAM_ALL, - 50, # 50 Hz for everything - 1 # Start - ) - - # Specifically request position stream at high rate - self.mav.mav.request_data_stream_send( - self.mav.target_system, - self.mav.target_component, - mavutil.mavlink.MAV_DATA_STREAM_POSITION, - 50, # 50 Hz - 1 - ) - - # Request extended status for altitude - self.mav.mav.request_data_stream_send( - self.mav.target_system, - self.mav.target_component, - mavutil.mavlink.MAV_DATA_STREAM_EXTENDED_STATUS, - 10, - 1 - ) - - return True - except Exception as e: - print(f"[ERROR] Connection failed: {e}") - return False - - def update_state(self): - """Update drone state from MAVLink messages.""" - while True: - msg = self.mav.recv_match(blocking=False) - if msg is None: - break - - msg_type = msg.get_type() - - if msg_type == "HEARTBEAT": - self.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0 - try: - self.mode = mavutil.mode_string_v10(msg) - except: - self.mode = str(msg.custom_mode) - - elif msg_type == "LOCAL_POSITION_NED": - self.position = {"x": msg.x, "y": msg.y, "z": msg.z} - self.altitude = -msg.z # NED: down is positive - - elif msg_type == "GLOBAL_POSITION_INT": - # Backup altitude source (relative altitude in mm) - if self.altitude == 0: - self.altitude = msg.relative_alt / 1000.0 - - elif msg_type == "VFR_HUD": - # Another backup altitude source - if self.altitude == 0: - self.altitude = msg.alt - - elif msg_type == "STATUSTEXT": - text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore') - text = text.strip() - if text: - print(f"[SITL] {text}") - - def set_param(self, param_name, value): - """Set a parameter value.""" - param_bytes = param_name.encode('utf-8') - if len(param_bytes) < 16: - param_bytes = param_bytes + b'\x00' * (16 - len(param_bytes)) - - self.mav.mav.param_set_send( - self.mav.target_system, - self.mav.target_component, - param_bytes, - float(value), - mavutil.mavlink.MAV_PARAM_TYPE_REAL32 - ) - time.sleep(0.5) - - def setup_gps_denied(self): - """Configure for simulated GPS-denied operation. - - GPS is enabled in EKF for: - 1. Geofence safety (enabled after GPS lock) - 2. LOCAL_POSITION_NED telemetry - - But we use GUIDED_NOGPS mode and send velocity/attitude commands, - simulating GPS-denied flight control. - """ - print("[CTRL] Configuring for GPS-denied simulation...") - - # Disable arming checks - self.set_param("ARMING_CHECK", 0) - - # Enable GPS in EKF (for geofence and position telemetry) - self.set_param("EK3_SRC1_POSXY", 3) # GPS - self.set_param("EK3_SRC1_VELXY", 3) # GPS - self.set_param("EK3_SRC1_POSZ", 1) # Baro - - # Disable GPS failsafe (we're simulating GPS-denied) - self.set_param("FS_GPS_ENABLE", 0) - - # DISABLE fence initially - will enable after GPS lock - self.set_param("FENCE_ENABLE", 0) - - print("[CTRL] Setup complete (fence will enable after GPS lock)") - time.sleep(1) - - def wait_for_ready(self, timeout=90): - """Wait for GPS lock and home position.""" - print("[CTRL] Waiting for GPS lock and home position...") - start = time.time() - gps_ok = False - home_ok = False - - while time.time() - start < timeout: - self.update_state() - - # Check multiple message types - msg = self.mav.recv_match(type=['STATUSTEXT', 'GPS_RAW_INT', 'HOME_POSITION'], - blocking=True, timeout=1) - if msg: - msg_type = msg.get_type() - - if msg_type == 'STATUSTEXT': - text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore') - text = text.strip() - if text: - print(f"[SITL] {text}") - # Check for origin set (means home is set) - if 'origin set' in text.lower() or 'Field Elevation' in text: - home_ok = True - print("[CTRL] Home position set!") - - elif msg_type == 'GPS_RAW_INT': - if msg.fix_type >= 3: # 3D fix - if not gps_ok: - print(f"[CTRL] GPS fix: {msg.fix_type} (satellites: {msg.satellites_visible})") - gps_ok = True - - elif msg_type == 'HOME_POSITION': - home_ok = True - print("[CTRL] Home position received!") - - # Check if ready - if gps_ok and home_ok: - print("[CTRL] GPS and home ready!") - - # Wait for EKF position estimate to be valid - print("[CTRL] Waiting for EKF position estimate...") - time.sleep(5) - - # Skip fence for now - causes "need position estimate" errors - # TODO: Re-enable fence after basic flight works - # print("[CTRL] Enabling geofence: Alt=10m, Radius=20m") - # self.set_param("FENCE_ENABLE", 1) - # self.set_param("FENCE_TYPE", 3) - # self.set_param("FENCE_ACTION", 2) - # self.set_param("FENCE_ALT_MAX", 10) - # self.set_param("FENCE_RADIUS", 20) - - return True - - elapsed = int(time.time() - start) - if elapsed % 10 == 0 and elapsed > 0: - status = f"GPS={'OK' if gps_ok else 'waiting'}, Home={'OK' if home_ok else 'waiting'}" - print(f"[CTRL] Waiting... ({elapsed}s) {status}") - - print("[CTRL] Ready timeout - continuing anyway (fence disabled)") - return True - - def set_mode(self, mode_name): - """Set flight mode.""" - mode_upper = mode_name.upper() - - if mode_upper not in self.COPTER_MODES: - print(f"[CTRL] Unknown mode: {mode_name}") - return False - - mode_id = self.COPTER_MODES[mode_upper] - print(f"[CTRL] Setting mode to {mode_upper}...") - - self.mav.mav.command_long_send( - self.mav.target_system, - self.mav.target_component, - mavutil.mavlink.MAV_CMD_DO_SET_MODE, - 0, - mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, - mode_id, - 0, 0, 0, 0, 0 - ) - - time.sleep(1) - for _ in range(10): - self.update_state() - if mode_upper in self.mode.upper(): - print(f"[CTRL] Mode: {self.mode}") - return True - time.sleep(0.2) - - print(f"[CTRL] Mode set (current: {self.mode})") - return True - - def arm(self): - """Force arm the drone.""" - print("[CTRL] Arming...") - - for attempt in range(5): - while self.mav.recv_match(blocking=False): - pass - - print(f"[CTRL] Arm attempt {attempt + 1}/5...") - self.mav.mav.command_long_send( - self.mav.target_system, - self.mav.target_component, - mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, - 0, - 1, # Arm - 21196, # Force arm - 0, 0, 0, 0, 0 - ) - - start = time.time() - while time.time() - start < 3.0: - msg = self.mav.recv_match(type=['COMMAND_ACK', 'HEARTBEAT', 'STATUSTEXT'], blocking=True, timeout=0.5) - if msg is None: - continue - - msg_type = msg.get_type() - - if msg_type == 'STATUSTEXT': - text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore') - if text.strip(): - print(f"[SITL] {text.strip()}") - - elif msg_type == 'HEARTBEAT': - if msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED: - print("[CTRL] Armed successfully!") - self.armed = True - return True - - elif msg_type == 'COMMAND_ACK': - if msg.command == mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM: - if msg.result == 0: - print("[CTRL] Arm accepted") - time.sleep(0.5) - self.update_state() - if self.armed: - return True - - time.sleep(1) - - print("[CTRL] Arm failed") - return False - - def send_attitude_thrust(self, roll, pitch, yaw, thrust): - """Send attitude and thrust command for GUIDED_NOGPS mode.""" - cy = math.cos(yaw * 0.5) - sy = math.sin(yaw * 0.5) - cp = math.cos(pitch * 0.5) - sp = math.sin(pitch * 0.5) - cr = math.cos(roll * 0.5) - sr = math.sin(roll * 0.5) - - q = [ - cr * cp * cy + sr * sp * sy, - sr * cp * cy - cr * sp * sy, - cr * sp * cy + sr * cp * sy, - cr * cp * sy - sr * sp * cy - ] - - type_mask = 0b00000111 # Use attitude + thrust - - self.mav.mav.set_attitude_target_send( - 0, - self.mav.target_system, - self.mav.target_component, - type_mask, - q, - 0, 0, 0, - thrust - ) - - def send_velocity_ned(self, vx, vy, vz): - """Send velocity command in NED frame.""" - type_mask = ( - (1 << 0) | (1 << 1) | (1 << 2) | # Ignore position - (1 << 6) | (1 << 7) | (1 << 8) | # Ignore acceleration - (1 << 10) | (1 << 11) # Ignore yaw - ) - - self.mav.mav.set_position_target_local_ned_send( - 0, - self.mav.target_system, - self.mav.target_component, - mavutil.mavlink.MAV_FRAME_LOCAL_NED, - type_mask, - 0, 0, 0, - vx, vy, vz, - 0, 0, 0, - 0, 0 - ) - - def takeoff(self, altitude=5.0): - """Take off using thrust commands.""" - print(f"[CTRL] Taking off to {altitude}m...") - - hover_thrust = 0.6 - max_thrust = 0.85 - thrust = 0.5 - - start = time.time() - timeout = 30 - last_alt = 0 - stuck_count = 0 - got_alt_reading = False - - while time.time() - start < timeout: - # Actively poll for position messages - for _ in range(5): - msg = self.mav.recv_match(type=['LOCAL_POSITION_NED', 'GLOBAL_POSITION_INT', 'VFR_HUD'], - blocking=True, timeout=0.02) - if msg: - msg_type = msg.get_type() - if msg_type == 'LOCAL_POSITION_NED': - self.altitude = -msg.z - self.position = {"x": msg.x, "y": msg.y, "z": msg.z} - got_alt_reading = True - elif msg_type == 'GLOBAL_POSITION_INT': - self.altitude = msg.relative_alt / 1000.0 - got_alt_reading = True - elif msg_type == 'VFR_HUD': - # VFR_HUD.alt is MSL, but climb rate is useful - if msg.climb > 0.1: - got_alt_reading = True - - self.update_state() - - if self.altitude >= altitude * 0.90: - self.send_attitude_thrust(0, 0, 0, hover_thrust) - print(f"\n[CTRL] Reached {self.altitude:.1f}m") - time.sleep(0.5) - return True - - # Ramp thrust more aggressively - elapsed = time.time() - start - if elapsed < 2.0: - # Initial ramp - thrust = 0.5 + (elapsed / 2.0) * 0.35 # 0.5 -> 0.85 in 2 seconds - elif self.altitude > 0.5: - # Flying - reduce to climb thrust - thrust = 0.75 - else: - # Max thrust if not climbing after 2s - thrust = max_thrust - - # Only check for stuck if we have altitude readings - if got_alt_reading and elapsed > 5.0: - if abs(self.altitude - last_alt) < 0.01: - stuck_count += 1 - if stuck_count > 50: # 5 seconds stuck - print(f"\n[WARN] Not climbing at thrust={thrust:.2f}, alt={self.altitude:.2f}m") - break - else: - stuck_count = 0 - last_alt = self.altitude - - self.send_attitude_thrust(0, 0, 0, thrust) - - alt_str = f"{self.altitude:.1f}" if got_alt_reading else "?" - print(f"\r[CTRL] Climbing: {alt_str}m / {altitude:.1f}m (thrust={thrust:.2f}, t={elapsed:.1f}s)", end="") - time.sleep(0.1) - - self.send_attitude_thrust(0, 0, 0, hover_thrust) - print(f"\n[CTRL] Takeoff ended at {self.altitude:.1f}m") - return self.altitude > 1.0 - - def fly_to(self, target_x, target_y, altitude, tolerance=1.0, timeout=60): - """Fly to position using attitude+thrust commands (works in GUIDED_NOGPS). - - Uses pitch for forward/backward movement, roll for left/right. - """ - print(f"[CTRL] Flying to ({target_x:.1f}, {target_y:.1f}, {altitude:.1f}m)") - - hover_thrust = 0.6 - max_tilt = 0.15 # ~8.5 degrees max tilt angle in radians - - start_time = time.time() - - while time.time() - start_time < timeout: - # Actively poll for position - for _ in range(3): - msg = self.mav.recv_match(type=['LOCAL_POSITION_NED', 'GLOBAL_POSITION_INT'], - blocking=True, timeout=0.03) - if msg: - if msg.get_type() == 'LOCAL_POSITION_NED': - self.altitude = -msg.z - self.position = {"x": msg.x, "y": msg.y, "z": msg.z} - elif msg.get_type() == 'GLOBAL_POSITION_INT': - self.altitude = msg.relative_alt / 1000.0 - - self.update_state() - - dx = target_x - self.position["x"] - dy = target_y - self.position["y"] - dz = altitude - self.altitude - - dist = math.sqrt(dx*dx + dy*dy) - - # Check if reached target - if dist < tolerance and abs(dz) < tolerance: - self.send_attitude_thrust(0, 0, 0, hover_thrust) - print(f"\n[CTRL] Reached ({self.position['x']:.1f}, {self.position['y']:.1f})") - return True - - # Calculate tilt angles for movement - # In NED frame: +X is forward (North), +Y is right (East) - # Pitch: negative = nose down = fly forward (+X) - # Roll: positive = right wing down = fly right (+Y) - - if dist > 0.1: - # Calculate direction to target - # Scale tilt based on distance (more tilt = faster movement) - tilt_scale = min(1.0, dist / 5.0) # Full tilt at 5+ meters away - - # Pitch for forward/back movement (negative pitch = forward) - pitch = -max_tilt * tilt_scale * (dx / dist) - - # Roll for left/right movement (positive roll = right) - roll = max_tilt * tilt_scale * (dy / dist) - else: - pitch = 0 - roll = 0 - - # Thrust for altitude - if dz > 0.5: - thrust = hover_thrust + 0.1 # Climb - elif dz < -0.5: - thrust = hover_thrust - 0.1 # Descend - else: - thrust = hover_thrust + dz * 0.1 # Small adjustment - - # Clamp thrust - thrust = max(0.4, min(0.8, thrust)) - - self.send_attitude_thrust(roll, pitch, 0, thrust) - - elapsed = time.time() - start_time - if int(elapsed * 10) % 10 == 0: # Print every second - print(f"\r[CTRL] Pos: ({self.position['x']:.1f}, {self.position['y']:.1f}) -> ({target_x:.1f}, {target_y:.1f}) Dist: {dist:.1f}m Alt: {self.altitude:.1f}m", end="", flush=True) - time.sleep(0.05) # 20Hz control rate - - self.send_attitude_thrust(0, 0, 0, hover_thrust) - print(f"\n[CTRL] Waypoint timeout") - return False - - def land(self): - """Land the drone.""" - print("[CTRL] Landing...") - self.set_mode("LAND") - - for _ in range(100): - self.update_state() - print(f"\r[CTRL] Landing... Alt: {self.altitude:.1f}m", end="") - if self.altitude < 0.3: - print("\n[CTRL] Landed!") - return True - time.sleep(0.2) - return False - - # ==================== MISSIONS ==================== - - def run_hover_mission(self, altitude=5.0, duration=30): - """Hover in place.""" - print(f"\n{'='*50}") - print(f" HOVER MISSION: {altitude}m for {duration}s") - print(f"{'='*50}\n") - - self.setup_gps_denied() - self.wait_for_ready() - - if not self.set_mode('GUIDED_NOGPS'): - self.set_mode('GUIDED') - - if not self.arm(): - return False - - if not self.takeoff(altitude): - self.land() - return False - - print(f"[CTRL] Hovering for {duration}s...") - start = time.time() - while time.time() - start < duration: - self.update_state() - self.send_attitude_thrust(0, 0, 0, 0.6) - remaining = duration - (time.time() - start) - print(f"\r[CTRL] Hovering... {remaining:.0f}s remaining, Alt: {self.altitude:.1f}m ", end="", flush=True) - time.sleep(0.1) - - print() - self.land() - print("\n[CTRL] Hover mission complete!") - return True - - def run_square_mission(self, altitude=5.0, size=5.0): - """Fly a square pattern.""" - print(f"\n{'='*50}") - print(f" SQUARE MISSION: {size}m x {size}m at {altitude}m") - print(f"{'='*50}\n") - - self.setup_gps_denied() - self.wait_for_ready() - - if not self.set_mode('GUIDED_NOGPS'): - self.set_mode('GUIDED') - - if not self.arm(): - return False - - if not self.takeoff(altitude): - self.land() - return False - - self.update_state() - start_x = self.position["x"] - start_y = self.position["y"] - - waypoints = [ - (start_x + size, start_y), - (start_x + size, start_y + size), - (start_x, start_y + size), - (start_x, start_y), - ] - - print(f"[CTRL] Flying square from ({start_x:.1f}, {start_y:.1f})") - - for i, (x, y) in enumerate(waypoints): - print(f"\n--- Waypoint {i+1}/4 ---") - self.fly_to(x, y, altitude) - time.sleep(1) - - self.land() - print("\n[CTRL] Square mission complete!") - return True - - def run_circle_mission(self, altitude=5.0, radius=5.0, points=8): - """Fly a circular pattern.""" - print(f"\n{'='*50}") - print(f" CIRCLE MISSION: radius={radius}m at {altitude}m") - print(f"{'='*50}\n") - - self.setup_gps_denied() - self.wait_for_ready() - - if not self.set_mode('GUIDED_NOGPS'): - self.set_mode('GUIDED') - - if not self.arm(): - return False - - if not self.takeoff(altitude): - self.land() - return False - - self.update_state() - center_x = self.position["x"] - center_y = self.position["y"] - - print(f"[CTRL] Flying circle around ({center_x:.1f}, {center_y:.1f})") - - for i in range(points + 1): - angle = 2 * math.pi * i / points - x = center_x + radius * math.cos(angle) - y = center_y + radius * math.sin(angle) - print(f"\n--- Point {i+1}/{points+1} ---") - self.fly_to(x, y, altitude) - time.sleep(0.5) - - self.land() - print("\n[CTRL] Circle mission complete!") - return True - - -def main(): - parser = argparse.ArgumentParser(description='Autonomous UAV Controller') - parser.add_argument('--connection', '-c', default='tcp:127.0.0.1:5760', - help='MAVLink connection string') - parser.add_argument('--mission', '-m', choices=['hover', 'square', 'circle'], - default='hover', help='Mission type') - parser.add_argument('--altitude', '-a', type=float, default=5.0, - help='Flight altitude (meters)') - parser.add_argument('--size', '-s', type=float, default=5.0, - help='Pattern size/radius (meters)') - parser.add_argument('--duration', '-d', type=float, default=30.0, - help='Hover duration (seconds)') - args = parser.parse_args() - - print("=" * 50) - print(" Autonomous UAV Controller") - print(" GPS-Denied Navigation Simulation") - print("=" * 50) - print(f" Connection: {args.connection}") - print(f" Mission: {args.mission}") - print(f" Altitude: {args.altitude}m") - print("=" * 50) - - controller = AutonomousController(args.connection) - - if not controller.connect(): - print("\n[ERROR] Could not connect to SITL") - sys.exit(1) - - try: - if args.mission == 'hover': - controller.run_hover_mission(args.altitude, args.duration) - elif args.mission == 'square': - controller.run_square_mission(args.altitude, args.size) - elif args.mission == 'circle': - controller.run_circle_mission(args.altitude, args.size) - - except KeyboardInterrupt: - print("\n\n[CTRL] Interrupted - Landing...") - controller.land() - except Exception as e: - print(f"\n[ERROR] {e}") - import traceback - traceback.print_exc() - controller.land() - - -if __name__ == '__main__': - main() diff --git a/src/control/__init__.py b/src/control/__init__.py index df7880f..e69de29 100644 --- a/src/control/__init__.py +++ b/src/control/__init__.py @@ -1 +0,0 @@ -"""Control module for UAV and UGV controllers.""" diff --git a/src/control/uav_controller.py b/src/control/uav_controller.py index f3df3b7..2173c2e 100644 --- a/src/control/uav_controller.py +++ b/src/control/uav_controller.py @@ -1,304 +1,263 @@ #!/usr/bin/env python3 -"""UAV Controller - Flight control using local position only.""" -import rclpy -from rclpy.node import Node -from geometry_msgs.msg import PoseStamped, TwistStamped, Twist -from mavros_msgs.srv import CommandBool, SetMode, CommandTOL -from mavros_msgs.msg import State -from std_msgs.msg import String, Bool +import time +import math +import argparse import numpy as np -from enum import Enum +from time import sleep, perf_counter +from typing import Tuple + +from pymavlink import mavutil +import pymavlink.dialects.v20.ardupilotmega as mavlink +from pymavlink.dialects.v20.ardupilotmega import ( + MAVLink_set_position_target_local_ned_message, + MAVLink_set_position_target_global_int_message, + MAV_FRAME_GLOBAL_RELATIVE_ALT_INT, + MAV_FRAME_BODY_OFFSET_NED, + MAV_FRAME_LOCAL_NED, + MAV_CMD_DO_SET_MODE, + MAV_CMD_NAV_TAKEOFF, + MAV_CMD_NAV_LAND, +) + +HOLD_ALT = 5.0 +LOWEST_ALT = 3.0 +GUIDED_MODE = 4 +GUIDED_NOGPS_MODE = 20 -class FlightState(Enum): - DISARMED = 0 - ARMED = 1 - TAKING_OFF = 2 - HOVERING = 3 - NAVIGATING = 4 - LANDING = 5 - EMERGENCY = 6 +class Controller: + HOLD_ALT = HOLD_ALT + LOWEST_ALT = LOWEST_ALT + + def __init__(self, connection_string: str): + print(f"[UAV] Connecting to {connection_string} ...") + self.conn = mavutil.mavlink_connection(connection_string) + self.conn.wait_heartbeat() + print("[UAV] Heartbeat from system %u component %u" % + (self.conn.target_system, self.conn.target_component)) + + self.conn.mav.request_data_stream_send( + self.conn.target_system, + self.conn.target_component, + mavutil.mavlink.MAV_DATA_STREAM_ALL, + 50, 1) + + self.current_yaw = 0.0 + self.altitude = 0.0 + self.position = {'x': 0.0, 'y': 0.0, 'z': 0.0} + self.armed = False + + def _drain_messages(self): + while True: + msg = self.conn.recv_match(blocking=False) + if msg is None: + break + mtype = msg.get_type() + if mtype == 'ATTITUDE': + self.current_yaw = msg.yaw + elif mtype == 'LOCAL_POSITION_NED': + self.position = {'x': msg.x, 'y': msg.y, 'z': msg.z} + self.altitude = -msg.z + elif mtype == 'GLOBAL_POSITION_INT': + if self.altitude == 0: + self.altitude = msg.relative_alt / 1000.0 + elif mtype == 'HEARTBEAT': + self.armed = bool(msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) + elif mtype == 'STATUSTEXT': + text = msg.text if isinstance(msg.text, str) else msg.text.decode('utf-8', errors='ignore') + text = text.strip() + if text: + print(f"[SITL] {text}") + + def update_state(self): + self._drain_messages() + + def get_position(self) -> dict: + self.conn.mav.command_long_send( + self.conn.target_system, + self.conn.target_component, + mavlink.MAV_CMD_REQUEST_MESSAGE, + 0, 33, 0, 0, 0, 0, 0, 0) + pos = self.conn.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=5) + if pos is None: + return {'lat': 0, 'lon': 0} + return {'lat': pos.lat, 'lon': pos.lon} + + def get_local_position(self) -> dict: + self.update_state() + return self.position.copy() + + def get_altitude(self) -> float: + self.update_state() + return self.altitude + + def set_param(self, name: str, value: float): + self.conn.mav.param_set_send( + self.conn.target_system, + self.conn.target_component, + name.encode('utf-8'), + value, + mavutil.mavlink.MAV_PARAM_TYPE_REAL32) + sleep(0.5) + self._drain_messages() + + def set_mode_guided(self): + self.conn.mav.command_long_send( + self.conn.target_system, + self.conn.target_component, + MAV_CMD_DO_SET_MODE, + 0, 89, GUIDED_MODE, 0, 0, 0, 0, 0) + sleep(1) + self._drain_messages() + print("[UAV] Mode -> GUIDED") + + def set_mode_guided_nogps(self): + self.conn.mav.command_long_send( + self.conn.target_system, + self.conn.target_component, + MAV_CMD_DO_SET_MODE, + 0, 89, GUIDED_NOGPS_MODE, 0, 0, 0, 0, 0) + sleep(1) + self._drain_messages() + print("[UAV] Mode -> GUIDED_NOGPS") + + def arm(self, retries: int = 10): + self.set_mode_guided() + + for attempt in range(1, retries + 1): + print(f"[UAV] Arm attempt {attempt}/{retries}...") + self.conn.arducopter_arm() + + t0 = time.time() + while time.time() - t0 < 5: + self._drain_messages() + if self.armed: + print("[UAV] Armed") + return True + sleep(0.2) + + self._drain_messages() + + print("[UAV] FAILED to arm after all retries") + return False -class UAVController(Node): - - def __init__(self): - super().__init__('uav_controller') - - self.declare_parameter('takeoff_altitude', 5.0) - self.declare_parameter('position_tolerance', 0.3) - self.declare_parameter('namespace', '/uav') - self.declare_parameter('auto_arm', True) - self.declare_parameter('auto_takeoff', True) - self.declare_parameter('startup_delay', 15.0) # Wait for EKF - - self.takeoff_altitude = self.get_parameter('takeoff_altitude').value - self.position_tolerance = self.get_parameter('position_tolerance').value - ns = self.get_parameter('namespace').value - self.auto_arm = self.get_parameter('auto_arm').value - self.auto_takeoff = self.get_parameter('auto_takeoff').value - self.startup_delay = self.get_parameter('startup_delay').value - - self.state = FlightState.DISARMED - self.mavros_state = None - self.current_pose = None - self.target_pose = None - self.home_position = None - self.startup_complete = False - self.ekf_ready = False - - self.state_sub = self.create_subscription( - State, f'{ns}/mavros/state', self.state_callback, 10) - - self.local_pose_sub = self.create_subscription( - PoseStamped, f'{ns}/mavros/local_position/pose', self.pose_callback, 10) - - self.cmd_sub = self.create_subscription( - String, f'{ns}/controller/command', self.command_callback, 10) - - self.setpoint_sub = self.create_subscription( - PoseStamped, f'{ns}/setpoint_position', self.setpoint_callback, 10) - - self.vel_sub = self.create_subscription( - Twist, f'{ns}/cmd_vel_safe', self.velocity_callback, 10) - - self.setpoint_pub = self.create_publisher( - PoseStamped, f'{ns}/mavros/setpoint_position/local', 10) - - self.vel_pub = self.create_publisher( - TwistStamped, f'{ns}/mavros/setpoint_velocity/cmd_vel', 10) - - self.status_pub = self.create_publisher( - String, f'{ns}/controller/status', 10) - - self.arming_client = self.create_client(CommandBool, f'{ns}/mavros/cmd/arming') - self.set_mode_client = self.create_client(SetMode, f'{ns}/mavros/set_mode') - self.takeoff_client = self.create_client(CommandTOL, f'{ns}/mavros/cmd/takeoff') - self.land_client = self.create_client(CommandTOL, f'{ns}/mavros/cmd/land') - - self.setpoint_timer = self.create_timer(0.05, self.publish_setpoint) - self.status_timer = self.create_timer(0.5, self.publish_status) - - # Delayed auto-start (wait for EKF initialization) - if self.auto_arm or self.auto_takeoff: - self.get_logger().info(f'Auto-mission enabled. Starting in {self.startup_delay}s...') - self.startup_timer = self.create_timer(self.startup_delay, self.auto_startup) - - self.get_logger().info('UAV Controller Started - GPS-denied mode') - - def auto_startup(self): - """Auto arm and takeoff after startup delay.""" - # Only run once - if self.startup_complete: - return - self.startup_complete = True - self.startup_timer.cancel() - - self.get_logger().info('Auto-startup: Beginning autonomous sequence...') - - # Check if MAVROS is connected - if self.mavros_state is None: - self.get_logger().warn('MAVROS not connected yet, retrying in 5s...') - self.startup_timer = self.create_timer(5.0, self.auto_startup) - self.startup_complete = False - return - - # Set GUIDED mode first - self.get_logger().info('Auto-startup: Setting GUIDED mode...') - self.set_mode('GUIDED') - - # Wait a moment then arm - self.create_timer(2.0, self._auto_arm_callback, callback_group=None) - - def _auto_arm_callback(self): - """Callback to arm after mode is set.""" - if self.auto_arm: - self.get_logger().info('Auto-startup: Arming...') - self.arm() - - # Wait then takeoff - if self.auto_takeoff: - self.create_timer(3.0, self._auto_takeoff_callback, callback_group=None) - - def _auto_takeoff_callback(self): - """Callback to takeoff after arming.""" - if self.mavros_state and self.mavros_state.armed: - self.get_logger().info('Auto-startup: Taking off...') - self.takeoff() - else: - self.get_logger().warn('Auto-startup: Not armed, cannot takeoff') - - def state_callback(self, msg): - self.mavros_state = msg - if msg.armed and self.state == FlightState.DISARMED: - self.state = FlightState.ARMED - - def pose_callback(self, msg): - self.current_pose = msg - if self.home_position is None and self.mavros_state and self.mavros_state.armed: - self.home_position = np.array([ - msg.pose.position.x, - msg.pose.position.y, - msg.pose.position.z - ]) - self.get_logger().info(f'Home position set: {self.home_position}') - - def setpoint_callback(self, msg): - self.target_pose = msg - - def velocity_callback(self, msg): - if self.state not in [FlightState.NAVIGATING, FlightState.HOVERING]: - return - - vel_msg = TwistStamped() - vel_msg.header.stamp = self.get_clock().now().to_msg() - vel_msg.header.frame_id = 'base_link' - vel_msg.twist = msg - self.vel_pub.publish(vel_msg) - - def command_callback(self, msg): - cmd = msg.data.lower() - - if cmd == 'arm': - self.arm() - elif cmd == 'disarm': - self.disarm() - elif cmd == 'takeoff': - self.takeoff() - elif cmd == 'land': - self.land() - elif cmd == 'rtl': - self.return_to_launch() - elif cmd == 'hold': - self.hold_position() - elif cmd == 'guided': - self.set_mode('GUIDED') - - def arm(self): - if not self.arming_client.wait_for_service(timeout_sec=5.0): - self.get_logger().error('Arming service not available') - return False - - req = CommandBool.Request() - req.value = True - future = self.arming_client.call_async(req) - future.add_done_callback(self.arm_response) - return True - - def arm_response(self, future): - try: - result = future.result() - if result.success: - self.state = FlightState.ARMED - self.get_logger().info('Vehicle armed') - else: - self.get_logger().error('Arming failed') - except Exception as e: - self.get_logger().error(f'Arming error: {e}') - def disarm(self): - req = CommandBool.Request() - req.value = False - future = self.arming_client.call_async(req) - self.state = FlightState.DISARMED - - def set_mode(self, mode): - if not self.set_mode_client.wait_for_service(timeout_sec=5.0): - return False - - req = SetMode.Request() - req.custom_mode = mode - future = self.set_mode_client.call_async(req) - return True - - def takeoff(self): - self.set_mode('GUIDED') - self.arm() - - if self.current_pose is not None: - self.target_pose = PoseStamped() - self.target_pose.header.frame_id = 'odom' - self.target_pose.pose.position.x = self.current_pose.pose.position.x - self.target_pose.pose.position.y = self.current_pose.pose.position.y - self.target_pose.pose.position.z = self.takeoff_altitude - self.target_pose.pose.orientation.w = 1.0 - - self.state = FlightState.TAKING_OFF - self.get_logger().info(f'Taking off to {self.takeoff_altitude}m') - + self.conn.arducopter_disarm() + print("[UAV] Disarmed") + + def takeoff(self, altitude: float = HOLD_ALT): + self.conn.mav.command_long_send( + self.conn.target_system, + self.conn.target_component, + MAV_CMD_NAV_TAKEOFF, + 0, 0, 0, 0, 0, 0, 0, altitude) + print(f"[UAV] Takeoff -> {altitude}m") + def land(self): - if self.current_pose is not None: - self.target_pose = PoseStamped() - self.target_pose.header.frame_id = 'odom' - self.target_pose.pose.position.x = self.current_pose.pose.position.x - self.target_pose.pose.position.y = self.current_pose.pose.position.y - self.target_pose.pose.position.z = 0.0 - self.target_pose.pose.orientation.w = 1.0 - - self.state = FlightState.LANDING - self.get_logger().info('Landing') - - def return_to_launch(self): - if self.home_position is not None: - self.target_pose = PoseStamped() - self.target_pose.header.frame_id = 'odom' - self.target_pose.pose.position.x = float(self.home_position[0]) - self.target_pose.pose.position.y = float(self.home_position[1]) - self.target_pose.pose.position.z = self.takeoff_altitude - self.target_pose.pose.orientation.w = 1.0 - - self.state = FlightState.NAVIGATING - self.get_logger().info('Returning to local home position') - - def hold_position(self): - if self.current_pose is not None: - self.target_pose = PoseStamped() - self.target_pose.header = self.current_pose.header - self.target_pose.pose = self.current_pose.pose - self.state = FlightState.HOVERING - - def publish_setpoint(self): - if self.target_pose is None: - return - - self.target_pose.header.stamp = self.get_clock().now().to_msg() - self.setpoint_pub.publish(self.target_pose) - - if self.current_pose is not None: - error = np.array([ - self.target_pose.pose.position.x - self.current_pose.pose.position.x, - self.target_pose.pose.position.y - self.current_pose.pose.position.y, - self.target_pose.pose.position.z - self.current_pose.pose.position.z - ]) - distance = np.linalg.norm(error) - - if self.state == FlightState.TAKING_OFF and distance < self.position_tolerance: - self.state = FlightState.HOVERING - self.get_logger().info('Takeoff complete') - elif self.state == FlightState.LANDING and self.current_pose.pose.position.z < 0.2: - self.state = FlightState.ARMED - self.get_logger().info('Landing complete') - - def publish_status(self): - status_msg = String() - armed = 'ARMED' if (self.mavros_state and self.mavros_state.armed) else 'DISARMED' - mode = self.mavros_state.mode if self.mavros_state else 'UNKNOWN' - status_msg.data = f'{self.state.name}|{armed}|{mode}' - self.status_pub.publish(status_msg) + self.conn.set_mode_rtl() + print("[UAV] Landing (RTL)") + def land_at(self, lat: int, lon: int): + self.conn.mav.command_long_send( + self.conn.target_system, + self.conn.target_component, + MAV_CMD_NAV_LAND, + 0, 0, 0, 0, 0, + lat / 1e7 if abs(lat) > 1e6 else lat, + lon / 1e7 if abs(lon) > 1e6 else lon, + 0) + print(f"[UAV] Landing at ({lat}, {lon})") -def main(args=None): - rclpy.init(args=args) - node = UAVController() - try: - rclpy.spin(node) - except KeyboardInterrupt: - pass - finally: - node.destroy_node() - rclpy.shutdown() + def move_pos_rel(self, x: float, y: float, z: float): + move_msg = MAVLink_set_position_target_local_ned_message( + int(perf_counter() * 1000), + self.conn.target_system, + self.conn.target_component, + MAV_FRAME_BODY_OFFSET_NED, + 3576, x, y, z, + 0, 0, 0, 0, 0, 0, 0, 0) + self.conn.mav.send(move_msg) + def move_pos_rel_yaw(self, x: float, y: float, z: float, yaw: float): + move_msg = MAVLink_set_position_target_local_ned_message( + int(perf_counter() * 1000), + self.conn.target_system, + self.conn.target_component, + MAV_FRAME_BODY_OFFSET_NED, + 3576, x, y, z, + 0, 0, 0, 0, 0, 0, yaw, 0) + self.conn.mav.send(move_msg) -if __name__ == '__main__': - main() + def move_vel_rel_alt(self, vx: float, vy: float, z: float): + move_msg = MAVLink_set_position_target_local_ned_message( + int(perf_counter() * 1000), + self.conn.target_system, + self.conn.target_component, + MAV_FRAME_BODY_OFFSET_NED, + 3555, 0, 0, z, + vx, vy, 0, 0, 0, 0, 0, 0) + self.conn.mav.send(move_msg) + + def move_local_ned(self, x: float, y: float, z: float): + move_msg = MAVLink_set_position_target_local_ned_message( + int(perf_counter() * 1000), + self.conn.target_system, + self.conn.target_component, + MAV_FRAME_LOCAL_NED, + 3576, x, y, z, + 0, 0, 0, 0, 0, 0, 0, 0) + self.conn.mav.send(move_msg) + + def go_to(self, lat: int, lon: int, alt: float, yaw: float = 0): + move_msg = MAVLink_set_position_target_global_int_message( + int(perf_counter() * 1000), + self.conn.target_system, + self.conn.target_component, + MAV_FRAME_GLOBAL_RELATIVE_ALT_INT, + 504, lat, lon, alt, + 0, 0, 0, 0, 0, 0, yaw, 0) + self.conn.mav.send(move_msg) + + def distance(self, p1: Tuple, p2: Tuple) -> float: + return float(np.linalg.norm(np.asarray(p1) - np.asarray(p2))) + + def wait_altitude(self, target_alt: float, tolerance: float = 0.5, + timeout: float = 30.0): + t0 = time.time() + while time.time() - t0 < timeout: + self.update_state() + if abs(self.altitude - target_alt) < tolerance: + print(f"\n[UAV] Altitude reached: {self.altitude:.1f}m") + return True + print(f"\r[UAV] Climbing: {self.altitude:.1f}m / {target_alt:.1f}m ", + end='', flush=True) + sleep(0.2) + print(f"\n[UAV] Altitude timeout at {self.altitude:.1f}m") + return False + + def wait_for_gps(self, timeout: float = 120.0): + print("[UAV] Waiting for GPS lock & EKF ...") + t0 = time.time() + while time.time() - t0 < timeout: + self._drain_messages() + msg = self.conn.recv_match(type='GPS_RAW_INT', blocking=True, timeout=2) + if msg and msg.fix_type >= 3: + print(f"\n[UAV] GPS fix: {msg.fix_type} sats: {msg.satellites_visible}") + print("[UAV] Waiting for EKF to settle ...") + for _ in range(15): + self._drain_messages() + sleep(1) + return True + elapsed = int(time.time() - t0) + print(f"\r[UAV] Waiting for GPS ... {elapsed}s ", end='', flush=True) + print("\n[UAV] GPS timeout") + return False + + def set_max_velocity(self, speed): + self.conn.mav.command_long_send( + self.conn.target_system, + self.conn.target_component, + mavlink.MAV_CMD_DO_CHANGE_SPEED, + 0, speed, -1, 0, 0, 0, 0, 0) diff --git a/src/control/ugv_controller.py b/src/control/ugv_controller.py index e0f20e2..f195c9d 100644 --- a/src/control/ugv_controller.py +++ b/src/control/ugv_controller.py @@ -1,198 +1,94 @@ #!/usr/bin/env python3 -"""UGV Controller - Ground vehicle control using local position.""" -import rclpy -from rclpy.node import Node -from geometry_msgs.msg import Twist, PoseStamped -from nav_msgs.msg import Odometry -from std_msgs.msg import String, Bool +import math import numpy as np -from enum import Enum +from time import sleep, perf_counter +from typing import Tuple, Optional + +try: + from pymavlink import mavutil + import pymavlink.dialects.v20.ardupilotmega as mavlink + from pymavlink.dialects.v20.ardupilotmega import ( + MAVLink_set_position_target_local_ned_message, + MAV_FRAME_BODY_OFFSET_NED, + MAV_FRAME_LOCAL_NED, + ) + HAS_MAVLINK = True +except ImportError: + HAS_MAVLINK = False + +MAX_LINEAR_VEL = 1.0 +MAX_ANGULAR_VEL = 1.5 +POSITION_TOL = 0.3 -class UGVState(Enum): - IDLE = 0 - MOVING = 1 - ROTATING = 2 - STOPPED = 3 +class UGVController: + def __init__(self, connection_string: Optional[str] = None, + static_pos: Tuple[float, float] = (10.0, 5.0)): + self.conn = None + self.backend = 'passive' + self.position = {'x': static_pos[0], 'y': static_pos[1], 'z': 0.0} + self.yaw = 0.0 + + if connection_string and HAS_MAVLINK: + try: + print(f"[UGV] Connecting via pymavlink: {connection_string}") + self.conn = mavutil.mavlink_connection(connection_string) + self.conn.wait_heartbeat(timeout=10) + print("[UGV] Heartbeat received (system %u component %u)" % + (self.conn.target_system, self.conn.target_component)) + self.backend = 'mavlink' + except Exception as e: + print(f"[UGV] MAVLink connection failed: {e}") + self.conn = None + + if self.backend == 'passive': + print(f"[UGV] Passive mode - static at ({static_pos[0]}, {static_pos[1]})") + + def update_state(self): + if self.backend == 'mavlink' and self.conn: + while True: + msg = self.conn.recv_match(blocking=False) + if msg is None: + break + mtype = msg.get_type() + if mtype == 'LOCAL_POSITION_NED': + self.position = {'x': msg.x, 'y': msg.y, 'z': msg.z} + elif mtype == 'ATTITUDE': + self.yaw = msg.yaw + + def get_position(self) -> dict: + self.update_state() + return self.position.copy() + + def move_to(self, x: float, y: float): + if self.backend == 'mavlink' and self.conn: + move_msg = MAVLink_set_position_target_local_ned_message( + int(perf_counter() * 1000), + self.conn.target_system, + self.conn.target_component, + MAV_FRAME_LOCAL_NED, + 3576, x, y, 0, + 0, 0, 0, 0, 0, 0, 0, 0) + self.conn.mav.send(move_msg) + print(f"[UGV] Moving to ({x:.1f}, {y:.1f})") + + def send_velocity(self, vx: float, vy: float, yaw_rate: float = 0.0): + if self.backend == 'mavlink' and self.conn: + move_msg = MAVLink_set_position_target_local_ned_message( + int(perf_counter() * 1000), + self.conn.target_system, + self.conn.target_component, + MAV_FRAME_BODY_OFFSET_NED, + 3527, 0, 0, 0, + vx, vy, 0, 0, 0, 0, 0, yaw_rate) + self.conn.mav.send(move_msg) -class UGVController(Node): - - def __init__(self): - super().__init__('ugv_controller') - - self.declare_parameter('max_linear_velocity', 1.0) - self.declare_parameter('max_angular_velocity', 1.5) - self.declare_parameter('kp_linear', 0.8) - self.declare_parameter('kp_angular', 1.5) - self.declare_parameter('position_tolerance', 0.3) - self.declare_parameter('angle_tolerance', 0.1) - - self.max_linear_vel = self.get_parameter('max_linear_velocity').value - self.max_angular_vel = self.get_parameter('max_angular_velocity').value - self.kp_linear = self.get_parameter('kp_linear').value - self.kp_angular = self.get_parameter('kp_angular').value - self.position_tolerance = self.get_parameter('position_tolerance').value - self.angle_tolerance = self.get_parameter('angle_tolerance').value - - self.state = UGVState.IDLE - self.current_odom = None - self.target_pose = None - self.home_position = None - - self.odom_sub = self.create_subscription( - Odometry, '/ugv/odom', self.odom_callback, 10) - - self.goal_sub = self.create_subscription( - PoseStamped, '/ugv/goal_pose', self.goal_callback, 10) - - self.cmd_sub = self.create_subscription( - String, '/ugv/controller/command', self.command_callback, 10) - - self.vel_sub = self.create_subscription( - Twist, '/ugv/cmd_vel_safe', self.velocity_callback, 10) - - self.cmd_vel_pub = self.create_publisher(Twist, '/ugv/cmd_vel', 10) - self.status_pub = self.create_publisher(String, '/ugv/controller/status', 10) - self.goal_reached_pub = self.create_publisher(Bool, '/ugv/goal_reached', 10) - - self.control_timer = self.create_timer(0.05, self.control_loop) - self.status_timer = self.create_timer(0.5, self.publish_status) - - self.get_logger().info('UGV Controller Started - GPS-denied mode') - - def odom_callback(self, msg): - self.current_odom = msg - if self.home_position is None: - self.home_position = np.array([ - msg.pose.pose.position.x, - msg.pose.pose.position.y - ]) - self.get_logger().info(f'Home position set: {self.home_position}') - - def goal_callback(self, msg): - self.target_pose = msg - self.state = UGVState.MOVING - self.get_logger().info( - f'Goal received: [{msg.pose.position.x:.2f}, {msg.pose.position.y:.2f}]') - - def command_callback(self, msg): - cmd = msg.data.lower() - - if cmd == 'stop': - self.stop() - self.state = UGVState.STOPPED - elif cmd == 'resume': - self.state = UGVState.MOVING - elif cmd == 'rtl': - self.return_to_launch() - - def velocity_callback(self, msg): - if self.state == UGVState.MOVING: - self.cmd_vel_pub.publish(msg) - - def control_loop(self): - if self.state != UGVState.MOVING: - return - - if self.current_odom is None or self.target_pose is None: - return - - current_pos = np.array([ - self.current_odom.pose.pose.position.x, - self.current_odom.pose.pose.position.y - ]) - - target_pos = np.array([ - self.target_pose.pose.position.x, - self.target_pose.pose.position.y - ]) - - error = target_pos - current_pos - distance = np.linalg.norm(error) - - if distance < self.position_tolerance: - self.stop() - self.state = UGVState.IDLE - - reached_msg = Bool() - reached_msg.data = True - self.goal_reached_pub.publish(reached_msg) - - self.get_logger().info('Goal reached!') - return - - target_angle = np.arctan2(error[1], error[0]) - - quat = self.current_odom.pose.pose.orientation - current_yaw = np.arctan2( - 2.0 * (quat.w * quat.z + quat.x * quat.y), - 1.0 - 2.0 * (quat.y * quat.y + quat.z * quat.z) - ) - - angle_error = self.normalize_angle(target_angle - current_yaw) - - cmd = Twist() - - if abs(angle_error) > self.angle_tolerance: - cmd.angular.z = np.clip( - self.kp_angular * angle_error, - -self.max_angular_vel, - self.max_angular_vel - ) - - if abs(angle_error) < np.pi / 4: - speed = self.kp_linear * distance * (1.0 - abs(angle_error) / np.pi) - cmd.linear.x = np.clip(speed, 0.0, self.max_linear_vel) - else: - cmd.linear.x = np.clip( - self.kp_linear * distance, - 0.0, - self.max_linear_vel - ) - - self.cmd_vel_pub.publish(cmd) - def stop(self): - cmd = Twist() - self.cmd_vel_pub.publish(cmd) - - def return_to_launch(self): - if self.home_position is not None: - self.target_pose = PoseStamped() - self.target_pose.header.frame_id = 'odom' - self.target_pose.pose.position.x = float(self.home_position[0]) - self.target_pose.pose.position.y = float(self.home_position[1]) - self.target_pose.pose.orientation.w = 1.0 - - self.state = UGVState.MOVING - self.get_logger().info('Returning to local home') - - def normalize_angle(self, angle): - while angle > np.pi: - angle -= 2 * np.pi - while angle < -np.pi: - angle += 2 * np.pi - return angle - - def publish_status(self): - status_msg = String() - status_msg.data = self.state.name - self.status_pub.publish(status_msg) + self.send_velocity(0, 0, 0) - -def main(args=None): - rclpy.init(args=args) - node = UGVController() - try: - rclpy.spin(node) - except KeyboardInterrupt: - pass - finally: - node.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main() + def distance_to(self, x: float, y: float) -> float: + dx = x - self.position['x'] + dy = y - self.position['y'] + return math.sqrt(dx**2 + dy**2) diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..a9dee5d --- /dev/null +++ b/src/main.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 + +import sys +import os +import time +import argparse +from time import sleep + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from control.uav_controller import Controller, HOLD_ALT +from control.ugv_controller import UGVController +from utils.helpers import clamp, distance_2d, PIDController, LowPassFilter +from utils.transforms import normalize_angle, body_to_world, world_to_body + + +def mission_hover(ctrl: Controller, altitude: float = HOLD_ALT, + duration: float = 30.0): + print("\n" + "=" * 50) + print(f" HOVER MISSION: {altitude}m for {duration}s") + print("=" * 50 + "\n") + + ctrl.set_param('ARMING_CHECK', 0) + ctrl.set_param('SCHED_LOOP_RATE', 200) + ctrl.set_param('FS_THR_ENABLE', 0) + ctrl.set_param('FS_GCS_ENABLE', 0) + sleep(2) + + ctrl.wait_for_gps() + + if not ctrl.arm(): + print("[UAV] Cannot arm - aborting mission") + return + + ctrl.takeoff(altitude) + ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30) + + print(f"[UAV] Hovering for {duration}s ...") + t0 = time.time() + while time.time() - t0 < duration: + ctrl.update_state() + remaining = duration - (time.time() - t0) + print(f"\r[UAV] Hovering: {remaining:.0f}s remaining Alt: {ctrl.altitude:.1f}m ", + end='', flush=True) + sleep(0.5) + + print("\n[UAV] Hover complete") + ctrl.land() + + print("[UAV] Waiting for landing ...") + t0 = time.time() + while time.time() - t0 < 30: + ctrl.update_state() + if ctrl.altitude < 0.3: + break + sleep(0.5) + print("[UAV] Landed!") + + +def mission_square(ctrl: Controller, altitude: float = HOLD_ALT, + side: float = 5.0): + print("\n" + "=" * 50) + print(f" SQUARE MISSION: {side}m sides at {altitude}m") + print("=" * 50 + "\n") + + ctrl.set_param('ARMING_CHECK', 0) + ctrl.set_param('SCHED_LOOP_RATE', 200) + ctrl.set_param('FS_THR_ENABLE', 0) + ctrl.set_param('FS_GCS_ENABLE', 0) + sleep(2) + + ctrl.wait_for_gps() + + if not ctrl.arm(): + print("[UAV] Cannot arm - aborting mission") + return + + ctrl.takeoff(altitude) + ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30) + + waypoints = [ + (side, 0, 0), + (0, side, 0), + (-side, 0, 0), + (0, -side, 0), + ] + + for i, (x, y, z) in enumerate(waypoints): + print(f"[UAV] Waypoint {i+1}/4: move ({x}, {y}, {z})") + ctrl.move_pos_rel(x, y, z) + sleep(5) + + print("[UAV] Square complete") + ctrl.land() + + print("[UAV] Waiting for landing ...") + t0 = time.time() + while time.time() - t0 < 30: + ctrl.update_state() + if ctrl.altitude < 0.3: + break + sleep(0.5) + print("[UAV] Landed!") + + +def mission_search(ctrl: Controller, ugv: UGVController, + altitude: float = HOLD_ALT): + print("\n" + "=" * 50) + print(f" SEARCH MISSION at {altitude}m") + print("=" * 50 + "\n") + + ctrl.set_param('ARMING_CHECK', 0) + ctrl.set_param('SCHED_LOOP_RATE', 200) + ctrl.set_param('FS_THR_ENABLE', 0) + ctrl.set_param('FS_GCS_ENABLE', 0) + sleep(2) + + ctrl.wait_for_gps() + + if not ctrl.arm(): + print("[UAV] Cannot arm - aborting mission") + return + + ctrl.takeoff(altitude) + ctrl.wait_altitude(altitude, tolerance=1.0, timeout=30) + + ugv_pos = ugv.get_position() + print(f"[UAV] UGV target at ({ugv_pos['x']:.1f}, {ugv_pos['y']:.1f})") + + distance_step = 3.0 + increment = 2.0 + travel_x = True + direction = 1 + MAX_LEGS = 12 + + for leg in range(MAX_LEGS): + ctrl.update_state() + uav_pos = ctrl.get_local_position() + dist_to_ugv = distance_2d( + (uav_pos['x'], uav_pos['y']), + (ugv_pos['x'], ugv_pos['y']) + ) + print(f"[UAV] Spiral leg {leg+1}/{MAX_LEGS} dist_to_ugv: {dist_to_ugv:.1f}m") + + if dist_to_ugv < 2.0: + print("[UAV] UGV found! Landing nearby.") + ctrl.land() + t0 = time.time() + while time.time() - t0 < 30: + ctrl.update_state() + if ctrl.altitude < 0.3: + break + sleep(0.5) + print("[UAV] Landed on UGV!") + return + + if travel_x: + ctrl.move_pos_rel(distance_step * direction, 0, 0) + else: + ctrl.move_pos_rel(0, distance_step * direction, 0) + direction *= -1 + distance_step += increment + + travel_x = not travel_x + sleep(4) + + print("[UAV] Search complete - UGV not found, landing") + ctrl.land() + + t0 = time.time() + while time.time() - t0 < 30: + ctrl.update_state() + if ctrl.altitude < 0.3: + break + sleep(0.5) + print("[UAV] Landed!") + + +def main(): + parser = argparse.ArgumentParser(description='UAV-UGV Simulation') + parser.add_argument('--device', default='sim', choices=['sim', 'real']) + parser.add_argument('--connection', default=None) + parser.add_argument('--mission', default='hover', choices=['hover', 'square', 'search']) + parser.add_argument('--altitude', type=float, default=HOLD_ALT) + parser.add_argument('--duration', type=float, default=30.0) + parser.add_argument('--ugv-x', type=float, default=10.0) + parser.add_argument('--ugv-y', type=float, default=5.0) + parser.add_argument('--ugv-connection', default=None) + args = parser.parse_args() + + if args.connection: + conn_str = args.connection + elif args.device == 'real': + conn_str = '/dev/ttyAMA0' + else: + conn_str = 'tcp:127.0.0.1:5760' + + ugv = UGVController( + connection_string=args.ugv_connection, + static_pos=(args.ugv_x, args.ugv_y), + ) + + ctrl = Controller(conn_str) + + if args.mission == 'hover': + mission_hover(ctrl, altitude=args.altitude, duration=args.duration) + elif args.mission == 'square': + mission_square(ctrl, altitude=args.altitude) + elif args.mission == 'search': + mission_search(ctrl, ugv, altitude=args.altitude) + + print("[MAIN] Mission finished.") + + +if __name__ == '__main__': + main() diff --git a/src/utils/__init__.py b/src/utils/__init__.py index e40643e..e69de29 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -1 +0,0 @@ -"""Utility functions and helpers.""" diff --git a/worlds/uav_ugv_search.sdf b/worlds/uav_ugv_search.sdf new file mode 100644 index 0000000..4959792 --- /dev/null +++ b/worlds/uav_ugv_search.sdf @@ -0,0 +1,320 @@ + + + + + + + + 0.002 + 1.0 + + + + + + + ogre2 + + + + + + + + + + + + + 1.0 1.0 1.0 + 0.6 0.75 0.9 + + false + + + + + -35.363262 + 149.165237 + 584 + 0 + EARTH_WGS84 + + + + + false + 0 0 10 0 0 0 + 0.9 0.9 0.9 1 + 0.5 0.5 0.5 1 + + 1000 + 0.9 + 0.01 + 0.001 + + -0.5 0.1 -0.9 + + + + + true + + + + + 0 0 1 + 100 100 + + + + + + + 0 0 1 + 100 100 + + + + 0.5 0.55 0.45 1 + 0.5 0.55 0.45 1 + + + + + + + + 1 + + + + 0 + 5 0 0.05 0 0 0 + 10 0.02 0.02 + + 1 0 0 0.8 + 1 0 0 0.8 + 1 0 0 0.5 + + + + + 0 + 0 5 0.05 0 0 0 + 0.02 10 0.02 + + 0 1 0 0.8 + 0 1 0 0.8 + 0 1 0 0.5 + + + + + 0 + 0 0 5.05 0 0 0 + 0.02 0.02 10 + + 0 0 1 0.8 + 0 0 1 0.8 + 0 0 1 0.5 + + + + + 1 + 1 + + + + + + + + model://iris_with_gimbal + 0 0 0.195 0 0 90 + + + + + + true + 10 5 0 0 0 0 + + + + 0 0 0.15 0 0 0 + + 0.6 0.4 0.2 + + 0.2 0.2 0.7 1 + 0.2 0.2 0.8 1 + + + + 0.6 0.4 0.2 + + + + + + 0 0 0.26 0 0 0 + + 0.5 0.5 0.01 + + 1.0 0.5 0.0 1 + 1.0 0.5 0.0 1 + 0.5 0.25 0.0 1 + + + + + + + 0 0 0.27 0 0 0 + + 0.3 0.04 0.005 + + 1 1 1 1 + 1 1 1 1 + 1 1 1 0.8 + + + + + -0.12 0 0.27 0 0 0 + + 0.04 0.25 0.005 + + 1 1 1 1 + 1 1 1 1 + 1 1 1 0.8 + + + + + 0.12 0 0.27 0 0 0 + + 0.04 0.25 0.005 + + 1 1 1 1 + 1 1 1 1 + 1 1 1 0.8 + + + + + + + 0.2 0.22 0.06 1.5708 0 0 + 0.060.04 + 0.15 0.15 0.15 10.15 0.15 0.15 1 + + + + 0.2 -0.22 0.06 1.5708 0 0 + 0.060.04 + 0.15 0.15 0.15 10.15 0.15 0.15 1 + + + + -0.2 0.22 0.06 1.5708 0 0 + 0.060.04 + 0.15 0.15 0.15 10.15 0.15 0.15 1 + + + + -0.2 -0.22 0.06 1.5708 0 0 + 0.060.04 + 0.15 0.15 0.15 10.15 0.15 0.15 1 + + + + + + + + + + true + 0 0 0.005 0 0 0 + + + 0.40.01 + + 1 1 0 1 + 1 1 0 1 + 0.8 0.8 0 0.6 + + + + + + + + true + 5 0 0.005 0 0 0 + + + 0.5 0.5 0.01 + + 1 0.1 0.1 1 + 1 0.1 0.1 1 + 0.8 0 0 0.5 + + + + + + + + true + 10 0 0.005 0 0 0 + + + 0.5 0.5 0.01 + + 0.1 1 0.1 1 + 0.1 1 0.1 1 + 0 0.8 0 0.5 + + + + + + + + true + 10 10 0.005 0 0 0 + + + 0.5 0.5 0.01 + + 0.1 0.1 1 1 + 0.1 0.1 1 1 + 0 0 0.8 0.5 + + + + + + + diff --git a/wsl_env.sh b/wsl_env.sh index 0710f96..d111c3e 100755 --- a/wsl_env.sh +++ b/wsl_env.sh @@ -1,34 +1,13 @@ #!/bin/bash -# WSL Environment Setup for UAV-UGV Simulation -# This file is sourced automatically when running the simulation in WSL +# WSL Environment for UAV-UGV Simulation -# DISPLAY configuration if [ -d "/mnt/wslg" ]; then - # WSLg (Windows 11) - native Wayland/X support export DISPLAY=:0 - export WAYLAND_DISPLAY="${WAYLAND_DISPLAY:-wayland-0}" else - # X server (Windows 10) - connect to VcXsrv or similar export DISPLAY=$(cat /etc/resolv.conf 2>/dev/null | grep nameserver | awk '{print $2}'):0 fi -# OpenGL settings for better compatibility export LIBGL_ALWAYS_INDIRECT=0 - -# Mesa driver overrides (helps with version requirements) export MESA_GL_VERSION_OVERRIDE=3.3 export MESA_GLSL_VERSION_OVERRIDE=330 - -# Gazebo specific settings export OGRE_RTT_MODE=Copy - -# Source Gazebo setup for proper resource paths -if [ -f /usr/share/gazebo/setup.bash ]; then - source /usr/share/gazebo/setup.bash -elif [ -f /usr/share/gazebo-11/setup.bash ]; then - source /usr/share/gazebo-11/setup.bash -fi - -# Uncomment the following line to force software rendering -# This is more stable but slower - use if you see rendering errors -# export LIBGL_ALWAYS_SOFTWARE=1