Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Music Generation Studio - HuggingFace Spaces Deployment | |
| Main application file for Gradio interface | |
| """ | |
| import os | |
| import sys | |
| import gradio as gr | |
| import logging | |
| from pathlib import Path | |
| import shutil | |
| import subprocess | |
| # Import spaces for ZeroGPU support | |
| try: | |
| import spaces | |
| HAS_SPACES = True | |
| except ImportError: | |
| HAS_SPACES = False | |
| # Create a dummy decorator for local development | |
| class spaces: | |
| def GPU(func): | |
| return func | |
| # Run DiffRhythm2 source setup if needed | |
| setup_script = Path(__file__).parent / "setup_diffrhythm2_src.sh" | |
| if setup_script.exists(): | |
| try: | |
| subprocess.run(["bash", str(setup_script)], check=True) | |
| except Exception as e: | |
| print(f"Warning: Failed to run setup script: {e}") | |
| # Configure environment for HuggingFace Spaces (espeak-ng paths, etc.) | |
| import hf_config | |
| # Setup paths for HuggingFace Spaces | |
| SPACE_DIR = Path(__file__).parent | |
| sys.path.insert(0, str(SPACE_DIR / 'backend')) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Import services | |
| try: | |
| from services.diffrhythm_service import DiffRhythmService | |
| from services.lyricmind_service import LyricMindService | |
| from services.timeline_service import TimelineService | |
| from services.export_service import ExportService | |
| from config.settings import Config | |
| from utils.prompt_analyzer import PromptAnalyzer | |
| except ImportError as e: | |
| logger.error(f"Import error: {e}") | |
| raise | |
| # Initialize configuration | |
| config = Config() | |
| # Create necessary directories | |
| os.makedirs("outputs", exist_ok=True) | |
| os.makedirs("outputs/music", exist_ok=True) | |
| os.makedirs("outputs/mixed", exist_ok=True) | |
| os.makedirs("models", exist_ok=True) | |
| os.makedirs("logs", exist_ok=True) | |
| # Initialize services - these persist at module level | |
| timeline_service = TimelineService() | |
| export_service = ExportService() | |
| # Lazy-load AI services (heavy models) | |
| diffrhythm_service = None | |
| lyricmind_service = None | |
| def get_diffrhythm_service(): | |
| """Lazy load DiffRhythm service""" | |
| global diffrhythm_service | |
| if diffrhythm_service is None: | |
| logger.info("Loading DiffRhythm2 model...") | |
| diffrhythm_service = DiffRhythmService(model_path=config.DIFFRHYTHM_MODEL_PATH) | |
| logger.info("DiffRhythm2 model loaded") | |
| return diffrhythm_service | |
| def get_lyricmind_service(): | |
| """Lazy load LyricMind service""" | |
| global lyricmind_service | |
| if lyricmind_service is None: | |
| logger.info("Loading LyricMind model...") | |
| lyricmind_service = LyricMindService(model_path=config.LYRICMIND_MODEL_PATH) | |
| logger.info("LyricMind model loaded") | |
| return lyricmind_service | |
| def generate_lyrics(prompt: str, progress=gr.Progress()): | |
| """Generate lyrics from prompt using analysis""" | |
| try: | |
| if not prompt or not prompt.strip(): | |
| return "β Please enter a prompt" | |
| # Fixed duration for all clips | |
| duration = 32 | |
| progress(0, desc="π Analyzing prompt...") | |
| logger.info(f"Generating lyrics for: {prompt}") | |
| # Analyze prompt | |
| analysis = PromptAnalyzer.analyze(prompt) | |
| genre = analysis.get('genres', ['general'])[0] if analysis.get('genres') else 'general' | |
| mood = analysis.get('mood', 'unknown') | |
| logger.info(f"Analysis - Genre: {genre}, Mood: {mood}") | |
| progress(0.3, desc=f"βοΈ Generating {genre} lyrics...") | |
| service = get_lyricmind_service() | |
| lyrics = service.generate( | |
| prompt=prompt, | |
| duration=duration, | |
| prompt_analysis=analysis | |
| ) | |
| progress(1.0, desc="β Lyrics generated!") | |
| return lyrics | |
| except Exception as e: | |
| logger.error(f"Error generating lyrics: {e}", exc_info=True) | |
| return f"β Error: {str(e)}" | |
| def generate_music(prompt: str, lyrics: str, lyrics_mode: str, position: str, context_length: int, timeline_state: dict, progress=gr.Progress()): | |
| """Generate music clip and add to timeline""" | |
| try: | |
| # Restore timeline from state | |
| if timeline_state and 'clips' in timeline_state: | |
| timeline_service.clips = [] | |
| for clip_data in timeline_state['clips']: | |
| from models.schemas import TimelineClip | |
| clip = TimelineClip(**clip_data) | |
| timeline_service.clips.append(clip) | |
| logger.info(f"[STATE] Restored {len(timeline_service.clips)} clips from state") | |
| if not prompt or not prompt.strip(): | |
| return "β Please enter a music prompt", get_timeline_display(), None, timeline_state | |
| # Fixed duration for all clips | |
| duration = 32 | |
| # Estimate time (CPU on HF Spaces) | |
| est_time = int(duration * 4) # Conservative estimate for CPU | |
| progress(0, desc=f"π Analyzing prompt... (Est. {est_time}s)") | |
| logger.info(f"Generating music: {prompt}, mode={lyrics_mode}, duration={duration}s") | |
| # Analyze prompt | |
| analysis = PromptAnalyzer.analyze(prompt) | |
| genre = analysis.get('genres', ['general'])[0] if analysis.get('genres') else 'general' | |
| bpm = analysis.get('bpm', 120) | |
| mood = analysis.get('mood', 'neutral') | |
| logger.info(f"Analysis - Genre: {genre}, BPM: {bpm}, Mood: {mood}") | |
| # Apply style consistency from previous clips within context window | |
| # Auto-disable context if this is the first clip | |
| clips = timeline_service.get_all_clips() | |
| effective_context_length = 0 if len(clips) == 0 else context_length | |
| if effective_context_length > 0 and clips: | |
| # Calculate which clips fall within the context window | |
| total_duration = timeline_service.get_total_duration() | |
| context_start = max(0, total_duration - effective_context_length) | |
| context_clips = [c for c in clips if c['start_time'] >= context_start] | |
| if context_clips: | |
| logger.info(f"Using {len(context_clips)} clips for style consistency (context: {effective_context_length}s)") | |
| # Enhance prompt with style consistency guidance | |
| prompt = f"{prompt} (maintaining consistent {genre} style at {bpm} BPM with {mood} mood)" | |
| else: | |
| logger.info("No clips in context window") | |
| else: | |
| if len(clips) == 0: | |
| logger.info("First clip - style consistency disabled") | |
| else: | |
| logger.info("Style consistency disabled (context length: 0)") | |
| # Determine lyrics based on mode | |
| lyrics_to_use = None | |
| if lyrics_mode == "Instrumental": | |
| logger.info("Generating instrumental (no vocals)") | |
| progress(0.1, desc=f"πΉ Preparing instrumental generation... ({est_time}s)") | |
| elif lyrics_mode == "User Lyrics": | |
| if not lyrics or not lyrics.strip(): | |
| return "β Please enter lyrics or switch mode", get_timeline_display(), None | |
| lyrics_to_use = lyrics.strip() | |
| logger.info(f"Using user-provided lyrics (length: {len(lyrics_to_use)} chars)") | |
| logger.info(f"First 100 chars: {lyrics_to_use[:100]}") | |
| progress(0.1, desc=f"π€ Preparing vocal generation... ({est_time}s)") | |
| elif lyrics_mode == "Auto Lyrics": | |
| if lyrics and lyrics.strip(): | |
| lyrics_to_use = lyrics.strip() | |
| logger.info("Using existing lyrics from textbox") | |
| progress(0.1, desc=f"π€ Using provided lyrics... ({est_time}s)") | |
| else: | |
| progress(0.1, desc="βοΈ Generating lyrics...") | |
| logger.info("Auto-generating lyrics...") | |
| lyric_service = get_lyricmind_service() | |
| lyrics_to_use = lyric_service.generate( | |
| prompt=prompt, | |
| duration=duration, | |
| prompt_analysis=analysis | |
| ) | |
| logger.info(f"Generated {len(lyrics_to_use)} characters of lyrics") | |
| progress(0.25, desc=f"π΅ Lyrics ready, generating music... ({est_time}s)") | |
| # Generate music | |
| progress(0.3, desc=f"πΌ Generating {genre} at {bpm} BPM... ({est_time}s)") | |
| service = get_diffrhythm_service() | |
| final_path = service.generate( | |
| prompt=prompt, | |
| duration=duration, | |
| lyrics=lyrics_to_use | |
| ) | |
| # Add to timeline | |
| progress(0.9, desc="π Adding to timeline...") | |
| clip_id = os.path.basename(final_path).split('.')[0] | |
| logger.info(f"[GENERATE] About to add clip: {clip_id}, position: {position}") | |
| logger.info(f"[GENERATE] Timeline service ID: {id(timeline_service)}") | |
| logger.info(f"[GENERATE] Clips before add: {len(timeline_service.clips)}") | |
| from models.schemas import ClipPosition | |
| clip_info = timeline_service.add_clip( | |
| clip_id=clip_id, | |
| file_path=final_path, | |
| duration=float(duration), | |
| position=ClipPosition(position) | |
| ) | |
| logger.info(f"Music added to timeline at position {clip_info['timeline_position']}") | |
| logger.info(f"[GENERATE] Clips after add: {len(timeline_service.clips)}") | |
| # Build status message | |
| progress(1.0, desc="β Complete!") | |
| status_msg = f"β Music generated successfully!\n" | |
| status_msg += f"πΈ Genre: {genre} | π₯ BPM: {bpm} | π Mood: {mood}\n" | |
| status_msg += f"π€ Mode: {lyrics_mode} | π Position: {position}\n" | |
| if lyrics_mode == "Auto Lyrics" and lyrics_to_use and not lyrics: | |
| status_msg += "βοΈ (Lyrics auto-generated)" | |
| # Save timeline to state | |
| new_state = { | |
| 'clips': [{ | |
| 'clip_id': c.clip_id, | |
| 'file_path': c.file_path, | |
| 'duration': c.duration, | |
| 'timeline_position': c.timeline_position, | |
| 'start_time': c.start_time, | |
| 'music_path': c.music_path | |
| } for c in timeline_service.clips] | |
| } | |
| logger.info(f"[STATE] Saved {len(new_state['clips'])} clips to state") | |
| return status_msg, get_timeline_display(), final_path, new_state | |
| except Exception as e: | |
| logger.error(f"Error generating music: {e}", exc_info=True) | |
| return f"β Error: {str(e)}", get_timeline_display(), None, timeline_state | |
| def get_timeline_display(): | |
| """Get timeline clips as HTML visualization with waveform-style display""" | |
| clips = timeline_service.get_all_clips() | |
| if not clips: | |
| return "<div style='text-align:center; padding:40px; color:#888;'>π Timeline is empty. Generate clips to get started!</div>" | |
| total_duration = timeline_service.get_total_duration() | |
| # Build HTML timeline | |
| html = f""" | |
| <div style="font-family: Arial, sans-serif; background: #1a1a1a; padding: 20px; border-radius: 8px; color: white;"> | |
| <div style="margin-bottom: 15px; font-size: 14px; color: #aaa;"> | |
| <strong>π Timeline:</strong> {len(clips)} clips | Total: {format_duration(total_duration)} | |
| </div> | |
| <div style="background: #2a2a2a; border-radius: 6px; padding: 15px; position: relative; min-height: 80px;"> | |
| <div style="position: absolute; top: 10px; left: 15px; right: 15px; height: 60px; background: #333; border-radius: 4px; overflow: hidden;"> | |
| """ | |
| # Calculate pixel width (scale to fit) | |
| if total_duration > 0: | |
| pixels_per_second = 800 / total_duration # 800px total width | |
| else: | |
| pixels_per_second = 10 | |
| # Add clip blocks | |
| colors = ['#8b5cf6', '#ec4899', '#06b6d4', '#10b981', '#f59e0b', '#ef4444'] | |
| for i, clip in enumerate(clips): | |
| start_px = clip['start_time'] * pixels_per_second | |
| width_px = clip['duration'] * pixels_per_second | |
| color = colors[i % len(colors)] | |
| # Create waveform-style bars | |
| bars = ''.join([ | |
| f'<div style="display:inline-block; width:2px; height:{20 + (i*7 % 30)}px; background:rgba(255,255,255,0.3); margin:0 1px; vertical-align:bottom;"></div>' | |
| for i in range(min(int(width_px / 4), 50)) | |
| ]) | |
| html += f""" | |
| <div style="position: absolute; left: {start_px}px; width: {width_px}px; height: 60px; | |
| background: linear-gradient(135deg, {color} 0%, {color}dd 100%); | |
| border-radius: 4px; border: 1px solid rgba(255,255,255,0.2); | |
| display: flex; align-items: center; justify-content: center; | |
| overflow: hidden; box-shadow: 0 2px 4px rgba(0,0,0,0.3);"> | |
| <div style="position: absolute; bottom: 5px; left: 0; right: 0; height: 40px; display: flex; align-items: flex-end; justify-content: space-evenly; padding: 0 5px;"> | |
| {bars} | |
| </div> | |
| <div style="position: relative; z-index: 1; font-size: 11px; font-weight: bold; | |
| text-shadow: 0 1px 2px rgba(0,0,0,0.5); text-align: center; padding: 0 5px;"> | |
| Clip {i+1}<br>{format_duration(clip['duration'])} | |
| </div> | |
| </div> | |
| """ | |
| html += """ | |
| </div> | |
| <div style="margin-top: 75px; font-size: 11px; color: #888;"> | |
| <div style="display: flex; justify-content: space-between;"> | |
| <span>0:00</span> | |
| <span>{}</span> | |
| </div> | |
| </div> | |
| </div> | |
| </div> | |
| """.format(format_duration(total_duration)) | |
| return html | |
| def remove_clip(clip_number: int, timeline_state: dict): | |
| """Remove a clip from timeline""" | |
| try: | |
| # Restore timeline from state | |
| if timeline_state and 'clips' in timeline_state: | |
| timeline_service.clips = [] | |
| for clip_data in timeline_state['clips']: | |
| from models.schemas import TimelineClip | |
| clip = TimelineClip(**clip_data) | |
| timeline_service.clips.append(clip) | |
| clips = timeline_service.get_all_clips() | |
| if not clips: | |
| return "π Timeline is empty", get_timeline_display(), timeline_state | |
| if clip_number < 1 or clip_number > len(clips): | |
| return f"β Invalid clip number. Choose 1-{len(clips)}", get_timeline_display(), timeline_state | |
| clip_id = clips[clip_number - 1]['clip_id'] | |
| timeline_service.remove_clip(clip_id) | |
| # Save updated state | |
| new_state = { | |
| 'clips': [{ | |
| 'clip_id': c.clip_id, | |
| 'file_path': c.file_path, | |
| 'duration': c.duration, | |
| 'timeline_position': c.timeline_position, | |
| 'start_time': c.start_time, | |
| 'music_path': c.music_path | |
| } for c in timeline_service.clips] | |
| } | |
| return f"β Clip {clip_number} removed", get_timeline_display(), new_state | |
| except Exception as e: | |
| logger.error(f"Error removing clip: {e}", exc_info=True) | |
| return f"β Error: {str(e)}", get_timeline_display(), timeline_state | |
| def clear_timeline(timeline_state: dict): | |
| """Clear all clips from timeline""" | |
| try: | |
| timeline_service.clear() | |
| new_state = {'clips': []} | |
| return "β Timeline cleared", get_timeline_display(), new_state | |
| except Exception as e: | |
| logger.error(f"Error clearing timeline: {e}", exc_info=True) | |
| return f"β Error: {str(e)}", get_timeline_display(), timeline_state | |
| def export_timeline(filename: str, export_format: str, timeline_state: dict, progress=gr.Progress()): | |
| """Export timeline to audio file""" | |
| try: | |
| # Restore timeline from state | |
| if timeline_state and 'clips' in timeline_state: | |
| timeline_service.clips = [] | |
| for clip_data in timeline_state['clips']: | |
| from models.schemas import TimelineClip | |
| clip = TimelineClip(**clip_data) | |
| timeline_service.clips.append(clip) | |
| logger.info(f"[STATE] Restored {len(timeline_service.clips)} clips for export") | |
| clips = timeline_service.get_all_clips() | |
| if not clips: | |
| return "β No clips to export", None, timeline_state | |
| if not filename or not filename.strip(): | |
| filename = "output" | |
| progress(0, desc="π Merging clips...") | |
| logger.info(f"Exporting timeline: {filename}.{export_format}") | |
| export_service.timeline_service = timeline_service | |
| progress(0.5, desc="πΎ Encoding audio...") | |
| output_path = export_service.merge_clips( | |
| filename=filename, | |
| export_format=export_format | |
| ) | |
| if output_path: | |
| progress(1.0, desc="β Export complete!") | |
| return f"β Exported: {os.path.basename(output_path)}", output_path, timeline_state | |
| else: | |
| return "β Export failed", None, timeline_state | |
| except Exception as e: | |
| logger.error(f"Error exporting: {e}", exc_info=True) | |
| return f"β Error: {str(e)}", None, timeline_state | |
| def get_timeline_playback(timeline_state: dict): | |
| """Get merged timeline audio for playback""" | |
| try: | |
| # Restore timeline from state | |
| if timeline_state and 'clips' in timeline_state: | |
| timeline_service.clips = [] | |
| for clip_data in timeline_state['clips']: | |
| from models.schemas import TimelineClip | |
| clip = TimelineClip(**clip_data) | |
| timeline_service.clips.append(clip) | |
| logger.info(f"[STATE] Restored {len(timeline_service.clips)} clips for playback") | |
| clips = timeline_service.get_all_clips() | |
| if not clips: | |
| return None | |
| # Use export service to merge clips | |
| export_service.timeline_service = timeline_service | |
| output_path = export_service.merge_clips( | |
| filename="timeline_preview", | |
| export_format="wav" | |
| ) | |
| logger.info(f"Timeline playback ready: {output_path}") | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Error creating playback: {e}", exc_info=True) | |
| return None | |
| def preview_mastering_preset(preset_name: str, timeline_state: dict): | |
| """Preview mastering preset on the most recent clip""" | |
| try: | |
| # Restore timeline from state | |
| if timeline_state and 'clips' in timeline_state: | |
| timeline_service.clips = [] | |
| for clip_data in timeline_state['clips']: | |
| from models.schemas import TimelineClip | |
| clip = TimelineClip(**clip_data) | |
| timeline_service.clips.append(clip) | |
| clips = timeline_service.get_all_clips() | |
| if not clips: | |
| return None, "β No clips in timeline to preview" | |
| # Use the most recent clip for preview | |
| latest_clip = clips[-1] | |
| clip_path = latest_clip['file_path'] | |
| if not os.path.exists(clip_path): | |
| return None, f"β Clip file not found: {clip_path}" | |
| # Extract preset name | |
| preset_key = preset_name.split(" - ")[0].lower().replace(" ", "_") | |
| # Create temporary preview file | |
| import tempfile | |
| preview_path = os.path.join(tempfile.gettempdir(), f"preview_{latest_clip['clip_id']}.wav") | |
| from services.mastering_service import MasteringService | |
| mastering_service = MasteringService() | |
| # Apply preset to preview file | |
| mastering_service.apply_preset( | |
| audio_path=clip_path, | |
| preset_name=preset_key, | |
| output_path=preview_path | |
| ) | |
| logger.info(f"Created mastering preview: {preview_path}") | |
| return preview_path, f"β Preview ready: {preset_name.split(' - ')[0]} applied to latest clip" | |
| except Exception as e: | |
| logger.error(f"Error creating preview: {e}", exc_info=True) | |
| return None, f"β Preview error: {str(e)}" | |
| def apply_mastering_preset(preset_name: str, timeline_state: dict): | |
| """Apply mastering preset to all clips in timeline""" | |
| try: | |
| logger.info(f"[STATE DEBUG] apply_mastering_preset called") | |
| logger.info(f"[STATE DEBUG] timeline_state type: {type(timeline_state)}") | |
| logger.info(f"[STATE DEBUG] timeline_state value: {timeline_state}") | |
| # Restore timeline from state | |
| if timeline_state and 'clips' in timeline_state: | |
| timeline_service.clips = [] | |
| for clip_data in timeline_state['clips']: | |
| from models.schemas import TimelineClip | |
| clip = TimelineClip(**clip_data) | |
| timeline_service.clips.append(clip) | |
| logger.info(f"[STATE] Restored {len(timeline_service.clips)} clips for mastering") | |
| else: | |
| logger.warning(f"[STATE DEBUG] State restoration failed - timeline_state is None or missing 'clips' key") | |
| clips = timeline_service.get_all_clips() | |
| logger.info(f"[MASTERING DEBUG] Retrieved {len(clips)} clips from timeline") | |
| if not clips: | |
| logger.warning("[MASTERING DEBUG] No clips found in timeline") | |
| return "β No clips in timeline", timeline_state | |
| # Log clip details for debugging | |
| for i, clip in enumerate(clips): | |
| logger.info(f"[MASTERING DEBUG] Clip {i+1}: {clip}") | |
| # Extract preset name from dropdown value | |
| preset_key = preset_name.split(" - ")[0].lower().replace(" ", "_") | |
| logger.info(f"Applying preset '{preset_key}' to {len(clips)} clip(s)") | |
| # Import mastering service | |
| from services.mastering_service import MasteringService | |
| mastering_service = MasteringService() | |
| # Apply preset to all clips | |
| for clip in clips: | |
| clip_path = clip['file_path'] | |
| if not os.path.exists(clip_path): | |
| logger.warning(f"Audio file not found: {clip_path}") | |
| continue | |
| # Apply preset | |
| mastering_service.apply_preset( | |
| audio_path=clip_path, | |
| preset_name=preset_key, | |
| output_path=clip_path # Overwrite original | |
| ) | |
| logger.info(f"Applied preset to: {clip['clip_id']}") | |
| return f"β Applied '{preset_name.split(' - ')[0]}' to {len(clips)} clip(s)", timeline_state | |
| except Exception as e: | |
| logger.error(f"Error applying preset: {e}", exc_info=True) | |
| return f"β Error: {str(e)}", timeline_state | |
| def preview_custom_eq(low_shelf, low_mid, mid, high_mid, high_shelf, timeline_state: dict): | |
| """Preview custom EQ on the most recent clip""" | |
| try: | |
| # Restore timeline from state | |
| if timeline_state and 'clips' in timeline_state: | |
| timeline_service.clips = [] | |
| for clip_data in timeline_state['clips']: | |
| from models.schemas import TimelineClip | |
| clip = TimelineClip(**clip_data) | |
| timeline_service.clips.append(clip) | |
| clips = timeline_service.get_all_clips() | |
| if not clips: | |
| return None, "β No clips in timeline to preview" | |
| # Use the most recent clip for preview | |
| latest_clip = clips[-1] | |
| clip_path = latest_clip['file_path'] | |
| if not os.path.exists(clip_path): | |
| return None, f"β Clip file not found: {clip_path}" | |
| # Create temporary preview file | |
| import tempfile | |
| preview_path = os.path.join(tempfile.gettempdir(), f"eq_preview_{latest_clip['clip_id']}.wav") | |
| from services.mastering_service import MasteringService | |
| mastering_service = MasteringService() | |
| # Format EQ bands | |
| eq_bands = [ | |
| {'type': 'lowshelf', 'frequency': 100, 'gain': low_shelf, 'q': 0.7}, | |
| {'type': 'peak', 'frequency': 500, 'gain': low_mid, 'q': 1.0}, | |
| {'type': 'peak', 'frequency': 2000, 'gain': mid, 'q': 1.0}, | |
| {'type': 'peak', 'frequency': 5000, 'gain': high_mid, 'q': 1.0}, | |
| {'type': 'highshelf', 'frequency': 10000, 'gain': high_shelf, 'q': 0.7} | |
| ] | |
| # Apply EQ to preview file | |
| mastering_service.apply_custom_eq( | |
| audio_path=clip_path, | |
| eq_bands=eq_bands, | |
| output_path=preview_path | |
| ) | |
| logger.info(f"Created EQ preview: {preview_path}") | |
| return preview_path, f"β Preview ready: Custom EQ applied to latest clip" | |
| except Exception as e: | |
| logger.error(f"Error creating EQ preview: {e}", exc_info=True) | |
| return None, f"β Preview error: {str(e)}" | |
| def apply_custom_eq(low_shelf, low_mid, mid, high_mid, high_shelf, timeline_state: dict): | |
| """Apply custom EQ to all clips in timeline""" | |
| try: | |
| logger.info(f"[STATE DEBUG] apply_custom_eq called") | |
| logger.info(f"[STATE DEBUG] timeline_state type: {type(timeline_state)}") | |
| logger.info(f"[STATE DEBUG] timeline_state value: {timeline_state}") | |
| # Restore timeline from state | |
| if timeline_state and 'clips' in timeline_state: | |
| timeline_service.clips = [] | |
| for clip_data in timeline_state['clips']: | |
| from models.schemas import TimelineClip | |
| clip = TimelineClip(**clip_data) | |
| timeline_service.clips.append(clip) | |
| logger.info(f"[STATE] Restored {len(timeline_service.clips)} clips for EQ") | |
| else: | |
| logger.warning(f"[STATE DEBUG] State restoration failed - timeline_state is None or missing 'clips' key") | |
| clips = timeline_service.get_all_clips() | |
| logger.info(f"[EQ DEBUG] Retrieved {len(clips)} clips from timeline") | |
| if not clips: | |
| logger.warning("[EQ DEBUG] No clips found in timeline") | |
| return "β No clips in timeline", timeline_state | |
| # Log clip details for debugging | |
| for i, clip in enumerate(clips): | |
| logger.info(f"[EQ DEBUG] Clip {i+1}: {clip}") | |
| logger.info(f"Applying custom EQ to {len(clips)} clip(s)") | |
| # Import mastering service | |
| from services.mastering_service import MasteringService | |
| mastering_service = MasteringService() | |
| # Apply custom EQ - format eq_bands as expected by the service | |
| eq_bands = [ | |
| {'type': 'lowshelf', 'frequency': 100, 'gain': low_shelf, 'q': 0.7}, | |
| {'type': 'peak', 'frequency': 500, 'gain': low_mid, 'q': 1.0}, | |
| {'type': 'peak', 'frequency': 2000, 'gain': mid, 'q': 1.0}, | |
| {'type': 'peak', 'frequency': 5000, 'gain': high_mid, 'q': 1.0}, | |
| {'type': 'highshelf', 'frequency': 10000, 'gain': high_shelf, 'q': 0.7} | |
| ] | |
| # Apply to all clips | |
| for clip in clips: | |
| clip_path = clip['file_path'] | |
| if not os.path.exists(clip_path): | |
| logger.warning(f"Audio file not found: {clip_path}") | |
| continue | |
| mastering_service.apply_custom_eq( | |
| audio_path=clip_path, | |
| eq_bands=eq_bands, | |
| output_path=clip_path # Overwrite original | |
| ) | |
| logger.info(f"Applied EQ to: {clip['clip_id']}") | |
| return f"β Applied custom EQ to {len(clips)} clip(s)", timeline_state | |
| except Exception as e: | |
| logger.error(f"Error applying EQ: {e}", exc_info=True) | |
| return f"β Error: {str(e)}", timeline_state | |
| def format_duration(seconds: float) -> str: | |
| """Format duration as MM:SS""" | |
| mins = int(seconds // 60) | |
| secs = int(seconds % 60) | |
| return f"{mins}:{secs:02d}" | |
| # Create Gradio interface | |
| with gr.Blocks( | |
| title="π΅ Music Generation Studio", | |
| theme=gr.themes.Soft(primary_hue="purple", secondary_hue="pink") | |
| ) as app: | |
| gr.Markdown( | |
| """ | |
| # π΅ Music Generation Studio | |
| Create AI-powered music with DiffRhythm2 and LyricMind AI | |
| π‘ **Tip**: Start with 10-20 second clips for faster generation with ZeroGPU | |
| """ | |
| ) | |
| # Timeline state - persists across GPU context switches | |
| timeline_state = gr.State(value={'clips': []}) | |
| # Generation Section | |
| gr.Markdown("### πΌ Music Generation") | |
| prompt_input = gr.Textbox( | |
| label="π― Music Prompt", | |
| placeholder="energetic rock song with electric guitar at 140 BPM", | |
| lines=3, | |
| info="Describe the music style, instruments, tempo, and mood" | |
| ) | |
| lyrics_mode = gr.Radio( | |
| choices=["Instrumental", "User Lyrics", "Auto Lyrics"], | |
| value="Instrumental", | |
| label="π€ Vocal Mode", | |
| info="Instrumental: no vocals | User: provide lyrics | Auto: AI-generated" | |
| ) | |
| with gr.Row(): | |
| auto_gen_btn = gr.Button("βοΈ Generate Lyrics", size="sm") | |
| lyrics_input = gr.Textbox( | |
| label="π Lyrics", | |
| placeholder="Enter lyrics or click 'Generate Lyrics'...", | |
| lines=6 | |
| ) | |
| with gr.Row(): | |
| context_length_input = gr.Slider( | |
| minimum=0, | |
| maximum=240, | |
| value=0, | |
| step=10, | |
| label="π¨ Style Context (seconds)", | |
| info="How far back to analyze for style consistency (0 = disabled, auto-disabled for first clip)", | |
| interactive=True | |
| ) | |
| position_input = gr.Radio( | |
| choices=["intro", "previous", "next", "outro"], | |
| value="next", | |
| label="π Position", | |
| info="Where to add clip on timeline" | |
| ) | |
| gr.Markdown("*All clips are generated at 32 seconds*") | |
| with gr.Row(): | |
| generate_btn = gr.Button( | |
| "β¨ Generate Music Clip", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| gen_status = gr.Textbox(label="π Status", lines=2, interactive=False) | |
| audio_output = gr.Audio( | |
| label="π§ Preview", | |
| type="filepath", | |
| waveform_options=gr.WaveformOptions( | |
| waveform_color="#9333ea", | |
| waveform_progress_color="#c084fc" | |
| ) | |
| ) | |
| # Timeline Section | |
| gr.Markdown("---") | |
| gr.Markdown("### π Timeline") | |
| timeline_display = gr.HTML( | |
| value=get_timeline_display() | |
| ) | |
| # Playback controls | |
| timeline_playback = gr.Audio( | |
| label="π΅ Timeline Playback", | |
| type="filepath", | |
| interactive=False, | |
| autoplay=False, | |
| waveform_options=gr.WaveformOptions( | |
| waveform_color="#06b6d4", | |
| waveform_progress_color="#22d3ee", | |
| show_controls=True | |
| ) | |
| ) | |
| with gr.Row(): | |
| play_timeline_btn = gr.Button("βΆοΈ Load Timeline for Playback", variant="secondary", scale=2) | |
| clip_number_input = gr.Number( | |
| label="Clip #", | |
| precision=0, | |
| minimum=1, | |
| scale=1 | |
| ) | |
| remove_btn = gr.Button("ποΈ Remove Clip", size="sm", scale=1) | |
| clear_btn = gr.Button("ποΈ Clear All", variant="stop", scale=1) | |
| timeline_status = gr.Textbox(label="Timeline Status", lines=1, interactive=False) | |
| # Advanced Controls | |
| with gr.Accordion("βοΈ Advanced Audio Mastering", open=False): | |
| gr.Markdown("### Professional Mastering & EQ") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("**Mastering Presets**") | |
| preset_select = gr.Dropdown( | |
| choices=[ | |
| "Clean Master - Transparent mastering", | |
| "Subtle Warmth - Gentle low-end enhancement", | |
| "Modern Pop - Radio-ready pop sound", | |
| "Radio Ready - Maximum loudness", | |
| "Punchy Commercial - Aggressive punch", | |
| "Rock Master - Guitar-focused mastering", | |
| "Metal Aggressive - Heavy metal mastering", | |
| "Indie Rock - Lo-fi indie character", | |
| "EDM Club - Electronic dance music", | |
| "House Groovy - House music vibe", | |
| "Techno Dark - Dark techno atmosphere", | |
| "Dubstep Heavy - Heavy bass dubstep", | |
| "HipHop Modern - Modern hip-hop mix", | |
| "Trap 808 - Trap with 808 bass", | |
| "RnB Smooth - Smooth R&B sound", | |
| "Acoustic Natural - Natural acoustic tone", | |
| "Folk Warm - Warm folk sound", | |
| "Jazz Vintage - Vintage jazz character", | |
| "Orchestral Wide - Wide orchestral space", | |
| "Classical Concert - Concert hall sound", | |
| "Ambient Spacious - Spacious atmospheric" | |
| ], | |
| value="Clean Master - Transparent mastering", | |
| label="Select Preset" | |
| ) | |
| preset_description = gr.Textbox( | |
| label="Description", | |
| value="Transparent mastering with gentle compression", | |
| lines=2, | |
| interactive=False | |
| ) | |
| with gr.Row(): | |
| preview_preset_btn = gr.Button("π Preview Preset", variant="secondary") | |
| apply_preset_btn = gr.Button("β¨ Apply to Timeline", variant="primary") | |
| preset_preview_audio = gr.Audio( | |
| label="π΅ Preset Preview (Latest Clip)", | |
| type="filepath", | |
| interactive=False, | |
| waveform_options=gr.WaveformOptions( | |
| waveform_color="#9333ea", | |
| waveform_progress_color="#c084fc" | |
| ) | |
| ) | |
| preset_status = gr.Textbox(label="Status", lines=1, interactive=False) | |
| with gr.Column(scale=1): | |
| gr.Markdown("**Custom EQ**") | |
| gr.Markdown("*5-band parametric EQ. Adjust gain for each frequency band (-12 to +12 dB).*") | |
| # DAW-style vertical sliders in columns | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("<center>**Low**<br>100 Hz</center>") | |
| low_shelf_gain = gr.Slider( | |
| -12, 12, 0, step=0.5, | |
| label="Low (100 Hz)" | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("<center>**Low-Mid**<br>500 Hz</center>") | |
| low_mid_gain = gr.Slider( | |
| -12, 12, 0, step=0.5, | |
| label="Low-Mid (500 Hz)" | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("<center>**Mid**<br>2000 Hz</center>") | |
| mid_gain = gr.Slider( | |
| -12, 12, 0, step=0.5, | |
| label="Mid (2000 Hz)" | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("<center>**High-Mid**<br>5000 Hz</center>") | |
| high_mid_gain = gr.Slider( | |
| -12, 12, 0, step=0.5, | |
| label="High-Mid (5000 Hz)" | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("<center>**High**<br>10k Hz</center>") | |
| high_shelf_gain = gr.Slider( | |
| -12, 12, 0, step=0.5, | |
| label="High (10k Hz)" | |
| ) | |
| with gr.Row(): | |
| preview_eq_btn = gr.Button("π Preview EQ", variant="secondary") | |
| apply_custom_eq_btn = gr.Button("πΉ Apply to Timeline", variant="primary") | |
| eq_preview_audio = gr.Audio( | |
| label="π΅ EQ Preview (Latest Clip)", | |
| type="filepath", | |
| interactive=False, | |
| waveform_options=gr.WaveformOptions( | |
| waveform_color="#ec4899", | |
| waveform_progress_color="#f9a8d4" | |
| ) | |
| ) | |
| eq_status = gr.Textbox(label="Status", lines=1, interactive=False) | |
| # Export Section | |
| gr.Markdown("---") | |
| gr.Markdown("### πΎ Export") | |
| with gr.Row(): | |
| export_filename = gr.Textbox( | |
| label="Filename", | |
| value="my_song", | |
| scale=2 | |
| ) | |
| export_format = gr.Dropdown( | |
| choices=["wav", "mp3"], | |
| value="wav", | |
| label="Format", | |
| scale=1 | |
| ) | |
| export_btn = gr.Button("πΎ Export", variant="primary", scale=1) | |
| export_status = gr.Textbox(label="Status", lines=1, interactive=False) | |
| export_audio = gr.Audio( | |
| label="π₯ Download", | |
| type="filepath", | |
| waveform_options=gr.WaveformOptions( | |
| waveform_color="#10b981", | |
| waveform_progress_color="#34d399" | |
| ) | |
| ) | |
| # Event handlers | |
| auto_gen_btn.click( | |
| fn=generate_lyrics, | |
| inputs=[prompt_input], | |
| outputs=lyrics_input | |
| ) | |
| generate_btn.click( | |
| fn=generate_music, | |
| inputs=[prompt_input, lyrics_input, lyrics_mode, position_input, context_length_input, timeline_state], | |
| outputs=[gen_status, timeline_display, audio_output, timeline_state] | |
| ) | |
| remove_btn.click( | |
| fn=remove_clip, | |
| inputs=[clip_number_input, timeline_state], | |
| outputs=[timeline_status, timeline_display, timeline_state] | |
| ) | |
| clear_btn.click( | |
| fn=clear_timeline, | |
| inputs=[timeline_state], | |
| outputs=[timeline_status, timeline_display, timeline_state] | |
| ) | |
| play_timeline_btn.click( | |
| fn=get_timeline_playback, | |
| inputs=[timeline_state], | |
| outputs=[timeline_playback] | |
| ) | |
| export_btn.click( | |
| fn=export_timeline, | |
| inputs=[export_filename, export_format, timeline_state], | |
| outputs=[export_status, export_audio, timeline_state] | |
| ) | |
| # Mastering event handlers | |
| preview_preset_btn.click( | |
| fn=preview_mastering_preset, | |
| inputs=[preset_select, timeline_state], | |
| outputs=[preset_preview_audio, preset_status] | |
| ) | |
| apply_preset_btn.click( | |
| fn=apply_mastering_preset, | |
| inputs=[preset_select, timeline_state], | |
| outputs=[preset_status, timeline_state] | |
| ).then( | |
| fn=get_timeline_playback, | |
| inputs=[timeline_state], | |
| outputs=[timeline_playback] | |
| ) | |
| preview_eq_btn.click( | |
| fn=preview_custom_eq, | |
| inputs=[low_shelf_gain, low_mid_gain, mid_gain, high_mid_gain, high_shelf_gain, timeline_state], | |
| outputs=[eq_preview_audio, eq_status] | |
| ) | |
| apply_custom_eq_btn.click( | |
| fn=apply_custom_eq, | |
| inputs=[low_shelf_gain, low_mid_gain, mid_gain, high_mid_gain, high_shelf_gain, timeline_state], | |
| outputs=[eq_status, timeline_state] | |
| ).then( | |
| fn=get_timeline_playback, | |
| inputs=[timeline_state], | |
| outputs=[timeline_playback] | |
| ) | |
| # Help section | |
| with gr.Accordion("βΉοΈ Help & Tips", open=False): | |
| gr.Markdown( | |
| """ | |
| ## π Quick Start | |
| 1. **Enter a prompt**: "upbeat pop song with synth at 128 BPM" | |
| 2. **Choose mode**: Instrumental (fastest) or with vocals | |
| 3. **Set duration**: Start with 10-20s for quick results | |
| 4. **Generate**: Click the button and wait ~2-4 minutes | |
| 5. **Export**: Download your complete song | |
| ## β‘ Performance Tips | |
| - **Shorter clips = faster**: 10-20s clips generate in ~1-2 minutes | |
| - **Instrumental mode**: ~30% faster than with vocals | |
| - **HF Spaces uses CPU**: Expect 2-4 minutes per 30s clip | |
| - **Build incrementally**: Generate short clips, then combine | |
| ## π― Prompt Tips | |
| - **Be specific**: "energetic rock with distorted guitar" > "rock song" | |
| - **Include BPM**: "at 140 BPM" helps set tempo | |
| - **Mention instruments**: "with piano and drums" | |
| - **Describe mood**: "melancholic", "upbeat", "aggressive" | |
| ## π€ Vocal Modes | |
| - **Instrumental**: Pure music, no vocals (fastest) | |
| - **User Lyrics**: Provide your own lyrics | |
| - **Auto Lyrics**: AI generates lyrics based on prompt | |
| ## π Timeline | |
| - Clips are arranged sequentially | |
| - Remove or clear clips as needed | |
| - Export combines all clips into one file | |
| --- | |
| β±οΈ **Average Generation Time**: 2-4 minutes per 30-second clip on CPU | |
| π΅ **Models**: DiffRhythm2 + MuQ-MuLan + LyricMind AI | |
| """ | |
| ) | |
| # Configure and launch | |
| if __name__ == "__main__": | |
| logger.info("π΅ Starting Music Generation Studio on HuggingFace Spaces...") | |
| app.queue( | |
| default_concurrency_limit=1, | |
| max_size=5 | |
| ) | |
| app.launch() | |