# Copyright 2024 UC Berkeley Team and The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # DISCLAIMER: This file is strongly influenced by https://github.com/ermongroup/ddim import math from dataclasses import dataclass from typing import List, Optional, Tuple, Union import numpy as np import torch from diffusers.configuration_utils import ConfigMixin, register_to_config from diffusers.utils import BaseOutput from diffusers.utils.torch_utils import randn_tensor from diffusers.schedulers.scheduling_utils import KarrasDiffusionSchedulers, SchedulerMixin @dataclass class DDPMSchedulerOutput(BaseOutput): """ Output class for the scheduler's `step` function output. Args: prev_sample (`torch.Tensor` of shape `(batch_size, num_channels, height, width)` for images): Computed sample `(x_{t-1})` of previous timestep. `prev_sample` should be used as next model input in the denoising loop. pred_original_sample (`torch.Tensor` of shape `(batch_size, num_channels, height, width)` for images): The predicted denoised sample `(x_{0})` based on the model output from the current timestep. `pred_original_sample` can be used to preview progress or for guidance. """ prev_sample: torch.Tensor pred_original_sample: Optional[torch.Tensor] = None def betas_for_alpha_bar( num_diffusion_timesteps, max_beta=0.999, alpha_transform_type="cosine", ): """ Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of (1-beta) over time from t = [0,1]. Contains a function alpha_bar that takes an argument t and transforms it to the cumulative product of (1-beta) up to that part of the diffusion process. Args: num_diffusion_timesteps (`int`): the number of betas to produce. max_beta (`float`): the maximum beta to use; use values lower than 1 to prevent singularities. alpha_transform_type (`str`, *optional*, default to `cosine`): the type of noise schedule for alpha_bar. Choose from `cosine` or `exp` Returns: betas (`np.ndarray`): the betas used by the scheduler to step the model outputs """ if alpha_transform_type == "cosine": def alpha_bar_fn(t): return math.cos((t + 0.008) / 1.008 * math.pi / 2) ** 2 elif alpha_transform_type == "exp": def alpha_bar_fn(t): return math.exp(t * -12.0) else: raise ValueError( f"Unsupported alpha_transform_type: {alpha_transform_type}") betas = [] for i in range(num_diffusion_timesteps): t1 = i / num_diffusion_timesteps t2 = (i + 1) / num_diffusion_timesteps betas.append(min(1 - alpha_bar_fn(t2) / alpha_bar_fn(t1), max_beta)) return torch.tensor(betas, dtype=torch.float32) # Copied from diffusers.schedulers.scheduling_ddim.rescale_zero_terminal_snr def rescale_zero_terminal_snr(betas): """ Rescales betas to have zero terminal SNR Based on https://arxiv.org/pdf/2305.08891.pdf (Algorithm 1) Args: betas (`torch.Tensor`): the betas that the scheduler is being initialized with. Returns: `torch.Tensor`: rescaled betas with zero terminal SNR """ # Convert betas to alphas_bar_sqrt alphas = 1.0 - betas alphas_cumprod = torch.cumprod(alphas, dim=0) alphas_bar_sqrt = alphas_cumprod.sqrt() # Store old values. alphas_bar_sqrt_0 = alphas_bar_sqrt[0].clone() alphas_bar_sqrt_T = alphas_bar_sqrt[-1].clone() # Shift so the last timestep is zero. alphas_bar_sqrt -= alphas_bar_sqrt_T # Scale so the first timestep is back to the old value. alphas_bar_sqrt *= alphas_bar_sqrt_0 / \ (alphas_bar_sqrt_0 - alphas_bar_sqrt_T) # Convert alphas_bar_sqrt to betas alphas_bar = alphas_bar_sqrt**2 # Revert sqrt alphas = alphas_bar[1:] / alphas_bar[:-1] # Revert cumprod alphas = torch.cat([alphas_bar[0:1], alphas]) betas = 1 - alphas return betas def compute_gaussian_product_coef(sigma1, sigma2): """ Given p1 = N(x_t|x_0, sigma_1**2) and p2 = N(x_t|x_1, sigma_2**2) return p1 * p2 = N(x_t| coef1 * x0 + coef2 * x1, var) """ denom = sigma1**2 + sigma2**2 coef1 = sigma2**2 / denom coef2 = sigma1**2 / denom var = (sigma1**2 * sigma2**2) / denom return coef1, coef2, var class I2SBScheduler(SchedulerMixin, ConfigMixin): @register_to_config def __init__( self, num_train_timesteps: int = 1000, beta_start: float = 0.0001, beta_end: float = 0.02, beta_schedule: str = "linear", trained_betas: Optional[Union[np.ndarray, List[float]]] = None, variance_type: str = "fixed_small", clip_sample: bool = True, prediction_type: str = "epsilon", thresholding: bool = False, dynamic_thresholding_ratio: float = 0.995, clip_sample_range: float = 1.0, sample_max_value: float = 1.0, timestep_spacing: str = "leading", steps_offset: int = 0, rescale_betas_zero_snr: bool = False, ): if trained_betas is not None: self.betas = torch.tensor(trained_betas, dtype=torch.float32) elif beta_schedule == "linear": self.betas = torch.linspace( beta_start, beta_end, num_train_timesteps, dtype=torch.float32) elif beta_schedule == "scaled_linear": # this schedule is very specific to the latent diffusion model. self.betas = torch.linspace( beta_start**0.5, beta_end**0.5, num_train_timesteps, dtype=torch.float32) ** 2 elif beta_schedule == "squaredcos_cap_v2": # Glide cosine schedule self.betas = betas_for_alpha_bar(num_train_timesteps) elif beta_schedule == "sigmoid": # GeoDiff sigmoid schedule betas = torch.linspace(-6, 6, num_train_timesteps) self.betas = torch.sigmoid( betas) * (beta_end - beta_start) + beta_start else: raise NotImplementedError( f"{beta_schedule} is not implemented for {self.__class__}") # Rescale for zero SNR if rescale_betas_zero_snr: self.betas = rescale_zero_terminal_snr(self.betas) std_fwd = torch.sqrt(torch.cumsum(self.betas, 0)) std_bwd = torch.sqrt(torch.flip( torch.cumsum(torch.flip(self.betas, dims=[0]), 0), dims=[0])) mu_x0, mu_x1, var = compute_gaussian_product_coef(std_fwd, std_bwd) std_sb = torch.sqrt(var) self.std_fwd = std_fwd self.std_bwd = std_bwd self.std_sb = std_sb self.mu_x0 = mu_x0 self.mu_x1 = mu_x1 # setable values self.custom_timesteps = False self.num_inference_steps = None self.timesteps = torch.from_numpy( np.arange(0, num_train_timesteps)[::-1].copy()) self.variance_type = variance_type def scale_model_input(self, sample: torch.Tensor, timestep: Optional[int] = None) -> torch.Tensor: """ Ensures interchangeability with schedulers that need to scale the denoising model input depending on the current timestep. Args: sample (`torch.Tensor`): The input sample. timestep (`int`, *optional*): The current timestep in the diffusion chain. Returns: `torch.Tensor`: A scaled input sample. """ return sample def set_timesteps( self, num_inference_steps: Optional[int] = None, device: Union[str, torch.device] = None, timesteps: Optional[List[int]] = None, ): """ Sets the discrete timesteps used for the diffusion chain (to be run before inference). Args: num_inference_steps (`int`): The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps` must be `None`. device (`str` or `torch.device`, *optional*): The device to which the timesteps should be moved to. If `None`, the timesteps are not moved. timesteps (`List[int]`, *optional*): Custom timesteps used to support arbitrary spacing between timesteps. If `None`, then the default timestep spacing strategy of equal spacing between timesteps is used. If `timesteps` is passed, `num_inference_steps` must be `None`. """ if num_inference_steps is not None and timesteps is not None: raise ValueError( "Can only pass one of `num_inference_steps` or `custom_timesteps`.") if timesteps is not None: for i in range(1, len(timesteps)): if timesteps[i] >= timesteps[i - 1]: raise ValueError( "`custom_timesteps` must be in descending order.") if timesteps[0] >= self.config.num_train_timesteps: raise ValueError( f"`timesteps` must start before `self.config.train_timesteps`:" f" {self.config.num_train_timesteps}." ) timesteps = np.array(timesteps, dtype=np.int64) self.custom_timesteps = True else: if num_inference_steps > self.config.num_train_timesteps: raise ValueError( f"`num_inference_steps`: {num_inference_steps} cannot be larger than `self.config.train_timesteps`:" f" {self.config.num_train_timesteps} as the unet model trained with this scheduler can only handle" f" maximal {self.config.num_train_timesteps} timesteps." ) self.num_inference_steps = num_inference_steps self.custom_timesteps = False # "linspace", "leading", "trailing" corresponds to annotation of Table 2. of https://arxiv.org/abs/2305.08891 if self.config.timestep_spacing == "linspace": timesteps = ( np.linspace(0, self.config.num_train_timesteps - 1, num_inference_steps) .round()[::-1] .copy() .astype(np.int64) ) elif self.config.timestep_spacing == "leading": step_ratio = self.config.num_train_timesteps // self.num_inference_steps # creates integer timesteps by multiplying by ratio # casting to int to avoid issues when num_inference_step is power of 3 timesteps = (np.arange(0, num_inference_steps) * step_ratio).round()[::-1].copy().astype(np.int64) timesteps += self.config.steps_offset elif self.config.timestep_spacing == "trailing": step_ratio = self.config.num_train_timesteps / self.num_inference_steps # creates integer timesteps by multiplying by ratio # casting to int to avoid issues when num_inference_step is power of 3 timesteps = np.round( np.arange(self.config.num_train_timesteps, 0, -step_ratio)).astype(np.int64) timesteps -= 1 else: raise ValueError( f"{self.config.timestep_spacing} is not supported. Please make sure to choose one of 'linspace', 'leading' or 'trailing'." ) self.timesteps = torch.from_numpy(timesteps).to(device) def _get_variance(self, t, predicted_variance=None, variance_type=None): prev_t = self.previous_timestep(t) alpha_prod_t = self.alphas_cumprod[t] alpha_prod_t_prev = self.alphas_cumprod[prev_t] if prev_t >= 0 else self.one current_beta_t = 1 - alpha_prod_t / alpha_prod_t_prev # For t > 0, compute predicted variance βt (see formula (6) and (7) from https://arxiv.org/pdf/2006.11239.pdf) # and sample from it to get previous sample # x_{t-1} ~ N(pred_prev_sample, variance) == add variance to pred_sample variance = (1 - alpha_prod_t_prev) / \ (1 - alpha_prod_t) * current_beta_t # we always take the log of variance, so clamp it to ensure it's not 0 variance = torch.clamp(variance, min=1e-20) if variance_type is None: variance_type = self.config.variance_type # hacks - were probably added for training stability if variance_type == "fixed_small": variance = variance # for rl-diffuser https://arxiv.org/abs/2205.09991 elif variance_type == "fixed_small_log": variance = torch.log(variance) variance = torch.exp(0.5 * variance) elif variance_type == "fixed_large": variance = current_beta_t elif variance_type == "fixed_large_log": # Glide max_log variance = torch.log(current_beta_t) elif variance_type == "learned": return predicted_variance elif variance_type == "learned_range": min_log = torch.log(variance) max_log = torch.log(current_beta_t) frac = (predicted_variance + 1) / 2 variance = frac * max_log + (1 - frac) * min_log return variance def _threshold_sample(self, sample: torch.Tensor) -> torch.Tensor: """ "Dynamic thresholding: At each sampling step we set s to a certain percentile absolute pixel value in xt0 (the prediction of x_0 at timestep t), and if s > 1, then we threshold xt0 to the range [-s, s] and then divide by s. Dynamic thresholding pushes saturated pixels (those near -1 and 1) inwards, thereby actively preventing pixels from saturation at each step. We find that dynamic thresholding results in significantly better photorealism as well as better image-text alignment, especially when using very large guidance weights." https://arxiv.org/abs/2205.11487 """ dtype = sample.dtype batch_size, channels, *remaining_dims = sample.shape if dtype not in (torch.float32, torch.float64): # upcast for quantile calculation, and clamp not implemented for cpu half sample = sample.float() # Flatten sample for doing quantile calculation along each image sample = sample.reshape(batch_size, channels * np.prod(remaining_dims)) abs_sample = sample.abs() # "a certain percentile absolute pixel value" s = torch.quantile( abs_sample, self.config.dynamic_thresholding_ratio, dim=1) s = torch.clamp( s, min=1, max=self.config.sample_max_value ) # When clamped to min=1, equivalent to standard clipping to [-1, 1] # (batch_size, 1) because clamp will broadcast along dim=0 s = s.unsqueeze(1) # "we threshold xt0 to the range [-s, s] and then divide by s" sample = torch.clamp(sample, -s, s) / s sample = sample.reshape(batch_size, channels, *remaining_dims) sample = sample.to(dtype) return sample def step( self, model_output: torch.Tensor, timestep: int, sample: torch.Tensor, is_ode: bool = False, generator=None, return_dict: bool = True, ) -> Union[DDPMSchedulerOutput, Tuple]: """ Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion process from the learned model outputs (most often the predicted noise). Args: model_output (`torch.Tensor`): The direct output from learned diffusion model. timestep (`float`): The current discrete timestep in the diffusion chain. sample (`torch.Tensor`): A current instance of a sample created by the diffusion process. generator (`torch.Generator`, *optional*): A random number generator. return_dict (`bool`, *optional*, defaults to `True`): Whether or not to return a [`~schedulers.scheduling_ddpm.DDPMSchedulerOutput`] or `tuple`. Returns: [`~schedulers.scheduling_ddpm.DDPMSchedulerOutput`] or `tuple`: If return_dict is `True`, [`~schedulers.scheduling_ddpm.DDPMSchedulerOutput`] is returned, otherwise a tuple is returned where the first element is the sample tensor. """ t = timestep prev_t = self.previous_timestep(t) if model_output.shape[1] == sample.shape[1] * 2 and self.variance_type in ["learned", "learned_range"]: model_output, predicted_variance = torch.split( model_output, sample.shape[1], dim=1) else: predicted_variance = None std_fwd_list = self.std_fwd.to(device=sample.device) std_fwd = std_fwd_list[t] std_fwd_prev = std_fwd_list[prev_t] std_delta = (std_fwd**2 - std_fwd_prev**2).sqrt() pred_original_sample = sample - std_fwd * model_output # 3. Clip or threshold "predicted x_0" if self.config.thresholding: pred_original_sample = self._threshold_sample(pred_original_sample) elif self.config.clip_sample: pred_original_sample = pred_original_sample.clamp( -self.config.clip_sample_range, self.config.clip_sample_range ) mu_x0, mu_xt, var = compute_gaussian_product_coef( std_fwd_prev, std_delta) pred_prev_sample = mu_x0 * pred_original_sample + mu_xt * sample # 6. Add noise variance_noise = 0 if t > 0 and not is_ode: device = model_output.device variance_noise = randn_tensor( model_output.shape, generator=generator, device=device, dtype=model_output.dtype ) * var.sqrt() pred_prev_sample = pred_prev_sample + variance_noise # from torchvision.utils import save_image # img_cat = torch.cat((xn, pred_original_sample, pred_prev_sample), 2) # save_image((img_cat + 1) / 2, f'tmp/tmp_{t.item()}.png') if not return_dict: return (pred_prev_sample,) return DDPMSchedulerOutput(prev_sample=pred_prev_sample, pred_original_sample=pred_original_sample) def add_noise( self, x0: torch.Tensor, x1: torch.Tensor, timesteps: torch.IntTensor, is_ode: bool = False, noise=None ) -> torch.Tensor: mu_x0 = self.mu_x0.to(device=x0.device) mu_x0 = mu_x0[timesteps] mu_x1 = self.mu_x1.to(device=x0.device) mu_x1 = mu_x1[timesteps] std_sb = self.std_sb.to(device=x0.device) std_sb = std_sb[timesteps] while len(mu_x0.shape) < len(x0.shape): mu_x0 = mu_x0.unsqueeze(-1) mu_x1 = mu_x1.unsqueeze(-1) std_sb = std_sb.unsqueeze(-1) xt = mu_x0 * x0 + mu_x1 * x1 if not is_ode: if noise is None: noise = torch.randn_like(xt) xt = xt + std_sb * noise return xt def get_velocity(self, sample: torch.Tensor, noise: torch.Tensor, timesteps: torch.IntTensor) -> torch.Tensor: raise NotImplementedError # Make sure alphas_cumprod and timestep have same device and dtype as sample # self.alphas_cumprod = self.alphas_cumprod.to(device=sample.device) # alphas_cumprod = self.alphas_cumprod.to(dtype=sample.dtype) # timesteps = timesteps.to(sample.device) # sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5 # sqrt_alpha_prod = sqrt_alpha_prod.flatten() # while len(sqrt_alpha_prod.shape) < len(sample.shape): # sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1) # sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5 # sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten() # while len(sqrt_one_minus_alpha_prod.shape) < len(sample.shape): # sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1) # velocity = sqrt_alpha_prod * noise - sqrt_one_minus_alpha_prod * sample # return velocity def compute_label(self, timesteps, x0, xt): std_fwd = self.std_fwd.to(device=x0.device) std_fwd = std_fwd[timesteps] while len(std_fwd.shape) < len(x0.shape): std_fwd = std_fwd.unsqueeze(-1) label = (xt - x0) / std_fwd return label def __len__(self): return self.config.num_train_timesteps def previous_timestep(self, timestep): if self.custom_timesteps: index = (self.timesteps == timestep).nonzero(as_tuple=True)[0][0] if index == self.timesteps.shape[0] - 1: prev_t = torch.tensor(-1) else: prev_t = self.timesteps[index + 1] else: num_inference_steps = ( self.num_inference_steps if self.num_inference_steps else self.config.num_train_timesteps ) prev_t = timestep - self.config.num_train_timesteps // num_inference_steps return prev_t