Files
AudioImage/server/processor.py
2026-01-07 04:09:56 +00:00

158 lines
5.6 KiB
Python

import os
import time
import struct
import math
import numpy as np
import librosa
import librosa.display
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from PIL import Image
MAX_MB = 40
SIG_SHIFT = b'B2I!'
SIG_STEGO = b'B2S!'
HEADER_FMT = '>4sQB'
HEADER_LEN = struct.calcsize(HEADER_FMT)
Image.MAX_IMAGE_PIXELS = 500 * 1024 * 1024
class AudioImageProcessor:
def __init__(self, upload_folder):
self.upload_folder = upload_folder
os.makedirs(upload_folder, exist_ok=True)
def _get_bytes(self, path):
if os.path.getsize(path) > (MAX_MB * 1024 * 1024):
raise ValueError("File too large (Max 40MB)")
with open(path, 'rb') as f:
return f.read()
def _create_header(self, signature, file_size, filepath):
_, ext = os.path.splitext(filepath)
ext_bytes = ext.encode('utf-8')
return struct.pack(HEADER_FMT, signature, file_size, len(ext_bytes)) + ext_bytes
def generate_spectrogram(self, audio_path, min_pixels=0):
try:
import torch
import torchaudio
has_torch = True
except ImportError:
has_torch = False
if has_torch and torch.cuda.is_available():
try:
device = "cuda"
waveform, sr = torchaudio.load(audio_path)
waveform = waveform.to(device)
n_fft = 2048
win_length = n_fft
hop_length = 512
n_mels = 128
mel_spectrogram = torchaudio.transforms.MelSpectrogram(
sample_rate=sr,
n_fft=n_fft,
win_length=win_length,
hop_length=hop_length,
n_mels=n_mels,
f_max=8000
).to(device)
S = mel_spectrogram(waveform)
S_dB = torchaudio.transforms.AmplitudeToDB()(S)
S_dB = S_dB.cpu().numpy()[0]
except Exception as e:
print(f"GPU processing failed, falling back to CPU: {e}")
return self._generate_spectrogram_cpu(audio_path, min_pixels)
else:
return self._generate_spectrogram_cpu(audio_path, min_pixels)
return self._plot_spectrogram(S_dB, sr, min_pixels)
def _generate_spectrogram_cpu(self, audio_path, min_pixels=0):
y, sr = librosa.load(audio_path)
S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128, fmax=8000)
S_dB = librosa.power_to_db(S, ref=np.max)
return self._plot_spectrogram(S_dB, sr, min_pixels)
def _plot_spectrogram(self, S_dB, sr, min_pixels=0):
dpi = 300
if min_pixels > 0:
required_dpi = math.ceil((min_pixels / 72) ** 0.5)
dpi = max(dpi, int(required_dpi * 1.05))
width_in = 12
height_in = 6
fig = plt.figure(figsize=(width_in, height_in))
ax = plt.axes([0, 0, 1, 1], frameon=False)
ax.set_axis_off()
librosa.display.specshow(S_dB, sr=sr, fmax=8000, cmap='magma', ax=ax)
output_path = os.path.join(self.upload_folder, f"art_{int(time.time())}.png")
plt.savefig(output_path, dpi=dpi)
plt.close()
return output_path
def encode_stego(self, data_path, host_path):
file_data = self._get_bytes(data_path)
header = self._create_header(SIG_STEGO, len(file_data), data_path)
payload_bits = np.unpackbits(np.frombuffer(header + file_data, dtype=np.uint8))
host = Image.open(host_path).convert('RGB')
host_arr = np.array(host)
flat_host = host_arr.flatten()
if len(payload_bits) > len(flat_host):
raise ValueError(f"Host image too small. Need {len(payload_bits)/3/1e6:.2f} MP.")
padded_bits = np.pad(payload_bits, (0, len(flat_host) - len(payload_bits)), 'constant')
embedded_flat = (flat_host & 0xFE) + padded_bits
embedded_img = Image.fromarray(embedded_flat.reshape(host_arr.shape), 'RGB')
output_path = os.path.join(self.upload_folder, f"stego_{os.path.basename(data_path)}.png")
embedded_img.save(output_path, "PNG")
return output_path
def decode_image(self, image_path):
img = Image.open(image_path).convert('RGB')
flat_bytes = np.array(img).flatten()
try:
sig = struct.unpack('>4s', flat_bytes[:4])[0]
if sig == SIG_SHIFT:
return self._extract(flat_bytes, image_path, is_bits=False)
except: pass
try:
sample_bytes = np.packbits(flat_bytes[:300] & 1)
sig = struct.unpack('>4s', sample_bytes[:4])[0]
if sig == SIG_STEGO:
all_bytes = np.packbits(flat_bytes & 1)
return self._extract(all_bytes, image_path, is_bits=True)
except: pass
raise ValueError("No encoded data found in this image.")
def _extract(self, byte_arr, original_path, is_bits):
sig, size, ext_len = struct.unpack(HEADER_FMT, byte_arr[:HEADER_LEN])
ext = byte_arr[HEADER_LEN:HEADER_LEN+ext_len].tobytes().decode('utf-8')
data = byte_arr[HEADER_LEN+ext_len : HEADER_LEN+ext_len+size]
tag = "decoded"
out_name = f"{os.path.splitext(os.path.basename(original_path))[0]}_{tag}{ext}"
out_path = os.path.join(self.upload_folder, out_name)
with open(out_path, 'wb') as f:
f.write(data.tobytes())
return out_path