Spaces:
Build error
Build error
| # Ultralytics YOLO 🚀, AGPL-3.0 license | |
| import gradio as gr | |
| import PIL.Image as Image | |
| import time | |
| import numpy as np | |
| import os | |
| import json | |
| from urllib.request import urlretrieve | |
| import zipfile | |
| import cv2 | |
| from datetime import datetime | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import logging | |
| import warnings | |
| from ultralytics import ASSETS, YOLO | |
| # Suppress warnings for cleaner output | |
| warnings.filterwarnings("ignore") | |
| logging.basicConfig(level=logging.INFO) | |
| # Model configurations with download URLs | |
| MODEL_CONFIGS = { | |
| # YOLOv8 Models | |
| "yolov8n": {"size": "nano", "url": None, "description": "Fastest, smallest model"}, | |
| "yolov8s": {"size": "small", "url": None, "description": "Small model, good balance"}, | |
| "yolov8m": {"size": "medium", "url": None, "description": "Medium model, better accuracy"}, | |
| "yolov8l": {"size": "large", "url": None, "description": "Large model, high accuracy"}, | |
| "yolov8x": {"size": "extra large", "url": None, "description": "Largest model, best accuracy"}, | |
| # YOLOv9 Models | |
| "yolov9t": {"size": "tiny", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov9t.pt", "description": "YOLOv9 Tiny model"}, | |
| "yolov9s": {"size": "small", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov9s.pt", "description": "YOLOv9 Small model"}, | |
| "yolov9m": {"size": "medium", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov9m.pt", "description": "YOLOv9 Medium model"}, | |
| "yolov9c": {"size": "custom", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov9c.pt", "description": "YOLOv9 Custom model"}, | |
| "yolov9e": {"size": "extra large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov9e.pt", "description": "YOLOv9 Enhanced model"}, | |
| # YOLOv10 Models | |
| "yolov10n": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov10n.pt", "description": "YOLOv10 Nano model"}, | |
| "yolov10s": {"size": "small", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov10s.pt", "description": "YOLOv10 Small model"}, | |
| "yolov10m": {"size": "medium", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov10m.pt", "description": "YOLOv10 Medium model"}, | |
| "yolov10b": {"size": "large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov10b.pt", "description": "YOLOv10 Large model"}, | |
| "yolov10l": {"size": "extra large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov10l.pt", "description": "YOLOv10 Extra Large model"}, | |
| "yolov10x": {"size": "huge", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov10x.pt", "description": "YOLOv10 Huge model"}, | |
| # YOLOv11 Models (Latest) | |
| "yolov11n": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov11n.pt", "description": "YOLOv11 Nano model - Latest"}, | |
| "yolov11s": {"size": "small", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov11s.pt", "description": "YOLOv11 Small model - Latest"}, | |
| "yolov11m": {"size": "medium", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov11m.pt", "description": "YOLOv11 Medium model - Latest"}, | |
| "yolov11l": {"size": "large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov11l.pt", "description": "YOLOv11 Large model - Latest"}, | |
| "yolov11x": {"size": "extra large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov11x.pt", "description": "YOLOv11 Extra Large model - Latest"}, | |
| # Specialized Models | |
| "yolov8n-seg": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-seg.pt", "description": "YOLOv8 Nano Segmentation"}, | |
| "yolov8s-seg": {"size": "small", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8s-seg.pt", "description": "YOLOv8 Small Segmentation"}, | |
| "yolov8m-seg": {"size": "medium", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8m-seg.pt", "description": "YOLOv8 Medium Segmentation"}, | |
| "yolov8l-seg": {"size": "large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8l-seg.pt", "description": "YOLOv8 Large Segmentation"}, | |
| "yolov8x-seg": {"size": "extra large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8x-seg.pt", "description": "YOLOv8 Extra Large Segmentation"}, | |
| # Pose Estimation Models | |
| "yolov8n-pose": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-pose.pt", "description": "YOLOv8 Nano Pose Estimation"}, | |
| "yolov8s-pose": {"size": "small", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8s-pose.pt", "description": "YOLOv8 Small Pose Estimation"}, | |
| "yolov8m-pose": {"size": "medium", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8m-pose.pt", "description": "YOLOv8 Medium Pose Estimation"}, | |
| "yolov8l-pose": {"size": "large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8l-pose.pt", "description": "YOLOv8 Large Pose Estimation"}, | |
| "yolov8x-pose": {"size": "extra large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8x-pose.pt", "description": "YOLOv8 Extra Large Pose Estimation"}, | |
| "yolov8x-pose-p6": {"size": "extra large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8x-pose-p6.pt", "description": "YOLOv8 Pose P6 (Higher Resolution)"}, | |
| # Classification Models | |
| "yolov8n-cls": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-cls.pt", "description": "YOLOv8 Nano Classification"}, | |
| "yolov8s-cls": {"size": "small", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8s-cls.pt", "description": "YOLOv8 Small Classification"}, | |
| "yolov8m-cls": {"size": "medium", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8m-cls.pt", "description": "YOLOv8 Medium Classification"}, | |
| "yolov8l-cls": {"size": "large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8l-cls.pt", "description": "YOLOv8 Large Classification"}, | |
| "yolov8x-cls": {"size": "extra large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8x-cls.pt", "description": "YOLOv8 Extra Large Classification"}, | |
| # Custom Trained Models | |
| "yolov8n-world": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-world.pt", "description": "YOLOv8 World (Zero-shot)"}, | |
| "yolov8s-world": {"size": "small", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8s-world.pt", "description": "YOLOv8 World Small (Zero-shot)"}, | |
| "yolov8m-world": {"size": "medium", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8m-world.pt", "description": "YOLOv8 World Medium (Zero-shot)"}, | |
| # SAM (Segment Anything Model) | |
| "sam_b": {"size": "base", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/sam_b.pt", "description": "SAM Base Model"}, | |
| "sam_l": {"size": "large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/sam_l.pt", "description": "SAM Large Model"}, | |
| # Fast Models for Real-time | |
| "yolov8n-640": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-640.pt", "description": "YOLOv8 Nano 640px Resolution"}, | |
| "yolov8n-1280": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-1280.pt", "description": "YOLOv8 Nano 1280px Resolution"}, | |
| # Object365 Pre-trained Models | |
| "yolov8n-o365": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-o365.pt", "description": "YOLOv8 Object365 Pre-trained"}, | |
| "yolov8s-o365": {"size": "small", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8s-o365.pt", "description": "YOLOv8 Object365 Small Pre-trained"}, | |
| # Roboflow Models (Popular Community Models) | |
| "yolov8n-roboflow": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-roboflow.pt", "description": "YOLOv8 Roboflow Community"}, | |
| "yolov8s-roboflow": {"size": "small", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8s-roboflow.pt", "description": "YOLOv8 Roboflow Small"}, | |
| # Mobile-Optimized Models | |
| "yolov8n-mobile": {"size": "nano", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n-mobile.pt", "description": "YOLOv8 Mobile Optimized"}, | |
| "yolov8s-mobile": {"size": "small", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8s-mobile.pt", "description": "YOLOv8 Small Mobile"}, | |
| # High-Accuracy Models | |
| "yolov8x-augmented": {"size": "extra large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8x-augmented.pt", "description": "YOLOv8 Extra Large Augmented"}, | |
| "yolov8x-ensemble": {"size": "extra large", "url": "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8x-ensemble.pt", "description": "YOLOv8 Extra Large Ensemble"}, | |
| } | |
| # Face recognition configuration | |
| FACE_RECOGNITION_DIR = "face_recognition" | |
| os.makedirs(FACE_RECOGNITION_DIR, exist_ok=True) | |
| FACES_DB_PATH = os.path.join(FACE_RECOGNITION_DIR, "faces_database.json") | |
| FACES_DIR = os.path.join(FACE_RECOGNITION_DIR, "registered_faces") | |
| os.makedirs(FACES_DIR, exist_ok=True) | |
| # Create models directory if it doesn't exist | |
| os.makedirs("models", exist_ok=True) | |
| # Global variables for batch download | |
| download_progress = {"current": 0, "total": 0, "status": "", "stop_flag": False} | |
| download_lock = threading.Lock() | |
| class FaceRecognitionSystem: | |
| def __init__(self): | |
| self.face_detector = None | |
| self.face_encoder = None | |
| self.faces_database = self.load_faces_database() | |
| def load_faces_database(self): | |
| """Load faces database from JSON file.""" | |
| if os.path.exists(FACES_DB_PATH): | |
| try: | |
| with open(FACES_DB_PATH, 'r') as f: | |
| return json.load(f) | |
| except: | |
| return {} | |
| return {} | |
| def save_faces_database(self): | |
| """Save faces database to JSON file.""" | |
| with open(FACES_DB_PATH, 'w') as f: | |
| json.dump(self.faces_database, f, indent=2) | |
| def initialize_models(self): | |
| """Initialize face detection and recognition models.""" | |
| try: | |
| # Use OpenCV's built-in face detector | |
| self.face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| return True | |
| except Exception as e: | |
| print(f"Error initializing face models: {e}") | |
| return False | |
| def detect_faces(self, image): | |
| """Detect faces in an image.""" | |
| if self.face_detector is None: | |
| self.initialize_models() | |
| # Convert PIL to OpenCV format | |
| if isinstance(image, Image.Image): | |
| image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| faces = self.face_detector.detectMultiScale(gray, 1.1, 4) | |
| return faces | |
| def extract_face_features(self, image, face_coords): | |
| """Extract features from detected face.""" | |
| x, y, w, h = face_coords | |
| face_img = image[y:y+h, x:x+w] | |
| # Resize face to standard size | |
| face_img = cv2.resize(face_img, (100, 100)) | |
| # Simple feature extraction using histogram | |
| hist_b = cv2.calcHist([face_img], [0], None, [50], [0, 256]) | |
| hist_g = cv2.calcHist([face_img], [1], None, [50], [0, 256]) | |
| hist_r = cv2.calcHist([face_img], [2], None, [50], [0, 256]) | |
| # Normalize histograms | |
| hist_b = cv2.normalize(hist_b, hist_b).flatten() | |
| hist_g = cv2.normalize(hist_g, hist_g).flatten() | |
| hist_r = cv2.normalize(hist_r, hist_r).flatten() | |
| # Combine features | |
| features = np.concatenate([hist_b, hist_g, hist_r]) | |
| return features | |
| def register_face(self, image, person_name): | |
| """Register a new face from a single photo.""" | |
| if self.face_detector is None: | |
| self.initialize_models() | |
| # Convert PIL to OpenCV format | |
| if isinstance(image, Image.Image): | |
| image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| faces = self.detect_faces(image) | |
| if len(faces) == 0: | |
| return False, "No face detected in the image. Please try with a clearer photo." | |
| elif len(faces) > 1: | |
| return False, f"Multiple faces detected ({len(faces)}). Please use a photo with only one face." | |
| # Extract features from the detected face | |
| face_coords = faces[0] | |
| features = self.extract_face_features(image, face_coords) | |
| # Save face image | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| face_filename = f"{person_name}_{timestamp}.jpg" | |
| face_path = os.path.join(FACES_DIR, face_filename) | |
| # Crop and save face | |
| x, y, w, h = face_coords | |
| face_img = image[y:y+h, x:x+w] | |
| cv2.imwrite(face_path, face_img) | |
| # Store in database | |
| self.faces_database[person_name] = { | |
| "features": features.tolist(), | |
| "image_path": face_filename, | |
| "registered_at": timestamp | |
| } | |
| self.save_faces_database() | |
| return True, f"✅ Successfully registered {person_name}'s face!" | |
| def recognize_faces(self, image, threshold=0.7): | |
| """Recognize faces in an image.""" | |
| if self.face_detector is None: | |
| self.initialize_models() | |
| if not self.faces_database: | |
| return image.copy() if isinstance(image, Image.Image) else image, "No faces registered yet. Please register faces first." | |
| # Convert PIL to OpenCV format | |
| if isinstance(image, Image.Image): | |
| image_copy = image.copy() | |
| image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| else: | |
| image_copy = image.copy() | |
| faces = self.detect_faces(image) | |
| if len(faces) == 0: | |
| return image_copy, "No faces detected in the image." | |
| recognized_count = 0 | |
| results_text = "Face Recognition Results:\n" | |
| for (x, y, w, h) in faces: | |
| # Extract features | |
| features = self.extract_face_features(image, (x, y, w, h)) | |
| # Compare with registered faces | |
| best_match = None | |
| best_similarity = 0 | |
| for name, data in self.faces_database.items(): | |
| stored_features = np.array(data["features"]) | |
| # Calculate similarity using correlation | |
| similarity = np.corrcoef(features, stored_features)[0, 1] | |
| if similarity > best_similarity and similarity > threshold: | |
| best_similarity = similarity | |
| best_match = name | |
| # Draw rectangle and label | |
| if best_match: | |
| cv2.rectangle(image_copy, (x, y), (x+w, y+h), (0, 255, 0), 2) | |
| cv2.putText(image_copy, best_match, (x, y-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2) | |
| results_text += f"✅ {best_match} (confidence: {best_similarity:.2f})\n" | |
| recognized_count += 1 | |
| else: | |
| cv2.rectangle(image_copy, (x, y), (x+w, y+h), (0, 0, 255), 2) | |
| cv2.putText(image_copy, "Unknown", (x, y-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2) | |
| results_text += f"❌ Unknown face\n" | |
| return image_copy, results_text | |
| def get_registered_faces(self): | |
| """Get list of registered faces.""" | |
| return list(self.faces_database.keys()) | |
| def delete_face(self, person_name): | |
| """Delete a registered face.""" | |
| if person_name in self.faces_database: | |
| # Delete image file | |
| image_path = os.path.join(FACES_DIR, self.faces_database[person_name]["image_path"]) | |
| if os.path.exists(image_path): | |
| os.remove(image_path) | |
| # Remove from database | |
| del self.faces_database[person_name] | |
| self.save_faces_database() | |
| return True, f"✅ Successfully deleted {person_name}'s face." | |
| else: | |
| return False, f"❌ {person_name} not found in database." | |
| # Initialize face recognition system | |
| face_system = FaceRecognitionSystem() | |
| def check_model_exists(model_name): | |
| """Check if model file exists locally.""" | |
| if model_name in ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"]: | |
| return True | |
| model_path = f"models/{model_name}.pt" | |
| return os.path.exists(model_path) | |
| def download_single_model(model_name): | |
| """Download a single model without progress tracking.""" | |
| if model_name in ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"]: | |
| return f"✅ {model_name} is built-in and ready!" | |
| model_config = MODEL_CONFIGS.get(model_name) | |
| if not model_config or not model_config["url"]: | |
| return f"❌ No download URL available for {model_name}" | |
| model_path = f"models/{model_name}.pt" | |
| if os.path.exists(model_path): | |
| return f"✅ {model_name} already downloaded and ready!" | |
| try: | |
| # Use with timeout and retry logic | |
| import urllib.request | |
| import socket | |
| socket.setdefaulttimeout(30) # 30 second timeout | |
| urlretrieve(model_config["url"], model_path) | |
| return f"✅ {model_name} downloaded successfully!" | |
| except Exception as e: | |
| logging.error(f"Failed to download {model_name}: {str(e)}") | |
| return f"❌ Failed to download {model_name}: {str(e)}" | |
| def download_model(model_name, progress=gr.Progress()): | |
| """Download model from URL if not available locally.""" | |
| if model_name in ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"]: | |
| return f"✅ {model_name} is built-in and ready!" | |
| model_config = MODEL_CONFIGS.get(model_name) | |
| if not model_config or not model_config["url"]: | |
| return f"❌ No download URL available for {model_name}" | |
| model_path = f"models/{model_name}.pt" | |
| if os.path.exists(model_path): | |
| return f"✅ {model_name} already downloaded and ready!" | |
| try: | |
| progress(0.1, desc=f"Starting download of {model_name}...") | |
| def report_progress(count, block_size, total_size): | |
| progress(0.1 + (count * block_size / total_size) * 0.8, desc=f"Downloading {model_name}...") | |
| urlretrieve(model_config["url"], model_path, reporthook=report_progress) | |
| progress(1.0, desc=f"✅ {model_name} downloaded successfully!") | |
| return f"✅ {model_name} downloaded successfully and ready to use!" | |
| except Exception as e: | |
| return f"❌ Failed to download {model_name}: {str(e)}" | |
| def download_all_models(progress=gr.Progress()): | |
| """Download all models automatically with progress tracking.""" | |
| global download_progress | |
| download_progress["stop_flag"] = False | |
| # Get models that need to be downloaded | |
| models_to_download = [] | |
| for model_name, config in MODEL_CONFIGS.items(): | |
| if model_name not in ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"]: # Skip built-in models | |
| if config["url"] and not check_model_exists(model_name): | |
| models_to_download.append(model_name) | |
| if not models_to_download: | |
| return "✅ All models are already downloaded!", "All models available" | |
| total_models = len(models_to_download) | |
| download_progress["total"] = total_models | |
| download_progress["current"] = 0 | |
| progress(0.0, desc=f"Starting download of {total_models} models...") | |
| success_count = 0 | |
| failed_count = 0 | |
| failed_models = [] | |
| for i, model_name in enumerate(models_to_download): | |
| if download_progress["stop_flag"]: | |
| break | |
| download_progress["current"] = i + 1 | |
| progress_value = (i + 1) / total_models | |
| try: | |
| result = download_single_model(model_name) | |
| if "✅" in result: | |
| success_count += 1 | |
| status = f"✅ {model_name} downloaded successfully ({success_count}/{total_models})" | |
| else: | |
| failed_count += 1 | |
| failed_models.append(model_name) | |
| status = f"❌ Failed to download {model_name} ({failed_count} failures)" | |
| download_progress["status"] = status | |
| progress(progress_value, desc=status) | |
| except Exception as e: | |
| failed_count += 1 | |
| failed_models.append(model_name) | |
| status = f"❌ Error downloading {model_name}: {str(e)}" | |
| download_progress["status"] = status | |
| progress(progress_value, desc=status) | |
| # Final summary | |
| total_processed = success_count + failed_count | |
| if failed_count == 0: | |
| final_message = f"🎉 Successfully downloaded all {success_count} models!" | |
| else: | |
| final_message = f"⚠️ Downloaded {success_count}/{total_processed} models. Failed: {', '.join(failed_models[:5])}" | |
| if len(failed_models) > 5: | |
| final_message += f" and {len(failed_models) - 5} more..." | |
| progress(1.0, desc="Download completed!") | |
| return final_message, f"Success: {success_count}, Failed: {failed_count}" | |
| def download_all_models_threaded(): | |
| """Download all models in parallel threads.""" | |
| global download_progress | |
| download_progress["stop_flag"] = False | |
| # Get models that need to be downloaded | |
| models_to_download = [] | |
| for model_name, config in MODEL_CONFIGS.items(): | |
| if model_name not in ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"]: # Skip built-in models | |
| if config["url"] and not check_model_exists(model_name): | |
| models_to_download.append(model_name) | |
| if not models_to_download: | |
| return "✅ All models are already downloaded!", "All models available" | |
| total_models = len(models_to_download) | |
| download_progress["total"] = total_models | |
| download_progress["current"] = 0 | |
| success_count = 0 | |
| failed_count = 0 | |
| failed_models = [] | |
| def download_worker(model_name): | |
| nonlocal success_count, failed_count, failed_models | |
| try: | |
| result = download_single_model(model_name) | |
| download_progress["current"] += 1 | |
| if "✅" in result: | |
| success_count += 1 | |
| return f"✅ {model_name}" | |
| else: | |
| failed_count += 1 | |
| failed_models.append(model_name) | |
| return f"❌ {model_name}" | |
| except Exception as e: | |
| failed_count += 1 | |
| failed_models.append(model_name) | |
| return f"❌ {model_name}: {str(e)}" | |
| # Use ThreadPoolExecutor for parallel downloads | |
| with ThreadPoolExecutor(max_workers=3) as executor: # Limit to 3 concurrent downloads | |
| futures = [executor.submit(download_worker, model_name) for model_name in models_to_download] | |
| for future in as_completed(futures): | |
| result = future.result() | |
| download_progress["status"] = result | |
| download_progress["current"] += 1 | |
| # Final summary | |
| if failed_count == 0: | |
| final_message = f"🎉 Successfully downloaded all {success_count} models in parallel!" | |
| else: | |
| final_message = f"⚠️ Downloaded {success_count}/{total_models} models. Failed: {', '.join(failed_models[:3])}" | |
| if len(failed_models) > 3: | |
| final_message += f" and {len(failed_models) - 3} more..." | |
| return final_message, f"Success: {success_count}, Failed: {failed_count}" | |
| def stop_download(): | |
| """Stop the current download process.""" | |
| global download_progress | |
| download_progress["stop_flag"] = True | |
| return "⏹️ Download stopped by user" | |
| def get_download_progress(): | |
| """Get current download progress.""" | |
| global download_progress | |
| if download_progress["total"] == 0: | |
| return "No download in progress" | |
| current = download_progress["current"] | |
| total = download_progress["total"] | |
| status = download_progress["status"] | |
| if current >= total: | |
| return "✅ Download completed!" | |
| return f"📥 Progress: {current}/{total} models\n{status}" | |
| def get_available_models(): | |
| """Get list of available models with their status.""" | |
| status = {} | |
| for model_name in MODEL_CONFIGS.keys(): | |
| if check_model_exists(model_name): | |
| status[model_name] = "✅ Available" | |
| else: | |
| status[model_name] = "⬇️ Download Required" | |
| return status | |
| def predict_image(img, conf_threshold, iou_threshold, model_name): | |
| """Predicts objects in an image using a YOLO model with adjustable confidence and IOU thresholds.""" | |
| if img is None: | |
| return None, "❌ Please upload an image first." | |
| if not check_model_exists(model_name): | |
| return None, f"❌ Model {model_name} is not available. Please download it first." | |
| try: | |
| # Validate input parameters | |
| conf_threshold = max(0.1, min(1.0, conf_threshold)) | |
| iou_threshold = max(0.1, min(1.0, iou_threshold)) | |
| if model_name in ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"]: | |
| model = YOLO(f"{model_name}.pt") | |
| else: | |
| model = YOLO(f"models/{model_name}.pt") | |
| # Use smaller image size for faster processing | |
| imgsz = 640 if "n" in model_name or "t" in model_name else 416 | |
| results = model.predict( | |
| source=img, | |
| conf=conf_threshold, | |
| iou=iou_threshold, | |
| show_labels=True, | |
| show_conf=True, | |
| imgsz=imgsz, | |
| verbose=False # Reduce output noise | |
| ) | |
| for r in results: | |
| im_array = r.plot() | |
| im = Image.fromarray(im_array[..., ::-1]) | |
| return im, f"✅ Detection completed with {model_name}" | |
| except Exception as e: | |
| logging.error(f"Detection error: {str(e)}") | |
| return None, f"❌ Error during detection: {str(e)}" | |
| def stream_video(frame, conf_threshold, iou_threshold, model_name): | |
| """Process video frame for real-time object detection.""" | |
| if frame is None: | |
| return None | |
| if not check_model_exists(model_name): | |
| return frame | |
| try: | |
| # Convert frame to PIL Image if it's numpy array | |
| if isinstance(frame, np.ndarray): | |
| frame = Image.fromarray(frame) | |
| # Validate thresholds | |
| conf_threshold = max(0.1, min(1.0, conf_threshold)) | |
| iou_threshold = max(0.1, min(1.0, iou_threshold)) | |
| if model_name in ["yolov8n", "yolov8s", "yolov8m", "yolov8l", "yolov8x"]: | |
| model = YOLO(f"{model_name}.pt") | |
| else: | |
| model = YOLO(f"models/{model_name}.pt") | |
| # Use smaller image size for real-time performance | |
| imgsz = 320 if "n" in model_name or "t" in model_name else 416 | |
| results = model.predict( | |
| source=frame, | |
| conf=conf_threshold, | |
| iou=iou_threshold, | |
| show_labels=True, | |
| show_conf=True, | |
| imgsz=imgsz, | |
| verbose=False | |
| ) | |
| for r in results: | |
| im_array = r.plot() | |
| im = Image.fromarray(im_array[..., ::-1]) | |
| return im | |
| except Exception as e: | |
| logging.error(f"Stream error: {str(e)}") | |
| return frame | |
| def refresh_model_status(): | |
| """Refresh the status of all models.""" | |
| return get_available_models() | |
| def create_model_info_display(): | |
| """Create formatted model information display.""" | |
| status = get_available_models() | |
| # Group models by category | |
| categories = { | |
| "YOLOv8 (Built-in)": [], | |
| "YOLOv9": [], | |
| "YOLOv10": [], | |
| "YOLOv11 (Latest)": [], | |
| "Segmentation": [], | |
| "Pose Estimation": [], | |
| "Classification": [], | |
| "Special Models": [] | |
| } | |
| for model_name, config in MODEL_CONFIGS.items(): | |
| status_icon = status[model_name] | |
| # Categorize models | |
| if model_name.startswith("yolov8") and not any(suffix in model_name for suffix in ["-seg", "-pose", "-cls", "-world", "-o365", "-roboflow", "-mobile", "-augmented", "-ensemble", "-640", "-1280"]): | |
| categories["YOLOv8 (Built-in)"].append((model_name, config, status_icon)) | |
| elif model_name.startswith("yolov9"): | |
| categories["YOLOv9"].append((model_name, config, status_icon)) | |
| elif model_name.startswith("yolov10"): | |
| categories["YOLOv10"].append((model_name, config, status_icon)) | |
| elif model_name.startswith("yolov11"): | |
| categories["YOLOv11 (Latest)"].append((model_name, config, status_icon)) | |
| elif "-seg" in model_name or model_name.startswith("sam"): | |
| categories["Segmentation"].append((model_name, config, status_icon)) | |
| elif "-pose" in model_name: | |
| categories["Pose Estimation"].append((model_name, config, status_icon)) | |
| elif "-cls" in model_name: | |
| categories["Classification"].append((model_name, config, status_icon)) | |
| else: | |
| categories["Special Models"].append((model_name, config, status_icon)) | |
| # Count available models | |
| available_count = sum(1 for status in status.values() if status == "✅ Available") | |
| total_count = len(MODEL_CONFIGS) | |
| # Build display text | |
| info_text = f"## Available Models ({available_count}/{total_count})\n\n" | |
| for category, models in categories.items(): | |
| if models: | |
| available_in_category = sum(1 for _, _, status in models if status == "✅ Available") | |
| total_in_category = len(models) | |
| info_text += f"### 📁 {category} ({available_in_category}/{total_in_category})\n\n" | |
| for model_name, config, status_icon in models[:5]: # Show first 5 models | |
| info_text += f"**{status_icon} {model_name}** - {config['description']}\n" | |
| if len(models) > 5: | |
| info_text += f"... and {len(models) - 5} more models\n" | |
| info_text += "\n" | |
| return info_text | |
| def register_face_from_image(image, person_name): | |
| """Register a face from uploaded image.""" | |
| if image is None: | |
| return None, "❌ Please upload an image first." | |
| if not person_name or person_name.strip() == "": | |
| return None, "❌ Please enter a name for the person." | |
| success, message = face_system.register_face(image, person_name.strip()) | |
| if success: | |
| # Update registered faces list | |
| registered_faces = face_system.get_registered_faces() | |
| return None, message, gr.Dropdown(choices=registered_faces), gr.CheckboxGroup(choices=registered_faces) | |
| else: | |
| return None, message | |
| def recognize_faces_in_image(image, threshold): | |
| """Recognize faces in uploaded image.""" | |
| if image is None: | |
| return None, "❌ Please upload an image first." | |
| result_image, results_text = face_system.recognize_faces(image, threshold) | |
| return result_image, results_text | |
| def delete_registered_face(person_name): | |
| """Delete a registered face.""" | |
| if not person_name: | |
| return "❌ Please select a person to delete." | |
| success, message = face_system.delete_face(person_name) | |
| if success: | |
| # Update registered faces list | |
| registered_faces = face_system.get_registered_faces() | |
| return message, gr.Dropdown(choices=registered_faces), gr.CheckboxGroup(choices=registered_faces) | |
| else: | |
| return message | |
| def get_faces_list_display(): | |
| """Get formatted list of registered faces.""" | |
| faces = face_system.get_registered_faces() | |
| if not faces: | |
| return "No faces registered yet." | |
| display_text = "## Registered Faces\n\n" | |
| for i, face in enumerate(faces, 1): | |
| display_text += f"{i}. **{face}**\n" | |
| return display_text | |
| def get_model_categories(): | |
| """Get model categories for dropdown.""" | |
| return { | |
| "🚀 Latest Models": ["yolov11n", "yolov11s", "yolov11m", "yolov11l", "yolov11x"], | |
| "⚡ YOLOv10": ["yolov10n", "yolov10s", "yolov10m", "yolov10b", "yolov10l", "yolov10x"], | |
| "🎯 YOLOv9": ["yolov9t", "yolov9s", "yolov9m", "yolov9c", "yolov9e"], | |
| "🔧 Segmentation": ["yolov8n-seg", "yolov8s-seg", "yolov8m-seg", "yolov8l-seg", "yolov8x-seg", "sam_b", "sam_l"], | |
| "🏃 Pose Estimation": ["yolov8n-pose", "yolov8s-pose", "yolov8m-pose", "yolov8l-pose", "yolov8x-pose", "yolov8x-pose-p6"], | |
| "🏷️ Classification": ["yolov8n-cls", "yolov8s-cls", "yolov8m-cls", "yolov8l-cls", "yolov8x-cls"], | |
| "🌍 Special Models": ["yolov8n-world", "yolov8s-world", "yolov8m-world", "yolov8n-o365", "yolov8s-o365"], | |
| "📱 Mobile Optimized": ["yolov8n-mobile", "yolov8s-mobile", "yolov8n-640", "yolov8n-1280"], | |
| } | |
| def update_model_choices(category): | |
| """Update model dropdown based on category selection.""" | |
| categories = get_model_categories() | |
| if category in categories: | |
| return gr.Dropdown(choices=categories[category]) | |
| return gr.Dropdown(choices=[]) | |
| def start_download_all(): | |
| """Start downloading all models in a separate thread.""" | |
| def download_thread(): | |
| result = download_all_models() | |
| return result | |
| # Start the download in a separate thread | |
| thread = threading.Thread(target=download_thread) | |
| thread.daemon = True | |
| thread.start() | |
| return "🚀 Started downloading all models... Check progress below." | |
| with gr.Blocks(title="Ultralytics YOLOv8 Live Stream", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Ultralytics YOLOv8 Object Detection 🚀") | |
| gr.Markdown("Upload images or use live webcam for real-time object detection with face recognition and 60+ custom models.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| with gr.Tabs(): | |
| with gr.TabItem("Image Upload"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| img_input = gr.Image(type="pil", label="Upload Image") | |
| conf_slider_img = gr.Slider(minimum=0, maximum=1, value=0.25, label="Confidence threshold") | |
| iou_slider_img = gr.Slider(minimum=0, maximum=1, value=0.45, label="IoU threshold") | |
| model_radio_img = gr.Radio( | |
| choices=list(MODEL_CONFIGS.keys()), | |
| label="Model Name", | |
| value="yolov8n" | |
| ) | |
| predict_btn_img = gr.Button("Detect Objects", variant="primary") | |
| with gr.Column(): | |
| img_output = gr.Image(type="pil", label="Detection Result") | |
| img_status = gr.Textbox(label="Status", interactive=False) | |
| predict_btn_img.click( | |
| fn=predict_image, | |
| inputs=[img_input, conf_slider_img, iou_slider_img, model_radio_img], | |
| outputs=[img_output, img_status] | |
| ) | |
| # Example images | |
| gr.Examples( | |
| examples=[ | |
| [ASSETS / "bus.jpg", 0.25, 0.45, "yolov8n"], | |
| [ASSETS / "zidane.jpg", 0.25, 0.45, "yolov8n"], | |
| ], | |
| inputs=[img_input, conf_slider_img, iou_slider_img, model_radio_img], | |
| ) | |
| with gr.TabItem("Live Webcam"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| webcam_input = gr.Image( | |
| sources=["webcam"], | |
| streaming=True, | |
| label="Live Webcam Feed" | |
| ) | |
| conf_slider_webcam = gr.Slider( | |
| minimum=0, maximum=1, value=0.25, label="Confidence threshold" | |
| ) | |
| iou_slider_webcam = gr.Slider( | |
| minimum=0, maximum=1, value=0.45, label="IoU threshold" | |
| ) | |
| model_radio_webcam = gr.Radio( | |
| choices=list(MODEL_CONFIGS.keys()), | |
| label="Model Name", | |
| value="yolov8n" | |
| ) | |
| start_btn = gr.Button("Start Detection", variant="primary") | |
| stop_btn = gr.Button("Stop Detection", variant="stop") | |
| with gr.Column(): | |
| webcam_output = gr.Image(label="Real-time Detection") | |
| # Stream the video continuously | |
| webcam_input.stream( | |
| fn=stream_video, | |
| inputs=[webcam_input, conf_slider_webcam, iou_slider_webcam, model_radio_webcam], | |
| outputs=webcam_output, | |
| time_limit=30, | |
| stream_every=0.1, | |
| concurrency_limit=4 | |
| ) | |
| # Stop button clears the output | |
| stop_btn.click( | |
| fn=lambda: None, | |
| inputs=[], | |
| outputs=[webcam_output] | |
| ) | |
| with gr.TabItem("👤 Face Recognition"): | |
| with gr.Tabs(): | |
| with gr.TabItem("Register Face"): | |
| gr.Markdown("### Register a New Face") | |
| gr.Markdown("Upload a clear photo with only one face to register a person.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| reg_face_input = gr.Image(type="pil", label="Upload Face Photo") | |
| reg_name_input = gr.Textbox(label="Person Name", placeholder="Enter person's name") | |
| reg_btn = gr.Button("📝 Register Face", variant="primary") | |
| with gr.Column(): | |
| reg_status = gr.Textbox(label="Registration Status", interactive=False) | |
| reg_btn.click( | |
| fn=register_face_from_image, | |
| inputs=[reg_face_input, reg_name_input], | |
| outputs=[reg_face_input, reg_status, gr.State(), gr.State()] | |
| ) | |
| with gr.TabItem("Recognize Faces"): | |
| gr.Markdown("### Recognize Faces in Image") | |
| gr.Markdown("Upload an image to recognize registered faces.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| rec_face_input = gr.Image(type="pil", label="Upload Image") | |
| rec_threshold = gr.Slider( | |
| minimum=0.1, maximum=1.0, value=0.7, | |
| label="Recognition Threshold", | |
| info="Higher values require more confidence" | |
| ) | |
| rec_btn = gr.Button("🔍 Recognize Faces", variant="primary") | |
| with gr.Column(): | |
| rec_output = gr.Image(type="pil", label="Recognition Result") | |
| rec_results = gr.Textbox(label="Recognition Results", interactive=False, lines=5) | |
| rec_btn.click( | |
| fn=recognize_faces_in_image, | |
| inputs=[rec_face_input, rec_threshold], | |
| outputs=[rec_output, rec_results] | |
| ) | |
| with gr.TabItem("Manage Faces"): | |
| gr.Markdown("### Manage Registered Faces") | |
| with gr.Row(): | |
| with gr.Column(): | |
| delete_dropdown = gr.Dropdown( | |
| choices=face_system.get_registered_faces(), | |
| label="Select Person to Delete" | |
| ) | |
| delete_btn = gr.Button("🗑️ Delete Face", variant="stop") | |
| with gr.Column(): | |
| delete_status = gr.Textbox(label="Deletion Status", interactive=False) | |
| delete_btn.click( | |
| fn=delete_registered_face, | |
| inputs=[delete_dropdown], | |
| outputs=[delete_status, delete_dropdown, gr.State()] | |
| ) | |
| # Display registered faces | |
| faces_display = gr.Markdown(value=get_faces_list_display()) | |
| refresh_faces_btn = gr.Button("🔄 Refresh List") | |
| refresh_faces_btn.click( | |
| fn=get_faces_list_display, | |
| outputs=[faces_display] | |
| ) | |
| with gr.Column(scale=1): | |
| gr.Markdown("## 📦 Model Manager (60+ Models)") | |
| with gr.Accordion("Available Models", open=True): | |
| model_info = gr.Markdown(value=create_model_info_display()) | |
| refresh_btn = gr.Button("🔄 Refresh Status", size="sm") | |
| refresh_btn.click(fn=refresh_model_status, outputs=model_info) | |
| with gr.Accordion("🚀 Auto Download All Models", open=True): | |
| gr.Markdown("### Download All Models at Once") | |
| gr.Markdown("Automatically download all {len(MODEL_CONFIGS) - 5} downloadable models.") | |
| with gr.Row(): | |
| download_all_btn = gr.Button("⬇️ Download All Models", variant="primary", size="lg") | |
| stop_download_btn = gr.Button("⏹️ Stop Download", variant="stop", size="lg") | |
| download_status = gr.Textbox(label="Download Status", interactive=False, lines=3) | |
| download_stats = gr.Textbox(label="Statistics", interactive=False) | |
| download_all_btn.click( | |
| fn=start_download_all, | |
| outputs=[download_status] | |
| ) | |
| stop_download_btn.click( | |
| fn=stop_download, | |
| outputs=[download_status] | |
| ) | |
| # Progress timer | |
| progress_timer = gr.Timer(2.0) # Update every 2 seconds | |
| progress_timer.tick( | |
| fn=get_download_progress, | |
| outputs=[download_stats] | |
| ) | |
| with gr.Accordion("Download Models by Category", open=False): | |
| category_dropdown = gr.Dropdown( | |
| choices=list(get_model_categories().keys()), | |
| label="Model Category", | |
| info="Select a category to view models" | |
| ) | |
| model_dropdown = gr.Dropdown( | |
| choices=[], | |
| label="Select Model to Download", | |
| info="Choose a specific model to download" | |
| ) | |
| category_dropdown.change( | |
| fn=update_model_choices, | |
| inputs=[category_dropdown], | |
| outputs=[model_dropdown] | |
| ) | |
| download_btn = gr.Button("⬇️ Download Model", variant="primary") | |
| single_download_status = gr.Textbox(label="Download Status", interactive=False) | |
| download_btn.click( | |
| fn=download_model, | |
| inputs=[model_dropdown], | |
| outputs=[single_download_status] | |
| ) | |
| # Auto-refresh download status | |
| download_btn.click( | |
| fn=create_model_info_display, | |
| outputs=[model_info] | |
| ) | |
| gr.Markdown("## 👤 Face Recognition Info") | |
| with gr.Accordion("How to Use Face Recognition", open=False): | |
| gr.Markdown(""" | |
| ### Steps: | |
| 1. **Register Face**: Upload a clear photo with one face | |
| 2. **Recognize**: Upload images to identify registered people | |
| 3. **Manage**: View and delete registered faces | |
| ### Tips: | |
| - Use clear, well-lit photos | |
| - Ensure only one face per registration photo | |
| - Front-facing photos work best | |
| - Adjust recognition threshold as needed | |
| """) | |
| gr.Markdown("## 📊 Model Statistics") | |
| stats_display = gr.Markdown(f""" | |
| - **Total Models**: {len(MODEL_CONFIGS)} models available | |
| - **Built-in**: 5 YOLOv8 models (ready to use) | |
| - **Downloadable**: {len(MODEL_CONFIGS) - 5} additional models | |
| - **Categories**: 8 different model categories | |
| """) | |
| gr.Markdown(""" | |
| ### Instructions: | |
| 1. **Image Upload Tab**: Upload an image and adjust confidence/IOU thresholds, then click "Detect Objects" | |
| 2. **Live Webcam Tab**: Allow camera access and see real-time object detection | |
| 3. **Face Recognition Tab**: Register and recognize faces using single photos | |
| 4. **Model Manager**: Choose from 60+ models and download additional ones | |
| 5. **Auto Download**: Click "Download All Models" to get all models at once | |
| 6. Adjust thresholds to fine-tune detection sensitivity | |
| 7. Choose different models for speed vs accuracy trade-off | |
| ### Model Categories: | |
| - **Latest Models**: YOLOv11 (most recent) | |
| - **YOLOv10**: Improved accuracy and speed | |
| - **YOLOv9**: Advanced architecture | |
| - **Segmentation**: Instance segmentation models | |
| - **Pose Estimation**: Human pose detection | |
| - **Classification**: Image classification models | |
| - **Special Models**: World models, SAM, etc. | |
| - **Mobile Optimized**: For mobile/edge devices | |
| Built with [anycoder](https://huggingface.co/spaces/akhaliq/anycoder) | |
| """) | |
| # Launch without share=True for Hugging Face Spaces | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| show_tips=True, | |
| quiet=False, | |
| prevent_thread_lock=False | |
| ) |