|
|
import torch |
|
|
from torch import nn |
|
|
from transformers import WhisperConfig |
|
|
from transformers.activations import ACT2FN |
|
|
from transformers.models.whisper.modeling_whisper import WHISPER_ATTENTION_CLASSES |
|
|
import torch.nn.functional as F |
|
|
from .coattention import CoAttention |
|
|
from .layers import CustomLinear, CustomDiagonalLinear, Gate, CustomLinearInitialized |
|
|
|
|
|
class LowRankApproxSelectFirst(nn.Module): |
|
|
def __init__(self, d_in, d_out, rank): |
|
|
super().__init__() |
|
|
self.d_in = d_in |
|
|
self.d_out = d_out |
|
|
self.rank = rank |
|
|
self.proj_in = nn.Linear(d_in, rank) |
|
|
self.proj_out = nn.Linear(rank, d_out) |
|
|
|
|
|
def forward(self, x): |
|
|
return self.proj_out(self.proj_in(x)) |
|
|
|
|
|
def _init_weights(self): |
|
|
|
|
|
eye = torch.eye(self.d_out, self.d_in) |
|
|
|
|
|
|
|
|
U, S, Vh = torch.linalg.svd(eye, full_matrices=False) |
|
|
|
|
|
U_k = U[:, :self.rank] |
|
|
S_k = S[:self.rank] |
|
|
V_k = Vh[:self.rank, :] |
|
|
|
|
|
A = V_k |
|
|
B = U_k @ torch.diag(S_k) |
|
|
|
|
|
|
|
|
self.proj_in.weight.data.copy_(A) |
|
|
self.proj_in.bias.data.zero_() |
|
|
self.proj_out.weight.data.copy_(B) |
|
|
self.proj_out.bias.data.zero_() |
|
|
|
|
|
|
|
|
|
|
|
class TACBlock(nn.Module): |
|
|
def __init__(self, config: WhisperConfig, d_int_factor: float = 1, num_speakers=2): |
|
|
super().__init__() |
|
|
d = config.d_model |
|
|
d_prime = int(d * d_int_factor) |
|
|
self.num_speakers = num_speakers |
|
|
self.proj_in_1 = nn.Linear(d, d_prime, bias=True) |
|
|
self.proj_in_2 = nn.Linear(d, d_prime, bias=True) |
|
|
self.proj_int = nn.Linear(d_prime, d_prime,bias=True) |
|
|
self.proj_out_1 = nn.Linear(d+d_prime, d,bias=True) |
|
|
self.proj_out_2 = nn.Linear(d+d_prime, d,bias=True) |
|
|
self.activation_fn = ACT2FN[config.activation_function] |
|
|
self.norms = nn.ModuleList([nn.LayerNorm(d) for _ in range(self.num_speakers)]) |
|
|
self.gate = Gate(self.num_speakers, 0.05) |
|
|
|
|
|
def forward(self, hidden_states: torch.FloatTensor) -> torch.FloatTensor: |
|
|
|
|
|
|
|
|
x_proj = torch.stack([self.activation_fn(self.proj_in_1(hidden_states[:,0])), self.activation_fn(self.proj_in_2(hidden_states[:, 1]))], dim=1) |
|
|
x_mean = x_proj.mean(dim=1, keepdim=True) |
|
|
z = self.activation_fn(self.proj_int(x_mean)) |
|
|
|
|
|
z_expand = z.expand(-1, self.num_speakers, -1, -1) |
|
|
x_cat = torch.cat([hidden_states, z_expand], dim=-1) |
|
|
x_out = torch.stack([self.norms[0](self.proj_out_1(x_cat[:, 0])), self.norms[1](self.proj_out_2(x_cat[:, 1]))], dim=1) |
|
|
return hidden_states + self.gate(x_out, dim=1) |
|
|
|
|
|
|
|
|
class CrossAttentionBlock(nn.Module): |
|
|
def __init__(self, config: WhisperConfig): |
|
|
super().__init__() |
|
|
self.embed_dim = config.d_model |
|
|
|
|
|
self.num_speakers = getattr(config, "mt_num_speakers", 2) |
|
|
if self.num_speakers != 2: |
|
|
raise ValueError("CrossAttentionBlock supports only 2 speakers.") |
|
|
|
|
|
|
|
|
self.attn_blocks = nn.ModuleList([ |
|
|
WHISPER_ATTENTION_CLASSES[config._attn_implementation]( |
|
|
embed_dim=self.embed_dim, |
|
|
num_heads=config.encoder_attention_heads, |
|
|
dropout=config.attention_dropout, |
|
|
config=config, |
|
|
) |
|
|
for _ in range(self.num_speakers) |
|
|
]) |
|
|
|
|
|
self.norms = nn.ModuleList([nn.LayerNorm(self.embed_dim) for _ in range(self.num_speakers)]) |
|
|
self.gate = Gate(self.num_speakers, 0.01) |
|
|
|
|
|
def forward(self, hidden_states): |
|
|
|
|
|
outputs = [] |
|
|
for s in range(self.num_speakers): |
|
|
q = hidden_states[:, s] |
|
|
other_s = 1 - s |
|
|
kv = hidden_states[:, other_s] |
|
|
|
|
|
attn_out, _, _ = self.attn_blocks[s](hidden_states=q, key_value_states=kv) |
|
|
outputs.append(self.norms[s](attn_out[:, None, :, :])) |
|
|
outputs = torch.concat(outputs, dim=1) |
|
|
outputs_modulated = self.gate(outputs, dim=1) + hidden_states |
|
|
return outputs_modulated |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def first_init_fun(module): |
|
|
|
|
|
|
|
|
torch.nn.init.xavier_uniform_(module.weight, gain=0.1) |
|
|
|
|
|
|
|
|
|
|
|
module.weight.data[:module.weight.shape[1] // 2, module.weight.shape[1] // 2:] += torch.eye(module.weight.shape[1] // 2) |
|
|
|
|
|
|
|
|
|
|
|
module.bias.data.zero_() |
|
|
|
|
|
def second_init_fun(module): |
|
|
|
|
|
torch.nn.init.xavier_uniform_(module.weight, gain=0.1) |
|
|
|
|
|
|
|
|
module.weight.data[:, :module.weight.shape[0]] += torch.eye(module.weight.shape[0]) |
|
|
|
|
|
|
|
|
module.bias.data.zero_() |
|
|
|
|
|
|
|
|
class CrossAttentionEnrollBlockNew(nn.Module): |
|
|
def __init__(self, config, layer_norm_eps: float = 1e-5): |
|
|
super().__init__() |
|
|
self.embed_dim = config.d_model |
|
|
self.ffn_dim = config.encoder_ffn_dim |
|
|
|
|
|
self.cross_attn = WHISPER_ATTENTION_CLASSES[config._attn_implementation]( |
|
|
embed_dim=self.embed_dim, |
|
|
num_heads=config.encoder_attention_heads, |
|
|
dropout=config.attention_dropout, |
|
|
config=config, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
self.cross_gate = nn.Parameter(torch.zeros(1)) |
|
|
|
|
|
self.ffn = nn.Sequential( |
|
|
CustomLinearInitialized(self.embed_dim * 2, self.ffn_dim, init_fun=first_init_fun), |
|
|
ACT2FN[config.activation_function], |
|
|
nn.Dropout(config.dropout if hasattr(config, 'dropout') else 0.1), |
|
|
CustomLinearInitialized(self.ffn_dim, self.embed_dim, init_fun=second_init_fun), |
|
|
nn.Dropout(config.dropout if hasattr(config, 'dropout') else 0.1) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: |
|
|
""" |
|
|
Args: |
|
|
hidden_states: (B, 2, T, F) - batch, channels, time, features |
|
|
Returns: |
|
|
Updated hidden states of same shape |
|
|
""" |
|
|
q_channel = hidden_states[:, 0] |
|
|
kv_channel = hidden_states[:, 1] |
|
|
|
|
|
|
|
|
attn_output = self.cross_attn( |
|
|
hidden_states=q_channel, |
|
|
key_value_states=kv_channel, |
|
|
output_attentions=False |
|
|
)[0] |
|
|
|
|
|
|
|
|
q_concat = torch.cat([attn_output, q_channel], dim=-1) |
|
|
|
|
|
|
|
|
|
|
|
updated_q = q_channel + torch.tanh(self.cross_gate) * self.ffn(q_concat) |
|
|
|
|
|
|
|
|
return torch.stack([updated_q, kv_channel], dim=1) |
|
|
|
|
|
class CrossAttentionEnrollBlock(nn.Module): |
|
|
def __init__(self, config: WhisperConfig): |
|
|
super().__init__() |
|
|
self.embed_dim = config.d_model |
|
|
|
|
|
|
|
|
self.attn_block = WHISPER_ATTENTION_CLASSES[config._attn_implementation]( |
|
|
embed_dim=self.embed_dim, |
|
|
num_heads=config.encoder_attention_heads, |
|
|
dropout=config.attention_dropout, |
|
|
config=config, |
|
|
) |
|
|
|
|
|
self.norm = nn.LayerNorm(self.embed_dim) |
|
|
self.gate = Gate(1, 0.1) |
|
|
|
|
|
def forward(self, hidden_states): |
|
|
q = hidden_states[:, 0] |
|
|
kv = hidden_states[:, 1] |
|
|
attn_out, _, _ = self.attn_block(hidden_states=q, key_value_states=kv) |
|
|
out = self.norm(attn_out) |
|
|
|
|
|
|
|
|
updated_q = self.gate(out[:, None, :, :], dim=1)[:, 0] + q |
|
|
|
|
|
|
|
|
result = torch.stack([updated_q, kv], dim=1) |
|
|
return result |
|
|
|
|
|
|
|
|
class CompetitiveCrossAttentionBlock(nn.Module): |
|
|
def __init__(self, config): |
|
|
super().__init__() |
|
|
self.embed_dim = config.d_model |
|
|
self.num_heads = config.encoder_attention_heads |
|
|
self.head_dim = self.embed_dim // self.num_heads |
|
|
assert ( |
|
|
self.head_dim * self.num_heads == self.embed_dim |
|
|
), "embed_dim must be divisible by num_heads" |
|
|
|
|
|
self.num_speakers = getattr(config, "mt_num_speakers", 2) |
|
|
if self.num_speakers != 2: |
|
|
raise ValueError("CompetitiveCrossAttentionBlock supports only 2 speakers.") |
|
|
|
|
|
|
|
|
self.q_proj = nn.Linear(self.embed_dim, self.embed_dim) |
|
|
self.k_proj = nn.Linear(self.embed_dim, self.embed_dim) |
|
|
self.v_proj = nn.Linear(self.embed_dim, self.embed_dim) |
|
|
|
|
|
self.out_proj = nn.Linear(self.embed_dim, self.embed_dim) |
|
|
|
|
|
self.norms = nn.ModuleList([nn.LayerNorm(self.embed_dim) for _ in range(self.num_speakers)]) |
|
|
self.eps = 1e-6 |
|
|
self.gate = Gate(self.num_speakers, 0.01) |
|
|
|
|
|
def _shape(self, tensor, seq_len, batch_size): |
|
|
|
|
|
return tensor.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2) |
|
|
|
|
|
def forward(self, hidden_states): |
|
|
|
|
|
B, _, T, _ = hidden_states.shape |
|
|
|
|
|
h1, h2 = hidden_states[:, 0], hidden_states[:, 1] |
|
|
|
|
|
|
|
|
Q1 = self.q_proj(h1) |
|
|
K2 = self.k_proj(h2) |
|
|
V2 = self.v_proj(h2) |
|
|
|
|
|
Q2 = self.q_proj(h2) |
|
|
K1 = self.k_proj(h1) |
|
|
V1 = self.v_proj(h1) |
|
|
|
|
|
|
|
|
Q1 = self._shape(Q1, T, B) |
|
|
K2 = self._shape(K2, T, B) |
|
|
V2 = self._shape(V2, T, B) |
|
|
|
|
|
Q2 = self._shape(Q2, T, B) |
|
|
K1 = self._shape(K1, T, B) |
|
|
V1 = self._shape(V1, T, B) |
|
|
|
|
|
|
|
|
scale = 1 / (self.head_dim ** 0.5) |
|
|
L_1to2 = torch.matmul(Q1, K2.transpose(-1, -2)) * scale |
|
|
L_2to1 = torch.matmul(Q2, K1.transpose(-1, -2)) * scale |
|
|
|
|
|
|
|
|
S_1to2 = F.softmax(L_1to2, dim=-1) |
|
|
S_2to1 = F.softmax(L_2to1, dim=-1) |
|
|
|
|
|
|
|
|
M_joint = S_1to2 + S_2to1 + self.eps |
|
|
A_1to2 = S_1to2 / M_joint |
|
|
A_2to1 = S_2to1 / M_joint |
|
|
|
|
|
|
|
|
H1_attn = torch.matmul(A_1to2, V2) |
|
|
H2_attn = torch.matmul(A_2to1, V1) |
|
|
|
|
|
|
|
|
H1_attn = H1_attn.transpose(1, 2).contiguous().view(B, T, self.embed_dim) |
|
|
H2_attn = H2_attn.transpose(1, 2).contiguous().view(B, T, self.embed_dim) |
|
|
|
|
|
|
|
|
H1_attn = self.norms[0](self.out_proj(H1_attn)) |
|
|
H2_attn = self.norms[1](self.out_proj(H2_attn)) |
|
|
|
|
|
|
|
|
out = hidden_states + self.gate(torch.concat([H1_attn[:, None, :, :], H2_attn[:, None, :, :]], dim=1), dim=1) |
|
|
|
|
|
return out |
|
|
|
|
|
|
|
|
class CoAttentionWrapper(nn.Module): |
|
|
def __init__(self, config, num_speakers=2): |
|
|
super().__init__() |
|
|
self.coa = CoAttention(embed_dim=config.d_model, single_dim=config.d_model//2, multi_dim=config.d_model // 4, n_heads=config.encoder_attention_heads, attn_dropout=config.attention_dropout) |
|
|
self.gate = Gate(num_speakers, 0.01) |
|
|
|
|
|
def forward(self, coa_input: torch.Tensor) -> torch.Tensor: |
|
|
|
|
|
hidden_states = coa_input.permute(-2, 0, 1, -1) |
|
|
hidden_states = self.coa(hidden_states) |
|
|
out = coa_input + self.gate(hidden_states.permute(1, 2, 0, -1), dim=1) |
|
|
return out |
|
|
|
|
|
|
|
|
class SpeakerCommunicationBlock(nn.Module): |
|
|
def __init__(self, config): |
|
|
super().__init__() |
|
|
self.num_speakers = getattr(config, "mt_num_speakers", 2) |
|
|
self.embed_dim = config.d_model |
|
|
self.scb_method = config.scb_method |
|
|
self.config = config |
|
|
|
|
|
if self.scb_method == "tac": |
|
|
self.method = TACBlock(config) |
|
|
elif self.scb_method == "cross_attention": |
|
|
self.method = CrossAttentionBlock(config) |
|
|
elif self.scb_method == "cross_attention_enroll": |
|
|
self.method = CrossAttentionEnrollBlock(config) |
|
|
elif self.scb_method == "cross_attention_enroll_new": |
|
|
self.method = CrossAttentionEnrollBlockNew(config) |
|
|
elif self.scb_method == "competitive_cross_attention": |
|
|
self.method = CompetitiveCrossAttentionBlock(config) |
|
|
elif self.scb_method == "co_attention": |
|
|
self.method = CoAttentionWrapper(config) |
|
|
elif self.scb_method == "identity": |
|
|
self.method = (nn.Parameter(torch.zeros(self.embed_dim)) if config.fddt_bias_only else ( |
|
|
CustomDiagonalLinear(self.embed_dim, bias=True, init_eye_val=1.0) if config.fddt_is_diagonal else CustomLinear( |
|
|
self.embed_dim, self.embed_dim, bias=True, init_eye_val=1.0))) |
|
|
else: |
|
|
raise ValueError(f"Unsupported scb_method: {self.scb_method}") |
|
|
|
|
|
def forward(self, x): |
|
|
|
|
|
B, T, F = x.shape |
|
|
S = self.num_speakers |
|
|
|
|
|
|
|
|
x_reshaped = x.view(B//S, S, T, F) |
|
|
|
|
|
|
|
|
out = self.method(x_reshaped) |
|
|
|
|
|
|
|
|
out_merged = out.view(B, T, F) |
|
|
return out_merged |
|
|
|