187 lines
6.9 KiB
Python
187 lines
6.9 KiB
Python
import os
|
|
import time
|
|
import struct
|
|
import math
|
|
import numpy as np
|
|
import librosa
|
|
import librosa.display
|
|
import matplotlib
|
|
# Set backend to Agg (Anti-Grain Geometry) to render without a GUI (essential for servers)
|
|
matplotlib.use('Agg')
|
|
import matplotlib.pyplot as plt
|
|
from PIL import Image
|
|
|
|
# --- Constants ---
|
|
MAX_MB = 40
|
|
SIG_SHIFT = b'B2I!'
|
|
SIG_STEGO = b'B2S!'
|
|
HEADER_FMT = '>4sQB'
|
|
HEADER_LEN = struct.calcsize(HEADER_FMT)
|
|
Image.MAX_IMAGE_PIXELS = 500 * 1024 * 1024
|
|
|
|
class AudioImageProcessor:
|
|
def __init__(self, upload_folder):
|
|
self.upload_folder = upload_folder
|
|
os.makedirs(upload_folder, exist_ok=True)
|
|
|
|
def _get_bytes(self, path):
|
|
"""Helper to safely read bytes"""
|
|
if os.path.getsize(path) > (MAX_MB * 1024 * 1024):
|
|
raise ValueError("File too large (Max 40MB)")
|
|
with open(path, 'rb') as f:
|
|
return f.read()
|
|
|
|
def _create_header(self, signature, file_size, filepath):
|
|
_, ext = os.path.splitext(filepath)
|
|
ext_bytes = ext.encode('utf-8')
|
|
return struct.pack(HEADER_FMT, signature, file_size, len(ext_bytes)) + ext_bytes
|
|
|
|
# --- Feature 1: Spectrogram Art ---
|
|
def generate_spectrogram(self, audio_path, min_pixels=0):
|
|
"""Generates a visual spectrogram from audio."""
|
|
try:
|
|
import torch
|
|
import torchaudio
|
|
has_torch = True
|
|
except ImportError:
|
|
has_torch = False
|
|
|
|
if has_torch and torch.cuda.is_available():
|
|
try:
|
|
# GPU Accelerated Path
|
|
device = "cuda"
|
|
waveform, sr = torchaudio.load(audio_path)
|
|
waveform = waveform.to(device)
|
|
|
|
# Create transformation
|
|
# Mimic librosa defaults roughly: n_fft=2048, hop_length=512
|
|
n_fft = 2048
|
|
win_length = n_fft
|
|
hop_length = 512
|
|
n_mels = 128
|
|
|
|
mel_spectrogram = torchaudio.transforms.MelSpectrogram(
|
|
sample_rate=sr,
|
|
n_fft=n_fft,
|
|
win_length=win_length,
|
|
hop_length=hop_length,
|
|
n_mels=n_mels,
|
|
f_max=8000
|
|
).to(device)
|
|
|
|
S = mel_spectrogram(waveform)
|
|
S_dB = torchaudio.transforms.AmplitudeToDB()(S)
|
|
|
|
# Back to CPU for plotting
|
|
S_dB = S_dB.cpu().numpy()[0] # Take first channel
|
|
# Librosa display expects numpy
|
|
except Exception as e:
|
|
# Fallback to CPU/Librosa if any error occurs
|
|
print(f"GPU processing failed, falling back to CPU: {e}")
|
|
return self._generate_spectrogram_cpu(audio_path, min_pixels)
|
|
else:
|
|
return self._generate_spectrogram_cpu(audio_path, min_pixels)
|
|
|
|
# Plotting (Common)
|
|
return self._plot_spectrogram(S_dB, sr, min_pixels)
|
|
|
|
def _generate_spectrogram_cpu(self, audio_path, min_pixels=0):
|
|
y, sr = librosa.load(audio_path)
|
|
S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128, fmax=8000)
|
|
S_dB = librosa.power_to_db(S, ref=np.max)
|
|
return self._plot_spectrogram(S_dB, sr, min_pixels)
|
|
|
|
def _plot_spectrogram(self, S_dB, sr, min_pixels=0):
|
|
# Calculate DPI dynamically to ensure we have enough pixels for steganography
|
|
dpi = 300
|
|
if min_pixels > 0:
|
|
# Figure is 12x6 inches. Area = 72 sq inches.
|
|
# Total Pixels = 72 * dpi^2
|
|
required_dpi = math.ceil((min_pixels / 72) ** 0.5)
|
|
# Add a small buffer
|
|
dpi = max(dpi, int(required_dpi * 1.05))
|
|
|
|
# Use exact dimensions without margins
|
|
width_in = 12
|
|
height_in = 6
|
|
fig = plt.figure(figsize=(width_in, height_in))
|
|
|
|
# Add axes covering the entire figure [left, bottom, width, height]
|
|
ax = plt.axes([0, 0, 1, 1], frameon=False)
|
|
ax.set_axis_off()
|
|
|
|
# 'magma' is a nice default
|
|
librosa.display.specshow(S_dB, sr=sr, fmax=8000, cmap='magma', ax=ax)
|
|
|
|
output_path = os.path.join(self.upload_folder, f"art_{int(time.time())}.png")
|
|
|
|
# specific DPI, no bbox_inches='tight' (which shrinks the image)
|
|
plt.savefig(output_path, dpi=dpi)
|
|
plt.close()
|
|
return output_path
|
|
|
|
|
|
|
|
# --- Feature 3: Steganography (Embed in Host) ---
|
|
def encode_stego(self, data_path, host_path):
|
|
# 1. Prepare Data
|
|
file_data = self._get_bytes(data_path)
|
|
header = self._create_header(SIG_STEGO, len(file_data), data_path)
|
|
payload_bits = np.unpackbits(np.frombuffer(header + file_data, dtype=np.uint8))
|
|
|
|
# 2. Prepare Host
|
|
host = Image.open(host_path).convert('RGB')
|
|
host_arr = np.array(host)
|
|
flat_host = host_arr.flatten()
|
|
|
|
if len(payload_bits) > len(flat_host):
|
|
raise ValueError(f"Host image too small. Need {len(payload_bits)/3/1e6:.2f} MP.")
|
|
|
|
# 3. Embed (LSB)
|
|
padded_bits = np.pad(payload_bits, (0, len(flat_host) - len(payload_bits)), 'constant')
|
|
embedded_flat = (flat_host & 0xFE) + padded_bits
|
|
|
|
embedded_img = Image.fromarray(embedded_flat.reshape(host_arr.shape), 'RGB')
|
|
|
|
output_path = os.path.join(self.upload_folder, f"stego_{os.path.basename(data_path)}.png")
|
|
embedded_img.save(output_path, "PNG")
|
|
return output_path
|
|
|
|
# --- Feature 4: Universal Decoder ---
|
|
def decode_image(self, image_path):
|
|
img = Image.open(image_path).convert('RGB')
|
|
flat_bytes = np.array(img).flatten()
|
|
|
|
# Strategy A: Check for Shift Signature (Raw Bytes)
|
|
try:
|
|
sig = struct.unpack('>4s', flat_bytes[:4])[0]
|
|
if sig == SIG_SHIFT:
|
|
return self._extract(flat_bytes, image_path, is_bits=False)
|
|
except: pass
|
|
|
|
# Strategy B: Check for Stego Signature (LSB)
|
|
try:
|
|
sample_bytes = np.packbits(flat_bytes[:300] & 1)
|
|
sig = struct.unpack('>4s', sample_bytes[:4])[0]
|
|
if sig == SIG_STEGO:
|
|
all_bytes = np.packbits(flat_bytes & 1)
|
|
return self._extract(all_bytes, image_path, is_bits=True)
|
|
except: pass
|
|
|
|
raise ValueError("No encoded data found in this image.")
|
|
|
|
def _extract(self, byte_arr, original_path, is_bits):
|
|
sig, size, ext_len = struct.unpack(HEADER_FMT, byte_arr[:HEADER_LEN])
|
|
ext = byte_arr[HEADER_LEN:HEADER_LEN+ext_len].tobytes().decode('utf-8')
|
|
|
|
data = byte_arr[HEADER_LEN+ext_len : HEADER_LEN+ext_len+size]
|
|
|
|
tag = "decoded"
|
|
out_name = f"{os.path.splitext(os.path.basename(original_path))[0]}_{tag}{ext}"
|
|
out_path = os.path.join(self.upload_folder, out_name)
|
|
|
|
with open(out_path, 'wb') as f:
|
|
f.write(data.tobytes())
|
|
|
|
return out_path
|