| | |
| | | if len(intermediate_outs) > 0: |
| | | return (xs_pad, intermediate_outs), olens, None |
| | | return xs_pad, olens, None |
| | | |
| | | |
| | | class EncoderLayerSANMExport(torch.nn.Module): |
| | | def __init__( |
| | | self, |
| | | model, |
| | | ): |
| | | """Construct an EncoderLayer object.""" |
| | | super().__init__() |
| | | self.self_attn = model.self_attn |
| | | self.feed_forward = model.feed_forward |
| | | self.norm1 = model.norm1 |
| | | self.norm2 = model.norm2 |
| | | self.in_size = model.in_size |
| | | self.size = model.size |
| | | |
| | | def forward(self, x, mask): |
| | | |
| | | residual = x |
| | | x = self.norm1(x) |
| | | x = self.self_attn(x, mask) |
| | | if self.in_size == self.size: |
| | | x = x + residual |
| | | residual = x |
| | | x = self.norm2(x) |
| | | x = self.feed_forward(x) |
| | | x = x + residual |
| | | |
| | | return x, mask |
| | | |
| | | @tables.register("encoder_classes", "SANMVadEncoderExport") |
| | | class SANMVadEncoderExport(torch.nn.Module): |
| | | def __init__( |
| | | self, |
| | | model, |
| | | max_seq_len=512, |
| | | feats_dim=560, |
| | | model_name='encoder', |
| | | onnx: bool = True, |
| | | ): |
| | | super().__init__() |
| | | self.embed = model.embed |
| | | self.model = model |
| | | self._output_size = model._output_size |
| | | |
| | | from funasr.utils.torch_function import MakePadMask |
| | | from funasr.utils.torch_function import sequence_mask |
| | | |
| | | if onnx: |
| | | self.make_pad_mask = MakePadMask(max_seq_len, flip=False) |
| | | else: |
| | | self.make_pad_mask = sequence_mask(max_seq_len, flip=False) |
| | | |
| | | from funasr.models.sanm.attention import MultiHeadedAttentionSANMExport |
| | | |
| | | if hasattr(model, 'encoders0'): |
| | | for i, d in enumerate(self.model.encoders0): |
| | | if isinstance(d.self_attn, MultiHeadedAttentionSANMwithMask): |
| | | d.self_attn = MultiHeadedAttentionSANMExport(d.self_attn) |
| | | self.model.encoders0[i] = EncoderLayerSANMExport(d) |
| | | |
| | | for i, d in enumerate(self.model.encoders): |
| | | if isinstance(d.self_attn, MultiHeadedAttentionSANMwithMask): |
| | | d.self_attn = MultiHeadedAttentionSANMExport(d.self_attn) |
| | | self.model.encoders[i] = EncoderLayerSANMExport(d) |
| | | |
| | | |
| | | def prepare_mask(self, mask, sub_masks): |
| | | mask_3d_btd = mask[:, :, None] |
| | | mask_4d_bhlt = (1 - sub_masks) * -10000.0 |
| | | |
| | | return mask_3d_btd, mask_4d_bhlt |
| | | |
| | | def forward(self, |
| | | speech: torch.Tensor, |
| | | speech_lengths: torch.Tensor, |
| | | vad_masks: torch.Tensor, |
| | | sub_masks: torch.Tensor, |
| | | ): |
| | | speech = speech * self._output_size ** 0.5 |
| | | mask = self.make_pad_mask(speech_lengths) |
| | | vad_masks = self.prepare_mask(mask, vad_masks) |
| | | mask = self.prepare_mask(mask, sub_masks) |
| | | |
| | | if self.embed is None: |
| | | xs_pad = speech |
| | | else: |
| | | xs_pad = self.embed(speech) |
| | | |
| | | encoder_outs = self.model.encoders0(xs_pad, mask) |
| | | xs_pad, masks = encoder_outs[0], encoder_outs[1] |
| | | |
| | | # encoder_outs = self.model.encoders(xs_pad, mask) |
| | | for layer_idx, encoder_layer in enumerate(self.model.encoders): |
| | | if layer_idx == len(self.model.encoders) - 1: |
| | | mask = vad_masks |
| | | encoder_outs = encoder_layer(xs_pad, mask) |
| | | xs_pad, masks = encoder_outs[0], encoder_outs[1] |
| | | |
| | | xs_pad = self.model.after_norm(xs_pad) |
| | | |
| | | return xs_pad, speech_lengths |
| | | |
| | | def get_output_size(self): |
| | | return self.model.encoders[0].size |