import gradio as gr import torch import numpy as np import tempfile import os from torchvision import transforms from diffusers import LTXLatentUpsamplePipeline #from pipeline_ltx_condition_control import LTXConditionPipeline, LTXVideoCondition from diffusers.pipelines.ltx.pipeline_ltx_condition import LTXConditionPipeline, LTXVideoCondition from diffusers.utils import export_to_video, load_video from torchvision import transforms import random import imageio from PIL import Image, ImageOps import cv2 import shutil import glob from pathlib import Path import warnings import logging warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=FutureWarning) warnings.filterwarnings("ignore", message=".*") from huggingface_hub import logging as ll ll.set_verbosity_error() ll.set_verbosity_warning() ll.set_verbosity_info() ll.set_verbosity_debug() logger = logging.getLogger("AducDebug") logging.basicConfig(level=logging.DEBUG) logger.setLevel(logging.DEBUG) FPS = 24 dtype = torch.bfloat16 device = "cuda" if torch.cuda.is_available() else "cpu" # Carregamento das pipelines pipeline = LTXConditionPipeline.from_pretrained( "Lightricks/LTX-Video-0.9.8-13B-distilled", offload_state_dict=False, torch_dtype=torch.bfloat16, cache_dir=os.getenv("HF_HOME_CACHE"), token=os.getenv("HF_TOKEN"), ) pipe_upsample = LTXLatentUpsamplePipeline.from_pretrained( "Lightricks/ltxv-spatial-upscaler-0.9.7", cache_dir=os.getenv("HF_HOME_CACHE"), vae=pipeline.vae, torch_dtype=dtype ) pipeline.to(device) pipe_upsample.to(device) pipeline.vae.enable_tiling() current_dir = Path(__file__).parent def cleanup_session_files(request: gr.Request): """Limpa arquivos temporários da sessão quando o usuário se desconecta.""" try: session_id = request.session_hash session_dir = os.path.join("/tmp/gradio", session_id) if os.path.exists(session_dir): shutil.rmtree(session_dir) print(f"Limpou o diretório da sessão: {session_dir}") except Exception as e: print(f"Erro durante a limpeza da sessão: {e}") def read_video(video) -> torch.Tensor: """Lê um arquivo de vídeo e converte para um tensor torch.""" to_tensor_transform = transforms.ToTensor() if isinstance(video, str): video_tensor = torch.stack([to_tensor_transform(img) for img in imageio.get_reader(video)]) else: video_tensor = torch.stack([to_tensor_transform(img) for img in video]) return video_tensor def round_to_nearest_resolution_acceptable_by_vae(height, width, vae_temporal_compression_ratio): """Arredonda a resolução para valores aceitáveis pelo VAE.""" height = height - (height % vae_temporal_compression_ratio) width = width - (width % vae_temporal_compression_ratio) return height, width # A assinatura da função volta a aceitar argumentos individuais para compatibilidade com o Gradio def generate_video( condition_image_1, condition_strength_1, condition_frame_index_1, condition_image_2, condition_strength_2, condition_frame_index_2, prompt, duration=3.0, negative_prompt="worst quality, inconsistent motion, blurry, jittery, distorted", height=768, width=1152, num_inference_steps=7, guidance_scale=1.0, seed=0, randomize_seed=False, progress=gr.Progress(track_tqdm=True) ): if True: # Lógica para agrupar as condições *dentro* da função # Cálculo de frames e resolução num_frames = int(duration * FPS) + 1 temporal_compression = pipeline.vae_temporal_compression_ratio num_frames = ((num_frames - 1) // temporal_compression) * temporal_compression + 1 downscale_factor = 2 / 3 downscaled_height = int(height * downscale_factor) downscaled_width = int(width * downscale_factor) downscaled_height, downscaled_width = round_to_nearest_resolution_acceptable_by_vae( downscaled_height, downscaled_width, pipeline.vae_temporal_compression_ratio ) conditions = [] if condition_image_1 is not None: condition_image_1 = ImageOps.fit(condition_image_1, (downscaled_width, downscaled_height), Image.LANCZOS) conditions.append(LTXVideoCondition( image=condition_image_1, strength=condition_strength_1, frame_index=int(condition_frame_index_1) )) if condition_image_2 is not None: condition_image_2 = ImageOps.fit(condition_image_2, (downscaled_width, downscaled_height), Image.LANCZOS) conditions.append(LTXVideoCondition( image=condition_image_2, strength=condition_strength_2, frame_index=int(condition_frame_index_2) )) pipeline_args = {} if conditions: pipeline_args["conditions"] = conditions if True: # dentro da função generatevideo(), após calcular downscaledheight, downscaledwidth: conditions = [] def image_to_latents(img: Image): # converte PIL→tensor 4-D [C, H, W] tensor = transforms.ToTensor()(img).unsqueeze(0) # [1, C, H, W] tensor = tensor.unsqueeze(2).to(device).to(dtype) # [1, C, 1, H, W] with torch.no_grad(): vae_out = pipeline.vae.encode(tensor) # agora aceita 5-D latents = vae_out.latent_dist.sample() # amostra 5-D [1, C_lat, 1, H', W'] # aplica fator de escala se houver if hasattr(pipeline.vae.config, "scaling_factor"): latents = latents * pipeline.vae.config.scaling_factor return latents # exemplo para primeira condição if condition_image_1 is not None: img1 = ImageOps.fit(condition_image_1, (downscaled_width, downscaled_height), Image.LANCZOS) lat1 = image_to_latents(img1) # agora lat1 → forma [1, C, H', W'], expande dimensão de frames # aqui usamos 1 frame de condicionamento; se quiser mais, repita ou ajuste lat1 = lat1.unsqueeze(2) # [1, C, 1, H', W'] conditions.append( LTXVideoCondition( latents=lat1, strength=condition_strength_1, frame_index=int(condition_frame_index_1), ) ) print (f"condition_image_1 {lat1.shape}") # mesma lógica para condição 2 if condition_image_2 is not None: img2 = ImageOps.fit(condition_image_2, (downscaled_width, downscaled_height), Image.LANCZOS) lat2 = image_to_latents(img2).unsqueeze(2) conditions.append( LTXVideoCondition( latents=lat2, strength=condition_strength_2, frame_index=int(condition_frame_index_2), ) ) print (f"condition_image_2 {lat2.shape}") # Manipulação da seed if randomize_seed: seed = random.randint(0, 2**32 - 1) # ETAPA 1: Geração do vídeo em baixa resolução latents = pipeline( prompt=prompt, negative_prompt=negative_prompt, width=downscaled_width, height=downscaled_height, num_frames=num_frames, timesteps=[1000, 993, 987, 981, 975, 909, 725, 0.03], decode_timestep=0.05, decode_noise_scale=0.025, image_cond_noise_scale=0.0, guidance_scale=guidance_scale, guidance_rescale=0.7, generator=torch.Generator().manual_seed(seed), output_type="latent", **pipeline_args ).frames # ETAPA 2: Upscale dos latentes upscaled_height, upscaled_width = downscaled_height * 2, downscaled_width * 2 upscaled_latents = pipe_upsample( latents=latents, output_type="latent" ).frames conditions = [] if condition_image_1 is not None: condition_image_1 = ImageOps.fit(condition_image_1, (upscaled_width, upscaled_height), Image.LANCZOS) conditions.append(LTXVideoCondition( image=condition_image_1, strength=condition_strength_1, frame_index=int(condition_frame_index_1) )) if condition_image_2 is not None: condition_image_2 = ImageOps.fit(condition_image_2, (upscaled_width, upscaled_height), Image.LANCZOS) conditions.append(LTXVideoCondition( image=condition_image_2, strength=condition_strength_2, frame_index=int(condition_frame_index_2) )) pipeline_args = {} if conditions: pipeline_args["conditions"] = conditions # ETAPA 3: Denoise final em alta resolução final_video_frames_np = pipeline( prompt=prompt, negative_prompt=negative_prompt, width=upscaled_width, height=upscaled_height, num_frames=num_frames, denoise_strength=0.999, timesteps=[1000, 909, 725, 421, 0], latents=upscaled_latents, decode_timestep=0.05, decode_noise_scale=0.025, image_cond_noise_scale=0.0, guidance_scale=guidance_scale, guidance_rescale=0.7, generator=torch.Generator(device="cuda").manual_seed(seed), output_type="np", **pipeline_args ).frames[0] # Exportação para arquivo MP4 video_uint8_frames = [(frame * 255).astype(np.uint8) for frame in final_video_frames_np] output_filename = "output.mp4" with imageio.get_writer(output_filename, fps=FPS, quality=8, macro_block_size=1) as writer: for frame_idx, frame_data in enumerate(video_uint8_frames): progress((frame_idx + 1) / len(video_uint8_frames), desc="Codificando frames do vídeo...") writer.append_data(frame_data) return output_filename, seed # Interface Gráfica com Gradio with gr.Blocks(theme=gr.themes.Ocean(font=[gr.themes.GoogleFont("Lexend Deca"), "sans-serif"]), delete_cache=(60, 900)) as demo: gr.Markdown( """ # Geração de Vídeo com LTX **Crie vídeos a partir de texto e imagens de condição usando o modelo LTX-Video.** """ ) with gr.Row(): with gr.Column(scale=1): prompt = gr.Textbox( label="Prompt", placeholder="Descreva o vídeo que você quer gerar...", lines=3, value="O Coringa em seu icônico terno roxo e cabelo verde, dançando sozinho em um quarto escuro e decadente. Seus movimentos são erráticos e imprevisíveis, alternando entre graciosos e caóticos enquanto ele se perde no momento. A câmera captura seus gestos teatrais, sua dança refletindo sua personalidade desequilibrada. Iluminação temperamental com sombras dançando pelas paredes, criando uma atmosfera de bela loucura." ) with gr.Accordion("Imagem de Condição 1", open=True): condition_image_1 = gr.Image(label="Imagem de Condição 1", type="pil") with gr.Row(): condition_strength_1 = gr.Slider(label="Peso (Strength)", minimum=0.0, maximum=1.0, step=0.05, value=1.0) condition_frame_index_1 = gr.Number(label="Frame", value=0, precision=0) with gr.Accordion("Imagem de Condição 2", open=False): condition_image_2 = gr.Image(label="Imagem de Condição 2", type="pil") with gr.Row(): condition_strength_2 = gr.Slider(label="Peso (Strength)", minimum=0.0, maximum=1.0, step=0.05, value=1.0) condition_frame_index_2 = gr.Number(label="Frame", value=0, precision=0) duration = gr.Slider(label="Duração (segundos)", minimum=1.0, maximum=10.0, step=0.5, value=2) with gr.Accordion("Configurações Avançadas", open=False): negative_prompt = gr.Textbox(label="Prompt Negativo", placeholder="O que você não quer no vídeo...", lines=2, value="pior qualidade, movimento inconsistente, embaçado, tremido, distorcido") with gr.Row(): height = gr.Slider(label="Altura", minimum=256, maximum=1536, step=32, value=768) width = gr.Slider(label="Largura", minimum=256, maximum=1536, step=32, value=1152) num_inference_steps = gr.Slider(label="Passos de Inferência", minimum=5, maximum=10, step=1, value=7, visible=False) with gr.Row(): guidance_scale = gr.Slider(label="Escala de Orientação (Guidance)", minimum=1.0, maximum=5.0, step=0.1, value=1.0) with gr.Row(): randomize_seed = gr.Checkbox(label="Seed Aleatória", value=True) seed = gr.Number(label="Seed", value=0, precision=0) generate_btn = gr.Button("Gerar Vídeo", variant="primary", size="lg") with gr.Column(scale=1): output_video = gr.Video(label="Vídeo Gerado", height=400) # CORREÇÃO: A lista de inputs agora é "plana", contendo apenas componentes do Gradio generate_btn.click( fn=generate_video, inputs=[ condition_image_1, condition_strength_1, condition_frame_index_1, condition_image_2, condition_strength_2, condition_frame_index_2, prompt, duration, negative_prompt, height, width, num_inference_steps, guidance_scale, seed, randomize_seed, ], outputs=[output_video, seed], show_progress=True ) demo.unload(cleanup_session_files) if __name__ == "__main__": demo.queue().launch(server_name="0.0.0.0", server_port=7860, debug=True, show_error=True)