Files
AudioImage/server/app.py
2026-01-07 04:34:24 +00:00

278 lines
10 KiB
Python

import os
import time
import json
from flask import Flask, request, send_file, jsonify, send_from_directory, Response
from flask_cors import CORS
from werkzeug.utils import secure_filename
from processor import AudioImageProcessor
app = Flask(__name__, static_folder='../build', static_url_path='')
CORS(app)
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'uploads')
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
processor = AudioImageProcessor(UPLOAD_FOLDER)
def save_upload(file_obj):
filename = secure_filename(file_obj.filename)
path = os.path.join(app.config['UPLOAD_FOLDER'], f"{int(time.time())}_{filename}")
file_obj.save(path)
return path
@app.route('/')
def index():
return send_from_directory(app.static_folder, 'index.html')
@app.errorhandler(404)
def not_found(e):
if request.path.startswith('/api/'):
return jsonify({"error": "Not found"}), 404
return send_from_directory(app.static_folder, 'index.html')
@app.route('/health', methods=['GET'])
def health():
return jsonify({"status": "ok", "max_mb": 40})
import threading
def cleanup_task():
expiration_seconds = 600
while True:
try:
now = time.time()
if os.path.exists(UPLOAD_FOLDER):
for filename in os.listdir(UPLOAD_FOLDER):
filepath = os.path.join(UPLOAD_FOLDER, filename)
if os.path.isfile(filepath):
if now - os.path.getctime(filepath) > expiration_seconds:
try:
os.remove(filepath)
print(f"Cleaned up: {filename}")
except Exception as e:
print(f"Error cleaning {filename}: {e}")
except Exception as e:
print(f"Cleanup Error: {e}")
time.sleep(60)
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true' or not os.environ.get('WERKZEUG_RUN_MAIN'):
t = threading.Thread(target=cleanup_task, daemon=True)
t.start()
@app.route('/api/generate-art', methods=['POST'])
def generate_art():
if 'audio' not in request.files:
return jsonify({"error": "No audio file provided"}), 400
audio_file = request.files['audio']
should_embed = request.form.get('embed', 'false').lower() == 'true'
audio_path = None
art_path = None
try:
audio_path = save_upload(audio_file)
min_pixels = 0
if should_embed:
file_size = os.path.getsize(audio_path)
min_pixels = int((file_size * 8 / 3) * 1.05)
art_path = processor.generate_spectrogram(audio_path, min_pixels=min_pixels)
final_path = art_path
if should_embed:
final_path = processor.encode_stego(audio_path, art_path)
if art_path != final_path:
try: os.remove(art_path)
except: pass
return send_file(final_path, mimetype='image/png')
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
if audio_path and os.path.exists(audio_path):
try: os.remove(audio_path)
except: pass
@app.route('/api/hide', methods=['POST'])
def hide_data():
if 'data' not in request.files or 'host' not in request.files:
return jsonify({"error": "Missing files"}), 400
data_file = request.files['data']
host_file = request.files['host']
data_path = None
host_path = None
try:
data_path = save_upload(data_file)
host_path = save_upload(host_file)
output_path = processor.encode_stego(data_path, host_path)
return send_file(output_path, mimetype='image/png')
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
if data_path and os.path.exists(data_path):
try: os.remove(data_path)
except: pass
if host_path and os.path.exists(host_path):
try: os.remove(host_path)
except: pass
import youtube_utils
@app.route('/api/hide-yt', methods=['POST'])
def hide_yt_data():
if 'url' not in request.form or 'host' not in request.files:
return jsonify({"error": "Missing URL or Host Image"}), 400
youtube_url = request.form['url']
host_file = request.files['host']
audio_path = None
host_path = None
try:
# Download Audio
audio_path = youtube_utils.download_audio(youtube_url, app.config['UPLOAD_FOLDER'])
# Save Host
host_path = save_upload(host_file)
# Encode
output_path = processor.encode_stego(audio_path, host_path)
return send_file(output_path, mimetype='image/png')
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# Cleanup
if audio_path and os.path.exists(audio_path):
try: os.remove(audio_path)
except: pass
if host_path and os.path.exists(host_path):
try: os.remove(host_path)
except: pass
@app.route('/api/decode', methods=['POST'])
def decode():
if 'image' not in request.files:
return jsonify({"error": "No image provided"}), 400
img_path = None
try:
img_path = save_upload(request.files['image'])
restored_path = processor.decode_image(img_path)
filename = os.path.basename(restored_path)
return send_file(restored_path, as_attachment=True, download_name=filename)
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
if img_path and os.path.exists(img_path):
try: os.remove(img_path)
except: pass
@app.route('/api/visualize', methods=['POST'])
def visualize():
if 'audio' not in request.files:
return jsonify({"error": "No audio file provided"}), 400
audio_file = request.files['audio']
audio_path = None
try:
audio_path = save_upload(audio_file)
file_size = os.path.getsize(audio_path)
min_pixels = int((file_size * 8 / 3) * 1.05)
def generate_steps():
art_path = None
final_path = None
try:
import base64
yield f"data: {json.dumps({'step': 1, 'status': 'loading', 'message': 'Loading audio file...', 'progress': 10})}\n\n"
time.sleep(0.8)
yield f"data: {json.dumps({'step': 1, 'status': 'complete', 'message': f'Audio loaded: {audio_file.filename}', 'progress': 20, 'fileSize': file_size})}\n\n"
time.sleep(0.5)
yield f"data: {json.dumps({'step': 2, 'status': 'loading', 'message': 'Analyzing audio frequencies...', 'progress': 30})}\n\n"
time.sleep(1.0)
yield f"data: {json.dumps({'step': 2, 'status': 'complete', 'message': 'Frequency analysis complete', 'progress': 40})}\n\n"
time.sleep(0.5)
yield f"data: {json.dumps({'step': 3, 'status': 'loading', 'message': 'Generating spectrogram image...', 'progress': 50})}\n\n"
print(f"[VISUALIZE] Starting spectrogram generation for {audio_path}")
art_path = processor.generate_spectrogram(audio_path, min_pixels=min_pixels)
print(f"[VISUALIZE] Spectrogram generated at {art_path}")
with open(art_path, 'rb') as img_file:
spectrogram_b64 = base64.b64encode(img_file.read()).decode('utf-8')
print(f"[VISUALIZE] Spectrogram base64 length: {len(spectrogram_b64)}")
yield f"data: {json.dumps({'step': 3, 'status': 'complete', 'message': 'Spectrogram generated!', 'progress': 70, 'spectrogramImage': f'data:image/png;base64,{spectrogram_b64}'})}\n\n"
print("[VISUALIZE] Sent spectrogram image")
time.sleep(2.0)
yield f"data: {json.dumps({'step': 4, 'status': 'loading', 'message': 'Embedding audio into image (LSB steganography)...', 'progress': 80})}\n\n"
final_path = processor.encode_stego(audio_path, art_path)
with open(final_path, 'rb') as img_file:
final_b64 = base64.b64encode(img_file.read()).decode('utf-8')
yield f"data: {json.dumps({'step': 4, 'status': 'complete', 'message': 'Audio embedded successfully!', 'progress': 95, 'finalImage': f'data:image/png;base64,{final_b64}'})}\n\n"
time.sleep(2.0)
result_id = os.path.basename(final_path)
yield f"data: {json.dumps({'step': 5, 'status': 'complete', 'message': 'Process complete!', 'progress': 100, 'resultId': result_id})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'step': 0, 'status': 'error', 'message': str(e), 'progress': 0})}\n\n"
finally:
if art_path and art_path != final_path and os.path.exists(art_path):
try: os.remove(art_path)
except: pass
if audio_path and os.path.exists(audio_path):
try: os.remove(audio_path)
except: pass
response = Response(generate_steps(), mimetype='text/event-stream')
response.headers['Cache-Control'] = 'no-cache'
response.headers['X-Accel-Buffering'] = 'no'
response.headers['Connection'] = 'keep-alive'
return response
except Exception as e:
if audio_path and os.path.exists(audio_path):
try: os.remove(audio_path)
except: pass
return jsonify({"error": str(e)}), 500
@app.route('/api/result/<result_id>', methods=['GET'])
def get_result(result_id):
result_path = os.path.join(app.config['UPLOAD_FOLDER'], result_id)
if os.path.exists(result_path):
return send_file(result_path, mimetype='image/png', as_attachment=False)
return jsonify({"error": "Result not found"}), 404
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True, port=5000, threaded=True)