Files
simulation/src/utils/recorder.py

493 lines
18 KiB
Python

#!/usr/bin/env python3
import os
import sys
import io
import time
import threading
import cv2
import numpy as np
import subprocess
import re
import tempfile
import requests
from pathlib import Path
from datetime import datetime
import yaml
import signal
try:
import PIL.Image
HAS_PIL = True
except ImportError:
HAS_PIL = False
API_URL = os.environ.get("API_URL", "https://simlink.sirblob.co")
class RunRecorder:
def __init__(self, results_dir=None, fps=5):
project_dir = Path(__file__).resolve().parent.parent.parent
self.fps = fps
self.start_time = time.time()
self.search_start = 0.0
self.search_duration = 0.0
config_data = {}
for cfg in ["search.yaml", "ugv.yaml", "uav.yaml"]:
p = project_dir / "config" / cfg
if p.exists():
try:
with open(p, 'r') as f:
data = yaml.safe_load(f)
if data:
key = cfg.replace('.yaml', '')
config_data[key] = data
except Exception as e:
print(f"[REC] Failed to load config {cfg}: {e}")
if "ugv" in config_data and "topics" in config_data["ugv"]:
del config_data["ugv"]["topics"]
if "uav" in config_data and "connection" in config_data["uav"]:
del config_data["uav"]["connection"]
self.sim_id = 0
self.sim_name = "simulation_unknown"
try:
resp = requests.post(f"{API_URL}/api/simulations/create", json=config_data, timeout=10)
if resp.status_code == 200:
data = resp.json()
self.sim_id = data.get("id", 0)
self.sim_name = f"simulation_{self.sim_id}"
except Exception as e:
print(f"[REC] API create failed: {e}")
if results_dir:
self.run_dir = Path(results_dir) / self.sim_name
else:
self.run_dir = project_dir / "results" / self.sim_name
self.run_dir.mkdir(parents=True, exist_ok=True)
self.run_num = self.sim_id
print(f"[REC] Recording local results to {self.run_dir.name} and API ({self.sim_name}, ID: {self.sim_id})")
self._log_path = self.run_dir / "log.txt"
self._log_file = open(self._log_path, "w")
self._original_stdout = sys.stdout
self._original_stderr = sys.stderr
self._tee_stdout = _TeeWriter(sys.stdout, self._log_file)
self._tee_stderr = _TeeWriter(sys.stderr, self._log_file)
self._tracker_writer = None
self._camera_writer = None
self._ugv_camera_writer = None
self._gazebo_writer = None
self._tracker_size = None
self._camera_size = None
self._ugv_camera_size = None
self._gazebo_size = None
self._tracker_frames = 0
self._camera_frames = 0
self._ugv_camera_frames = 0
self._gazebo_frames = 0
self._last_tracker_frame = None
self._last_camera_frame = None
self._last_ugv_camera_frame = None
self._camera_snapshots = []
self._recording = False
self._tracker_ref = None
self._camera_ref = None
self._record_thread = None
self._lock = threading.Lock()
threading.Thread(target=self._upload_hardware_info, daemon=True).start()
def _upload_hardware_info(self):
if not self.sim_id:
return
payload = {
"cpu_info": "Unknown CPU",
"gpu_info": "Unknown GPU",
"ram_info": "Unknown RAM"
}
try:
import subprocess
cpu = "Unknown"
try:
cpu = subprocess.check_output("grep -m 1 'model name' /proc/cpuinfo | cut -d ':' -f 2", shell=True, timeout=2).decode('utf-8').strip()
except Exception: pass
if cpu: payload["cpu_info"] = cpu
try:
ram_kb = int(subprocess.check_output("awk '/MemTotal/ {print $2}' /proc/meminfo", shell=True, timeout=2))
payload["ram_info"] = f"{round(ram_kb / 1024 / 1024, 1)} GB"
except Exception: pass
try:
gpu = subprocess.check_output("lspci | grep -i vga | cut -d ':' -f 3", shell=True, timeout=2).decode('utf-8').strip()
if gpu: payload["gpu_info"] = gpu
except Exception: pass
except Exception:
pass
try:
requests.put(f"{API_URL}/api/simulations/{self.sim_id}/hardware", json=payload, timeout=5)
except Exception as e:
print(f"[REC] Hardware info sync failed: {e}")
def _upload_file(self, path, filename):
if not self.sim_id:
return
try:
with open(path, 'rb') as f:
resp = requests.post(
f"{API_URL}/api/simulations/{self.sim_id}/upload",
files={"file": (filename, f)},
timeout=60
)
if resp.status_code == 200:
pass
except Exception as e:
print(f"[REC] Upload failed for {filename}: {e}")
def _find_gazebo_window_ffmpeg(self):
try:
cmd = "xwininfo -root -tree | grep -i 'gazebo\|gz sim'"
output = subprocess.check_output(cmd, shell=True).decode('utf-8')
lines = [l for l in output.strip().split('\n') if "1x1" not in l and "Gazebo Sim" in l]
if not lines:
return None
wid_line = lines[0]
wid_match = re.search(r'(0x[0-9a-fA-F]+)', wid_line)
if wid_match:
return wid_match.group(1)
except Exception:
pass
return None
def _get_window_geometry(self, window_id):
try:
geo = subprocess.run(
["xdotool", "getwindowgeometry", window_id],
capture_output=True, text=True
)
x = y = width = height = None
for line in geo.stdout.splitlines():
if "Position:" in line:
pos = line.split("Position:")[1].strip().split()[0]
x, y = map(int, pos.split(","))
elif "Geometry:" in line:
size = line.split("Geometry:")[1].strip()
width, height = map(int, size.split("x"))
if None in (x, y, width, height):
return None
return x, y, width, height
except FileNotFoundError:
return None
def set_phase(self, phase: str):
if phase == "takeoff":
self.start_time = time.time()
elif phase == "search":
self.search_start = time.time()
elif phase != "search" and phase != "takeoff" and self.search_start > 0 and self.search_duration == 0:
self.search_duration = time.time() - self.search_start
def log_position(self, uav_x=0, uav_y=0, uav_alt=0, uav_heading=0, ugv_x=0, ugv_y=0):
pass
def start_logging(self):
sys.stdout = self._tee_stdout
sys.stderr = self._tee_stderr
def stop_logging(self):
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
def start_recording(self, tracker=None, camera=None):
self._tracker_ref = tracker
self._camera_ref = camera
self._recording = True
window_id = self._find_gazebo_window_ffmpeg()
geo = None
if window_id:
geo = self._get_window_geometry(window_id)
if geo:
x, y, width, height = geo
width += width % 2
height += height % 2
else:
x, y, width, height = 0, 0, 1920, 1080
self.gazebo_output_file = str(self.run_dir / "gazebo.mp4")
display = os.environ.get("DISPLAY", ":0")
if not display.endswith(".0"):
display += ".0"
cmd = [
"ffmpeg", "-y",
"-f", "x11grab"
]
if window_id:
cmd.extend([
"-window_id", str(window_id),
"-r", str(self.fps),
"-i", display,
])
else:
cmd.extend([
"-r", str(self.fps),
"-s", f"{width}x{height}",
"-i", f"{display}+{x},{y}",
])
cmd.extend([
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "18",
"-pix_fmt", "yuv420p",
"-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2",
"-movflags", "+faststart",
self.gazebo_output_file
])
try:
self._gazebo_ffmpeg_proc = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print(f"[REC] Started recording Gazebo window via ffmpeg to gazebo.mp4 (ID: {window_id})")
except Exception as e:
print(f"[REC] Failed to start ffmpeg: {e}")
self._gazebo_ffmpeg_proc = None
self._record_thread = threading.Thread(target=self._record_loop, daemon=True)
self._record_thread.start()
def _record_loop(self):
interval = 1.0 / self.fps
while self._recording:
t0 = time.time()
if self._tracker_ref is not None:
try:
frame = self._tracker_ref.draw()
if frame is not None:
self._write_tracker_frame(frame)
self._last_tracker_frame = frame.copy()
except Exception:
pass
if self._camera_ref is not None:
try:
frame = self._camera_ref.frames.get("downward")
if frame is not None:
self._write_camera_frame(frame)
self._last_camera_frame = frame.copy()
except Exception:
pass
try:
ugv_frame = self._camera_ref.frames.get("ugv_forward")
if ugv_frame is not None:
self._write_ugv_camera_frame(ugv_frame)
self._last_ugv_camera_frame = ugv_frame.copy()
except Exception:
pass
elapsed = time.time() - t0
sleep_time = max(0, interval - elapsed)
time.sleep(sleep_time)
def _write_tracker_frame(self, frame):
h, w = frame.shape[:2]
if self._tracker_writer is None:
self._tracker_size = (w, h)
fourcc = cv2.VideoWriter_fourcc(*'XVID')
path = str(self.run_dir / "flight_path.avi")
self._tracker_writer = cv2.VideoWriter(path, fourcc, self.fps, (w, h))
self._tracker_writer.write(frame)
self._tracker_frames += 1
def _write_camera_frame(self, frame):
h, w = frame.shape[:2]
if self._camera_writer is None:
self._camera_size = (w, h)
fourcc = cv2.VideoWriter_fourcc(*'XVID')
path = str(self.run_dir / "camera.avi")
self._camera_writer = cv2.VideoWriter(path, fourcc, self.fps, (w, h))
self._camera_writer.write(frame)
self._camera_frames += 1
def _write_ugv_camera_frame(self, frame):
h, w = frame.shape[:2]
if self._ugv_camera_writer is None:
self._ugv_camera_size = (w, h)
fourcc = cv2.VideoWriter_fourcc(*'XVID')
path = str(self.run_dir / "ugv_camera.avi")
self._ugv_camera_writer = cv2.VideoWriter(path, fourcc, self.fps, (w, h))
self._ugv_camera_writer.write(frame)
self._ugv_camera_frames += 1
def snapshot_camera(self, label="snapshot"):
if self._camera_ref is None:
return
frame = self._camera_ref.frames.get("downward")
if frame is None:
return
idx = len(self._camera_snapshots)
filename = f"camera_{idx:03d}_{label}.png"
path = self.run_dir / filename
cv2.imwrite(str(path), frame)
self._camera_snapshots.append(filename)
print(f"[REC] Snapshot: {filename}")
threading.Thread(target=self._upload_file, args=(path, filename), daemon=True).start()
def snapshot_tracker(self, label="tracker"):
if self._last_tracker_frame is not None:
filename = f"tracker_{label}.png"
path = self.run_dir / filename
cv2.imwrite(str(path), self._last_tracker_frame)
print(f"[REC] Tracker snapshot: {filename}")
threading.Thread(target=self._upload_file, args=(path, filename), daemon=True).start()
def save_summary(self, search_mode="", altitude=0, markers=None, landed=False, ugv_dispatched=False, ugv_target=None, extra=None):
duration = time.time() - self.start_time
summary = {
"search_mode": search_mode,
"altitude": round(altitude, 2),
"duration_seconds": round(duration, 1),
"search_duration_seconds": round(self.search_duration, 1),
"landed": landed,
"ugv_dispatched": ugv_dispatched,
"markers_found": 0,
"markers": {},
"timestamp": datetime.now().isoformat(),
}
if ugv_target:
summary["ugv_target"] = ugv_target
if markers:
summary["markers_found"] = len(markers)
for mid, info in markers.items():
pos = info.get("uav_position", {})
summary["markers"][int(mid)] = {
"x": round(pos.get("x", 0), 2),
"y": round(pos.get("y", 0), 2),
"distance": round(info.get("distance", 0), 2),
}
if extra:
summary.update(extra)
path = self.run_dir / "summary.yaml"
try:
with open(path, "w") as f:
yaml.dump(summary, f, default_flow_style=False, sort_keys=False)
print(f"[REC] Summary saved: {path}")
threading.Thread(target=self._upload_file, args=(path, "summary.yaml"), daemon=True).start()
except Exception as e:
print(f"[REC] Failed to save summary: {e}")
def stop(self):
self._recording = False
if self._record_thread:
self._record_thread.join(timeout=3.0)
if hasattr(self, '_gazebo_ffmpeg_proc') and self._gazebo_ffmpeg_proc:
try:
self._gazebo_ffmpeg_proc.send_signal(signal.SIGINT)
self._gazebo_ffmpeg_proc.wait(timeout=5)
except Exception:
pass
self._upload_file(self.gazebo_output_file, "gazebo.mp4")
if self._last_tracker_frame is not None:
filename = "flight_path.png"
path = self.run_dir / filename
cv2.imwrite(str(path), self._last_tracker_frame)
print(f"[REC] Flight path saved: {path}")
self._upload_file(path, filename)
if self._last_camera_frame is not None:
filename = "camera_final.png"
path = self.run_dir / filename
cv2.imwrite(str(path), self._last_camera_frame)
self._upload_file(path, filename)
if self._last_ugv_camera_frame is not None:
filename = "ugv_camera_final.png"
path = self.run_dir / filename
cv2.imwrite(str(path), self._last_ugv_camera_frame)
self._upload_file(path, filename)
if self._tracker_writer:
self._tracker_writer.release()
self._upload_file(self.run_dir / "flight_path.avi", "flight_path.avi")
if self._camera_writer:
self._camera_writer.release()
self._upload_file(self.run_dir / "camera.avi", "camera.avi")
if self._ugv_camera_writer:
self._ugv_camera_writer.release()
self._upload_file(self.run_dir / "ugv_camera.avi", "ugv_camera.avi")
self.stop_logging()
self._log_file.close()
self._upload_file(self._log_path, "log.txt")
duration = time.time() - self.start_time
mins = int(duration // 60)
secs = int(duration % 60)
if self.sim_id:
try:
requests.put(
f"{API_URL}/api/simulations/{self.sim_id}/time",
json={"search_time": self.search_duration, "total_time": duration},
timeout=5
)
except Exception as e:
print(f"[REC] PUT time failed: {e}")
self._original_stdout.write(
f"\n[REC] ═══════════════════════════════════════\n"
f"[REC] Cloud Upload Complete (ID: {self.sim_id})\n"
f"[REC] Duration: {mins}m {secs}s\n"
f"[REC] Tracker: {self._tracker_frames} frames\n"
f"[REC] UAV Camera: {self._camera_frames} frames\n"
f"[REC] UGV Camera: {self._ugv_camera_frames} frames\n"
f"[REC] Gazebo: via ffmpeg (.mp4)\n"
f"[REC] ═══════════════════════════════════════\n"
)
class _TeeWriter:
def __init__(self, stream, log_file):
self._stream = stream
self._log = log_file
def write(self, data):
self._stream.write(data)
try:
self._log.write(data)
self._log.flush()
except (ValueError, IOError):
pass
def flush(self):
self._stream.flush()
try:
self._log.flush()
except (ValueError, IOError):
pass
def fileno(self):
return self._stream.fileno()
def isatty(self):
return self._stream.isatty()