Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Madverse Music API | |
| AI Music Detection Service | |
| """ | |
| # Configure numba before any other imports to avoid caching issues | |
| import os | |
| os.environ.setdefault('NUMBA_DISABLE_JIT', '1') | |
| os.environ.setdefault('NUMBA_CACHE_DIR', '/app/.cache/numba') | |
| from fastapi import FastAPI, HTTPException, BackgroundTasks, Header, Depends | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from pydantic import BaseModel, HttpUrl | |
| import torch | |
| import soundfile as sf | |
| import scipy.signal | |
| import tempfile | |
| import requests | |
| from pathlib import Path | |
| import time | |
| from typing import Optional, Annotated, List, Union | |
| import uvicorn | |
| import asyncio | |
| from contextlib import asynccontextmanager | |
| import socket | |
| import numpy as np | |
| # Global model variable | |
| model = None | |
| async def lifespan(app: FastAPI): | |
| """Application lifespan management""" | |
| # Startup | |
| global model | |
| try: | |
| from sonics import HFAudioClassifier | |
| print("🔄 Loading Madverse Music AI model...") | |
| # Set cache directory to a writable location | |
| cache_dir = "/app/.cache" if os.path.exists("/app") else "./cache" | |
| os.makedirs(cache_dir, exist_ok=True) | |
| # Load model with explicit cache directory | |
| model = HFAudioClassifier.from_pretrained( | |
| "awsaf49/sonics-spectttra-alpha-120s", | |
| cache_dir=cache_dir | |
| ) | |
| model.eval() | |
| print("✅ Model loaded successfully!") | |
| except Exception as e: | |
| print(f"❌ Failed to load model: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise | |
| yield | |
| # Shutdown | |
| print("🔄 Shutting down...") | |
| # Initialize FastAPI app with lifespan | |
| app = FastAPI( | |
| title="Madverse Music API", | |
| description="AI-powered music detection API to identify AI-generated vs human-created music", | |
| version="1.0.0", | |
| docs_url="/", | |
| redoc_url="/docs", | |
| lifespan=lifespan | |
| ) | |
| # API Key Configuration | |
| API_KEY = os.getenv("MADVERSE_API_KEY", "madverse-music-api-key-2024") # Default key for demo | |
| async def verify_api_key(x_api_key: Annotated[Union[str, None], Header()] = None): | |
| """Verify API key from header""" | |
| if x_api_key is None: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Missing API key. Please provide a valid X-API-Key header." | |
| ) | |
| if x_api_key != API_KEY: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid API key. Please provide a valid X-API-Key header." | |
| ) | |
| return x_api_key | |
| class MusicAnalysisRequest(BaseModel): | |
| urls: List[HttpUrl] | |
| def check_api_key_first(request: MusicAnalysisRequest, x_api_key: Annotated[Union[str, None], Header()] = None): | |
| """Check API key before processing request""" | |
| if x_api_key is None: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Missing API key. Please provide a valid X-API-Key header." | |
| ) | |
| if x_api_key != API_KEY: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid API key. Please provide a valid X-API-Key header." | |
| ) | |
| return request | |
| class FileAnalysisResult(BaseModel): | |
| url: str | |
| success: bool | |
| classification: Optional[str] = None # "Real" or "Fake" | |
| confidence: Optional[float] = None # 0.0 to 1.0 | |
| probability: Optional[float] = None # Raw sigmoid probability | |
| raw_score: Optional[float] = None # Raw model output | |
| duration: Optional[float] = None # Audio duration in seconds | |
| message: str | |
| processing_time: Optional[float] = None | |
| error: Optional[str] = None | |
| class MusicAnalysisResponse(BaseModel): | |
| success: bool | |
| total_files: int | |
| successful_analyses: int | |
| failed_analyses: int | |
| results: List[FileAnalysisResult] | |
| total_processing_time: float | |
| message: str | |
| class ErrorResponse(BaseModel): | |
| success: bool | |
| error: str | |
| message: str | |
| def cleanup_file(file_path: str): | |
| """Background task to cleanup temporary files""" | |
| try: | |
| if os.path.exists(file_path): | |
| os.unlink(file_path) | |
| except: | |
| pass | |
| def download_audio(url: str, max_size_mb: int = 100) -> str: | |
| """Download audio file from URL with size validation""" | |
| try: | |
| print(f"🔽 Downloading audio from: {url}") | |
| # Check if URL is accessible | |
| response = requests.head(str(url), timeout=10) | |
| print(f"📊 Head response status: {response.status_code}") | |
| # Check content size | |
| content_length = response.headers.get('Content-Length') | |
| if content_length: | |
| size_mb = int(content_length) / (1024 * 1024) | |
| print(f"📏 File size: {size_mb:.2f}MB") | |
| if int(content_length) > max_size_mb * 1024 * 1024: | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"File too large. Maximum size: {max_size_mb}MB" | |
| ) | |
| # Download file | |
| print("🔽 Starting download...") | |
| response = requests.get(str(url), timeout=30, stream=True) | |
| response.raise_for_status() | |
| print(f"✅ Download response status: {response.status_code}") | |
| # Create temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file: | |
| downloaded_size = 0 | |
| for chunk in response.iter_content(chunk_size=8192): | |
| downloaded_size += len(chunk) | |
| if downloaded_size > max_size_mb * 1024 * 1024: | |
| os.unlink(tmp_file.name) | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"File too large. Maximum size: {max_size_mb}MB" | |
| ) | |
| tmp_file.write(chunk) | |
| print(f"💾 Downloaded {downloaded_size} bytes to {tmp_file.name}") | |
| return tmp_file.name | |
| except requests.exceptions.RequestException as e: | |
| error_msg = f"Failed to download audio: {str(e)}" | |
| print(f"❌ Download error: {error_msg}") | |
| raise HTTPException( | |
| status_code=400, | |
| detail=error_msg | |
| ) | |
| except Exception as e: | |
| error_msg = f"Error downloading file: {str(e)}" | |
| print(f"❌ Unexpected download error: {error_msg}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=error_msg | |
| ) | |
| def classify_audio(file_path: str) -> dict: | |
| """Classify audio file using the AI model""" | |
| try: | |
| print(f"🎵 Loading audio file: {file_path}") | |
| # Check if file exists | |
| if not os.path.exists(file_path): | |
| raise ValueError(f"Audio file not found: {file_path}") | |
| # Check file size | |
| file_size = os.path.getsize(file_path) | |
| print(f"📏 Audio file size: {file_size} bytes") | |
| if file_size == 0: | |
| raise ValueError("Audio file is empty") | |
| # Load audio with soundfile | |
| print("🔊 Loading audio with soundfile...") | |
| audio, sr = sf.read(file_path) | |
| print(f"🎼 Audio loaded: {len(audio)} samples at {sr}Hz, duration: {len(audio)/sr:.2f}s") | |
| if len(audio) == 0: | |
| raise ValueError("Audio file contains no audio data") | |
| # Convert to mono if stereo | |
| if audio.ndim > 1: | |
| print("🔀 Converting stereo to mono...") | |
| audio = np.mean(audio, axis=1) | |
| # Resample to 16kHz if needed (model requirement) | |
| target_sr = 16000 | |
| if sr != target_sr: | |
| print(f"🔄 Resampling from {sr}Hz to {target_sr}Hz...") | |
| # Calculate the number of samples after resampling | |
| num_samples = int(len(audio) * target_sr / sr) | |
| audio = scipy.signal.resample(audio, num_samples) | |
| sr = target_sr | |
| print(f"✅ Resampled: {len(audio)} samples at {sr}Hz") | |
| # Convert to tensor and add batch dimension | |
| print("🧮 Converting to tensor...") | |
| audio_tensor = torch.FloatTensor(audio).unsqueeze(0) | |
| print(f"📊 Tensor shape: {audio_tensor.shape}") | |
| # Get prediction | |
| print("🤖 Running model inference...") | |
| with torch.no_grad(): | |
| output = model(audio_tensor) | |
| print(f"📈 Model output: {output}") | |
| # Convert logit to probability using sigmoid | |
| prob = torch.sigmoid(output).item() | |
| print(f"📊 Sigmoid probability: {prob}") | |
| # Classify: prob < 0.5 = Real, prob >= 0.5 = Fake | |
| if prob < 0.5: | |
| classification = "Real" | |
| confidence = (1 - prob) * 2 # Convert to 0-1 scale | |
| else: | |
| classification = "Fake" | |
| confidence = (prob - 0.5) * 2 # Convert to 0-1 scale | |
| result = { | |
| "classification": classification, | |
| "confidence": min(confidence, 1.0), # Cap at 1.0 | |
| "probability": prob, | |
| "raw_score": output.item(), | |
| "duration": len(audio) / sr | |
| } | |
| print(f"✅ Classification result: {result}") | |
| return result | |
| except Exception as e: | |
| error_msg = f"Error analyzing audio: {str(e)}" | |
| print(f"❌ Audio analysis error: {error_msg}") | |
| import traceback | |
| print(f"🔍 Traceback: {traceback.format_exc()}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=error_msg | |
| ) | |
| async def process_single_url(url: str) -> FileAnalysisResult: | |
| """Process a single URL and return result""" | |
| start_time = time.time() | |
| temp_file = None | |
| try: | |
| print(f"🚀 Processing URL: {url}") | |
| # Download audio file | |
| temp_file = download_audio(url) | |
| print(f"✅ Download completed: {temp_file}") | |
| # Classify audio | |
| result = classify_audio(temp_file) | |
| print(f"✅ Classification completed: {result}") | |
| # Calculate processing time | |
| processing_time = time.time() - start_time | |
| # Prepare response | |
| emoji = "🎤" if result["classification"] == "Real" else "🤖" | |
| message = f'{emoji} Detected as {result["classification"].lower()} music' | |
| return FileAnalysisResult( | |
| url=str(url), | |
| success=True, | |
| classification=result["classification"], | |
| confidence=result["confidence"], | |
| probability=result["probability"], | |
| raw_score=result["raw_score"], | |
| duration=result["duration"], | |
| message=message, | |
| processing_time=processing_time | |
| ) | |
| except Exception as e: | |
| processing_time = time.time() - start_time | |
| error_msg = str(e) | |
| print(f"❌ Processing failed for {url}: {error_msg}") | |
| import traceback | |
| print(f"🔍 Full traceback: {traceback.format_exc()}") | |
| return FileAnalysisResult( | |
| url=str(url), | |
| success=False, | |
| message=f"❌ Failed to process: {error_msg}", | |
| processing_time=processing_time, | |
| error=error_msg | |
| ) | |
| finally: | |
| # Cleanup file in background | |
| if temp_file: | |
| try: | |
| print(f"🧹 Cleaning up temporary file: {temp_file}") | |
| os.unlink(temp_file) | |
| except Exception as cleanup_error: | |
| print(f"⚠️ Failed to cleanup {temp_file}: {cleanup_error}") | |
| async def analyze_music( | |
| request: MusicAnalysisRequest = Depends(check_api_key_first) | |
| ): | |
| """ | |
| Analyze music from URL(s) to detect if it's AI-generated or human-created | |
| - **urls**: Array of direct URLs to audio files (MP3, WAV, FLAC, M4A, OGG) | |
| - Returns classification results for each file | |
| - Processes files concurrently for better performance when multiple URLs provided | |
| """ | |
| start_time = time.time() | |
| if not model: | |
| raise HTTPException( | |
| status_code=503, | |
| detail="Model not loaded. Please try again later." | |
| ) | |
| if len(request.urls) > 50: # Limit processing | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Too many URLs. Maximum 50 files per request." | |
| ) | |
| if len(request.urls) == 0: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="At least one URL is required." | |
| ) | |
| try: | |
| # Process all URLs concurrently with limited concurrency | |
| semaphore = asyncio.Semaphore(5) # Limit to 5 concurrent downloads | |
| async def process_with_semaphore(url): | |
| async with semaphore: | |
| return await process_single_url(str(url)) | |
| # Create tasks for all URLs | |
| tasks = [process_with_semaphore(url) for url in request.urls] | |
| # Wait for all tasks to complete | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Process results and handle any exceptions | |
| processed_results = [] | |
| successful_count = 0 | |
| failed_count = 0 | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| # Handle exception case | |
| processed_results.append(FileAnalysisResult( | |
| url=str(request.urls[i]), | |
| success=False, | |
| message=f"❌ Processing failed: {str(result)}", | |
| error=str(result) | |
| )) | |
| failed_count += 1 | |
| else: | |
| processed_results.append(result) | |
| if result.success: | |
| successful_count += 1 | |
| else: | |
| failed_count += 1 | |
| # Calculate total processing time | |
| total_processing_time = time.time() - start_time | |
| # Prepare summary message | |
| total_files = len(request.urls) | |
| if total_files == 1: | |
| # Single file message | |
| if successful_count == 1: | |
| message = processed_results[0].message | |
| else: | |
| message = processed_results[0].message | |
| else: | |
| # Multiple files message | |
| if successful_count == total_files: | |
| message = f"✅ Successfully analyzed all {total_files} files" | |
| elif successful_count > 0: | |
| message = f"⚠️ Analyzed {successful_count}/{total_files} files successfully" | |
| else: | |
| message = f"❌ Failed to analyze any files" | |
| return MusicAnalysisResponse( | |
| success=successful_count > 0, | |
| total_files=total_files, | |
| successful_analyses=successful_count, | |
| failed_analyses=failed_count, | |
| results=processed_results, | |
| total_processing_time=total_processing_time, | |
| message=message | |
| ) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Internal server error during processing: {str(e)}" | |
| ) | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "model_loaded": model is not None, | |
| "service": "Madverse Music API" | |
| } | |
| async def get_info(): | |
| """Get API information""" | |
| return { | |
| "name": "Madverse Music API", | |
| "version": "1.0.0", | |
| "description": "AI-powered music detection to identify AI-generated vs human-created music", | |
| "model": "SpecTTTra-α (120s)", | |
| "accuracy": { | |
| "f1_score": 0.97, | |
| "sensitivity": 0.96, | |
| "specificity": 0.99 | |
| }, | |
| "supported_formats": ["MP3", "WAV", "FLAC", "M4A", "OGG"], | |
| "max_file_size": "100MB", | |
| "max_duration": "120 seconds", | |
| "authentication": { | |
| "required": True, | |
| "type": "API Key", | |
| "header": "X-API-Key", | |
| "example": "X-API-Key: your-api-key-here" | |
| }, | |
| "usage": { | |
| "curl_example": "curl -X POST 'http://localhost:8000/analyze' -H 'X-API-Key: your-api-key' -H 'Content-Type: application/json' -d '{\"url\":\"https://example.com/song.mp3\"}'" | |
| } | |
| } | |
| def find_available_port(start_port: int = 8000, max_attempts: int = 10) -> int: | |
| """Find an available port starting from start_port""" | |
| import random | |
| import time | |
| # Add some randomization to avoid race conditions | |
| time.sleep(random.uniform(0.1, 0.5)) | |
| for port in range(start_port, start_port + max_attempts): | |
| try: | |
| # Try to bind to the port with proper error handling | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| s.bind(('0.0.0.0', port)) | |
| s.listen(1) | |
| print(f"✅ Port {port} is available") | |
| return port | |
| except OSError as e: | |
| print(f"❌ Port {port} is busy: {e}") | |
| continue | |
| # If no port found, raise an exception | |
| raise RuntimeError(f"No available port found in range {start_port}-{start_port + max_attempts - 1}") | |
| if __name__ == "__main__": | |
| try: | |
| # Check if we're in a Hugging Face environment | |
| is_hf_space = os.getenv('SPACE_ID') is not None | |
| hf_port = os.getenv('PORT') # HF Spaces sets this | |
| if is_hf_space and hf_port: | |
| # Use HF Spaces assigned port | |
| port = int(hf_port) | |
| print(f"🤗 Running in Hugging Face Spaces on port {port}") | |
| elif is_hf_space: | |
| print("🤗 Running in Hugging Face Spaces environment") | |
| # Use standard HF Spaces port | |
| port = 7860 | |
| else: | |
| # Find an available port for local development | |
| port = find_available_port(8000, 10) | |
| print(f"🚀 Starting server on port {port}") | |
| # For HF Spaces, don't use the retry logic as it might cause issues | |
| if is_hf_space: | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |
| else: | |
| # Add retry logic for local development | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |
| break # If successful, break out of retry loop | |
| except OSError as e: | |
| if "Address already in use" in str(e) and attempt < max_retries - 1: | |
| print(f"⚠️ Port {port} became busy, trying next port...") | |
| port = find_available_port(port + 1, 10) | |
| print(f"🔄 Retrying on port {port}") | |
| else: | |
| raise | |
| except RuntimeError as e: | |
| print(f"❌ {e}") | |
| print("💡 Suggestions:") | |
| print(" 1. Wait a moment and try again (another instance might be shutting down)") | |
| print(" 2. Manually specify a different port:") | |
| print(" uvicorn app:app --host 0.0.0.0 --port 8001") | |
| print(" 3. Check for running processes: ps aux | grep python") | |
| except KeyboardInterrupt: | |
| print("\n🛑 Server stopped by user") | |
| except Exception as e: | |
| print(f"❌ Failed to start server: {e}") | |
| import traceback | |
| traceback.print_exc() |