|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import sys |
|
|
import gc |
|
|
import yaml |
|
|
import time |
|
|
import json |
|
|
import random |
|
|
import shutil |
|
|
import warnings |
|
|
import tempfile |
|
|
import traceback |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Optional, Tuple, Union |
|
|
import cv2 |
|
|
|
|
|
|
|
|
warnings.filterwarnings("ignore", category=UserWarning) |
|
|
warnings.filterwarnings("ignore", category=FutureWarning) |
|
|
from huggingface_hub import logging as hf_logging |
|
|
hf_logging.set_verbosity_error() |
|
|
|
|
|
|
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
from einops import rearrange |
|
|
from huggingface_hub import hf_hub_download |
|
|
from safetensors import safe_open |
|
|
|
|
|
from managers.vae_manager import vae_manager_singleton |
|
|
from tools.video_encode_tool import video_encode_tool_singleton |
|
|
|
|
|
from api.aduc_ltx_latent_patch import LTXLatentConditioningPatch, PatchedConditioningItem |
|
|
|
|
|
|
|
|
LTXV_DEBUG = True |
|
|
LTXV_FRAME_LOG_EVERY = 8 |
|
|
DEPS_DIR = Path("/data") |
|
|
LTX_VIDEO_REPO_DIR = DEPS_DIR / "LTX-Video" |
|
|
RESULTS_DIR = Path("/app/output") |
|
|
DEFAULT_FPS = 24.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _run_setup_script(): |
|
|
"""Executa o script setup.py se o repositório LTX-Video não existir.""" |
|
|
setup_script_path = "setup.py" |
|
|
if not os.path.exists(setup_script_path): |
|
|
print("[DEBUG] 'setup.py' não encontrado. Pulando clonagem de dependências.") |
|
|
return |
|
|
|
|
|
print(f"[DEBUG] Repositório não encontrado em {LTX_VIDEO_REPO_DIR}. Executando setup.py...") |
|
|
try: |
|
|
subprocess.run([sys.executable, setup_script_path], check=True, capture_output=True, text=True) |
|
|
print("[DEBUG] Script 'setup.py' concluído com sucesso.") |
|
|
except subprocess.CalledProcessError as e: |
|
|
print(f"[ERROR] Falha ao executar 'setup.py' (código {e.returncode}).\nOutput:\n{e.stdout}\n{e.stderr}") |
|
|
sys.exit(1) |
|
|
|
|
|
def add_deps_to_path(repo_path: Path): |
|
|
"""Adiciona o diretório do repositório ao sys.path para importações locais.""" |
|
|
resolved_path = str(repo_path.resolve()) |
|
|
if resolved_path not in sys.path: |
|
|
sys.path.insert(0, resolved_path) |
|
|
if LTXV_DEBUG: |
|
|
print(f"[DEBUG] Adicionado ao sys.path: {resolved_path}") |
|
|
|
|
|
|
|
|
if not LTX_VIDEO_REPO_DIR.exists(): |
|
|
_run_setup_script() |
|
|
add_deps_to_path(LTX_VIDEO_REPO_DIR) |
|
|
|
|
|
|
|
|
from ltx_video.models.autoencoders.vae_encode import un_normalize_latents, normalize_latents |
|
|
from ltx_video.pipelines.pipeline_ltx_video import adain_filter_latent |
|
|
from ltx_video.models.autoencoders.latent_upsampler import LatentUpsampler |
|
|
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXVideoPipeline |
|
|
from transformers import T5EncoderModel, T5Tokenizer, AutoModelForCausalLM, AutoProcessor, AutoTokenizer |
|
|
from ltx_video.models.autoencoders.causal_video_autoencoder import CausalVideoAutoencoder |
|
|
from ltx_video.models.transformers.symmetric_patchifier import SymmetricPatchifier |
|
|
from ltx_video.models.transformers.transformer3d import Transformer3DModel |
|
|
from ltx_video.schedulers.rf import RectifiedFlowScheduler |
|
|
from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy |
|
|
import ltx_video.pipelines.crf_compressor as crf_compressor |
|
|
|
|
|
|
|
|
def create_latent_upsampler(latent_upsampler_model_path: str, device: str): |
|
|
latent_upsampler = LatentUpsampler.from_pretrained(latent_upsampler_model_path) |
|
|
latent_upsampler.to(device) |
|
|
latent_upsampler.eval() |
|
|
return latent_upsampler |
|
|
|
|
|
def create_ltx_video_pipeline( |
|
|
ckpt_path: str, |
|
|
precision: str, |
|
|
text_encoder_model_name_or_path: str, |
|
|
sampler: Optional[str] = None, |
|
|
device: Optional[str] = None, |
|
|
enhance_prompt: bool = False, |
|
|
prompt_enhancer_image_caption_model_name_or_path: Optional[str] = None, |
|
|
prompt_enhancer_llm_model_name_or_path: Optional[str] = None, |
|
|
) -> LTXVideoPipeline: |
|
|
ckpt_path = Path(ckpt_path) |
|
|
assert os.path.exists( |
|
|
ckpt_path |
|
|
), f"Ckpt path provided (--ckpt_path) {ckpt_path} does not exist" |
|
|
|
|
|
with safe_open(ckpt_path, framework="pt") as f: |
|
|
metadata = f.metadata() |
|
|
config_str = metadata.get("config") |
|
|
configs = json.loads(config_str) |
|
|
allowed_inference_steps = configs.get("allowed_inference_steps", None) |
|
|
|
|
|
vae = CausalVideoAutoencoder.from_pretrained(ckpt_path) |
|
|
transformer = Transformer3DModel.from_pretrained(ckpt_path) |
|
|
|
|
|
|
|
|
if sampler == "from_checkpoint" or not sampler: |
|
|
scheduler = RectifiedFlowScheduler.from_pretrained(ckpt_path) |
|
|
else: |
|
|
scheduler = RectifiedFlowScheduler( |
|
|
sampler=("Uniform" if sampler.lower() == "uniform" else "LinearQuadratic") |
|
|
) |
|
|
|
|
|
text_encoder = T5EncoderModel.from_pretrained( |
|
|
text_encoder_model_name_or_path, subfolder="text_encoder" |
|
|
) |
|
|
patchifier = SymmetricPatchifier(patch_size=1) |
|
|
tokenizer = T5Tokenizer.from_pretrained( |
|
|
text_encoder_model_name_or_path, subfolder="tokenizer" |
|
|
) |
|
|
|
|
|
transformer = transformer.to(device) |
|
|
vae = vae.to(device) |
|
|
text_encoder = text_encoder.to(device) |
|
|
|
|
|
if enhance_prompt: |
|
|
prompt_enhancer_image_caption_model = AutoModelForCausalLM.from_pretrained( |
|
|
prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True |
|
|
) |
|
|
prompt_enhancer_image_caption_processor = AutoProcessor.from_pretrained( |
|
|
prompt_enhancer_image_caption_model_name_or_path, trust_remote_code=True |
|
|
) |
|
|
prompt_enhancer_llm_model = AutoModelForCausalLM.from_pretrained( |
|
|
prompt_enhancer_llm_model_name_or_path, |
|
|
torch_dtype="bfloat16", |
|
|
) |
|
|
prompt_enhancer_llm_tokenizer = AutoTokenizer.from_pretrained( |
|
|
prompt_enhancer_llm_model_name_or_path, |
|
|
) |
|
|
else: |
|
|
prompt_enhancer_image_caption_model = None |
|
|
prompt_enhancer_image_caption_processor = None |
|
|
prompt_enhancer_llm_model = None |
|
|
prompt_enhancer_llm_tokenizer = None |
|
|
|
|
|
vae = vae.to(torch.bfloat16) |
|
|
if precision == "bfloat16" and transformer.dtype != torch.bfloat16: |
|
|
transformer = transformer.to(torch.bfloat16) |
|
|
text_encoder = text_encoder.to(torch.bfloat16) |
|
|
|
|
|
|
|
|
submodel_dict = { |
|
|
"transformer": transformer, |
|
|
"patchifier": patchifier, |
|
|
"text_encoder": text_encoder, |
|
|
"tokenizer": tokenizer, |
|
|
"scheduler": scheduler, |
|
|
"vae": vae, |
|
|
"prompt_enhancer_image_caption_model": prompt_enhancer_image_caption_model, |
|
|
"prompt_enhancer_image_caption_processor": prompt_enhancer_image_caption_processor, |
|
|
"prompt_enhancer_llm_model": prompt_enhancer_llm_model, |
|
|
"prompt_enhancer_llm_tokenizer": prompt_enhancer_llm_tokenizer, |
|
|
"allowed_inference_steps": allowed_inference_steps, |
|
|
} |
|
|
|
|
|
pipeline = LTXVideoPipeline(**submodel_dict) |
|
|
|
|
|
LTXLatentConditioningPatch.apply() |
|
|
|
|
|
pipeline = pipeline.to(device) |
|
|
return pipeline |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_padding(orig_h: int, orig_w: int, target_h: int, target_w: int) -> Tuple[int, int, int, int]: |
|
|
"""Calcula o preenchimento para centralizar uma imagem em uma nova dimensão.""" |
|
|
pad_h = target_h - orig_h |
|
|
pad_w = target_w - orig_w |
|
|
pad_top = pad_h // 2 |
|
|
pad_bottom = pad_h - pad_top |
|
|
pad_left = pad_w // 2 |
|
|
pad_right = pad_w - pad_left |
|
|
return (pad_left, pad_right, pad_top, pad_bottom) |
|
|
|
|
|
def log_tensor_info(tensor: torch.Tensor, name: str = "Tensor"): |
|
|
"""Exibe informações detalhadas sobre um tensor para depuração.""" |
|
|
if not isinstance(tensor, torch.Tensor): |
|
|
print(f"\n[INFO] '{name}' não é um tensor.") |
|
|
return |
|
|
print(f"\n--- Tensor Info: {name} ---") |
|
|
print(f" - Shape: {tuple(tensor.shape)}") |
|
|
print(f" - Dtype: {tensor.dtype}") |
|
|
print(f" - Device: {tensor.device}") |
|
|
if tensor.numel() > 0: |
|
|
try: |
|
|
print(f" - Stats: Min={tensor.min().item():.4f}, Max={tensor.max().item():.4f}, Mean={tensor.mean().item():.4f}") |
|
|
except RuntimeError: |
|
|
print(" - Stats: Não foi possível calcular (ex: tensores bool).") |
|
|
print("-" * 30) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VideoService: |
|
|
""" |
|
|
Serviço encapsulado para gerar vídeos usando a pipeline LTX-Video. |
|
|
Gerencia o carregamento de modelos, pré-processamento, geração em múltiplos |
|
|
passos (baixa resolução, upscale com denoise) e pós-processamento. |
|
|
""" |
|
|
def __init__(self): |
|
|
"""Inicializa o serviço, carregando configurações e modelos.""" |
|
|
t0 = time.perf_counter() |
|
|
print("[INFO] Inicializando VideoService...") |
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
self.config = self._load_config("ltxv-13b-0.9.8-distilled-fp8.yaml") |
|
|
|
|
|
self.pipeline, self.latent_upsampler = self._load_models_from_hub() |
|
|
self._move_models_to_device() |
|
|
|
|
|
self.runtime_autocast_dtype = self._get_precision_dtype() |
|
|
vae_manager_singleton.attach_pipeline( |
|
|
self.pipeline, |
|
|
device=self.device, |
|
|
autocast_dtype=self.runtime_autocast_dtype |
|
|
) |
|
|
self._tmp_dirs = set() |
|
|
RESULTS_DIR.mkdir(exist_ok=True) |
|
|
print(f"[INFO] VideoService pronto. Tempo de inicialização: {time.perf_counter()-t0:.2f}s") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _load_image_to_tensor_with_resize_and_crop( |
|
|
self, |
|
|
image_input: Union[str, Image.Image], |
|
|
target_height: int = 512, |
|
|
target_width: int = 768, |
|
|
just_crop: bool = False, |
|
|
) -> torch.Tensor: |
|
|
"""Load and process an image into a tensor. |
|
|
|
|
|
Args: |
|
|
image_input: Either a file path (str) or a PIL Image object |
|
|
target_height: Desired height of output tensor |
|
|
target_width: Desired width of output tensor |
|
|
just_crop: If True, only crop the image to the target size without resizing |
|
|
""" |
|
|
if isinstance(image_input, str): |
|
|
image = Image.open(image_input).convert("RGB") |
|
|
elif isinstance(image_input, Image.Image): |
|
|
image = image_input |
|
|
else: |
|
|
raise ValueError("image_input must be either a file path or a PIL Image object") |
|
|
|
|
|
input_width, input_height = image.size |
|
|
aspect_ratio_target = target_width / target_height |
|
|
aspect_ratio_frame = input_width / input_height |
|
|
if aspect_ratio_frame > aspect_ratio_target: |
|
|
new_width = int(input_height * aspect_ratio_target) |
|
|
new_height = input_height |
|
|
x_start = (input_width - new_width) // 2 |
|
|
y_start = 0 |
|
|
else: |
|
|
new_width = input_width |
|
|
new_height = int(input_width / aspect_ratio_target) |
|
|
x_start = 0 |
|
|
y_start = (input_height - new_height) // 2 |
|
|
|
|
|
image = image.crop((x_start, y_start, x_start + new_width, y_start + new_height)) |
|
|
if not just_crop: |
|
|
image = image.resize((target_width, target_height)) |
|
|
|
|
|
image = np.array(image) |
|
|
image = cv2.GaussianBlur(image, (3, 3), 0) |
|
|
frame_tensor = torch.from_numpy(image).float() |
|
|
frame_tensor = crf_compressor.compress(frame_tensor / 255.0) * 255.0 |
|
|
frame_tensor = frame_tensor.permute(2, 0, 1) |
|
|
frame_tensor = (frame_tensor / 127.5) - 1.0 |
|
|
|
|
|
return frame_tensor.unsqueeze(0).unsqueeze(2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@torch.no_grad() |
|
|
def _image_to_latents(self, image_input: Union[str, Image.Image], height: int, width: int) -> torch.Tensor: |
|
|
""" |
|
|
Converte uma imagem (caminho ou PIL) em um tensor de latentes 5D. |
|
|
Retorna: Tensor na forma [1, C_lat, 1, H_lat, W_lat] |
|
|
""" |
|
|
print(f"[DEBUG] Codificando imagem para latente ({height}x{width})...") |
|
|
|
|
|
pixel_tensor = self._load_image_to_tensor_with_resize_and_crop( |
|
|
image_input, target_height=height, target_width=width |
|
|
) |
|
|
pixel_tensor_gpu = pixel_tensor.to(self.device, dtype=self.pipeline.vae.dtype) |
|
|
|
|
|
|
|
|
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')): |
|
|
|
|
|
latents = self.pipeline.vae.encode(pixel_tensor_gpu).latent_dist.sample() |
|
|
|
|
|
|
|
|
if hasattr(self.pipeline.vae.config, "scaling_factor"): |
|
|
latents = latents * self.pipeline.vae.config.scaling_factor |
|
|
|
|
|
print(f"[DEBUG] Imagem codificada para latente com shape: {latents.shape}") |
|
|
return latents |
|
|
|
|
|
def _prepare_condition_items(self, items_list: List[Tuple], height: int, width: int) -> List[PatchedConditioningItem]: |
|
|
""" |
|
|
Prepara os itens de condicionamento. |
|
|
Recebe uma lista [Imagem, frame, peso], converte a Imagem para LATENTE |
|
|
e cria uma lista de PatchedConditioningItem com o tensor em `latents`. |
|
|
""" |
|
|
if not items_list: |
|
|
return [] |
|
|
|
|
|
conditioning_items = [] |
|
|
for media_input, frame_idx, weight in items_list: |
|
|
|
|
|
latent_tensor = self._image_to_latents(media_input, height, width) |
|
|
|
|
|
safe_frame_idx = int(frame_idx) |
|
|
|
|
|
|
|
|
item = PatchedConditioningItem( |
|
|
media_frame_number=safe_frame_idx, |
|
|
conditioning_strength=float(weight), |
|
|
media_item=None, |
|
|
latents=latent_tensor |
|
|
) |
|
|
conditioning_items.append(item) |
|
|
|
|
|
print(f"[INFO] Preparados {len(conditioning_items)} itens de condicionamento com latentes pré-codificados.") |
|
|
return conditioning_items |
|
|
|
|
|
|
|
|
|
|
|
def generate_low_resolution( |
|
|
self, |
|
|
prompt: str, |
|
|
negative_prompt: str, |
|
|
height: int, |
|
|
width: int, |
|
|
duration_secs: float, |
|
|
guidance_scale: float, |
|
|
seed: Optional[int] = None, |
|
|
conditioning_items: Optional[List[PatchedConditioningItem]] = None |
|
|
) -> Tuple[str, str, int]: |
|
|
""" |
|
|
ETAPA 1: Gera um vídeo e latentes em resolução base a partir de um prompt e |
|
|
condicionamentos opcionais. |
|
|
""" |
|
|
print("[INFO] Iniciando ETAPA 1: Geração de Baixa Resolução...") |
|
|
|
|
|
|
|
|
used_seed = random.randint(0, 2**32 - 1) if seed is None else int(seed) |
|
|
|
|
|
print(f" - Usando Seed: {used_seed}") |
|
|
|
|
|
temp_dir = tempfile.mkdtemp(prefix="ltxv_low_") |
|
|
self._register_tmp_dir(temp_dir) |
|
|
results_dir = "/app/output" |
|
|
os.makedirs(results_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
actual_num_frames = int(round(duration_secs * DEFAULT_FPS)) |
|
|
downscaled_height = height |
|
|
downscaled_width = width |
|
|
|
|
|
|
|
|
|
|
|
print(f" - Frames: {actual_num_frames}, Duração: {duration_secs}s") |
|
|
print(f" - Dimensões de Saída: {downscaled_height}x{downscaled_width}") |
|
|
|
|
|
|
|
|
with torch.autocast(device_type=self.device.split(':')[0], dtype=self.runtime_autocast_dtype, enabled=(self.device == 'cuda')): |
|
|
|
|
|
first_pass_kwargs = { |
|
|
"prompt": prompt, |
|
|
"negative_prompt": negative_prompt, |
|
|
"height": downscaled_height, |
|
|
"width": downscaled_width, |
|
|
"num_frames": (actual_num_frames//8)+1, |
|
|
"frame_rate": int(DEFAULT_FPS), |
|
|
"generator": torch.Generator(device=self.device).manual_seed(used_seed), |
|
|
"output_type": "latent", |
|
|
"vae_per_channel_normalize": True, |
|
|
"is_video": True, |
|
|
"conditioning_items": conditioning_items, |
|
|
"guidance_scale": float(guidance_scale), |
|
|
**(self.config.get("first_pass", {})) |
|
|
} |
|
|
|
|
|
print(" - Enviando para a pipeline LTX...") |
|
|
latents = self.pipeline(**first_pass_kwargs).images |
|
|
print(f" - Latentes gerados com shape: {latents.shape}") |
|
|
|
|
|
|
|
|
pixel_tensor = vae_manager_singleton.decode(latents, decode_timestep=float(self.config.get("decode_timestep", 0.05))) |
|
|
tensor_path = self._save_latents_to_disk(latents, "latents_low_res", used_seed) |
|
|
|
|
|
final_video_path = self._save_video_from_tensor(pixel_tensor, f"final_video_{seed}", seed, temp_dir, fps=DEFAULT_FPS) |
|
|
return final_video_path |
|
|
|
|
|
|
|
|
self._finalize() |
|
|
|
|
|
print("[SUCCESS] ETAPA 1 Concluída.") |
|
|
return final_video_path, tensor_path, used_seed |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _finalize(self): |
|
|
"""Limpa a memória da GPU e os diretórios temporários.""" |
|
|
if LTXV_DEBUG: |
|
|
print("[DEBUG] Finalize: iniciando limpeza...") |
|
|
|
|
|
gc.collect() |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
torch.cuda.ipc_collect() |
|
|
|
|
|
|
|
|
for d in list(self._tmp_dirs): |
|
|
shutil.rmtree(d, ignore_errors=True) |
|
|
self._tmp_dirs.remove(d) |
|
|
if LTXV_DEBUG: |
|
|
print(f"[DEBUG] Diretório temporário removido: {d}") |
|
|
|
|
|
def _save_latents_to_disk(self, latents_tensor: torch.Tensor, base_filename: str, seed: int) -> str: |
|
|
"""Salva um tensor de latentes em um arquivo .pt.""" |
|
|
latents_cpu = latents_tensor.detach().to("cpu") |
|
|
tensor_path = RESULTS_DIR / f"{base_filename}_{seed}.pt" |
|
|
torch.save(latents_cpu, tensor_path) |
|
|
if LTXV_DEBUG: |
|
|
print(f"[DEBUG] Latentes salvos em: {tensor_path}") |
|
|
return str(tensor_path) |
|
|
|
|
|
def _save_video_from_tensor(self, pixel_tensor: torch.Tensor, base_filename: str, seed: int, temp_dir: str, fps: int = int(DEFAULT_FPS)) -> str: |
|
|
"""Salva um tensor de pixels como um arquivo de vídeo MP4.""" |
|
|
temp_path = os.path.join(temp_dir, f"{base_filename}_{seed}.mp4") |
|
|
video_encode_tool_singleton.save_video_from_tensor(pixel_tensor, temp_path, fps=DEFAULT_FPS) |
|
|
|
|
|
final_path = RESULTS_DIR / f"{base_filename}_{seed}.mp4" |
|
|
shutil.move(temp_path, final_path) |
|
|
print(f"[INFO] Vídeo final salvo em: {final_path}") |
|
|
return str(final_path) |
|
|
|
|
|
def _load_config(self, config_filename: str) -> Dict: |
|
|
"""Carrega o arquivo de configuração YAML.""" |
|
|
config_path = LTX_VIDEO_REPO_DIR / "configs" / config_filename |
|
|
print(f"[INFO] Carregando configuração de: {config_path}") |
|
|
with open(config_path, "r") as file: |
|
|
return yaml.safe_load(file) |
|
|
|
|
|
def _load_models_from_hub(self): |
|
|
"""Baixa e cria as instâncias da pipeline e do upsampler.""" |
|
|
t0 = time.perf_counter() |
|
|
LTX_REPO = "Lightricks/LTX-Video" |
|
|
|
|
|
print("[INFO] Baixando checkpoint principal...") |
|
|
self.config["checkpoint_path"] = hf_hub_download( |
|
|
repo_id=LTX_REPO, filename=self.config["checkpoint_path"], |
|
|
token=os.getenv("HF_TOKEN") |
|
|
) |
|
|
print(f"[INFO] Checkpoint principal em: {self.config['checkpoint_path']}") |
|
|
|
|
|
print("[INFO] Construindo pipeline...") |
|
|
pipeline = create_ltx_video_pipeline( |
|
|
ckpt_path=self.config["checkpoint_path"], |
|
|
precision=self.config["precision"], |
|
|
text_encoder_model_name_or_path=self.config["text_encoder_model_name_or_path"], |
|
|
sampler=self.config["sampler"], |
|
|
device="cpu", |
|
|
enhance_prompt=False |
|
|
) |
|
|
print("[INFO] Pipeline construída.") |
|
|
|
|
|
latent_upsampler = None |
|
|
if self.config.get("spatial_upscaler_model_path"): |
|
|
print("[INFO] Baixando upscaler espacial...") |
|
|
self.config["spatial_upscaler_model_path"] = hf_hub_download( |
|
|
repo_id=LTX_REPO, filename=self.config["spatial_upscaler_model_path"], |
|
|
token=os.getenv("HF_TOKEN") |
|
|
) |
|
|
print(f"[INFO] Upscaler em: {self.config['spatial_upscaler_model_path']}") |
|
|
|
|
|
print("[INFO] Construindo latent_upsampler...") |
|
|
latent_upsampler = create_latent_upsampler(self.config["spatial_upscaler_model_path"], device="cpu") |
|
|
print("[INFO] Latent upsampler construído.") |
|
|
|
|
|
print(f"[INFO] Carregamento de modelos concluído em {time.perf_counter()-t0:.2f}s") |
|
|
return pipeline, latent_upsampler |
|
|
|
|
|
def _move_models_to_device(self): |
|
|
"""Move os modelos carregados para o dispositivo de computação (GPU/CPU).""" |
|
|
print(f"[INFO] Movendo modelos para o dispositivo: {self.device}") |
|
|
self.pipeline.to(self.device) |
|
|
if self.latent_upsampler: |
|
|
self.latent_upsampler.to(self.device) |
|
|
|
|
|
def _get_precision_dtype(self) -> torch.dtype: |
|
|
"""Determina o dtype para autocast com base na configuração de precisão.""" |
|
|
prec = str(self.config.get("precision", "")).lower() |
|
|
if prec in ["float8_e4m3fn", "bfloat16"]: |
|
|
return torch.bfloat16 |
|
|
elif prec == "mixed_precision": |
|
|
return torch.float16 |
|
|
return torch.float32 |
|
|
|
|
|
@torch.no_grad() |
|
|
def _upsample_and_filter_latents(self, latents: torch.Tensor) -> torch.Tensor: |
|
|
"""Aplica o upsample espacial e o filtro AdaIN aos latentes.""" |
|
|
if not self.latent_upsampler: |
|
|
raise ValueError("Latent Upsampler não está carregado para a operação de upscale.") |
|
|
|
|
|
latents_unnormalized = un_normalize_latents(latents, self.pipeline.vae, vae_per_channel_normalize=True) |
|
|
upsampled_latents_unnormalized = self.latent_upsampler(latents_unnormalized) |
|
|
upsampled_latents_normalized = normalize_latents(upsampled_latents_unnormalized, self.pipeline.vae, vae_per_channel_normalize=True) |
|
|
|
|
|
|
|
|
return adain_filter_latent(latents=upsampled_latents_normalized, reference_latents=latents) |
|
|
|
|
|
def _prepare_conditioning_tensor_from_path(self, filepath: str, height: int, width: int, padding: Tuple) -> torch.Tensor: |
|
|
"""Carrega uma imagem, redimensiona, aplica padding e move para o dispositivo.""" |
|
|
tensor = self._load_image_to_tensor_with_resize_and_crop(filepath, height, width) |
|
|
tensor = F.pad(tensor, padding) |
|
|
return tensor.to(self.device, dtype=self.runtime_autocast_dtype) |
|
|
|
|
|
def _calculate_downscaled_dims(self, height: int, width: int) -> Tuple[int, int]: |
|
|
"""Calcula as dimensões para o primeiro passo (baixa resolução).""" |
|
|
height_padded = ((height - 1) // 8 + 1) * 8 |
|
|
width_padded = ((width - 1) // 8 + 1) * 8 |
|
|
|
|
|
downscale_factor = self.config.get("downscale_factor", 0.6666666) |
|
|
vae_scale_factor = self.pipeline.vae_scale_factor |
|
|
|
|
|
target_w = int(width_padded * downscale_factor) |
|
|
downscaled_width = target_w - (target_w % vae_scale_factor) |
|
|
|
|
|
target_h = int(height_padded * downscale_factor) |
|
|
downscaled_height = target_h - (target_h % vae_scale_factor) |
|
|
|
|
|
return downscaled_height, downscaled_width |
|
|
|
|
|
|
|
|
def _seed_everething(self, seed: int): |
|
|
random.seed(seed) |
|
|
np.random.seed(seed) |
|
|
torch.manual_seed(seed) |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.manual_seed(seed) |
|
|
if torch.backends.mps.is_available(): |
|
|
torch.mps.manual_seed(seed) |
|
|
|
|
|
|
|
|
def _register_tmp_dir(self, dir_path: str): |
|
|
"""Registra um diretório temporário para limpeza posterior.""" |
|
|
if dir_path and os.path.isdir(dir_path): |
|
|
self._tmp_dirs.add(dir_path) |
|
|
if LTXV_DEBUG: |
|
|
print(f"[DEBUG] Diretório temporário registrado: {dir_path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("Criando instância do VideoService. O carregamento do modelo começará agora...") |
|
|
video_generation_service = VideoService() |
|
|
print("Instância do VideoService pronta para uso.") |