459 lines
15 KiB
Python
459 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Drone Simulation Host - PyBullet physics simulation (GPS-Denied).
|
|
Includes downward camera for drone vision.
|
|
Communicates via UDP on port 5555 for commands, 5556 for telemetry.
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import math
|
|
import socket
|
|
import time
|
|
from typing import Dict, Any, Optional, Tuple
|
|
|
|
import numpy as np
|
|
import pybullet as p
|
|
import pybullet_data
|
|
|
|
|
|
class DroneSimulator:
|
|
"""PyBullet-based drone simulation with camera."""
|
|
|
|
UDP_HOST = '0.0.0.0'
|
|
UDP_PORT = 5555
|
|
UDP_BUFFER_SIZE = 65535
|
|
SOCKET_TIMEOUT = 0.001
|
|
TELEMETRY_PORT = 5556
|
|
|
|
PHYSICS_TIMESTEP = 1.0 / 240.0
|
|
GRAVITY = -9.81
|
|
TELEMETRY_INTERVAL = 10
|
|
|
|
DRONE_MASS = 1.0
|
|
DRONE_SIZE = (0.3, 0.3, 0.1)
|
|
DRONE_START_POS = (0.0, 0.0, 5.0)
|
|
|
|
THRUST_SCALE = 15.0
|
|
PITCH_TORQUE_SCALE = 2.0
|
|
ROLL_TORQUE_SCALE = 2.0
|
|
YAW_TORQUE_SCALE = 1.0
|
|
HOVER_THRUST = DRONE_MASS * abs(GRAVITY)
|
|
|
|
ROVER_SIZE = (1.0, 1.0, 0.3)
|
|
ROVER_POS = (0.0, 0.0, 0.15)
|
|
|
|
# Camera parameters
|
|
CAMERA_WIDTH = 320
|
|
CAMERA_HEIGHT = 240
|
|
CAMERA_FOV = 60.0
|
|
CAMERA_NEAR = 0.1
|
|
CAMERA_FAR = 20.0
|
|
CAMERA_INTERVAL = 5 # Capture every N telemetry frames
|
|
|
|
def __init__(self):
|
|
print("=" * 60)
|
|
print("Drone Simulation Host (GPS-Denied + Camera)")
|
|
print("=" * 60)
|
|
|
|
self._running = True
|
|
self._step_count = 0
|
|
self._camera_frame_count = 0
|
|
self._last_command: Dict[str, float] = {
|
|
'thrust': 0.0, 'pitch': 0.0, 'roll': 0.0, 'yaw': 0.0
|
|
}
|
|
|
|
self._rover_pos = list(self.ROVER_POS)
|
|
self._last_camera_image = None
|
|
|
|
self._init_physics()
|
|
self._init_objects()
|
|
self._init_network()
|
|
|
|
print("Simulation Ready! (GPS-Denied Mode)")
|
|
print(" Sensors: IMU, Altimeter, Camera")
|
|
print(f" Camera: {self.CAMERA_WIDTH}x{self.CAMERA_HEIGHT} @ {self.CAMERA_FOV}° FOV")
|
|
print(f" Listening on UDP port {self.UDP_PORT}")
|
|
print("=" * 60)
|
|
|
|
def _init_physics(self) -> None:
|
|
print("Initializing PyBullet physics engine...")
|
|
self._physics_client = p.connect(p.GUI)
|
|
p.setGravity(0, 0, self.GRAVITY)
|
|
p.setTimeStep(self.PHYSICS_TIMESTEP)
|
|
p.resetDebugVisualizerCamera(
|
|
cameraDistance=8.0,
|
|
cameraYaw=45,
|
|
cameraPitch=-30,
|
|
cameraTargetPosition=[0, 0, 1]
|
|
)
|
|
p.setAdditionalSearchPath(pybullet_data.getDataPath())
|
|
print(" Physics engine initialized (240 Hz)")
|
|
|
|
def _init_objects(self) -> None:
|
|
print("Creating simulation objects...")
|
|
|
|
self._ground_id = p.loadURDF("plane.urdf")
|
|
print(" Ground plane loaded")
|
|
|
|
rover_collision = p.createCollisionShape(
|
|
p.GEOM_BOX,
|
|
halfExtents=[s/2 for s in self.ROVER_SIZE]
|
|
)
|
|
rover_visual = p.createVisualShape(
|
|
p.GEOM_BOX,
|
|
halfExtents=[s/2 for s in self.ROVER_SIZE],
|
|
rgbaColor=[0.2, 0.6, 0.2, 1.0]
|
|
)
|
|
self._rover_id = p.createMultiBody(
|
|
baseMass=0,
|
|
baseCollisionShapeIndex=rover_collision,
|
|
baseVisualShapeIndex=rover_visual,
|
|
basePosition=self.ROVER_POS
|
|
)
|
|
print(f" Rover created at position {self.ROVER_POS}")
|
|
|
|
self._create_landing_pad_marker()
|
|
|
|
drone_collision = p.createCollisionShape(
|
|
p.GEOM_BOX,
|
|
halfExtents=[s/2 for s in self.DRONE_SIZE]
|
|
)
|
|
drone_visual = p.createVisualShape(
|
|
p.GEOM_BOX,
|
|
halfExtents=[s/2 for s in self.DRONE_SIZE],
|
|
rgbaColor=[0.8, 0.2, 0.2, 1.0]
|
|
)
|
|
self._drone_id = p.createMultiBody(
|
|
baseMass=self.DRONE_MASS,
|
|
baseCollisionShapeIndex=drone_collision,
|
|
baseVisualShapeIndex=drone_visual,
|
|
basePosition=self.DRONE_START_POS
|
|
)
|
|
p.changeDynamics(
|
|
self._drone_id, -1,
|
|
linearDamping=0.1,
|
|
angularDamping=0.5
|
|
)
|
|
print(f" Drone created at position {self.DRONE_START_POS}")
|
|
|
|
def _create_landing_pad_marker(self) -> None:
|
|
marker_height = self.ROVER_POS[2] + self.ROVER_SIZE[2] / 2 + 0.01
|
|
h_size = 0.3
|
|
line_color = [1, 1, 1]
|
|
line_width = 3
|
|
|
|
p.addUserDebugLine(
|
|
[-h_size, 0, marker_height],
|
|
[h_size, 0, marker_height],
|
|
lineColorRGB=line_color,
|
|
lineWidth=line_width
|
|
)
|
|
p.addUserDebugLine(
|
|
[-h_size, -h_size, marker_height],
|
|
[-h_size, h_size, marker_height],
|
|
lineColorRGB=line_color,
|
|
lineWidth=line_width
|
|
)
|
|
p.addUserDebugLine(
|
|
[h_size, -h_size, marker_height],
|
|
[h_size, h_size, marker_height],
|
|
lineColorRGB=line_color,
|
|
lineWidth=line_width
|
|
)
|
|
|
|
def _init_network(self) -> None:
|
|
print("Initializing network...")
|
|
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
self._socket.settimeout(self.SOCKET_TIMEOUT)
|
|
self._socket.bind((self.UDP_HOST, self.UDP_PORT))
|
|
self._controller_addr: Optional[Tuple[str, int]] = None
|
|
print(f" UDP socket bound to {self.UDP_HOST}:{self.UDP_PORT}")
|
|
|
|
def run(self) -> None:
|
|
print("\nStarting simulation loop...")
|
|
print("Press Ctrl+C to exit\n")
|
|
|
|
try:
|
|
while self._running:
|
|
loop_start = time.time()
|
|
|
|
if not p.isConnected():
|
|
print("GUI window closed, exiting...")
|
|
break
|
|
|
|
self._receive_commands()
|
|
self._apply_drone_controls()
|
|
p.stepSimulation()
|
|
self._step_count += 1
|
|
|
|
if self._step_count % self.TELEMETRY_INTERVAL == 0:
|
|
self._send_telemetry()
|
|
|
|
elapsed = time.time() - loop_start
|
|
sleep_time = self.PHYSICS_TIMESTEP - elapsed
|
|
if sleep_time > 0:
|
|
time.sleep(sleep_time)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nKeyboard interrupt received")
|
|
finally:
|
|
self.shutdown()
|
|
|
|
def _receive_commands(self) -> None:
|
|
try:
|
|
data, addr = self._socket.recvfrom(self.UDP_BUFFER_SIZE)
|
|
self._controller_addr = (addr[0], self.TELEMETRY_PORT)
|
|
|
|
try:
|
|
command = json.loads(data.decode('utf-8'))
|
|
|
|
# Drone control commands
|
|
self._last_command = {
|
|
'thrust': float(command.get('thrust', 0.0)),
|
|
'pitch': float(command.get('pitch', 0.0)),
|
|
'roll': float(command.get('roll', 0.0)),
|
|
'yaw': float(command.get('yaw', 0.0))
|
|
}
|
|
|
|
# Rover position update (if provided)
|
|
if 'rover' in command:
|
|
rover_data = command['rover']
|
|
new_x = float(rover_data.get('x', self._rover_pos[0]))
|
|
new_y = float(rover_data.get('y', self._rover_pos[1]))
|
|
new_z = float(rover_data.get('z', self._rover_pos[2]))
|
|
self._rover_pos = [new_x, new_y, new_z]
|
|
p.resetBasePositionAndOrientation(
|
|
self._rover_id,
|
|
self._rover_pos,
|
|
[0, 0, 0, 1]
|
|
)
|
|
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
print(f"Invalid command received: {e}")
|
|
|
|
except socket.timeout:
|
|
pass
|
|
except socket.error as e:
|
|
print(f"Socket error: {e}")
|
|
|
|
def _apply_drone_controls(self) -> None:
|
|
pos, orn = p.getBasePositionAndOrientation(self._drone_id)
|
|
rot_matrix = p.getMatrixFromQuaternion(orn)
|
|
local_up = [rot_matrix[2], rot_matrix[5], rot_matrix[8]]
|
|
|
|
thrust_command = self._last_command['thrust']
|
|
total_thrust = self.HOVER_THRUST + (thrust_command * self.THRUST_SCALE)
|
|
total_thrust = max(0, total_thrust)
|
|
|
|
thrust_force = [
|
|
local_up[0] * total_thrust,
|
|
local_up[1] * total_thrust,
|
|
local_up[2] * total_thrust
|
|
]
|
|
|
|
p.applyExternalForce(
|
|
self._drone_id, -1,
|
|
forceObj=thrust_force,
|
|
posObj=pos,
|
|
flags=p.WORLD_FRAME
|
|
)
|
|
|
|
pitch_torque = self._last_command['pitch'] * self.PITCH_TORQUE_SCALE
|
|
roll_torque = self._last_command['roll'] * self.ROLL_TORQUE_SCALE
|
|
yaw_torque = self._last_command['yaw'] * self.YAW_TORQUE_SCALE
|
|
|
|
p.applyExternalTorque(
|
|
self._drone_id, -1,
|
|
torqueObj=[pitch_torque, roll_torque, yaw_torque],
|
|
flags=p.LINK_FRAME
|
|
)
|
|
|
|
def _capture_camera_image(self) -> Optional[str]:
|
|
"""Capture image from drone's downward-facing camera. Returns base64 encoded JPEG."""
|
|
pos, orn = p.getBasePositionAndOrientation(self._drone_id)
|
|
euler = p.getEulerFromQuaternion(orn)
|
|
|
|
target_pos = [pos[0], pos[1], 0]
|
|
|
|
view_matrix = p.computeViewMatrix(
|
|
cameraEyePosition=pos,
|
|
cameraTargetPosition=target_pos,
|
|
cameraUpVector=[math.cos(euler[2]), math.sin(euler[2]), 0]
|
|
)
|
|
|
|
aspect = self.CAMERA_WIDTH / self.CAMERA_HEIGHT
|
|
projection_matrix = p.computeProjectionMatrixFOV(
|
|
fov=self.CAMERA_FOV,
|
|
aspect=aspect,
|
|
nearVal=self.CAMERA_NEAR,
|
|
farVal=self.CAMERA_FAR
|
|
)
|
|
|
|
_, _, rgb, _, _ = p.getCameraImage(
|
|
width=self.CAMERA_WIDTH,
|
|
height=self.CAMERA_HEIGHT,
|
|
viewMatrix=view_matrix,
|
|
projectionMatrix=projection_matrix,
|
|
renderer=p.ER_BULLET_HARDWARE_OPENGL
|
|
)
|
|
|
|
image = np.array(rgb, dtype=np.uint8).reshape(
|
|
self.CAMERA_HEIGHT, self.CAMERA_WIDTH, 4
|
|
)[:, :, :3]
|
|
|
|
# Encode as base64 for transmission
|
|
try:
|
|
from PIL import Image
|
|
import io
|
|
pil_img = Image.fromarray(image)
|
|
buffer = io.BytesIO()
|
|
pil_img.save(buffer, format='JPEG', quality=70)
|
|
return base64.b64encode(buffer.getvalue()).decode('utf-8')
|
|
except ImportError:
|
|
# Return raw RGB values as base64 if PIL not available
|
|
return base64.b64encode(image.tobytes()).decode('utf-8')
|
|
|
|
def _get_landing_pad_detection(self) -> Optional[Dict[str, float]]:
|
|
"""Simulate downward camera detecting landing pad."""
|
|
drone_pos, drone_orn = p.getBasePositionAndOrientation(self._drone_id)
|
|
rover_pos, _ = p.getBasePositionAndOrientation(self._rover_id)
|
|
|
|
dx = rover_pos[0] - drone_pos[0]
|
|
dy = rover_pos[1] - drone_pos[1]
|
|
dz = rover_pos[2] - drone_pos[2]
|
|
|
|
horizontal_dist = math.sqrt(dx**2 + dy**2)
|
|
vertical_dist = -dz
|
|
|
|
if vertical_dist <= 0 or vertical_dist > 10.0:
|
|
return None
|
|
|
|
fov_rad = math.radians(self.CAMERA_FOV / 2)
|
|
max_horizontal = vertical_dist * math.tan(fov_rad)
|
|
|
|
if horizontal_dist > max_horizontal:
|
|
return None
|
|
|
|
rot_matrix = p.getMatrixFromQuaternion(drone_orn)
|
|
body_x = [rot_matrix[0], rot_matrix[3], rot_matrix[6]]
|
|
body_y = [rot_matrix[1], rot_matrix[4], rot_matrix[7]]
|
|
|
|
relative_x = dx * body_x[0] + dy * body_x[1] + dz * body_x[2]
|
|
relative_y = dx * body_y[0] + dy * body_y[1] + dz * body_y[2]
|
|
|
|
angle = math.atan2(horizontal_dist, vertical_dist)
|
|
confidence = max(0.0, 1.0 - (angle / fov_rad))
|
|
confidence *= max(0.0, 1.0 - (vertical_dist / 10.0))
|
|
|
|
return {
|
|
"relative_x": round(relative_x, 4),
|
|
"relative_y": round(relative_y, 4),
|
|
"distance": round(vertical_dist, 4),
|
|
"confidence": round(confidence, 4)
|
|
}
|
|
|
|
def _send_telemetry(self) -> None:
|
|
if self._controller_addr is None:
|
|
return
|
|
|
|
pos, orn = p.getBasePositionAndOrientation(self._drone_id)
|
|
vel, ang_vel = p.getBaseVelocity(self._drone_id)
|
|
euler = p.getEulerFromQuaternion(orn)
|
|
landed_on_rover = self._check_landing()
|
|
|
|
pad_detection = self._get_landing_pad_detection()
|
|
|
|
# Capture camera image periodically
|
|
self._camera_frame_count += 1
|
|
camera_image = None
|
|
|
|
if self._camera_frame_count >= self.CAMERA_INTERVAL:
|
|
self._camera_frame_count = 0
|
|
camera_image = self._capture_camera_image()
|
|
|
|
telemetry = {
|
|
"imu": {
|
|
"orientation": {
|
|
"roll": round(euler[0], 4),
|
|
"pitch": round(euler[1], 4),
|
|
"yaw": round(euler[2], 4)
|
|
},
|
|
"angular_velocity": {
|
|
"x": round(ang_vel[0], 4),
|
|
"y": round(ang_vel[1], 4),
|
|
"z": round(ang_vel[2], 4)
|
|
},
|
|
"linear_acceleration": {
|
|
"x": 0.0,
|
|
"y": 0.0,
|
|
"z": round(-self.GRAVITY, 4)
|
|
}
|
|
},
|
|
"altimeter": {
|
|
"altitude": round(pos[2], 4),
|
|
"vertical_velocity": round(vel[2], 4)
|
|
},
|
|
"velocity": {
|
|
"x": round(vel[0], 4),
|
|
"y": round(vel[1], 4),
|
|
"z": round(vel[2], 4)
|
|
},
|
|
"landing_pad": pad_detection,
|
|
"camera": {
|
|
"width": self.CAMERA_WIDTH,
|
|
"height": self.CAMERA_HEIGHT,
|
|
"fov": self.CAMERA_FOV,
|
|
"image": camera_image # Base64 encoded JPEG (or None)
|
|
},
|
|
"landed": landed_on_rover,
|
|
"timestamp": round(self._step_count * self.PHYSICS_TIMESTEP, 4)
|
|
}
|
|
|
|
try:
|
|
json_data = json.dumps(telemetry).encode('utf-8')
|
|
self._socket.sendto(json_data, self._controller_addr)
|
|
except socket.error as e:
|
|
print(f"Failed to send telemetry: {e}")
|
|
|
|
def _check_landing(self) -> bool:
|
|
contact_points = p.getContactPoints(
|
|
bodyA=self._drone_id,
|
|
bodyB=self._rover_id
|
|
)
|
|
return len(contact_points) > 0
|
|
|
|
def shutdown(self) -> None:
|
|
print("\nShutting down simulation...")
|
|
self._running = False
|
|
|
|
try:
|
|
self._socket.close()
|
|
print(" Socket closed")
|
|
except socket.error:
|
|
pass
|
|
|
|
if p.isConnected():
|
|
p.disconnect()
|
|
print(" PyBullet disconnected")
|
|
|
|
print("Shutdown complete")
|
|
|
|
|
|
def main():
|
|
print("\n" + "=" * 60)
|
|
print(" DRONE LANDING SIMULATION (GPS-DENIED + CAMERA)")
|
|
print("=" * 60 + "\n")
|
|
|
|
try:
|
|
simulator = DroneSimulator()
|
|
simulator.run()
|
|
except Exception as e:
|
|
print(f"Fatal error: {e}")
|
|
raise
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|