#!/usr/bin/env python3 """ Drone Simulation Host - PyBullet physics simulation (GPS-Denied). Includes downward camera for drone vision. Communicates via UDP on port 5555 for commands, 5556 for telemetry. """ import base64 import json import math import socket import time from typing import Dict, Any, Optional, Tuple import numpy as np import pybullet as p import pybullet_data # Load configuration try: from config import DRONE, ROVER, CAMERA, PHYSICS, NETWORK CONFIG_LOADED = True except ImportError: CONFIG_LOADED = False DRONE = {"mass": 1.0, "size": (0.3, 0.3, 0.1), "color": (0.8, 0.1, 0.1, 1.0), "start_position": (0.0, 0.0, 5.0), "thrust_scale": 15.0, "pitch_torque_scale": 2.0, "roll_torque_scale": 2.0, "yaw_torque_scale": 1.0} ROVER = {"size": (1.0, 1.0, 0.3), "color": (0.2, 0.6, 0.2, 1.0), "start_position": (0.0, 0.0, 0.15)} CAMERA = {"width": 320, "height": 240, "fov": 60.0, "near_clip": 0.1, "far_clip": 20.0} PHYSICS = {"gravity": -9.81, "timestep": 1/240, "telemetry_rate": 24} NETWORK = {"host": "0.0.0.0", "command_port": 5555, "telemetry_port": 5556} class DroneSimulator: """PyBullet-based drone simulation with camera.""" def __init__(self): print("=" * 60) print("Drone Simulation Host (GPS-Denied + Camera)") if CONFIG_LOADED: print(" Configuration loaded from config.py") print("=" * 60) # Load from config self._udp_host = NETWORK.get("host", "0.0.0.0") self._udp_port = NETWORK.get("command_port", 5555) self._telemetry_port = NETWORK.get("telemetry_port", 5556) self._gravity = PHYSICS.get("gravity", -9.81) self._timestep = PHYSICS.get("timestep", 1/240) self._telemetry_interval = int(240 / PHYSICS.get("telemetry_rate", 24)) self._drone_mass = DRONE.get("mass", 1.0) self._drone_size = DRONE.get("size", (0.3, 0.3, 0.1)) self._drone_start = DRONE.get("start_position", (0.0, 0.0, 5.0)) self._drone_color = DRONE.get("color", (0.8, 0.1, 0.1, 1.0)) self._thrust_scale = DRONE.get("thrust_scale", 15.0) self._pitch_torque_scale = DRONE.get("pitch_torque_scale", 2.0) self._roll_torque_scale = DRONE.get("roll_torque_scale", 2.0) self._yaw_torque_scale = DRONE.get("yaw_torque_scale", 1.0) self._hover_thrust = self._drone_mass * abs(self._gravity) self._rover_size = ROVER.get("size", (1.0, 1.0, 0.3)) self._rover_start = list(ROVER.get("start_position", (0.0, 0.0, 0.15))) self._rover_color = ROVER.get("color", (0.2, 0.6, 0.2, 1.0)) self._camera_width = CAMERA.get("width", 320) self._camera_height = CAMERA.get("height", 240) self._camera_fov = CAMERA.get("fov", 60.0) self._camera_near = CAMERA.get("near_clip", 0.1) self._camera_far = CAMERA.get("far_clip", 20.0) self._camera_interval = 5 self._running = True self._step_count = 0 self._camera_frame_count = 0 self._last_command: Dict[str, float] = { 'thrust': 0.0, 'pitch': 0.0, 'roll': 0.0, 'yaw': 0.0 } self._rover_pos = list(self._rover_start) self._last_camera_image = None self._init_physics() self._init_objects() self._init_network() print("Simulation Ready! (GPS-Denied Mode)") print(" Sensors: IMU, Altimeter, Camera") print(f" Camera: {self._camera_width}x{self._camera_height} @ {self._camera_fov}° FOV") print(f" Listening on UDP port {self._udp_port}") print("=" * 60) def _init_physics(self) -> None: print("Initializing PyBullet physics engine...") self._physics_client = p.connect(p.GUI) p.setGravity(0, 0, self._gravity) p.setTimeStep(self._timestep) p.resetDebugVisualizerCamera( cameraDistance=8.0, cameraYaw=45, cameraPitch=-30, cameraTargetPosition=[0, 0, 1] ) p.setAdditionalSearchPath(pybullet_data.getDataPath()) print(" Physics engine initialized (240 Hz)") def _init_objects(self) -> None: print("Creating simulation objects...") self._ground_id = p.loadURDF("plane.urdf") print(" Ground plane loaded") rover_collision = p.createCollisionShape( p.GEOM_BOX, halfExtents=[s/2 for s in self._rover_size] ) rover_visual = p.createVisualShape( p.GEOM_BOX, halfExtents=[s/2 for s in self._rover_size], rgbaColor=list(self._rover_color) ) self._rover_id = p.createMultiBody( baseMass=0, baseCollisionShapeIndex=rover_collision, baseVisualShapeIndex=rover_visual, basePosition=self._rover_start ) print(f" Rover created at position {self._rover_start}") self._create_landing_pad_marker() drone_collision = p.createCollisionShape( p.GEOM_BOX, halfExtents=[s/2 for s in self._drone_size] ) drone_visual = p.createVisualShape( p.GEOM_BOX, halfExtents=[s/2 for s in self._drone_size], rgbaColor=list(self._drone_color) ) self._drone_id = p.createMultiBody( baseMass=self._drone_mass, baseCollisionShapeIndex=drone_collision, baseVisualShapeIndex=drone_visual, basePosition=self._drone_start ) p.changeDynamics( self._drone_id, -1, linearDamping=0.1, angularDamping=0.5 ) print(f" Drone created at position {self._drone_start}") def _create_landing_pad_marker(self) -> None: marker_height = self._rover_start[2] + self._rover_size[2] / 2 + 0.01 h_size = 0.3 line_color = [1, 1, 1] line_width = 3 p.addUserDebugLine( [-h_size, 0, marker_height], [h_size, 0, marker_height], lineColorRGB=line_color, lineWidth=line_width ) p.addUserDebugLine( [-h_size, -h_size, marker_height], [-h_size, h_size, marker_height], lineColorRGB=line_color, lineWidth=line_width ) p.addUserDebugLine( [h_size, -h_size, marker_height], [h_size, h_size, marker_height], lineColorRGB=line_color, lineWidth=line_width ) def _init_network(self) -> None: print("Initializing network...") self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._socket.settimeout(0.001) self._socket.bind((self._udp_host, self._udp_port)) self._controller_addr: Optional[Tuple[str, int]] = None print(f" UDP socket bound to {self._udp_host}:{self._udp_port}") def run(self) -> None: print("\nStarting simulation loop...") print("Press Ctrl+C to exit\n") try: while self._running: loop_start = time.time() if not p.isConnected(): print("GUI window closed, exiting...") break self._receive_commands() self._apply_drone_controls() p.stepSimulation() self._step_count += 1 if self._step_count % self._telemetry_interval == 0: self._send_telemetry() elapsed = time.time() - loop_start sleep_time = self._timestep - elapsed if sleep_time > 0: time.sleep(sleep_time) except KeyboardInterrupt: print("\nKeyboard interrupt received") finally: self.shutdown() def _receive_commands(self) -> None: try: data, addr = self._socket.recvfrom(65535) self._controller_addr = (addr[0], self._telemetry_port) try: command = json.loads(data.decode('utf-8')) # Drone control commands self._last_command = { 'thrust': float(command.get('thrust', 0.0)), 'pitch': float(command.get('pitch', 0.0)), 'roll': float(command.get('roll', 0.0)), 'yaw': float(command.get('yaw', 0.0)) } # Rover position update (if provided) if 'rover' in command: rover_data = command['rover'] new_x = float(rover_data.get('x', self._rover_pos[0])) new_y = float(rover_data.get('y', self._rover_pos[1])) new_z = float(rover_data.get('z', self._rover_pos[2])) self._rover_pos = [new_x, new_y, new_z] p.resetBasePositionAndOrientation( self._rover_id, self._rover_pos, [0, 0, 0, 1] ) except (json.JSONDecodeError, ValueError) as e: print(f"Invalid command received: {e}") except socket.timeout: pass except socket.error as e: print(f"Socket error: {e}") def _apply_drone_controls(self) -> None: pos, orn = p.getBasePositionAndOrientation(self._drone_id) rot_matrix = p.getMatrixFromQuaternion(orn) local_up = [rot_matrix[2], rot_matrix[5], rot_matrix[8]] thrust_command = self._last_command['thrust'] total_thrust = self._hover_thrust + (thrust_command * self._thrust_scale) total_thrust = max(0, total_thrust) thrust_force = [ local_up[0] * total_thrust, local_up[1] * total_thrust, local_up[2] * total_thrust ] p.applyExternalForce( self._drone_id, -1, forceObj=thrust_force, posObj=pos, flags=p.WORLD_FRAME ) pitch_torque = self._last_command['pitch'] * self._pitch_torque_scale roll_torque = self._last_command['roll'] * self._roll_torque_scale yaw_torque = self._last_command['yaw'] * self._yaw_torque_scale p.applyExternalTorque( self._drone_id, -1, torqueObj=[pitch_torque, roll_torque, yaw_torque], flags=p.LINK_FRAME ) def _capture_camera_image(self) -> Optional[str]: """Capture image from drone's downward-facing camera. Returns base64 encoded JPEG.""" pos, orn = p.getBasePositionAndOrientation(self._drone_id) euler = p.getEulerFromQuaternion(orn) target_pos = [pos[0], pos[1], 0] view_matrix = p.computeViewMatrix( cameraEyePosition=pos, cameraTargetPosition=target_pos, cameraUpVector=[math.cos(euler[2]), math.sin(euler[2]), 0] ) aspect = self._camera_width / self._camera_height projection_matrix = p.computeProjectionMatrixFOV( fov=self._camera_fov, aspect=aspect, nearVal=self._camera_near, farVal=self._camera_far ) _, _, rgb, _, _ = p.getCameraImage( width=self._camera_width, height=self._camera_height, viewMatrix=view_matrix, projectionMatrix=projection_matrix, renderer=p.ER_BULLET_HARDWARE_OPENGL ) image = np.array(rgb, dtype=np.uint8).reshape( self._camera_height, self._camera_width, 4 )[:, :, :3] # Encode as base64 for transmission try: from PIL import Image import io pil_img = Image.fromarray(image) buffer = io.BytesIO() pil_img.save(buffer, format='JPEG', quality=70) return base64.b64encode(buffer.getvalue()).decode('utf-8') except ImportError: # Return raw RGB values as base64 if PIL not available return base64.b64encode(image.tobytes()).decode('utf-8') def _get_landing_pad_detection(self) -> Optional[Dict[str, float]]: """Simulate downward camera detecting landing pad.""" drone_pos, drone_orn = p.getBasePositionAndOrientation(self._drone_id) rover_pos, _ = p.getBasePositionAndOrientation(self._rover_id) dx = rover_pos[0] - drone_pos[0] dy = rover_pos[1] - drone_pos[1] dz = rover_pos[2] - drone_pos[2] horizontal_dist = math.sqrt(dx**2 + dy**2) vertical_dist = -dz if vertical_dist <= 0 or vertical_dist > 10.0: return None fov_rad = math.radians(self._camera_fov / 2) max_horizontal = vertical_dist * math.tan(fov_rad) if horizontal_dist > max_horizontal: return None rot_matrix = p.getMatrixFromQuaternion(drone_orn) body_x = [rot_matrix[0], rot_matrix[3], rot_matrix[6]] body_y = [rot_matrix[1], rot_matrix[4], rot_matrix[7]] relative_x = dx * body_x[0] + dy * body_x[1] + dz * body_x[2] relative_y = dx * body_y[0] + dy * body_y[1] + dz * body_y[2] angle = math.atan2(horizontal_dist, vertical_dist) confidence = max(0.0, 1.0 - (angle / fov_rad)) confidence *= max(0.0, 1.0 - (vertical_dist / 10.0)) return { "relative_x": round(relative_x, 4), "relative_y": round(relative_y, 4), "distance": round(vertical_dist, 4), "confidence": round(confidence, 4) } def _send_telemetry(self) -> None: if self._controller_addr is None: return pos, orn = p.getBasePositionAndOrientation(self._drone_id) vel, ang_vel = p.getBaseVelocity(self._drone_id) euler = p.getEulerFromQuaternion(orn) landed_on_rover = self._check_landing() pad_detection = self._get_landing_pad_detection() # Capture camera image periodically self._camera_frame_count += 1 camera_image = None if self._camera_frame_count >= self._camera_interval: self._camera_frame_count = 0 camera_image = self._capture_camera_image() telemetry = { "imu": { "orientation": { "roll": round(euler[0], 4), "pitch": round(euler[1], 4), "yaw": round(euler[2], 4) }, "angular_velocity": { "x": round(ang_vel[0], 4), "y": round(ang_vel[1], 4), "z": round(ang_vel[2], 4) }, "linear_acceleration": { "x": 0.0, "y": 0.0, "z": round(-self._gravity, 4) } }, "altimeter": { "altitude": round(pos[2], 4), "vertical_velocity": round(vel[2], 4) }, "velocity": { "x": round(vel[0], 4), "y": round(vel[1], 4), "z": round(vel[2], 4) }, "landing_pad": pad_detection, "camera": { "width": self._camera_width, "height": self._camera_height, "fov": self._camera_fov, "image": camera_image # Base64 encoded JPEG (or None) }, "landed": landed_on_rover, "timestamp": round(self._step_count * self._timestep, 4) } try: json_data = json.dumps(telemetry).encode('utf-8') self._socket.sendto(json_data, self._controller_addr) except socket.error as e: print(f"Failed to send telemetry: {e}") def _check_landing(self) -> bool: contact_points = p.getContactPoints( bodyA=self._drone_id, bodyB=self._rover_id ) return len(contact_points) > 0 def shutdown(self) -> None: print("\nShutting down simulation...") self._running = False try: self._socket.close() print(" Socket closed") except socket.error: pass if p.isConnected(): p.disconnect() print(" PyBullet disconnected") print("Shutdown complete") def main(): print("\n" + "=" * 60) print(" DRONE LANDING SIMULATION (GPS-DENIED + CAMERA)") print("=" * 60 + "\n") try: simulator = DroneSimulator() simulator.run() except Exception as e: print(f"Fatal error: {e}") raise if __name__ == '__main__': main()