Inital Commit
This commit is contained in:
98
server/app.py
Normal file
98
server/app.py
Normal file
@@ -0,0 +1,98 @@
|
||||
import os
|
||||
import time
|
||||
from flask import Flask, request, send_file, jsonify
|
||||
from flask_cors import CORS
|
||||
from werkzeug.utils import secure_filename
|
||||
from processor import AudioImageProcessor
|
||||
|
||||
app = Flask(__name__)
|
||||
CORS(app) # Allow Svelte to communicate
|
||||
|
||||
# Configuration
|
||||
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads')
|
||||
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
||||
processor = AudioImageProcessor(UPLOAD_FOLDER)
|
||||
|
||||
def save_upload(file_obj):
|
||||
filename = secure_filename(file_obj.filename)
|
||||
path = os.path.join(app.config['UPLOAD_FOLDER'], f"{int(time.time())}_{filename}")
|
||||
file_obj.save(path)
|
||||
return path
|
||||
|
||||
@app.route('/health', methods=['GET'])
|
||||
def health():
|
||||
return jsonify({"status": "ok", "max_mb": 40})
|
||||
|
||||
# --- Endpoint 1: Create Art (Optional: Embed Audio in it) ---
|
||||
@app.route('/api/generate-art', methods=['POST'])
|
||||
def generate_art():
|
||||
if 'audio' not in request.files:
|
||||
return jsonify({"error": "No audio file provided"}), 400
|
||||
|
||||
audio_file = request.files['audio']
|
||||
should_embed = request.form.get('embed', 'false').lower() == 'true'
|
||||
|
||||
try:
|
||||
# 1. Save Audio
|
||||
audio_path = save_upload(audio_file)
|
||||
|
||||
# 2. Generate Art
|
||||
art_path = processor.generate_spectrogram(audio_path)
|
||||
|
||||
# 3. If Embed requested, run Steganography immediately using the art as host
|
||||
final_path = art_path
|
||||
if should_embed:
|
||||
final_path = processor.encode_stego(audio_path, art_path)
|
||||
|
||||
return send_file(final_path, mimetype='image/png')
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
# --- Endpoint 2: Format Shift (Audio -> Static) ---
|
||||
@app.route('/api/shift', methods=['POST'])
|
||||
def shift_format():
|
||||
if 'file' not in request.files:
|
||||
return jsonify({"error": "No file provided"}), 400
|
||||
|
||||
try:
|
||||
f_path = save_upload(request.files['file'])
|
||||
img_path = processor.encode_shift(f_path)
|
||||
return send_file(img_path, mimetype='image/png')
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
# --- Endpoint 3: Steganography (Audio + Custom Host) ---
|
||||
@app.route('/api/hide', methods=['POST'])
|
||||
def hide_data():
|
||||
if 'data' not in request.files or 'host' not in request.files:
|
||||
return jsonify({"error": "Requires 'data' and 'host' files"}), 400
|
||||
|
||||
try:
|
||||
data_path = save_upload(request.files['data'])
|
||||
host_path = save_upload(request.files['host'])
|
||||
|
||||
stego_path = processor.encode_stego(data_path, host_path)
|
||||
return send_file(stego_path, mimetype='image/png')
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
# --- Endpoint 4: Decode (Universal) ---
|
||||
@app.route('/api/decode', methods=['POST'])
|
||||
def decode():
|
||||
if 'image' not in request.files:
|
||||
return jsonify({"error": "No image provided"}), 400
|
||||
|
||||
try:
|
||||
img_path = save_upload(request.files['image'])
|
||||
restored_path = processor.decode_image(img_path)
|
||||
|
||||
# Determine mimetype based on extension for browser friendliness
|
||||
filename = os.path.basename(restored_path)
|
||||
return send_file(restored_path, as_attachment=True, download_name=filename)
|
||||
except Exception as e:
|
||||
return jsonify({"error": str(e)}), 500
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Threaded=True is important for processing images without blocking
|
||||
app.run(debug=True, port=5000, threaded=True)
|
||||
144
server/processor.py
Normal file
144
server/processor.py
Normal file
@@ -0,0 +1,144 @@
|
||||
import os
|
||||
import struct
|
||||
import math
|
||||
import numpy as np
|
||||
import librosa
|
||||
import librosa.display
|
||||
import matplotlib
|
||||
# Set backend to Agg (Anti-Grain Geometry) to render without a GUI (essential for servers)
|
||||
matplotlib.use('Agg')
|
||||
import matplotlib.pyplot as plt
|
||||
from PIL import Image
|
||||
|
||||
# --- Constants ---
|
||||
MAX_MB = 40
|
||||
SIG_SHIFT = b'B2I!'
|
||||
SIG_STEGO = b'B2S!'
|
||||
HEADER_FMT = '>4sQB'
|
||||
HEADER_LEN = struct.calcsize(HEADER_FMT)
|
||||
Image.MAX_IMAGE_PIXELS = 500 * 1024 * 1024
|
||||
|
||||
class AudioImageProcessor:
|
||||
def __init__(self, upload_folder):
|
||||
self.upload_folder = upload_folder
|
||||
os.makedirs(upload_folder, exist_ok=True)
|
||||
|
||||
def _get_bytes(self, path):
|
||||
"""Helper to safely read bytes"""
|
||||
if os.path.getsize(path) > (MAX_MB * 1024 * 1024):
|
||||
raise ValueError("File too large (Max 40MB)")
|
||||
with open(path, 'rb') as f:
|
||||
return f.read()
|
||||
|
||||
def _create_header(self, signature, file_size, filepath):
|
||||
_, ext = os.path.splitext(filepath)
|
||||
ext_bytes = ext.encode('utf-8')
|
||||
return struct.pack(HEADER_FMT, signature, file_size, len(ext_bytes)) + ext_bytes
|
||||
|
||||
# --- Feature 1: Spectrogram Art ---
|
||||
def generate_spectrogram(self, audio_path):
|
||||
"""Generates a visual spectrogram from audio."""
|
||||
y, sr = librosa.load(audio_path)
|
||||
S = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=256, fmax=8000)
|
||||
S_dB = librosa.power_to_db(S, ref=np.max)
|
||||
|
||||
plt.figure(figsize=(12, 6))
|
||||
plt.axis('off')
|
||||
plt.margins(0, 0)
|
||||
plt.gca().xaxis.set_major_locator(plt.NullLocator())
|
||||
plt.gca().yaxis.set_major_locator(plt.NullLocator())
|
||||
|
||||
# 'magma' is a nice default, but you could parameterize this
|
||||
librosa.display.specshow(S_dB, sr=sr, fmax=8000, cmap='magma')
|
||||
|
||||
output_path = os.path.join(self.upload_folder, f"art_{os.path.basename(audio_path)}.png")
|
||||
plt.savefig(output_path, bbox_inches='tight', pad_inches=0, dpi=300)
|
||||
plt.close()
|
||||
return output_path
|
||||
|
||||
# --- Feature 2: Format Shift (Raw Data to Image) ---
|
||||
def encode_shift(self, file_path):
|
||||
file_data = self._get_bytes(file_path)
|
||||
file_size = len(file_data)
|
||||
|
||||
header = self._create_header(SIG_SHIFT, file_size, file_path)
|
||||
payload = header + file_data
|
||||
|
||||
# Calculate size
|
||||
pixels = math.ceil(len(payload) / 3)
|
||||
side = math.ceil(math.sqrt(pixels))
|
||||
padding = (side * side * 3) - len(payload)
|
||||
|
||||
# Pad and Reshape
|
||||
arr = np.frombuffer(payload, dtype=np.uint8)
|
||||
if padding > 0:
|
||||
arr = np.pad(arr, (0, padding), 'constant')
|
||||
|
||||
img = Image.fromarray(arr.reshape((side, side, 3)), 'RGB')
|
||||
|
||||
output_path = os.path.join(self.upload_folder, f"shift_{os.path.basename(file_path)}.png")
|
||||
img.save(output_path, "PNG")
|
||||
return output_path
|
||||
|
||||
# --- Feature 3: Steganography (Embed in Host) ---
|
||||
def encode_stego(self, data_path, host_path):
|
||||
# 1. Prepare Data
|
||||
file_data = self._get_bytes(data_path)
|
||||
header = self._create_header(SIG_STEGO, len(file_data), data_path)
|
||||
payload_bits = np.unpackbits(np.frombuffer(header + file_data, dtype=np.uint8))
|
||||
|
||||
# 2. Prepare Host
|
||||
host = Image.open(host_path).convert('RGB')
|
||||
host_arr = np.array(host)
|
||||
flat_host = host_arr.flatten()
|
||||
|
||||
if len(payload_bits) > len(flat_host):
|
||||
raise ValueError(f"Host image too small. Need {len(payload_bits)/3/1e6:.2f} MP.")
|
||||
|
||||
# 3. Embed (LSB)
|
||||
padded_bits = np.pad(payload_bits, (0, len(flat_host) - len(payload_bits)), 'constant')
|
||||
embedded_flat = (flat_host & 0xFE) + padded_bits
|
||||
|
||||
embedded_img = Image.fromarray(embedded_flat.reshape(host_arr.shape), 'RGB')
|
||||
|
||||
output_path = os.path.join(self.upload_folder, f"stego_{os.path.basename(data_path)}.png")
|
||||
embedded_img.save(output_path, "PNG")
|
||||
return output_path
|
||||
|
||||
# --- Feature 4: Universal Decoder ---
|
||||
def decode_image(self, image_path):
|
||||
img = Image.open(image_path).convert('RGB')
|
||||
flat_bytes = np.array(img).flatten()
|
||||
|
||||
# Strategy A: Check for Shift Signature (Raw Bytes)
|
||||
try:
|
||||
sig = struct.unpack('>4s', flat_bytes[:4])[0]
|
||||
if sig == SIG_SHIFT:
|
||||
return self._extract(flat_bytes, image_path, is_bits=False)
|
||||
except: pass
|
||||
|
||||
# Strategy B: Check for Stego Signature (LSB)
|
||||
try:
|
||||
sample_bytes = np.packbits(flat_bytes[:300] & 1)
|
||||
sig = struct.unpack('>4s', sample_bytes[:4])[0]
|
||||
if sig == SIG_STEGO:
|
||||
all_bytes = np.packbits(flat_bytes & 1)
|
||||
return self._extract(all_bytes, image_path, is_bits=True)
|
||||
except: pass
|
||||
|
||||
raise ValueError("No encoded data found in this image.")
|
||||
|
||||
def _extract(self, byte_arr, original_path, is_bits):
|
||||
sig, size, ext_len = struct.unpack(HEADER_FMT, byte_arr[:HEADER_LEN])
|
||||
ext = byte_arr[HEADER_LEN:HEADER_LEN+ext_len].tobytes().decode('utf-8')
|
||||
|
||||
data = byte_arr[HEADER_LEN+ext_len : HEADER_LEN+ext_len+size]
|
||||
|
||||
tag = "decoded"
|
||||
out_name = f"{os.path.splitext(os.path.basename(original_path))[0]}_{tag}{ext}"
|
||||
out_path = os.path.join(self.upload_folder, out_name)
|
||||
|
||||
with open(out_path, 'wb') as f:
|
||||
f.write(data.tobytes())
|
||||
|
||||
return out_path
|
||||
6
server/requirements.txt
Normal file
6
server/requirements.txt
Normal file
@@ -0,0 +1,6 @@
|
||||
Flask
|
||||
Flask-Cors
|
||||
numpy
|
||||
Pillow
|
||||
librosa
|
||||
matplotlib
|
||||
Reference in New Issue
Block a user