import gradio as gr import cv2 import numpy as np from pathlib import Path import tempfile import threading import time from datetime import datetime, timedelta import subprocess import os class WatermarkRemover: def __init__(self): self.temp_dir = Path(tempfile.gettempdir()) / "watermark_removal" self.temp_dir.mkdir(exist_ok=True) self.start_cleanup_thread() def start_cleanup_thread(self): """Inicia thread para limpiar archivos antiguos cada hora""" def cleanup_worker(): while True: time.sleep(3600) self.cleanup_old_files() thread = threading.Thread(target=cleanup_worker, daemon=True) thread.start() def cleanup_old_files(self): """Elimina archivos temporales mayores a 2 horas""" try: cutoff_time = datetime.now() - timedelta(hours=2) for file_path in self.temp_dir.glob("*"): if file_path.is_file(): file_time = datetime.fromtimestamp(file_path.stat().st_mtime) if file_time < cutoff_time: file_path.unlink() print(f"Eliminado: {file_path.name}") except Exception as e: print(f"Error limpieza: {e}") def detect_moving_watermark(self, frames_sample): """ Detecta marca de agua analizando varianza temporal en regiones """ # Convertir a float para cálculos precisos frames_float = [frame.astype(np.float32) for frame in frames_sample] # Calcular varianza temporal por píxel frames_stack = np.stack(frames_float, axis=0) temporal_variance = np.var(frames_stack, axis=0) # Promedio de varianza en canales variance_gray = np.mean(temporal_variance, axis=2) # Normalizar variance_normalized = cv2.normalize(variance_gray, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) # Encontrar regiones con BAJA varianza (marca de agua se mueve pero mantiene características) # Y alta presencia (aparece en todos los frames) _, low_variance_mask = cv2.threshold(variance_normalized, 50, 255, cv2.THRESH_BINARY_INV) # Calcular máscara de presencia constante frames_gray = [cv2.cvtColor(f, cv2.COLOR_BGR2GRAY) for f in frames_sample] median_intensity = np.median(frames_gray, axis=0).astype(np.uint8) # Detectar regiones semi-transparentes (marca de agua típica) presence_mask = cv2.inRange(median_intensity, 30, 220) # Combinar máscaras watermark_mask = cv2.bitwise_and(low_variance_mask, presence_mask) # Limpiar ruido kernel = np.ones((3, 3), np.uint8) watermark_mask = cv2.morphologyEx(watermark_mask, cv2.MORPH_OPEN, kernel, iterations=2) watermark_mask = cv2.morphologyEx(watermark_mask, cv2.MORPH_CLOSE, kernel, iterations=2) return watermark_mask def track_watermark_position(self, prev_gray, curr_gray, prev_points): """ Rastrea la posición de la marca de agua entre frames usando optical flow """ if prev_points is None or len(prev_points) == 0: return None # Parámetros para Lucas-Kanade optical flow lk_params = dict( winSize=(21, 21), maxLevel=3, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01) ) # Calcular optical flow next_points, status, _ = cv2.calcOpticalFlowPyrLK( prev_gray, curr_gray, prev_points, None, **lk_params ) if next_points is None: return None # Filtrar puntos válidos good_new = next_points[status == 1] if len(good_new) < 3: return None return good_new.reshape(-1, 1, 2).astype(np.float32) def create_adaptive_mask(self, frame, base_mask, tracked_points=None): """ Crea máscara adaptativa para la marca de agua """ mask = base_mask.copy() if tracked_points is not None and len(tracked_points) > 0: # Expandir máscara alrededor de puntos rastreados for point in tracked_points: x, y = point.ravel() cv2.circle(mask, (int(x), int(y)), 15, 255, -1) # Dilatar para cubrir completamente la marca de agua kernel = np.ones((7, 7), np.uint8) mask = cv2.dilate(mask, kernel, iterations=2) # Suavizar bordes de la máscara mask = cv2.GaussianBlur(mask, (21, 21), 0) return mask def advanced_inpainting(self, frame, mask): """ Inpainting mejorado usando múltiples técnicas """ # Convertir máscara a binaria _, binary_mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY) # Aplicar inpainting con algoritmo Navier-Stokes (mejor para texturas) result = cv2.inpaint(frame, binary_mask, 5, cv2.INPAINT_NS) # Segunda pasada con Telea para refinar result = cv2.inpaint(result, binary_mask, 3, cv2.INPAINT_TELEA) return result def process_video(self, video_path, sensitivity=50, progress=gr.Progress()): """ Procesa video completo eliminando marcas de agua móviles """ try: progress(0, desc="Abriendo video...") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, "❌ Error: No se pudo abrir el video" # Propiedades del video fps = int(cap.get(cv2.CAP_PROP_FPS)) if fps == 0: fps = 30 # fallback width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Limitar tamaño para evitar timeouts en HuggingFace max_resolution = 1280 scale = 1.0 if width > max_resolution or height > max_resolution: scale = max_resolution / max(width, height) width = int(width * scale) height = int(height * scale) progress(0.05, desc="Analizando marca de agua...") # Leer muestra reducida de frames sample_frames = [] sample_size = min(20, total_frames) # Reducido para más velocidad for i in range(sample_size): frame_idx = int(i * total_frames / sample_size) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if ret: if scale != 1.0: frame = cv2.resize(frame, (width, height)) sample_frames.append(frame) if len(sample_frames) == 0: cap.release() return None, "❌ No se pudieron leer frames del video" # Detectar máscara base de marca de agua base_watermark_mask = self.detect_moving_watermark(sample_frames) # Encontrar puntos característicos mask_points = cv2.goodFeaturesToTrack( base_watermark_mask, maxCorners=30, # Reducido para rendimiento qualityLevel=0.01, minDistance=10 ) progress(0.1, desc="Procesando frames...") # Reiniciar video cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Archivo de salida timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = self.temp_dir / f"cleaned_{timestamp}.mp4" # Usar MP4V codec (más compatible) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) if not out.isOpened(): cap.release() return None, "❌ Error al crear archivo de salida" prev_gray = None tracked_points = mask_points frame_count = 0 update_interval = max(1, total_frames // 50) # Actualizar progreso cada 2% while True: ret, frame = cap.read() if not ret: break # Redimensionar si es necesario if scale != 1.0: frame = cv2.resize(frame, (width, height)) curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Actualizar tracking if prev_gray is not None and tracked_points is not None: tracked_points = self.track_watermark_position( prev_gray, curr_gray, tracked_points ) # Crear máscara adaptativa adaptive_mask = self.create_adaptive_mask( frame, base_watermark_mask, tracked_points ) # Aplicar inpainting cleaned_frame = self.advanced_inpainting(frame, adaptive_mask) # Escribir frame out.write(cleaned_frame) prev_gray = curr_gray frame_count += 1 # Re-detectar puntos cada 30 frames if frame_count % 30 == 0 and mask_points is not None: tracked_points = mask_points.copy() # Actualizar progreso menos frecuentemente if frame_count % update_interval == 0: progress_val = 0.1 + (0.85 * frame_count / total_frames) progress(progress_val, desc=f"Procesando: {frame_count}/{total_frames}") cap.release() out.release() progress(1.0, desc="¡Completado!") return str(output_path), f"✅ Video procesado: {frame_count} frames\n🎯 Resolución: {width}x{height}\n⏱️ FPS: {fps}" except Exception as e: import traceback error_msg = f"❌ Error: {str(e)}\n{traceback.format_exc()}" print(error_msg) return None, error_msg # Crear instancia del removedor remover = WatermarkRemover() def process_video_interface(video_file, sensitivity): """Interfaz para Gradio""" if video_file is None: return None, "⚠️ Por favor, carga un video" output_path, message = remover.process_video(video_file, sensitivity) return output_path, message # Crear interfaz de Gradio with gr.Blocks(title="Eliminador de Marcas de Agua - REAL", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🎬 Eliminador de Marcas de Agua REAL **Sistema avanzado de eliminación de marcas de agua móviles usando:** - 🔍 Análisis de varianza temporal - 🎯 Optical Flow tracking - 🖌️ Inpainting multi-capa (Navier-Stokes + Telea) - 🧹 Limpieza automática de archivos (cada 2 horas) --- ⚠️ **Límites en HuggingFace Spaces:** Videos > 1280p se redimensionan automáticamente """) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📥 Entrada") video_input = gr.Video( label="Video con Marca de Agua", format="mp4" ) sensitivity = gr.Slider( minimum=10, maximum=100, value=50, step=10, label="Sensibilidad de Detección", info="Mayor valor = detección más agresiva" ) process_btn = gr.Button("🚀 ELIMINAR MARCA DE AGUA", variant="primary", size="lg") gr.Markdown(""" ### 💡 Tips: - Videos cortos (< 3 min) procesan más rápido - Formatos: MP4, AVI, MOV, MKV - Si falla la carga, comprime el video primero - Aumenta sensibilidad si no detecta la marca """) with gr.Column(scale=1): gr.Markdown("### 📤 Resultado") video_output = gr.Video(label="Video Limpio") status_output = gr.Textbox(label="Estado del Proceso", lines=4) process_btn.click( fn=process_video_interface, inputs=[video_input, sensitivity], outputs=[video_output, status_output] ) gr.Markdown(""" --- ### 🔧 Cómo Funciona: 1. **Análisis Temporal**: Examina múltiples frames para identificar patrones de marca de agua 2. **Tracking Adaptativo**: Sigue el movimiento de la marca usando optical flow 3. **Inpainting Inteligente**: Rellena las áreas detectadas con contenido del fondo 4. **Limpieza Automática**: Borra archivos temporales mayores a 2 horas ### ⚠️ Limitaciones: - Funciona mejor con marcas de agua semi-transparentes - Videos muy comprimidos pueden tener resultados variables - Marcas de agua muy grandes pueden dejar artefactos """) if __name__ == "__main__": demo.queue(max_size=5).launch( server_name="0.0.0.0", server_port=7860 )