Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| from pathlib import Path | |
| import tempfile | |
| import threading | |
| import time | |
| from datetime import datetime, timedelta | |
| import subprocess | |
| import os | |
| class WatermarkRemover: | |
| def __init__(self): | |
| self.temp_dir = Path(tempfile.gettempdir()) / "watermark_removal" | |
| self.temp_dir.mkdir(exist_ok=True) | |
| self.start_cleanup_thread() | |
| def start_cleanup_thread(self): | |
| """Inicia thread para limpiar archivos antiguos cada hora""" | |
| def cleanup_worker(): | |
| while True: | |
| time.sleep(3600) | |
| self.cleanup_old_files() | |
| thread = threading.Thread(target=cleanup_worker, daemon=True) | |
| thread.start() | |
| def cleanup_old_files(self): | |
| """Elimina archivos temporales mayores a 2 horas""" | |
| try: | |
| cutoff_time = datetime.now() - timedelta(hours=2) | |
| for file_path in self.temp_dir.glob("*"): | |
| if file_path.is_file(): | |
| file_time = datetime.fromtimestamp(file_path.stat().st_mtime) | |
| if file_time < cutoff_time: | |
| file_path.unlink() | |
| print(f"Eliminado: {file_path.name}") | |
| except Exception as e: | |
| print(f"Error limpieza: {e}") | |
| def detect_moving_watermark(self, frames_sample): | |
| """ | |
| Detecta marca de agua analizando varianza temporal en regiones | |
| """ | |
| # Convertir a float para cálculos precisos | |
| frames_float = [frame.astype(np.float32) for frame in frames_sample] | |
| # Calcular varianza temporal por píxel | |
| frames_stack = np.stack(frames_float, axis=0) | |
| temporal_variance = np.var(frames_stack, axis=0) | |
| # Promedio de varianza en canales | |
| variance_gray = np.mean(temporal_variance, axis=2) | |
| # Normalizar | |
| variance_normalized = cv2.normalize(variance_gray, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) | |
| # Encontrar regiones con BAJA varianza (marca de agua se mueve pero mantiene características) | |
| # Y alta presencia (aparece en todos los frames) | |
| _, low_variance_mask = cv2.threshold(variance_normalized, 50, 255, cv2.THRESH_BINARY_INV) | |
| # Calcular máscara de presencia constante | |
| frames_gray = [cv2.cvtColor(f, cv2.COLOR_BGR2GRAY) for f in frames_sample] | |
| median_intensity = np.median(frames_gray, axis=0).astype(np.uint8) | |
| # Detectar regiones semi-transparentes (marca de agua típica) | |
| presence_mask = cv2.inRange(median_intensity, 30, 220) | |
| # Combinar máscaras | |
| watermark_mask = cv2.bitwise_and(low_variance_mask, presence_mask) | |
| # Limpiar ruido | |
| kernel = np.ones((3, 3), np.uint8) | |
| watermark_mask = cv2.morphologyEx(watermark_mask, cv2.MORPH_OPEN, kernel, iterations=2) | |
| watermark_mask = cv2.morphologyEx(watermark_mask, cv2.MORPH_CLOSE, kernel, iterations=2) | |
| return watermark_mask | |
| def track_watermark_position(self, prev_gray, curr_gray, prev_points): | |
| """ | |
| Rastrea la posición de la marca de agua entre frames usando optical flow | |
| """ | |
| if prev_points is None or len(prev_points) == 0: | |
| return None | |
| # Parámetros para Lucas-Kanade optical flow | |
| lk_params = dict( | |
| winSize=(21, 21), | |
| maxLevel=3, | |
| criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01) | |
| ) | |
| # Calcular optical flow | |
| next_points, status, _ = cv2.calcOpticalFlowPyrLK( | |
| prev_gray, curr_gray, prev_points, None, **lk_params | |
| ) | |
| if next_points is None: | |
| return None | |
| # Filtrar puntos válidos | |
| good_new = next_points[status == 1] | |
| if len(good_new) < 3: | |
| return None | |
| return good_new.reshape(-1, 1, 2).astype(np.float32) | |
| def create_adaptive_mask(self, frame, base_mask, tracked_points=None): | |
| """ | |
| Crea máscara adaptativa para la marca de agua | |
| """ | |
| mask = base_mask.copy() | |
| if tracked_points is not None and len(tracked_points) > 0: | |
| # Expandir máscara alrededor de puntos rastreados | |
| for point in tracked_points: | |
| x, y = point.ravel() | |
| cv2.circle(mask, (int(x), int(y)), 15, 255, -1) | |
| # Dilatar para cubrir completamente la marca de agua | |
| kernel = np.ones((7, 7), np.uint8) | |
| mask = cv2.dilate(mask, kernel, iterations=2) | |
| # Suavizar bordes de la máscara | |
| mask = cv2.GaussianBlur(mask, (21, 21), 0) | |
| return mask | |
| def advanced_inpainting(self, frame, mask): | |
| """ | |
| Inpainting mejorado usando múltiples técnicas | |
| """ | |
| # Convertir máscara a binaria | |
| _, binary_mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY) | |
| # Aplicar inpainting con algoritmo Navier-Stokes (mejor para texturas) | |
| result = cv2.inpaint(frame, binary_mask, 5, cv2.INPAINT_NS) | |
| # Segunda pasada con Telea para refinar | |
| result = cv2.inpaint(result, binary_mask, 3, cv2.INPAINT_TELEA) | |
| return result | |
| def process_video(self, video_path, sensitivity=50, progress=gr.Progress()): | |
| """ | |
| Procesa video completo eliminando marcas de agua móviles | |
| """ | |
| try: | |
| progress(0, desc="Abriendo video...") | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, "❌ Error: No se pudo abrir el video" | |
| # Propiedades del video | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| if fps == 0: | |
| fps = 30 # fallback | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Limitar tamaño para evitar timeouts en HuggingFace | |
| max_resolution = 1280 | |
| scale = 1.0 | |
| if width > max_resolution or height > max_resolution: | |
| scale = max_resolution / max(width, height) | |
| width = int(width * scale) | |
| height = int(height * scale) | |
| progress(0.05, desc="Analizando marca de agua...") | |
| # Leer muestra reducida de frames | |
| sample_frames = [] | |
| sample_size = min(20, total_frames) # Reducido para más velocidad | |
| for i in range(sample_size): | |
| frame_idx = int(i * total_frames / sample_size) | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) | |
| ret, frame = cap.read() | |
| if ret: | |
| if scale != 1.0: | |
| frame = cv2.resize(frame, (width, height)) | |
| sample_frames.append(frame) | |
| if len(sample_frames) == 0: | |
| cap.release() | |
| return None, "❌ No se pudieron leer frames del video" | |
| # Detectar máscara base de marca de agua | |
| base_watermark_mask = self.detect_moving_watermark(sample_frames) | |
| # Encontrar puntos característicos | |
| mask_points = cv2.goodFeaturesToTrack( | |
| base_watermark_mask, | |
| maxCorners=30, # Reducido para rendimiento | |
| qualityLevel=0.01, | |
| minDistance=10 | |
| ) | |
| progress(0.1, desc="Procesando frames...") | |
| # Reiniciar video | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
| # Archivo de salida | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_path = self.temp_dir / f"cleaned_{timestamp}.mp4" | |
| # Usar MP4V codec (más compatible) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) | |
| if not out.isOpened(): | |
| cap.release() | |
| return None, "❌ Error al crear archivo de salida" | |
| prev_gray = None | |
| tracked_points = mask_points | |
| frame_count = 0 | |
| update_interval = max(1, total_frames // 50) # Actualizar progreso cada 2% | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Redimensionar si es necesario | |
| if scale != 1.0: | |
| frame = cv2.resize(frame, (width, height)) | |
| curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| # Actualizar tracking | |
| if prev_gray is not None and tracked_points is not None: | |
| tracked_points = self.track_watermark_position( | |
| prev_gray, curr_gray, tracked_points | |
| ) | |
| # Crear máscara adaptativa | |
| adaptive_mask = self.create_adaptive_mask( | |
| frame, base_watermark_mask, tracked_points | |
| ) | |
| # Aplicar inpainting | |
| cleaned_frame = self.advanced_inpainting(frame, adaptive_mask) | |
| # Escribir frame | |
| out.write(cleaned_frame) | |
| prev_gray = curr_gray | |
| frame_count += 1 | |
| # Re-detectar puntos cada 30 frames | |
| if frame_count % 30 == 0 and mask_points is not None: | |
| tracked_points = mask_points.copy() | |
| # Actualizar progreso menos frecuentemente | |
| if frame_count % update_interval == 0: | |
| progress_val = 0.1 + (0.85 * frame_count / total_frames) | |
| progress(progress_val, desc=f"Procesando: {frame_count}/{total_frames}") | |
| cap.release() | |
| out.release() | |
| progress(1.0, desc="¡Completado!") | |
| return str(output_path), f"✅ Video procesado: {frame_count} frames\n🎯 Resolución: {width}x{height}\n⏱️ FPS: {fps}" | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"❌ Error: {str(e)}\n{traceback.format_exc()}" | |
| print(error_msg) | |
| return None, error_msg | |
| # Crear instancia del removedor | |
| remover = WatermarkRemover() | |
| def process_video_interface(video_file, sensitivity): | |
| """Interfaz para Gradio""" | |
| if video_file is None: | |
| return None, "⚠️ Por favor, carga un video" | |
| output_path, message = remover.process_video(video_file, sensitivity) | |
| return output_path, message | |
| # Crear interfaz de Gradio | |
| with gr.Blocks(title="Eliminador de Marcas de Agua - REAL", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # 🎬 Eliminador de Marcas de Agua REAL | |
| **Sistema avanzado de eliminación de marcas de agua móviles usando:** | |
| - 🔍 Análisis de varianza temporal | |
| - 🎯 Optical Flow tracking | |
| - 🖌️ Inpainting multi-capa (Navier-Stokes + Telea) | |
| - 🧹 Limpieza automática de archivos (cada 2 horas) | |
| --- | |
| ⚠️ **Límites en HuggingFace Spaces:** Videos > 1280p se redimensionan automáticamente | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📥 Entrada") | |
| video_input = gr.Video( | |
| label="Video con Marca de Agua", | |
| format="mp4" | |
| ) | |
| sensitivity = gr.Slider( | |
| minimum=10, | |
| maximum=100, | |
| value=50, | |
| step=10, | |
| label="Sensibilidad de Detección", | |
| info="Mayor valor = detección más agresiva" | |
| ) | |
| process_btn = gr.Button("🚀 ELIMINAR MARCA DE AGUA", variant="primary", size="lg") | |
| gr.Markdown(""" | |
| ### 💡 Tips: | |
| - Videos cortos (< 3 min) procesan más rápido | |
| - Formatos: MP4, AVI, MOV, MKV | |
| - Si falla la carga, comprime el video primero | |
| - Aumenta sensibilidad si no detecta la marca | |
| """) | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 📤 Resultado") | |
| video_output = gr.Video(label="Video Limpio") | |
| status_output = gr.Textbox(label="Estado del Proceso", lines=4) | |
| process_btn.click( | |
| fn=process_video_interface, | |
| inputs=[video_input, sensitivity], | |
| outputs=[video_output, status_output] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### 🔧 Cómo Funciona: | |
| 1. **Análisis Temporal**: Examina múltiples frames para identificar patrones de marca de agua | |
| 2. **Tracking Adaptativo**: Sigue el movimiento de la marca usando optical flow | |
| 3. **Inpainting Inteligente**: Rellena las áreas detectadas con contenido del fondo | |
| 4. **Limpieza Automática**: Borra archivos temporales mayores a 2 horas | |
| ### ⚠️ Limitaciones: | |
| - Funciona mejor con marcas de agua semi-transparentes | |
| - Videos muy comprimidos pueden tener resultados variables | |
| - Marcas de agua muy grandes pueden dejar artefactos | |
| """) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=5).launch( | |
| server_name="0.0.0.0", | |
| server_port=7860 | |
| ) |