| | |
| | | from funasr.models.transformer.embedding import PositionalEncoding |
| | | from funasr.models.paraformer.decoder import DecoderLayerSANM, ParaformerSANMDecoder |
| | | from funasr.models.sanm.positionwise_feed_forward import PositionwiseFeedForwardDecoderSANM |
| | | from funasr.models.sanm.attention import MultiHeadedAttentionSANMDecoder, MultiHeadedAttentionCrossAtt |
| | | from funasr.models.sanm.attention import ( |
| | | MultiHeadedAttentionSANMDecoder, |
| | | MultiHeadedAttentionCrossAtt, |
| | | ) |
| | | |
| | | |
| | | class ContextualDecoderLayer(torch.nn.Module): |
| | |
| | | self.concat_linear1 = torch.nn.Linear(size + size, size) |
| | | self.concat_linear2 = torch.nn.Linear(size + size, size) |
| | | |
| | | def forward(self, tgt, tgt_mask, memory, memory_mask, cache=None,): |
| | | def forward( |
| | | self, |
| | | tgt, |
| | | tgt_mask, |
| | | memory, |
| | | memory_mask, |
| | | cache=None, |
| | | ): |
| | | # tgt = self.dropout(tgt) |
| | | if isinstance(tgt, Tuple): |
| | | tgt, _ = tgt |
| | |
| | | if self.src_attn is not None: |
| | | if self.normalize_before: |
| | | x = self.norm3(x) |
| | | x = self.dropout(self.src_attn(x, memory, memory_mask)) |
| | | x = self.dropout(self.src_attn(x, memory, memory_mask)) |
| | | return x, tgt_mask, memory, memory_mask, cache |
| | | |
| | | |
| | | @tables.register("decoder_classes", "ContextualParaformerDecoder") |
| | | class ContextualParaformerDecoder(ParaformerSANMDecoder): |
| | |
| | | Paraformer: Fast and Accurate Parallel Transformer for Non-autoregressive End-to-End Speech Recognition |
| | | https://arxiv.org/abs/2006.01713 |
| | | """ |
| | | |
| | | def __init__( |
| | | self, |
| | | vocab_size: int, |
| | |
| | | ) |
| | | |
| | | attention_dim = encoder_output_size |
| | | if input_layer == 'none': |
| | | if input_layer == "none": |
| | | self.embed = None |
| | | if input_layer == "embed": |
| | | self.embed = torch.nn.Sequential( |
| | |
| | | dropout_rate=dropout_rate, |
| | | normalize_before=True, |
| | | ) |
| | | self.bias_output = torch.nn.Conv1d(attention_dim*2, attention_dim, 1, bias=False) |
| | | self.bias_output = torch.nn.Conv1d(attention_dim * 2, attention_dim, 1, bias=False) |
| | | self.last_decoder = ContextualDecoderLayer( |
| | | attention_dim, |
| | | MultiHeadedAttentionSANMDecoder( |
| | | attention_dim, self_attention_dropout_rate, kernel_size, sanm_shfit=sanm_shfit |
| | | ), |
| | | MultiHeadedAttentionCrossAtt( |
| | | attention_heads, attention_dim, src_attention_dropout_rate |
| | | ), |
| | | PositionwiseFeedForwardDecoderSANM(attention_dim, linear_units, dropout_rate), |
| | | dropout_rate, |
| | | normalize_before, |
| | | concat_after, |
| | | ) |
| | | attention_dim, |
| | | MultiHeadedAttentionSANMDecoder( |
| | | attention_dim, self_attention_dropout_rate, kernel_size, sanm_shfit=sanm_shfit |
| | | ), |
| | | MultiHeadedAttentionCrossAtt( |
| | | attention_heads, attention_dim, src_attention_dropout_rate |
| | | ), |
| | | PositionwiseFeedForwardDecoderSANM(attention_dim, linear_units, dropout_rate), |
| | | dropout_rate, |
| | | normalize_before, |
| | | concat_after, |
| | | ) |
| | | if num_blocks - att_layer_num <= 0: |
| | | self.decoders2 = None |
| | | else: |
| | |
| | | memory_mask = myutils.sequence_mask(hlens, device=memory.device)[:, None, :] |
| | | |
| | | x = tgt |
| | | x, tgt_mask, memory, memory_mask, _ = self.decoders( |
| | | x, tgt_mask, memory, memory_mask |
| | | ) |
| | | _, _, x_self_attn, x_src_attn = self.last_decoder( |
| | | x, tgt_mask, memory, memory_mask |
| | | ) |
| | | x, tgt_mask, memory, memory_mask, _ = self.decoders(x, tgt_mask, memory, memory_mask) |
| | | _, _, x_self_attn, x_src_attn = self.last_decoder(x, tgt_mask, memory, memory_mask) |
| | | |
| | | # contextual paraformer related |
| | | contextual_length = torch.Tensor([contextual_info.shape[1]]).int().repeat(hs_pad.shape[0]) |
| | | contextual_mask = myutils.sequence_mask(contextual_length, device=memory.device)[:, None, :] |
| | | cx, tgt_mask, _, _, _ = self.bias_decoder(x_self_attn, tgt_mask, contextual_info, memory_mask=contextual_mask) |
| | | cx, tgt_mask, _, _, _ = self.bias_decoder( |
| | | x_self_attn, tgt_mask, contextual_info, memory_mask=contextual_mask |
| | | ) |
| | | |
| | | if self.bias_output is not None: |
| | | x = torch.cat([x_src_attn, cx*clas_scale], dim=2) |
| | | x = torch.cat([x_src_attn, cx * clas_scale], dim=2) |
| | | x = self.bias_output(x.transpose(1, 2)).transpose(1, 2) # 2D -> D |
| | | x = x_self_attn + self.dropout(x) |
| | | |
| | | if self.decoders2 is not None: |
| | | x, tgt_mask, memory, memory_mask, _ = self.decoders2( |
| | | x, tgt_mask, memory, memory_mask |
| | | ) |
| | | x, tgt_mask, memory, memory_mask, _ = self.decoders2(x, tgt_mask, memory, memory_mask) |
| | | |
| | | x, tgt_mask, memory, memory_mask, _ = self.decoders3( |
| | | x, tgt_mask, memory, memory_mask |
| | | ) |
| | | x, tgt_mask, memory, memory_mask, _ = self.decoders3(x, tgt_mask, memory, memory_mask) |
| | | if self.normalize_before: |
| | | x = self.after_norm(x) |
| | | olens = tgt_mask.sum(1) |
| | |
| | | |
| | | @tables.register("decoder_classes", "ContextualParaformerDecoderExport") |
| | | class ContextualParaformerDecoderExport(torch.nn.Module): |
| | | def __init__(self, model, |
| | | max_seq_len=512, |
| | | model_name='decoder', |
| | | onnx: bool = True, |
| | | **kwargs,): |
| | | def __init__( |
| | | self, |
| | | model, |
| | | max_seq_len=512, |
| | | model_name="decoder", |
| | | onnx: bool = True, |
| | | **kwargs, |
| | | ): |
| | | super().__init__() |
| | | from funasr.utils.torch_function import sequence_mask |
| | | |
| | | self.model = model |
| | | self.make_pad_mask = sequence_mask(max_seq_len, flip=False) |
| | | |
| | | |
| | | from funasr.models.sanm.attention import MultiHeadedAttentionSANMDecoderExport |
| | | from funasr.models.sanm.attention import MultiHeadedAttentionCrossAttExport |
| | | from funasr.models.paraformer.decoder import DecoderLayerSANMExport |
| | | from funasr.models.transformer.positionwise_feed_forward import PositionwiseFeedForwardDecoderSANMExport |
| | | from funasr.models.transformer.positionwise_feed_forward import ( |
| | | PositionwiseFeedForwardDecoderSANMExport, |
| | | ) |
| | | |
| | | for i, d in enumerate(self.model.decoders): |
| | | if isinstance(d.feed_forward, PositionwiseFeedForwardDecoderSANM): |
| | |
| | | if isinstance(d.feed_forward, PositionwiseFeedForwardDecoderSANM): |
| | | d.feed_forward = PositionwiseFeedForwardDecoderSANMExport(d.feed_forward) |
| | | self.model.decoders3[i] = DecoderLayerSANMExport(d) |
| | | |
| | | |
| | | self.output_layer = model.output_layer |
| | | self.after_norm = model.after_norm |
| | | self.model_name = model_name |
| | | |
| | | # bias decoder |
| | | if isinstance(self.model.bias_decoder.src_attn, MultiHeadedAttentionCrossAtt): |
| | | self.model.bias_decoder.src_attn = MultiHeadedAttentionCrossAttExport(self.model.bias_decoder.src_attn) |
| | | self.model.bias_decoder.src_attn = MultiHeadedAttentionCrossAttExport( |
| | | self.model.bias_decoder.src_attn |
| | | ) |
| | | self.bias_decoder = self.model.bias_decoder |
| | | |
| | | |
| | | # last decoder |
| | | if isinstance(self.model.last_decoder.src_attn, MultiHeadedAttentionCrossAtt): |
| | | self.model.last_decoder.src_attn = MultiHeadedAttentionCrossAttExport(self.model.last_decoder.src_attn) |
| | | self.model.last_decoder.src_attn = MultiHeadedAttentionCrossAttExport( |
| | | self.model.last_decoder.src_attn |
| | | ) |
| | | if isinstance(self.model.last_decoder.self_attn, MultiHeadedAttentionSANMDecoder): |
| | | self.model.last_decoder.self_attn = MultiHeadedAttentionSANMDecoderExport(self.model.last_decoder.self_attn) |
| | | self.model.last_decoder.self_attn = MultiHeadedAttentionSANMDecoderExport( |
| | | self.model.last_decoder.self_attn |
| | | ) |
| | | if isinstance(self.model.last_decoder.feed_forward, PositionwiseFeedForwardDecoderSANM): |
| | | self.model.last_decoder.feed_forward = PositionwiseFeedForwardDecoderSANMExport(self.model.last_decoder.feed_forward) |
| | | self.model.last_decoder.feed_forward = PositionwiseFeedForwardDecoderSANMExport( |
| | | self.model.last_decoder.feed_forward |
| | | ) |
| | | self.last_decoder = self.model.last_decoder |
| | | self.bias_output = self.model.bias_output |
| | | self.dropout = self.model.dropout |
| | | |
| | | |
| | | def prepare_mask(self, mask): |
| | | mask_3d_btd = mask[:, :, None] |
| | |
| | | elif len(mask.shape) == 3: |
| | | mask_4d_bhlt = 1 - mask[:, None, :] |
| | | mask_4d_bhlt = mask_4d_bhlt * -10000.0 |
| | | |
| | | |
| | | return mask_3d_btd, mask_4d_bhlt |
| | | |
| | | def forward( |
| | |
| | | # memory_mask = myutils.sequence_mask(hlens, device=memory.device)[:, None, :] |
| | | |
| | | x = tgt |
| | | x, tgt_mask, memory, memory_mask, _ = self.model.decoders( |
| | | x, tgt_mask, memory, memory_mask |
| | | ) |
| | | x, tgt_mask, memory, memory_mask, _ = self.model.decoders(x, tgt_mask, memory, memory_mask) |
| | | |
| | | _, _, x_self_attn, x_src_attn = self.last_decoder( |
| | | x, tgt_mask, memory, memory_mask |
| | | ) |
| | | _, _, x_self_attn, x_src_attn = self.last_decoder(x, tgt_mask, memory, memory_mask) |
| | | |
| | | # contextual paraformer related |
| | | contextual_length = torch.Tensor([bias_embed.shape[1]]).int().repeat(hs_pad.shape[0]) |
| | | # contextual_mask = myutils.sequence_mask(contextual_length, device=memory.device)[:, None, :] |
| | | contextual_mask = self.make_pad_mask(contextual_length) |
| | | contextual_mask, _ = self.prepare_mask(contextual_mask) |
| | | # import pdb; pdb.set_trace() |
| | | contextual_mask = contextual_mask.transpose(2, 1).unsqueeze(1) |
| | | cx, tgt_mask, _, _, _ = self.bias_decoder(x_self_attn, tgt_mask, bias_embed, memory_mask=contextual_mask) |
| | | cx, tgt_mask, _, _, _ = self.bias_decoder( |
| | | x_self_attn, tgt_mask, bias_embed, memory_mask=contextual_mask |
| | | ) |
| | | |
| | | if self.bias_output is not None: |
| | | x = torch.cat([x_src_attn, cx], dim=2) |
| | |
| | | x, tgt_mask, memory, memory_mask, _ = self.model.decoders2( |
| | | x, tgt_mask, memory, memory_mask |
| | | ) |
| | | x, tgt_mask, memory, memory_mask, _ = self.model.decoders3( |
| | | x, tgt_mask, memory, memory_mask |
| | | ) |
| | | x, tgt_mask, memory, memory_mask, _ = self.model.decoders3(x, tgt_mask, memory, memory_mask) |
| | | x = self.after_norm(x) |
| | | x = self.output_layer(x) |
| | | |
| | | return x, ys_in_lens |
| | | |