241 lines
7.1 KiB
Python
241 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Record Drone Simulation with Flight
|
|
====================================
|
|
Runs the drone controller while recording the Gazebo simulation to video.
|
|
|
|
This script:
|
|
1. Starts screen recording (ffmpeg)
|
|
2. Runs the drone flight pattern
|
|
3. Stops recording and saves the video
|
|
|
|
Usage:
|
|
python scripts/record_flight.py --pattern square --duration 120
|
|
python scripts/record_flight.py --pattern circle --output my_flight
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import signal
|
|
import argparse
|
|
import subprocess
|
|
import threading
|
|
from datetime import datetime
|
|
|
|
# Add parent directory to path
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from src.drone_controller import DroneController
|
|
|
|
|
|
class SimulationRecorder:
|
|
"""Records the simulation display using ffmpeg."""
|
|
|
|
def __init__(self, output_dir="recordings", fps=30, quality="medium"):
|
|
self.output_dir = output_dir
|
|
self.fps = fps
|
|
self.quality = quality
|
|
self.process = None
|
|
self.recording = False
|
|
self.output_file = None
|
|
|
|
# Quality presets (CRF values for libx264)
|
|
self.quality_presets = {
|
|
"low": (28, "faster"),
|
|
"medium": (23, "medium"),
|
|
"high": (18, "slow")
|
|
}
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
def start(self, output_name=None):
|
|
"""Start recording."""
|
|
if self.recording:
|
|
print("[WARN] Already recording")
|
|
return False
|
|
|
|
if not output_name:
|
|
output_name = f"flight_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
self.output_file = os.path.join(self.output_dir, f"{output_name}.mp4")
|
|
|
|
crf, preset = self.quality_presets.get(self.quality, (23, "medium"))
|
|
display = os.environ.get("DISPLAY", ":0")
|
|
|
|
# FFmpeg command for screen recording
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "x11grab",
|
|
"-framerate", str(self.fps),
|
|
"-i", display,
|
|
"-c:v", "libx264",
|
|
"-preset", preset,
|
|
"-crf", str(crf),
|
|
"-pix_fmt", "yuv420p",
|
|
self.output_file
|
|
]
|
|
|
|
print(f"[INFO] Starting recording: {self.output_file}")
|
|
|
|
try:
|
|
self.process = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL
|
|
)
|
|
self.recording = True
|
|
return True
|
|
except FileNotFoundError:
|
|
print("[ERROR] ffmpeg not found. Install with: sudo apt install ffmpeg")
|
|
return False
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed to start recording: {e}")
|
|
return False
|
|
|
|
def stop(self):
|
|
"""Stop recording."""
|
|
if not self.recording or not self.process:
|
|
return None
|
|
|
|
print("[INFO] Stopping recording...")
|
|
|
|
# Send 'q' to ffmpeg to stop gracefully
|
|
try:
|
|
self.process.communicate(input=b'q', timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
self.process.kill()
|
|
|
|
self.recording = False
|
|
|
|
if os.path.exists(self.output_file):
|
|
size = os.path.getsize(self.output_file)
|
|
print(f"[OK] Recording saved: {self.output_file} ({size / 1024 / 1024:.1f} MB)")
|
|
return self.output_file
|
|
else:
|
|
print("[WARN] Recording file not found")
|
|
return None
|
|
|
|
|
|
def run_recorded_flight(pattern="square", altitude=5.0, size=5.0, output_name=None, quality="medium"):
|
|
"""Run a drone flight while recording."""
|
|
|
|
print("=" * 50)
|
|
print(" Recorded Drone Flight")
|
|
print("=" * 50)
|
|
print(f" Pattern: {pattern}")
|
|
print(f" Altitude: {altitude}m")
|
|
print(f" Size: {size}m")
|
|
print(f" Recording Quality: {quality}")
|
|
print("=" * 50)
|
|
print()
|
|
|
|
# Start recorder
|
|
recorder = SimulationRecorder(quality=quality)
|
|
|
|
# Initialize controller
|
|
controller = DroneController()
|
|
|
|
if not controller.connect():
|
|
print("[ERROR] Could not connect to SITL")
|
|
return False
|
|
|
|
# Start recording
|
|
recorder.start(output_name)
|
|
time.sleep(2) # Give recording time to start
|
|
|
|
try:
|
|
# Setup for GPS-denied flight
|
|
print("\n--- SETUP ---")
|
|
controller.setup_gps_denied()
|
|
|
|
if not controller.set_mode("GUIDED_NOGPS"):
|
|
raise Exception("Could not set GUIDED_NOGPS mode")
|
|
|
|
if not controller.arm():
|
|
raise Exception("Could not arm")
|
|
|
|
# Takeoff
|
|
print("\n--- TAKEOFF ---")
|
|
if not controller.takeoff(altitude):
|
|
print("[WARN] Takeoff may have failed, continuing anyway...")
|
|
|
|
time.sleep(2)
|
|
|
|
# Fly pattern
|
|
print("\n--- FLY PATTERN ---")
|
|
if pattern == "square":
|
|
controller.fly_square(size=size, altitude=altitude)
|
|
elif pattern == "circle":
|
|
controller.fly_circle(radius=size, altitude=altitude)
|
|
else:
|
|
# Hover
|
|
print("[INFO] Hovering...")
|
|
time.sleep(10)
|
|
|
|
# Land
|
|
print("\n--- LANDING ---")
|
|
controller.land()
|
|
|
|
# Wait for landing
|
|
print("[INFO] Waiting for landing...")
|
|
for i in range(100):
|
|
controller.update_state()
|
|
if controller.altitude < 0.5:
|
|
print(f"[OK] Landed at altitude {controller.altitude:.1f}m")
|
|
break
|
|
time.sleep(0.1)
|
|
|
|
time.sleep(2)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n[INFO] Interrupted - landing...")
|
|
controller.land()
|
|
time.sleep(3)
|
|
except Exception as e:
|
|
print(f"\n[ERROR] {e}")
|
|
controller.land()
|
|
finally:
|
|
# Stop recording
|
|
time.sleep(1)
|
|
video_file = recorder.stop()
|
|
|
|
print()
|
|
print("=" * 50)
|
|
print(" Flight Complete")
|
|
print("=" * 50)
|
|
if video_file:
|
|
print(f" Video: {video_file}")
|
|
print()
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Record Drone Flight Simulation")
|
|
parser.add_argument("--pattern", "-p", choices=["square", "circle", "hover"],
|
|
default="square", help="Flight pattern")
|
|
parser.add_argument("--altitude", "-a", type=float, default=5.0,
|
|
help="Flight altitude (meters)")
|
|
parser.add_argument("--size", "-s", type=float, default=5.0,
|
|
help="Pattern size (meters)")
|
|
parser.add_argument("--output", "-o", type=str, default=None,
|
|
help="Output video filename (without extension)")
|
|
parser.add_argument("--quality", "-q", choices=["low", "medium", "high"],
|
|
default="medium", help="Video quality")
|
|
|
|
args = parser.parse_args()
|
|
|
|
run_recorded_flight(
|
|
pattern=args.pattern,
|
|
altitude=args.altitude,
|
|
size=args.size,
|
|
output_name=args.output,
|
|
quality=args.quality
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|