erasewatermark / app.py
gnosticdev's picture
Update app.py
bd2d1b3 verified
import gradio as gr
import cv2
import numpy as np
from pathlib import Path
import tempfile
import threading
import time
from datetime import datetime, timedelta
import subprocess
import os
class WatermarkRemover:
def __init__(self):
self.temp_dir = Path(tempfile.gettempdir()) / "watermark_removal"
self.temp_dir.mkdir(exist_ok=True)
self.start_cleanup_thread()
def start_cleanup_thread(self):
"""Inicia thread para limpiar archivos antiguos cada hora"""
def cleanup_worker():
while True:
time.sleep(3600)
self.cleanup_old_files()
thread = threading.Thread(target=cleanup_worker, daemon=True)
thread.start()
def cleanup_old_files(self):
"""Elimina archivos temporales mayores a 2 horas"""
try:
cutoff_time = datetime.now() - timedelta(hours=2)
for file_path in self.temp_dir.glob("*"):
if file_path.is_file():
file_time = datetime.fromtimestamp(file_path.stat().st_mtime)
if file_time < cutoff_time:
file_path.unlink()
print(f"Eliminado: {file_path.name}")
except Exception as e:
print(f"Error limpieza: {e}")
def detect_moving_watermark(self, frames_sample):
"""
Detecta marca de agua analizando varianza temporal en regiones
"""
# Convertir a float para cálculos precisos
frames_float = [frame.astype(np.float32) for frame in frames_sample]
# Calcular varianza temporal por píxel
frames_stack = np.stack(frames_float, axis=0)
temporal_variance = np.var(frames_stack, axis=0)
# Promedio de varianza en canales
variance_gray = np.mean(temporal_variance, axis=2)
# Normalizar
variance_normalized = cv2.normalize(variance_gray, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8)
# Encontrar regiones con BAJA varianza (marca de agua se mueve pero mantiene características)
# Y alta presencia (aparece en todos los frames)
_, low_variance_mask = cv2.threshold(variance_normalized, 50, 255, cv2.THRESH_BINARY_INV)
# Calcular máscara de presencia constante
frames_gray = [cv2.cvtColor(f, cv2.COLOR_BGR2GRAY) for f in frames_sample]
median_intensity = np.median(frames_gray, axis=0).astype(np.uint8)
# Detectar regiones semi-transparentes (marca de agua típica)
presence_mask = cv2.inRange(median_intensity, 30, 220)
# Combinar máscaras
watermark_mask = cv2.bitwise_and(low_variance_mask, presence_mask)
# Limpiar ruido
kernel = np.ones((3, 3), np.uint8)
watermark_mask = cv2.morphologyEx(watermark_mask, cv2.MORPH_OPEN, kernel, iterations=2)
watermark_mask = cv2.morphologyEx(watermark_mask, cv2.MORPH_CLOSE, kernel, iterations=2)
return watermark_mask
def track_watermark_position(self, prev_gray, curr_gray, prev_points):
"""
Rastrea la posición de la marca de agua entre frames usando optical flow
"""
if prev_points is None or len(prev_points) == 0:
return None
# Parámetros para Lucas-Kanade optical flow
lk_params = dict(
winSize=(21, 21),
maxLevel=3,
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01)
)
# Calcular optical flow
next_points, status, _ = cv2.calcOpticalFlowPyrLK(
prev_gray, curr_gray, prev_points, None, **lk_params
)
if next_points is None:
return None
# Filtrar puntos válidos
good_new = next_points[status == 1]
if len(good_new) < 3:
return None
return good_new.reshape(-1, 1, 2).astype(np.float32)
def create_adaptive_mask(self, frame, base_mask, tracked_points=None):
"""
Crea máscara adaptativa para la marca de agua
"""
mask = base_mask.copy()
if tracked_points is not None and len(tracked_points) > 0:
# Expandir máscara alrededor de puntos rastreados
for point in tracked_points:
x, y = point.ravel()
cv2.circle(mask, (int(x), int(y)), 15, 255, -1)
# Dilatar para cubrir completamente la marca de agua
kernel = np.ones((7, 7), np.uint8)
mask = cv2.dilate(mask, kernel, iterations=2)
# Suavizar bordes de la máscara
mask = cv2.GaussianBlur(mask, (21, 21), 0)
return mask
def advanced_inpainting(self, frame, mask):
"""
Inpainting mejorado usando múltiples técnicas
"""
# Convertir máscara a binaria
_, binary_mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
# Aplicar inpainting con algoritmo Navier-Stokes (mejor para texturas)
result = cv2.inpaint(frame, binary_mask, 5, cv2.INPAINT_NS)
# Segunda pasada con Telea para refinar
result = cv2.inpaint(result, binary_mask, 3, cv2.INPAINT_TELEA)
return result
def process_video(self, video_path, sensitivity=50, progress=gr.Progress()):
"""
Procesa video completo eliminando marcas de agua móviles
"""
try:
progress(0, desc="Abriendo video...")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, "❌ Error: No se pudo abrir el video"
# Propiedades del video
fps = int(cap.get(cv2.CAP_PROP_FPS))
if fps == 0:
fps = 30 # fallback
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Limitar tamaño para evitar timeouts en HuggingFace
max_resolution = 1280
scale = 1.0
if width > max_resolution or height > max_resolution:
scale = max_resolution / max(width, height)
width = int(width * scale)
height = int(height * scale)
progress(0.05, desc="Analizando marca de agua...")
# Leer muestra reducida de frames
sample_frames = []
sample_size = min(20, total_frames) # Reducido para más velocidad
for i in range(sample_size):
frame_idx = int(i * total_frames / sample_size)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
if scale != 1.0:
frame = cv2.resize(frame, (width, height))
sample_frames.append(frame)
if len(sample_frames) == 0:
cap.release()
return None, "❌ No se pudieron leer frames del video"
# Detectar máscara base de marca de agua
base_watermark_mask = self.detect_moving_watermark(sample_frames)
# Encontrar puntos característicos
mask_points = cv2.goodFeaturesToTrack(
base_watermark_mask,
maxCorners=30, # Reducido para rendimiento
qualityLevel=0.01,
minDistance=10
)
progress(0.1, desc="Procesando frames...")
# Reiniciar video
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
# Archivo de salida
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = self.temp_dir / f"cleaned_{timestamp}.mp4"
# Usar MP4V codec (más compatible)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
if not out.isOpened():
cap.release()
return None, "❌ Error al crear archivo de salida"
prev_gray = None
tracked_points = mask_points
frame_count = 0
update_interval = max(1, total_frames // 50) # Actualizar progreso cada 2%
while True:
ret, frame = cap.read()
if not ret:
break
# Redimensionar si es necesario
if scale != 1.0:
frame = cv2.resize(frame, (width, height))
curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Actualizar tracking
if prev_gray is not None and tracked_points is not None:
tracked_points = self.track_watermark_position(
prev_gray, curr_gray, tracked_points
)
# Crear máscara adaptativa
adaptive_mask = self.create_adaptive_mask(
frame, base_watermark_mask, tracked_points
)
# Aplicar inpainting
cleaned_frame = self.advanced_inpainting(frame, adaptive_mask)
# Escribir frame
out.write(cleaned_frame)
prev_gray = curr_gray
frame_count += 1
# Re-detectar puntos cada 30 frames
if frame_count % 30 == 0 and mask_points is not None:
tracked_points = mask_points.copy()
# Actualizar progreso menos frecuentemente
if frame_count % update_interval == 0:
progress_val = 0.1 + (0.85 * frame_count / total_frames)
progress(progress_val, desc=f"Procesando: {frame_count}/{total_frames}")
cap.release()
out.release()
progress(1.0, desc="¡Completado!")
return str(output_path), f"✅ Video procesado: {frame_count} frames\n🎯 Resolución: {width}x{height}\n⏱️ FPS: {fps}"
except Exception as e:
import traceback
error_msg = f"❌ Error: {str(e)}\n{traceback.format_exc()}"
print(error_msg)
return None, error_msg
# Crear instancia del removedor
remover = WatermarkRemover()
def process_video_interface(video_file, sensitivity):
"""Interfaz para Gradio"""
if video_file is None:
return None, "⚠️ Por favor, carga un video"
output_path, message = remover.process_video(video_file, sensitivity)
return output_path, message
# Crear interfaz de Gradio
with gr.Blocks(title="Eliminador de Marcas de Agua - REAL", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🎬 Eliminador de Marcas de Agua REAL
**Sistema avanzado de eliminación de marcas de agua móviles usando:**
- 🔍 Análisis de varianza temporal
- 🎯 Optical Flow tracking
- 🖌️ Inpainting multi-capa (Navier-Stokes + Telea)
- 🧹 Limpieza automática de archivos (cada 2 horas)
---
⚠️ **Límites en HuggingFace Spaces:** Videos > 1280p se redimensionan automáticamente
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📥 Entrada")
video_input = gr.Video(
label="Video con Marca de Agua",
format="mp4"
)
sensitivity = gr.Slider(
minimum=10,
maximum=100,
value=50,
step=10,
label="Sensibilidad de Detección",
info="Mayor valor = detección más agresiva"
)
process_btn = gr.Button("🚀 ELIMINAR MARCA DE AGUA", variant="primary", size="lg")
gr.Markdown("""
### 💡 Tips:
- Videos cortos (< 3 min) procesan más rápido
- Formatos: MP4, AVI, MOV, MKV
- Si falla la carga, comprime el video primero
- Aumenta sensibilidad si no detecta la marca
""")
with gr.Column(scale=1):
gr.Markdown("### 📤 Resultado")
video_output = gr.Video(label="Video Limpio")
status_output = gr.Textbox(label="Estado del Proceso", lines=4)
process_btn.click(
fn=process_video_interface,
inputs=[video_input, sensitivity],
outputs=[video_output, status_output]
)
gr.Markdown("""
---
### 🔧 Cómo Funciona:
1. **Análisis Temporal**: Examina múltiples frames para identificar patrones de marca de agua
2. **Tracking Adaptativo**: Sigue el movimiento de la marca usando optical flow
3. **Inpainting Inteligente**: Rellena las áreas detectadas con contenido del fondo
4. **Limpieza Automática**: Borra archivos temporales mayores a 2 horas
### ⚠️ Limitaciones:
- Funciona mejor con marcas de agua semi-transparentes
- Videos muy comprimidos pueden tener resultados variables
- Marcas de agua muy grandes pueden dejar artefactos
""")
if __name__ == "__main__":
demo.queue(max_size=5).launch(
server_name="0.0.0.0",
server_port=7860
)