""" Stereo Matching Methods Comparison Demo (Hugging Face Spaces with ZeroGPU) This demo compares different stereo matching algorithms using Gradio's ImageSlider. Optimized for Hugging Face Spaces with ZeroGPU support. Currently supports: - FoundationStereo (Low-cost and High-quality variants) - CREStereo (ETH3D pre-trained model) """ import os import sys import logging import gc import tempfile from pathlib import Path from typing import Optional, Tuple, Union, Dict, List import numpy as np import cv2 import gradio as gr import imageio import argparse import random # Import spaces BEFORE torch to ensure proper ZeroGPU initialization import spaces import torch import torch.nn.functional as F # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Get current directory current_dir = os.path.dirname(os.path.abspath(__file__)) # Add subdemo directories to path foundation_stereo_dir = os.path.join(current_dir, "FoundationStereo_demo") crestereo_dir = os.path.join(current_dir, "CREStereo_demo") sys.path.insert(0, foundation_stereo_dir) sys.path.insert(0, crestereo_dir) # Global variables for model caching _cached_models = {} _available_methods = {} class StereoMethodBase: """Base class for stereo matching methods""" def __init__(self, name: str, display_name: str): self.name = name self.display_name = display_name self._model = None self._device = None def load_model(self): """Load the model for this method""" raise NotImplementedError def process_stereo_pair(self, left_img: np.ndarray, right_img: np.ndarray, progress_callback=None) -> Tuple[np.ndarray, str]: """Process stereo pair and return disparity visualization and status""" raise NotImplementedError def cleanup(self): """Clean up model and free memory""" if self._model is not None: del self._model self._model = None self._device = None torch.cuda.empty_cache() gc.collect() class FoundationStereoMethod(StereoMethodBase): """FoundationStereo implementation""" def __init__(self, variant: str = "11-33-40"): display_name = f"FoundationStereo ({variant})" super().__init__(f"foundation_stereo_{variant}", display_name) self.variant = variant def load_model(self): """Load FoundationStereo model""" try: # Import FoundationStereo modules from FoundationStereo_demo.app_local import get_cached_model, get_available_models # Get available models available_models = get_available_models() # Find the appropriate model selection model_selection = None for model_name in available_models.keys(): if self.variant in model_name: model_selection = model_name break if model_selection is None: # Fallback to first available model model_selection = list(available_models.keys())[0] if available_models else None if model_selection is None: raise ValueError("No FoundationStereo models available") self._model, self._device = get_cached_model(model_selection) logging.info(f"✅ FoundationStereo {self.variant} loaded successfully") return True except Exception as e: logging.error(f"Failed to load FoundationStereo {self.variant}: {e}") return False def process_stereo_pair(self, left_img: np.ndarray, right_img: np.ndarray, progress_callback=None) -> Tuple[np.ndarray, str]: """Process stereo pair using FoundationStereo""" try: from FoundationStereo_demo.app_local import process_stereo_pair # Save images temporarily with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as left_tmp: cv2.imwrite(left_tmp.name, cv2.cvtColor(left_img, cv2.COLOR_RGB2BGR)) left_path = left_tmp.name with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as right_tmp: cv2.imwrite(right_tmp.name, cv2.cvtColor(right_img, cv2.COLOR_RGB2BGR)) right_path = right_tmp.name try: # Find the model selection from FoundationStereo_demo.app_local import get_available_models available_models = get_available_models() model_selection = None for model_name in available_models.keys(): if self.variant in model_name: model_selection = model_name break if model_selection is None: model_selection = list(available_models.keys())[0] # Process the stereo pair result_img, status = process_stereo_pair(model_selection, left_path, right_path) if result_img is not None: return result_img, f"✅ {self.display_name}: {status}" else: return None, f"❌ {self.display_name}: Processing failed" finally: # Clean up temporary files if os.path.exists(left_path): os.unlink(left_path) if os.path.exists(right_path): os.unlink(right_path) except Exception as e: logging.error(f"FoundationStereo processing failed: {e}") return None, f"❌ {self.display_name}: {str(e)}" class CREStereoMethod(StereoMethodBase): """CREStereo implementation""" def __init__(self): super().__init__("crestereo", "CREStereo (ETH3D)") def load_model(self): """Load CREStereo model""" try: from CREStereo_demo.app_local import get_cached_model, get_available_models # Get available models available_models = get_available_models() if not available_models: raise ValueError("No CREStereo models available") # Use the first available model model_selection = list(available_models.keys())[0] self._model, self._device = get_cached_model(model_selection) logging.info("✅ CREStereo loaded successfully") return True except Exception as e: logging.error(f"Failed to load CREStereo: {e}") return False def process_stereo_pair(self, left_img: np.ndarray, right_img: np.ndarray, progress_callback=None) -> Tuple[np.ndarray, str]: """Process stereo pair using CREStereo""" try: from CREStereo_demo.app_local import process_stereo_pair # Save images temporarily with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as left_tmp: cv2.imwrite(left_tmp.name, cv2.cvtColor(left_img, cv2.COLOR_RGB2BGR)) left_path = left_tmp.name with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as right_tmp: cv2.imwrite(right_tmp.name, cv2.cvtColor(right_img, cv2.COLOR_RGB2BGR)) right_path = right_tmp.name try: # Find the model selection from CREStereo_demo.app_local import get_available_models available_models = get_available_models() model_selection = list(available_models.keys())[0] # Process the stereo pair result_img, status = process_stereo_pair(model_selection, left_path, right_path) if result_img is not None: return result_img, f"✅ {self.display_name}: {status}" else: return None, f"❌ {self.display_name}: Processing failed" finally: # Clean up temporary files if os.path.exists(left_path): os.unlink(left_path) if os.path.exists(right_path): os.unlink(right_path) except Exception as e: logging.error(f"CREStereo processing failed: {e}") return None, f"❌ {self.display_name}: {str(e)}" def initialize_methods() -> Dict[str, StereoMethodBase]: """Initialize available stereo matching methods""" methods = {} # Initialize FoundationStereo variants for variant in ["11-33-40", "23-51-11"]: method = FoundationStereoMethod(variant) methods[method.name] = method # Initialize CREStereo crestereo_method = CREStereoMethod() methods[crestereo_method.name] = crestereo_method return methods def load_example_images() -> List[Tuple[str, str, str]]: """Load example stereo pairs""" examples = [] assets_dir = os.path.join(current_dir, "assets") if os.path.exists(assets_dir): for example_dir in os.listdir(assets_dir): example_path = os.path.join(assets_dir, example_dir) if os.path.isdir(example_path): left_path = os.path.join(example_path, "left.png") right_path = os.path.join(example_path, "right.png") if os.path.exists(left_path) and os.path.exists(right_path): examples.append((left_path, right_path, example_dir)) return examples @spaces.GPU(duration=120) # 2 minutes for comparison processing def compare_methods(left_image: np.ndarray, right_image: np.ndarray, method1_name: str, method2_name: str, progress: gr.Progress = gr.Progress()) -> Tuple[Optional[np.ndarray], str]: """Compare two stereo matching methods""" if left_image is None or right_image is None: return None, "❌ Please upload both left and right images." if method1_name == method2_name: return None, "❌ Please select two different methods for comparison." # Get methods methods = initialize_methods() method1 = methods.get(method1_name) method2 = methods.get(method2_name) if method1 is None or method2 is None: return None, "❌ Selected methods not available." progress(0.1, desc=f"Loading {method1.display_name}...") # Load method 1 if not method1.load_model(): return None, f"❌ Failed to load {method1.display_name}" progress(0.2, desc=f"Processing with {method1.display_name}...") # Process with method 1 result1, status1 = method1.process_stereo_pair(left_image, right_image) progress(0.5, desc=f"Loading {method2.display_name}...") # Load method 2 if not method2.load_model(): method1.cleanup() return None, f"❌ Failed to load {method2.display_name}" progress(0.7, desc=f"Processing with {method2.display_name}...") # Process with method 2 result2, status2 = method2.process_stereo_pair(left_image, right_image) progress(0.9, desc="Creating comparison...") if result1 is None or result2 is None: method1.cleanup() method2.cleanup() return None, "❌ One or both methods failed to process the images." # Create side-by-side comparison comparison_img = create_comparison_image(result1, result2, method1.display_name, method2.display_name) # Clean up method1.cleanup() method2.cleanup() progress(1.0, desc="Complete!") status = f"""🔍 **Comparison Results** **{method1.display_name}:** {status1} **{method2.display_name}:** {status2} 💡 **Tip:** Use the slider in the comparison image to switch between results.""" return comparison_img, status def create_comparison_image(img1: np.ndarray, img2: np.ndarray, label1: str, label2: str) -> np.ndarray: """Create a side-by-side comparison image with labels""" h, w = img1.shape[:2] # Create comparison canvas comparison = np.zeros((h + 60, w * 2 + 20, 3), dtype=np.uint8) comparison.fill(255) # White background # Place images comparison[50:50+h, 10:10+w] = img1 comparison[50:50+h, w+20:w*2+20] = img2 # Add labels font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.8 font_thickness = 2 # Method 1 label text_size1 = cv2.getTextSize(label1, font, font_scale, font_thickness)[0] text_x1 = 10 + (w - text_size1[0]) // 2 cv2.putText(comparison, label1, (text_x1, 30), font, font_scale, (0, 0, 0), font_thickness) # Method 2 label text_size2 = cv2.getTextSize(label2, font, font_scale, font_thickness)[0] text_x2 = w + 20 + (w - text_size2[0]) // 2 cv2.putText(comparison, label2, (text_x2, 30), font, font_scale, (0, 0, 0), font_thickness) return comparison @spaces.GPU(duration=90) # 1.5 minutes for single method processing def single_method_inference(left_image: np.ndarray, right_image: np.ndarray, method_name: str, progress: gr.Progress = gr.Progress()) -> Tuple[Optional[np.ndarray], str]: """Run inference with a single method""" if left_image is None or right_image is None: return None, "❌ Please upload both left and right images." methods = initialize_methods() method = methods.get(method_name) if method is None: return None, "❌ Selected method not available." progress(0.2, desc=f"Loading {method.display_name}...") if not method.load_model(): return None, f"❌ Failed to load {method.display_name}" progress(0.5, desc=f"Processing with {method.display_name}...") result, status = method.process_stereo_pair(left_image, right_image) method.cleanup() progress(1.0, desc="Complete!") return result, status @spaces.GPU(duration=120) # 2 minutes for slider comparison def create_slider_comparison(left_img, right_img, method1, method2, progress=gr.Progress()): """Create comparison for image slider""" if left_img is None or right_img is None: return None, "❌ Please upload both images." if method1 == method2: return None, "❌ Please select different methods." methods = initialize_methods() m1 = methods.get(method1) m2 = methods.get(method2) if m1 is None or m2 is None: return None, "❌ Methods not available." progress(0.1, desc=f"Processing with {m1.display_name}...") # Process with method 1 if not m1.load_model(): return None, f"❌ Failed to load {m1.display_name}" result1, status1 = m1.process_stereo_pair(left_img, right_img) progress(0.5, desc=f"Processing with {m2.display_name}...") # Process with method 2 if not m2.load_model(): m1.cleanup() return None, f"❌ Failed to load {m2.display_name}" result2, status2 = m2.process_stereo_pair(left_img, right_img) # Clean up m1.cleanup() m2.cleanup() progress(1.0, desc="Complete!") if result1 is None or result2 is None: return None, "❌ Processing failed." # Add method names to the top of the images for slider comparison def add_method_label(img: np.ndarray, method_name: str) -> np.ndarray: """Add method name label to the top of the image""" h, w = img.shape[:2] # Create new image with space for label labeled_img = np.zeros((h + 40, w, 3), dtype=np.uint8) labeled_img.fill(255) # White background for label area # Place original image below the label area labeled_img[40:, :] = img # Add method name label font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.7 font_thickness = 2 # Calculate text size and position text_size = cv2.getTextSize(method_name, font, font_scale, font_thickness)[0] text_x = (w - text_size[0]) // 2 text_y = 28 # Position in the label area cv2.putText(labeled_img, method_name, (text_x, text_y), font, font_scale, (0, 0, 0), font_thickness) return labeled_img # Add labels to both images labeled_result1 = add_method_label(result1, m1.display_name) labeled_result2 = add_method_label(result2, m2.display_name) status = f"""🎚️ **Interactive Comparison Ready** **{m1.display_name}:** {status1.split(':')[-1].strip() if ':' in status1 else status1} **{m2.display_name}:** {status2.split(':')[-1].strip() if ':' in status2 else status2} 💡 **Tip:** Drag the slider to compare the two methods interactively!""" return (labeled_result1, labeled_result2), status def create_app() -> gr.Blocks: """Create the Gradio application""" # Load examples examples = load_example_images() # Get available methods methods = initialize_methods() method_choices = [(method.display_name, method.name) for method in methods.values()] with gr.Blocks( title="Stereo Matching Methods Comparison", theme=gr.themes.Soft(), css="footer {visibility: hidden}" ) as app: gr.Markdown(""" # 🏆 Stereo Matching Methods Comparison Compare different stereo matching algorithms side-by-side using advanced deep learning models. **Available Methods:** - 🎯 **FoundationStereo** (Low-cost & High-quality variants) - Zero-shot stereo matching - ⚡ **CREStereo** - Practical stereo matching with high efficiency ⚠️ **Important**: Upload **rectified** stereo image pairs for best results. 🚀 **Powered by ZeroGPU**: Automatic GPU allocation for fast processing! """) # Instructions section with gr.Accordion("📋 How to Use", open=False): gr.Markdown(""" ### 🖼️ Input Requirements 1. **Rectified stereo pairs**: Images should be epipolar-aligned (horizontal epipolar lines) 2. **Same resolution**: Left and right images must have identical dimensions 3. **Good quality**: Clear, well-lit images work best ### 🔍 Comparison Modes 1. **Method Comparison**: Compare two different methods side-by-side 2. **Single Method**: Test individual methods 3. **Interactive Slider**: Use ImageSlider for easy comparison ### 📊 Example Images Try the provided example stereo pairs to see the differences between methods. ### 🚀 ZeroGPU Integration - Automatic GPU allocation when processing starts - Optimized memory management - Fast model loading and cleanup """) with gr.Tabs(): # Tab 1: Method Comparison with gr.Tab("🔍 Method Comparison"): gr.Markdown("### Compare Two Stereo Matching Methods") with gr.Row(): with gr.Column(): left_img_comp = gr.Image(label="Left Image", type="numpy") right_img_comp = gr.Image(label="Right Image", type="numpy") with gr.Column(): method1_dropdown = gr.Dropdown( choices=method_choices, label="Method 1", value=method_choices[0][1] if method_choices else None ) method2_dropdown = gr.Dropdown( choices=method_choices, label="Method 2", value=method_choices[1][1] if len(method_choices) > 1 else None ) compare_btn = gr.Button("🚀 Compare Methods", variant="primary", size="lg") comparison_result = gr.Image(label="Comparison Result") comparison_status = gr.Markdown() compare_btn.click( fn=compare_methods, inputs=[left_img_comp, right_img_comp, method1_dropdown, method2_dropdown], outputs=[comparison_result, comparison_status], show_progress=True ) # Examples for method comparison if examples: example_inputs = [] for left_path, right_path, name in examples[:3]: # Load images as numpy arrays left_img = cv2.imread(left_path) right_img = cv2.imread(right_path) if left_img is not None: left_img = cv2.cvtColor(left_img, cv2.COLOR_BGR2RGB) if right_img is not None: right_img = cv2.cvtColor(right_img, cv2.COLOR_BGR2RGB) example_inputs.append([left_img, right_img]) gr.Examples( examples=example_inputs, inputs=[left_img_comp, right_img_comp], label="📸 Example Stereo Pairs", examples_per_page=3 ) # Tab 2: Interactive Slider Comparison with gr.Tab("🎚️ Interactive Comparison"): gr.Markdown("### Interactive Method Comparison with Slider") with gr.Row(): with gr.Column(): left_img_slider = gr.Image(label="Left Image", type="numpy") right_img_slider = gr.Image(label="Right Image", type="numpy") with gr.Column(): method1_slider = gr.Dropdown( choices=method_choices, label="Method A", value=method_choices[0][1] if method_choices else None ) method2_slider = gr.Dropdown( choices=method_choices, label="Method B", value=method_choices[1][1] if len(method_choices) > 1 else None ) slider_compare_btn = gr.Button("🎚️ Generate Slider Comparison", variant="primary", size="lg") # Image slider for comparison comparison_slider = gr.ImageSlider( label="Method Comparison (Drag slider to compare)", show_label=True ) slider_status = gr.Markdown() slider_compare_btn.click( fn=create_slider_comparison, inputs=[left_img_slider, right_img_slider, method1_slider, method2_slider], outputs=[comparison_slider, slider_status], show_progress=True ) # Examples for interactive slider if examples: example_inputs_slider = [] for left_path, right_path, name in examples[:3]: # Load images as numpy arrays left_img = cv2.imread(left_path) right_img = cv2.imread(right_path) if left_img is not None: left_img = cv2.cvtColor(left_img, cv2.COLOR_BGR2RGB) if right_img is not None: right_img = cv2.cvtColor(right_img, cv2.COLOR_BGR2RGB) example_inputs_slider.append([left_img, right_img]) gr.Examples( examples=example_inputs_slider, inputs=[left_img_slider, right_img_slider], label="📸 Example Stereo Pairs", examples_per_page=3 ) # Tab 3: Single Method Testing with gr.Tab("🎯 Single Method"): gr.Markdown("### Test Individual Methods") with gr.Row(): with gr.Column(): left_img_single = gr.Image(label="Left Image", type="numpy") right_img_single = gr.Image(label="Right Image", type="numpy") with gr.Column(): method_single = gr.Dropdown( choices=method_choices, label="Select Method", value=method_choices[0][1] if method_choices else None ) single_btn = gr.Button("🚀 Process", variant="primary", size="lg") single_result = gr.Image(label="Disparity Result") single_status = gr.Markdown() single_btn.click( fn=single_method_inference, inputs=[left_img_single, right_img_single, method_single], outputs=[single_result, single_status], show_progress=True ) # Examples for single method if examples: example_inputs_single = [] for left_path, right_path, name in examples[:3]: # Load images as numpy arrays left_img = cv2.imread(left_path) right_img = cv2.imread(right_path) if left_img is not None: left_img = cv2.cvtColor(left_img, cv2.COLOR_BGR2RGB) if right_img is not None: right_img = cv2.cvtColor(right_img, cv2.COLOR_BGR2RGB) example_inputs_single.append([left_img, right_img]) gr.Examples( examples=example_inputs_single, inputs=[left_img_single, right_img_single], label="📸 Example Stereo Pairs", examples_per_page=3 ) # Information section with gr.Accordion("ℹ️ Method Information", open=False): gr.Markdown(""" ### 🎯 FoundationStereo - **Type**: Zero-shot stereo matching using foundation models - **Variants**: Low-cost (11-33-40) and High-quality (23-51-11) - **Strengths**: Generalizes well to different domains without training - **Paper**: [FoundationStereo: Zero-Shot Stereo Matching via Foundation Model](https://arxiv.org/abs/2501.09898) ### ⚡ CREStereo - **Type**: Practical stereo matching with iterative refinement - **Model**: ETH3D pre-trained weights - **Strengths**: Fast inference with good accuracy - **Paper**: [Practical Stereo Matching via Cascaded Recurrent Network with Adaptive Correlation](https://arxiv.org/abs/2203.11483) ### 🎚️ Interactive Comparison Tips - Use the **ImageSlider** to quickly compare methods - Drag the slider to see differences in detail preservation - Look for differences in depth boundaries and texture regions - Different methods may perform better on different scene types ### 🚀 ZeroGPU Features - **Automatic GPU allocation**: GPU resources allocated on-demand - **Optimized timeouts**: Different durations for different operations - **Memory management**: Automatic cleanup after processing - **Queue management**: Fair resource sharing among users """) # Footer gr.Markdown(""" --- ### 📝 Notes - **🚀 ZeroGPU Powered**: Automatic GPU allocation for optimal performance - **⏱️ Processing Times**: Method comparison ~2min, Single method ~1.5min - **🧠 Memory Management**: Automatic cleanup between comparisons - **📊 Best Results**: Use high-quality, well-rectified stereo pairs ### 🔗 References - [FoundationStereo Repository](https://github.com/NVlabs/FoundationStereo) - [CREStereo Repository](https://github.com/megvii-research/CREStereo) - [Gradio ImageSlider Documentation](https://gradio.app/docs/#imageslider) - [Hugging Face ZeroGPU](https://huggingface.co/zero-gpu-explorers) """) return app def main(): """Main function to launch the comparison app""" logging.info("🚀 Starting Stereo Matching Comparison App (ZeroGPU)...") # Check if we're in Hugging Face Spaces if 'SPACE_ID' in os.environ: logging.info("Running in Hugging Face Spaces environment") try: # Check if subdemo directories exist foundation_exists = os.path.exists(foundation_stereo_dir) crestereo_exists = os.path.exists(crestereo_dir) if not foundation_exists and not crestereo_exists: logging.error("No stereo matching demo directories found!") return logging.info(f"FoundationStereo demo: {'✅' if foundation_exists else '❌'}") logging.info(f"CREStereo demo: {'✅' if crestereo_exists else '❌'}") # Create and launch app logging.info("Creating comparison app...") app = create_app() logging.info("✅ Comparison app created successfully") # Launch with Spaces-optimized settings app.launch( share=False, # Spaces handles sharing show_error=True, favicon_path=None, ssr_mode=False, allowed_paths=["./"] ) except Exception as e: logging.error(f"Failed to launch app: {e}") raise if __name__ == "__main__": main()