MoonQwen3 / modeling_moonvit.py
shilinxu's picture
Upload folder using huggingface_hub
050b696 verified
raw
history blame
29.3 kB
import math
from copy import deepcopy
from typing import Union, Tuple, Sequence, Optional, List
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.checkpoint
from transformers.activations import GELUActivation, ACT2FN, PytorchGELUTanh
from transformers.activations import PytorchGELUTanh
from transformers.modeling_utils import PreTrainedModel
from transformers.utils import is_flash_attn_2_available
from .configuration_moonvit import MoonViTConfig
if is_flash_attn_2_available():
from flash_attn import flash_attn_varlen_func
else:
flash_attn_varlen_func = None
def rotate_half(x):
x1 = x[..., : x.shape[-1] // 2]
x2 = x[..., x.shape[-1] // 2 :]
return torch.cat((-x2, x1), dim=-1)
def apply_multimodal_rotary_pos_emb(q, k, cos, sin, mrope_section=[12, 12, 12], unsqueeze_dim=1):
mrope_section = mrope_section * 2
cos = torch.cat([m[i % 3] for i, m in enumerate(cos.split(mrope_section, dim=-1))], dim=-1).unsqueeze(unsqueeze_dim)
sin = torch.cat([m[i % 3] for i, m in enumerate(sin.split(mrope_section, dim=-1))], dim=-1).unsqueeze(unsqueeze_dim)
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
def get_rope_index(
image_token_id,
video_token_id,
vision_start_token_id,
spatial_merge_size: int = 2,
input_ids: Optional[torch.LongTensor] = None,
image_grid_thw: Optional[torch.LongTensor] = None,
video_grid_thw: Optional[torch.LongTensor] = None,
second_per_grid_ts: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
) -> tuple[torch.Tensor, torch.Tensor]:
mrope_position_deltas = []
if input_ids is not None and (image_grid_thw is not None or video_grid_thw is not None):
total_input_ids = input_ids
if attention_mask is None:
attention_mask = torch.ones_like(total_input_ids)
position_ids = torch.ones(3,input_ids.shape[0],input_ids.shape[1],dtype=input_ids.dtype,device=input_ids.device)
image_index, video_index = 0, 0
attention_mask = attention_mask.to(total_input_ids.device)
for i, input_ids in enumerate(total_input_ids):
input_ids = input_ids[attention_mask[i] == 1]
image_nums, video_nums = 0, 0
vision_start_indices = torch.argwhere(input_ids == vision_start_token_id).squeeze(1)
vision_tokens = input_ids[vision_start_indices + 1]
image_nums = (vision_tokens == image_token_id).sum()
video_nums = (vision_tokens == video_token_id).sum()
input_tokens = input_ids.tolist()
llm_pos_ids_list: list = []
st = 0
remain_images, remain_videos = image_nums, video_nums
for _ in range(image_nums + video_nums):
if image_token_id in input_tokens and remain_images > 0:
ed_image = input_tokens.index(image_token_id, st)
else:
ed_image = len(input_tokens) + 1
if video_token_id in input_tokens and remain_videos > 0:
ed_video = input_tokens.index(video_token_id, st)
else:
ed_video = len(input_tokens) + 1
if ed_image < ed_video:
t, h, w = (image_grid_thw[image_index][0],image_grid_thw[image_index][1],image_grid_thw[image_index][2])
second_per_grid_t = 0
image_index += 1
remain_images -= 1
ed = ed_image
else:
t, h, w = (video_grid_thw[video_index][0],video_grid_thw[video_index][1],video_grid_thw[video_index][2])
if second_per_grid_ts is not None:
second_per_grid_t = second_per_grid_ts[video_index]
else:
second_per_grid_t = 1.0
video_index += 1
remain_videos -= 1
ed = ed_video
llm_grid_t, llm_grid_h, llm_grid_w = (t.item(),h.item() // spatial_merge_size,w.item() // spatial_merge_size)
text_len = ed - st
st_idx = llm_pos_ids_list[-1].max() + 1 if len(llm_pos_ids_list) > 0 else 0
llm_pos_ids_list.append(torch.arange(text_len).view(1, -1).expand(3, -1) + st_idx)
range_tensor = torch.arange(llm_grid_t).view(-1, 1)
expanded_range = range_tensor.expand(-1, llm_grid_h * llm_grid_w)
## normalize type, send to device.
second_per_grid_t = torch.as_tensor(second_per_grid_t, dtype=range_tensor.dtype, device=range_tensor.device)
time_tensor = expanded_range * second_per_grid_t * 2
time_tensor_long = time_tensor.long()
t_index = time_tensor_long.flatten()
h_index = torch.arange(llm_grid_h).view(1, -1, 1).expand(llm_grid_t, -1, llm_grid_w).flatten()
w_index = torch.arange(llm_grid_w).view(1, 1, -1).expand(llm_grid_t, llm_grid_h, -1).flatten()
llm_pos_ids_list.append(torch.stack([t_index, h_index, w_index]) + text_len + st_idx)
st = ed + llm_grid_t * llm_grid_h * llm_grid_w
if st < len(input_tokens):
st_idx = llm_pos_ids_list[-1].max() + 1 if len(llm_pos_ids_list) > 0 else 0
text_len = len(input_tokens) - st
llm_pos_ids_list.append(torch.arange(text_len).view(1, -1).expand(3, -1) + st_idx)
llm_positions = torch.cat(llm_pos_ids_list, dim=1).reshape(3, -1)
position_ids[..., i, attention_mask[i] == 1] = llm_positions.to(position_ids.device)
mrope_position_deltas.append(llm_positions.max() + 1 - len(total_input_ids[i]))
mrope_position_deltas = torch.tensor(mrope_position_deltas, device=input_ids.device).unsqueeze(1)
return position_ids, mrope_position_deltas
else:
if attention_mask is not None:
position_ids = attention_mask.long().cumsum(-1) - 1
position_ids.masked_fill_(attention_mask == 0, 1)
position_ids = position_ids.unsqueeze(0).expand(3, -1, -1).to(attention_mask.device)
max_position_ids = position_ids.max(0, keepdim=False)[0].max(-1, keepdim=True)[0]
mrope_position_deltas = max_position_ids + 1 - attention_mask.shape[-1]
else:
position_ids = (torch.arange(input_ids.shape[1], device=input_ids.device).view(1, 1, -1).expand(3, input_ids.shape[0], -1))
mrope_position_deltas = torch.zeros([input_ids.shape[0], 1],device=input_ids.device,dtype=input_ids.dtype,)
return position_ids, mrope_position_deltas
def multihead_attention(
q: torch.Tensor,
k: torch.Tensor,
v: torch.Tensor,
q_cu_seqlens: Optional[torch.Tensor] = None,
k_cu_seqlens: Optional[torch.Tensor] = None,
):
"""Multi-head attention using flash attention 2.
Args:
q, k, v: tensor of shape (batch_size, seqlen, num_heads, head_dim),
or (tot_seqlens, num_heads, head_dim) if packing.
q_cu_seqlens (torch.Tensor): cumulative sequence lengths of q.
The first element should be 0 and the last element should be q.shape[0].
k_cu_seqlens (torch.Tensor): cumulative sequence lengths of k.
The first element should be 0 and the last element should be k.shape[0].
Returns:
output: shape (batch_size, seqlen, dim) or (tot_seqlens, dim) if packing,
where dim = num_heads * head_dim
"""
# Unified format legal check
assert q.dim() == k.dim() == v.dim() == 3, "q, k, v must have 3 dims"
assert q_cu_seqlens[-1] == q.shape[0], "q_cu_seqlens must sum to q.shape[0]"
assert (
k_cu_seqlens[-1] == k.shape[0] == v.shape[0]
), "k_cu_seqlens must sum to k.shape[0]"
assert q.dtype in [
torch.bfloat16,
torch.float16,
], f"unsupported dtype {q.dtype} for multihead attn"
max_seqlen_q = (q_cu_seqlens[1:] - q_cu_seqlens[:-1]).max().item()
max_seqlen_k = (k_cu_seqlens[1:] - k_cu_seqlens[:-1]).max().item()
attn_out = flash_attn_varlen_func(
q,
k,
v,
q_cu_seqlens,
k_cu_seqlens,
max_seqlen_q,
max_seqlen_k,
causal=False,
)
attn_out = attn_out.flatten(start_dim=-2)
return attn_out
def sdpa_attention(
q: torch.Tensor,
k: torch.Tensor,
v: torch.Tensor,
attention_mask: torch.Tensor,
) -> torch.Tensor:
"""SDPA attention.
Args:
q, k, v: tensor of shape (batch_size, num_heads, seqlen, head_dim),
or (batch_size, seqlen, num_heads, head_dim) if packing.
"""
# bs, num_heads, seq_length, head_dim = q.shape
# attention_mask = attention_mask.repeat(1, num_heads, 1, 1)
attn_output = F.scaled_dot_product_attention(q, k, v, attention_mask, dropout_p=0.0)
attn_output = attn_output.transpose(1, 2)
return attn_output
def eager_attention(
q: torch.Tensor,
k: torch.Tensor,
v: torch.Tensor,
q_cu_seqlens: Optional[torch.Tensor] = None,
k_cu_seqlens: Optional[torch.Tensor] = None,
) -> torch.Tensor:
seq_length = q.shape[0]
attention_mask = torch.zeros(
[1, seq_length, seq_length], device=q.device, dtype=torch.bool
)
for i in range(1, len(q_cu_seqlens)):
attention_mask[
...,
q_cu_seqlens[i - 1] : q_cu_seqlens[i],
q_cu_seqlens[i - 1] : q_cu_seqlens[i],
] = True
q = q.transpose(0, 1)
k = k.transpose(0, 1)
v = v.transpose(0, 1)
attn_weight = q @ k.transpose(-2, -1) / math.sqrt(q.shape[-1])
attn_weight += attention_mask
attn_weight = torch.softmax(attn_weight, dim=-1, dtype=torch.float32).to(q.dtype)
attn_output = attn_weight @ v
attn_output = attn_output.transpose(0, 1)
attn_output = attn_output.reshape(seq_length, -1)
return attn_output
VL_VISION_ATTENTION_FUNCTIONS = {
"flash_attention_2": multihead_attention,
"sdpa": sdpa_attention,
"eager": eager_attention,
}
def _apply_rope_input_validation(x, freqs_cis):
assert x.ndim == freqs_cis.ndim + 1, (x.shape, freqs_cis.shape)
assert x.shape[:-2] == freqs_cis.shape[:-1], (x.shape, freqs_cis.shape)
assert x.shape[-1] == 2 * freqs_cis.shape[-1], (x.shape, freqs_cis.shape)
assert freqs_cis.dtype == torch.complex64, freqs_cis.dtype
def apply_rope(
xq: torch.Tensor, xk: torch.Tensor, freqs_cis: torch.Tensor
) -> tuple[torch.Tensor, torch.Tensor]:
"""
Args: (The leading dimensions of all inputs should be the same)
xq: query, tensor of shape (..., num_heads, head_dim)
xk: key, tensor of shape (..., num_heads, head_dim)
freqs_cis: tensor of shape (..., head_dim/2), dtype=torch.complex64. It contains the precomputed cis(freqs) for each position in the 2D grid.
Returns:
xq_out, xk_out: tensors of shape (..., num_heads, head_dim)
"""
_apply_rope_input_validation(xq, freqs_cis)
_apply_rope_input_validation(xk, freqs_cis)
freqs_cis = freqs_cis.unsqueeze(-2) # ..., 1, head_dim/2
# ..., num_heads, head_dim/2
xq_ = torch.view_as_complex(xq.float().view(*xq.shape[:-1], -1, 2))
xk_ = torch.view_as_complex(xk.float().view(*xq.shape[:-1], -1, 2))
xq_out = torch.view_as_real(xq_ * freqs_cis).flatten(-2) # ..., num_heads, head_dim
xk_out = torch.view_as_real(xk_ * freqs_cis).flatten(-2) # ..., num_heads, head_dim
return xq_out.type_as(xq), xk_out.type_as(xk)
class Learnable2DInterpPosEmb(nn.Module):
def __init__(
self, height: int, width: int, dim: int, interpolation_mode: str = "bicubic"
) -> None:
super().__init__()
self.height = height
self.width = width
self.interpolation_mode = interpolation_mode
self.weight = nn.Parameter(torch.empty(height, width, dim))
self.reset_parameters()
def reset_parameters(self):
nn.init.normal_(self.weight)
def forward(self, x: torch.Tensor, grid_hws: torch.Tensor) -> torch.Tensor:
pos_embs = []
for shape in grid_hws.tolist():
if shape == self.weight.shape[:-1]:
pos_embs.append(self.weight.flatten(end_dim=1))
else:
pos_embs.append(
F.interpolate(
self.weight.permute((2, 0, 1)).unsqueeze(0),
size=shape,
mode=self.interpolation_mode,
)
.squeeze(0)
.permute((1, 2, 0))
.flatten(end_dim=1)
)
out = x + torch.cat(pos_embs)
return out
class MoonVisionPatchEmbed(nn.Module):
def __init__(
self,
out_dim: int,
in_dim: int = 3,
patch_size: Union[int, Tuple[int, int]] = (14, 14),
pos_emb_height: int = 14,
pos_emb_width: int = 14,
):
super().__init__()
assert isinstance(
patch_size, (int, Sequence)
), f"Invalid patch_size type: {type(patch_size)}"
if isinstance(patch_size, int):
patch_size = (patch_size, patch_size)
assert (
len(patch_size) == 2
), f"Expected patch_size to be a tuple of 2, got {patch_size}"
self.patch_size = patch_size
self.proj = nn.Conv2d(
in_dim, out_dim, kernel_size=patch_size, stride=patch_size
)
self.pos_emb = Learnable2DInterpPosEmb(
height=pos_emb_height, width=pos_emb_width, dim=out_dim
)
def forward(self, x: torch.Tensor, grid_hws: torch.Tensor) -> torch.Tensor:
"""
Args:
x (L, Channels): input tensor
grid_hws (N, 2): grid height and width
Returns:
(L, Cout) tensor
"""
x = self.proj(x).view(x.size(0), -1)
# apply positional embedding
x = self.pos_emb(x, grid_hws)
return x
class Rope2DPosEmb(nn.Module):
"""2D rotary position embedding with multi-resolution support.
This class is intended to be used in the following way:
1. Before training, create an instance of Rope2DPosEmb. This instance will hold the precomputed cis.
2. Before each forward pass, call `get_freqs_cis_by_*` to get the `freqs_cis` tensor for this iteration.
3. During the forward pass, pass the `freqs_cis` tensor to each attention layer, and call `apply` just before each attention operation.
The rope is shared across all attention layers and all heads.
Refs:
- RoFormer: https://arxiv.org/abs/2104.09864
- VisionLLaMA: https://arxiv.org/abs/2403.00522
- https://github.com/Meituan-AutoML/VisionLLaMA/blob/main/dit/models.py
Args:
dim (int): usually the multi-head attention dimension, should be divisible by 4 (TODO: relax this constraint if needed)
max_height (int): the maximum height of the 2D grid
max_width (int): the maximum width of the 2D grid
theta_base (float): the base of the theta
device (str): the device to store the precomputed cis
"""
def __init__(self, dim: int, max_height: int, max_width: int, theta_base=10000):
super().__init__()
self.dim = dim
assert self.dim % 4 == 0, "dim must be divisible by 4"
self.max_height = max_height
self.max_width = max_width
self.theta_base = theta_base
self.freqs_cis = None
def extra_repr(self):
return f"dim={self.dim}, max_height={self.max_height}, max_width={self.max_width}, theta_base={self.theta_base}"
def _precompute_freqs_cis(self, device: torch.device) -> torch.Tensor:
"""Calculate the cis(freqs) for each position in the 2D grid.
Return: complex tensor of shape (max_height, max_width, dim//2) and value:
height axis: ret[h, w, 2*i] = cis(h * theta_base**(-4*i/dim))
weight axis: ret[h, w, 2*i+1] = cis(w * theta_base**(-4*i/dim)) with (i in [0, dim//4))
note: `cis` is a mathematical notation defined by cis x = cos x + i sin x,
"""
N = self.max_height * self.max_width
flat_pos = torch.arange(0, N).float().to(device)
x_pos = flat_pos % self.max_width
y_pos = flat_pos // self.max_width
dim_range = (
torch.arange(0, self.dim, 4)[: (self.dim // 4)].float().to(device)
) # C/4
freqs = 1.0 / (self.theta_base ** (dim_range / self.dim))
x_freqs = torch.outer(x_pos, freqs).float() # N, C/4
y_freqs = torch.outer(y_pos, freqs).float() # N, C/4
x_cis = torch.polar(torch.ones_like(x_freqs), x_freqs) # N, C/4
y_cis = torch.polar(torch.ones_like(y_freqs), y_freqs) # N, C/4
# N, C/4, 2
freqs_cis = torch.cat(
[x_cis.unsqueeze(dim=-1), y_cis.unsqueeze(dim=-1)], dim=-1
)
# max_height, max_width, C/2
freqs_cis = freqs_cis.reshape(self.max_height, self.max_width, -1)
return freqs_cis
def get_freqs_cis(self, grid_hws: torch.Tensor) -> torch.Tensor:
"""
Args:
grid_hws (torch.Tensor): grid height and width
Returns:
freqs_cis: tensor of shape (sum(t * height * width), dim//2)
"""
if self.freqs_cis is None:
self.freqs_cis = self._precompute_freqs_cis(grid_hws.device)
shapes = grid_hws.tolist()
assert all(
1 <= h <= self.max_height and 1 <= w <= self.max_width for h, w in shapes
), (
shapes,
self.max_height,
self.max_width,
)
# freqs_cis = torch.cat(
# [self.freqs_cis[:h, :w].reshape(-1, self.dim // 2) for h, w in shapes],
# dim=0,
# )
max_h, max_w = grid_hws.max(dim=0).values.tolist()
max_h, max_w = max_h // 2, max_w // 2
freqs_cis = self.freqs_cis[:max_h, :max_w].reshape(-1, self.dim // 2).repeat(len(shapes), 1, 1)
return freqs_cis
class MLP2(nn.Module):
"""
Args:
dims: [in_dim, hidden_dim, out_dim]
bias: whether to use bias in linear layer.
"""
def __init__(self, dims: list[int], activation, bias=True):
super().__init__()
assert len(dims) == 3
self.fc0 = nn.Linear(dims[0], dims[1], bias=bias)
self.fc1 = nn.Linear(dims[1], dims[2], bias=bias)
self.activation = activation
for m in [self.fc0, self.fc1]:
nn.init.trunc_normal_(m.weight, std=math.sqrt(2 / m.in_features))
if m.bias is not None:
nn.init.zeros_(m.bias)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.fc0(x)
x = self.activation(x)
return self.fc1(x)
class MoonVitEncoderLayer(nn.Module):
def __init__(
self,
layer_idx: int,
num_heads: int,
hidden_dim: int,
mlp_dim: int,
attn_implementation: str = "eager",
activation=F.gelu,
attn_bias: bool = False,
):
super().__init__()
self.layer_idx = layer_idx
self.num_heads = num_heads
self.hidden_dim = hidden_dim
self.hidden_size_per_attention_head = self.hidden_dim // self.num_heads
self.attn_implementation = attn_implementation
self.norm0 = nn.LayerNorm(hidden_dim)
self.norm1 = nn.LayerNorm(hidden_dim)
self.mlp = MLP2([hidden_dim, mlp_dim, hidden_dim], activation)
self.wqkv = nn.Linear(hidden_dim, hidden_dim * 3, bias=attn_bias)
self.wo = nn.Linear(hidden_dim, hidden_dim, bias=attn_bias)
def attention_qkvpacked(
self,
x: torch.Tensor,
attention_mask: torch.Tensor,
rope_freqs_cis: Optional[torch.Tensor] = None,
past_key_value = None
):
"""
Args:
x (torch.Tensor): (batch_size, seqlen, hidden_dim)
cu_seqlens (torch.Tensor):
"""
batch_size, seqlen, hidden_dim = x.shape
xqkv = self.wqkv(x)
xqkv = xqkv.view(batch_size, seqlen, 3, self.num_heads, self.hidden_size_per_attention_head)
xq, xk, xv = torch.unbind(xqkv, dim=-3)
xq = xq.transpose(1, 2)
xk = xk.transpose(1, 2)
xv = xv.transpose(1, 2)
# xq, xk = apply_rope(xq, xk, rope_freqs_cis)
cos, sin = rope_freqs_cis
xq, xk = apply_multimodal_rotary_pos_emb(xq, xk, cos, sin)
if past_key_value is not None:
xk, xv = past_key_value.update(xk, xv, self.layer_idx)
attn_func = VL_VISION_ATTENTION_FUNCTIONS[self.attn_implementation]
attn_out = attn_func(xq, xk, xv, attention_mask)
attn_out = attn_out.reshape(batch_size, seqlen, hidden_dim).contiguous()
attn_out = self.wo(attn_out)
return attn_out
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: torch.Tensor,
rope_freqs_cis: Union[torch.Tensor, None] = None,
past_key_value = None
) -> torch.Tensor:
"""
Args:
hidden_states: non-packed (B, N, D) or packed (L, D). if non-packed, seqlens should be None, if packed, seqlens should be set
Returns:
output: same shape of input, non-packed (B, N, D) for non-packed input, (L, D) for packed input
"""
residual = hidden_states
hidden_states = self.norm0(hidden_states)
attn_out = self.attention_qkvpacked(
hidden_states, attention_mask, rope_freqs_cis=rope_freqs_cis, past_key_value=past_key_value,
)
hidden_states = residual + attn_out
residual = hidden_states
hidden_states = self.mlp(self.norm1(hidden_states))
hidden_states = residual + hidden_states
return hidden_states
class MoonVitEncoder(nn.Module):
def __init__(
self,
hidden_dim: int,
num_layers: int,
block_cfg: dict,
) -> None:
super().__init__()
self.blocks = nn.ModuleList(
[MoonVitEncoderLayer(layer_idx, **block_cfg) for layer_idx in range(num_layers)]
)
self.final_layernorm = nn.LayerNorm(hidden_dim)
self.gradient_checkpointing = False
def forward(self, hidden_states, attention_mask, rope_freqs_cis, past_key_value=None) -> torch.Tensor:
for _, block in enumerate(self.blocks):
if self.gradient_checkpointing and self.training:
# hidden_states = self._gradient_checkpointing_func(
# block.__call__, hidden_states, attention_mask, rope_freqs_cis
# )
hidden_states = torch.utils.checkpoint.checkpoint(
block.__call__, hidden_states, attention_mask, rope_freqs_cis, past_key_value
)
else:
hidden_states = block(
hidden_states, attention_mask, rope_freqs_cis=rope_freqs_cis, past_key_value=past_key_value,
)
hidden_states = self.final_layernorm(hidden_states)
return hidden_states
def patch_merger(
x: torch.Tensor,
grid_hws: torch.Tensor,
merge_kernel_size: list[int, int] = (2, 2),
) -> List[torch.Tensor]:
d_model = x.size(-1)
outputs = []
pre_sum = 0
for i, x_shape in enumerate(grid_hws.tolist()):
height, width = x_shape[0], x_shape[1]
# Get the current sequence
seq = x[pre_sum:pre_sum+height * width]
# Reshape along self.merge_kernel_size and concat to the last dimension
kernel_height, kernel_width = merge_kernel_size
new_height, new_width = height // kernel_height, width // kernel_width
reshaped_seq = seq.view(
new_height, kernel_height, new_width, kernel_width, d_model
)
reshaped_seq = reshaped_seq.permute(0, 2, 1, 3, 4).contiguous()
padded_seq = reshaped_seq.view(
new_height * new_width, kernel_height * kernel_width, -1
)
outputs.append(padded_seq)
pre_sum += height * width
return outputs
class MultiModalProjector(nn.Module):
def __init__(self, config):
super().__init__()
self.hidden_size = (
config.hidden_size
* config.merge_kernel_size[0]
* config.merge_kernel_size[1]
)
self.pre_norm = torch.nn.LayerNorm(config.hidden_size, eps=1e-05)
self.linear_1 = nn.Linear(self.hidden_size, self.hidden_size, bias=True)
self.act = GELUActivation()
self.linear_2 = nn.Linear(self.hidden_size, config.text_hidden_size, bias=True)
# self.linear_2 = nn.Linear(self.hidden_size, config.hidden_size, bias=True)
def forward(self, image_features: list[torch.Tensor]) -> torch.Tensor:
# image_features = torch.cat(image_features, dim=0)
# hidden_states = self.pre_norm(image_features).view(-1, self.hidden_size)
hidden_states = self.pre_norm(image_features)
hidden_states = self.linear_1(hidden_states)
hidden_states = self.act(hidden_states)
hidden_states = self.linear_2(hidden_states)
return hidden_states
class MoonVitPretrainedModel(PreTrainedModel):
config_class = MoonViTConfig
model_type = "moonvit"
supports_gradient_checkpointing = True
_no_split_modules = ["PackingTransformer"]
_supports_flash_attn_2 = True
_supports_sdpa = True
def __init__(self, config: MoonViTConfig, *inputs, **kwargs):
super().__init__(config, *inputs, **kwargs)
config = deepcopy(config)
self.merge_kernel_size = config.merge_kernel_size
self.patch_size = config.patch_size
self.patch_embed = MoonVisionPatchEmbed(
out_dim=config.hidden_size,
patch_size=config.patch_size,
pos_emb_height=config.init_pos_emb_height,
pos_emb_width=config.init_pos_emb_width,
)
self.rope_2d = Rope2DPosEmb(
config.hidden_size // config.num_attention_heads, 512, 512
)
self.encoder = MoonVitEncoder(
hidden_dim=config.hidden_size,
num_layers=config.num_hidden_layers,
block_cfg={
"num_heads": config.num_attention_heads,
"hidden_dim": config.hidden_size,
"mlp_dim": config.intermediate_size,
"activation": PytorchGELUTanh(),
"attn_bias": True,
"attn_implementation": config._attn_implementation,
},
)
self.pixel_merger = nn.Sequential(
nn.Linear(config.hidden_size*4, config.hidden_size),
nn.GELU(),
nn.Linear(config.hidden_size, config.hidden_size)
)
self.projector = nn.Sequential(
nn.LayerNorm(config.hidden_size),
nn.Linear(config.hidden_size, config.hidden_size, bias=True),
nn.GELU(),
nn.Linear(config.hidden_size, config.text_hidden_size, bias=True),
)
def _init_weights(self, module):
"""Initialize the weights"""
if isinstance(module, nn.Linear):
nn.init.xavier_uniform_(module.weight)
nn.init.normal_(module.bias, std=1e-6)
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
def forward(
self, pixel_values: torch.Tensor, image_grid_hws: torch.Tensor
) -> torch.Tensor:
"""
Args:
pixel_values (torch.Tensor): The input pixel values.
grid_hws (torch.Tensor): The grid height and width.
Returns:
torch.Tensor: The output tokens.
"""
hidden_states = self.patch_embed(pixel_values, image_grid_hws)
hidden_states_list = patch_merger(
hidden_states, image_grid_hws, merge_kernel_size=self.merge_kernel_size
)
hidden_states = self.pixel_merger(torch.cat(hidden_states_list).view(-1, hidden_states.shape[-1] * 4))
num_tokens = (image_grid_hws.prod(dim=1) // 4).tolist()
hidden_states_list = hidden_states.split(num_tokens, dim=0)
max_length = max(num_tokens)
max_h, max_w = image_grid_hws.max(dim=0).values.tolist()
max_length = max_h * max_w // 4
hidden_states = torch.stack([F.pad(h, (0, 0, 0, max_length - h.shape[0])) for h in hidden_states_list])
attention_mask = torch.zeros(len(image_grid_hws), max_length, device=hidden_states.device, dtype=torch.bool)
for i in range(len(image_grid_hws)):
attention_mask[i][:num_tokens[i]] = True
from transformers.modeling_attn_mask_utils import _prepare_4d_attention_mask
attention_mask = _prepare_4d_attention_mask(attention_mask, hidden_states.dtype)
rope_freqs_cis = self.rope_2d.get_freqs_cis(grid_hws=image_grid_hws)
hidden_states = self.encoder(hidden_states, attention_mask, rope_freqs_cis)
hidden_states = torch.cat([hidden_states[i][:num_tokens[i]] for i in range(len(image_grid_hws))])
# hidden_states = self.projector(hidden_states)
return hidden_states