#!/usr/bin/env python3 """ Record Drone Simulation with Flight ==================================== Runs the drone controller while recording the Gazebo simulation to video. This script: 1. Starts screen recording (ffmpeg) 2. Runs the drone flight pattern 3. Stops recording and saves the video Usage: python scripts/record_flight.py --pattern square --duration 120 python scripts/record_flight.py --pattern circle --output my_flight """ import os import sys import time import signal import argparse import subprocess import threading from datetime import datetime # Add parent directory to path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.drone_controller import DroneController class SimulationRecorder: """Records the simulation display using ffmpeg.""" def __init__(self, output_dir="recordings", fps=30, quality="medium"): self.output_dir = output_dir self.fps = fps self.quality = quality self.process = None self.recording = False self.output_file = None # Quality presets (CRF values for libx264) self.quality_presets = { "low": (28, "faster"), "medium": (23, "medium"), "high": (18, "slow") } os.makedirs(output_dir, exist_ok=True) def start(self, output_name=None): """Start recording.""" if self.recording: print("[WARN] Already recording") return False if not output_name: output_name = f"flight_{datetime.now().strftime('%Y%m%d_%H%M%S')}" self.output_file = os.path.join(self.output_dir, f"{output_name}.mp4") crf, preset = self.quality_presets.get(self.quality, (23, "medium")) display = os.environ.get("DISPLAY", ":0") # FFmpeg command for screen recording cmd = [ "ffmpeg", "-y", "-f", "x11grab", "-framerate", str(self.fps), "-i", display, "-c:v", "libx264", "-preset", preset, "-crf", str(crf), "-pix_fmt", "yuv420p", self.output_file ] print(f"[INFO] Starting recording: {self.output_file}") try: self.process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) self.recording = True return True except FileNotFoundError: print("[ERROR] ffmpeg not found. Install with: sudo apt install ffmpeg") return False except Exception as e: print(f"[ERROR] Failed to start recording: {e}") return False def stop(self): """Stop recording.""" if not self.recording or not self.process: return None print("[INFO] Stopping recording...") # Send 'q' to ffmpeg to stop gracefully try: self.process.communicate(input=b'q', timeout=5) except subprocess.TimeoutExpired: self.process.kill() self.recording = False if os.path.exists(self.output_file): size = os.path.getsize(self.output_file) print(f"[OK] Recording saved: {self.output_file} ({size / 1024 / 1024:.1f} MB)") return self.output_file else: print("[WARN] Recording file not found") return None def run_recorded_flight(pattern="square", altitude=5.0, size=5.0, output_name=None, quality="medium"): """Run a drone flight while recording.""" print("=" * 50) print(" Recorded Drone Flight") print("=" * 50) print(f" Pattern: {pattern}") print(f" Altitude: {altitude}m") print(f" Size: {size}m") print(f" Recording Quality: {quality}") print("=" * 50) print() # Start recorder recorder = SimulationRecorder(quality=quality) # Initialize controller controller = DroneController() if not controller.connect(): print("[ERROR] Could not connect to SITL") return False # Start recording recorder.start(output_name) time.sleep(2) # Give recording time to start try: # Setup for GPS-denied flight print("\n--- SETUP ---") controller.setup_gps_denied() if not controller.set_mode("GUIDED_NOGPS"): raise Exception("Could not set GUIDED_NOGPS mode") if not controller.arm(): raise Exception("Could not arm") # Takeoff print("\n--- TAKEOFF ---") if not controller.takeoff(altitude): print("[WARN] Takeoff may have failed, continuing anyway...") time.sleep(2) # Fly pattern print("\n--- FLY PATTERN ---") if pattern == "square": controller.fly_square(size=size, altitude=altitude) elif pattern == "circle": controller.fly_circle(radius=size, altitude=altitude) else: # Hover print("[INFO] Hovering...") time.sleep(10) # Land print("\n--- LANDING ---") controller.land() # Wait for landing print("[INFO] Waiting for landing...") for i in range(100): controller.update_state() if controller.altitude < 0.5: print(f"[OK] Landed at altitude {controller.altitude:.1f}m") break time.sleep(0.1) time.sleep(2) except KeyboardInterrupt: print("\n[INFO] Interrupted - landing...") controller.land() time.sleep(3) except Exception as e: print(f"\n[ERROR] {e}") controller.land() finally: # Stop recording time.sleep(1) video_file = recorder.stop() print() print("=" * 50) print(" Flight Complete") print("=" * 50) if video_file: print(f" Video: {video_file}") print() return True def main(): parser = argparse.ArgumentParser(description="Record Drone Flight Simulation") parser.add_argument("--pattern", "-p", choices=["square", "circle", "hover"], default="square", help="Flight pattern") parser.add_argument("--altitude", "-a", type=float, default=5.0, help="Flight altitude (meters)") parser.add_argument("--size", "-s", type=float, default=5.0, help="Pattern size (meters)") parser.add_argument("--output", "-o", type=str, default=None, help="Output video filename (without extension)") parser.add_argument("--quality", "-q", choices=["low", "medium", "high"], default="medium", help="Video quality") args = parser.parse_args() run_recorded_flight( pattern=args.pattern, altitude=args.altitude, size=args.size, output_name=args.output, quality=args.quality ) if __name__ == "__main__": main()