| | |
| | | from funasr.modules.positionwise_feed_forward import PositionwiseFeedForward |
| | | from funasr.export.models.modules.feedforward import PositionwiseFeedForward as PositionwiseFeedForward_export |
| | | |
| | | def subsequent_mask(size, device="cpu", dtype=torch.bool): |
| | | """Create mask for subsequent steps (size, size). |
| | | |
| | | :param int size: size of mask |
| | | :param str device: "cpu" or "cuda" or torch.Tensor.device |
| | | :param torch.dtype dtype: result dtype |
| | | :rtype: torch.Tensor |
| | | >>> subsequent_mask(3) |
| | | [[1, 0, 0], |
| | | [1, 1, 0], |
| | | [1, 1, 1]] |
| | | """ |
| | | ret = torch.ones(size, size, device=device, dtype=dtype) |
| | | return torch.tril(ret, out=ret) |
| | | |
| | | class SANMEncoder(nn.Module): |
| | | def __init__( |
| | |
| | | self.num_heads = model.encoders[0].self_attn.h |
| | | self.hidden_size = model.encoders[0].self_attn.linear_out.out_features |
| | | |
| | | def prepare_mask(self, mask): |
| | | def prepare_mask(self, mask, sub_masks): |
| | | mask_3d_btd = mask[:, :, None] |
| | | sub_masks = subsequent_mask(mask.size(-1)).type(torch.float32) |
| | | if len(mask.shape) == 2: |
| | | mask_4d_bhlt = 1 - sub_masks[:, None, None, :] |
| | | elif len(mask.shape) == 3: |
| | | mask_4d_bhlt = 1 - sub_masks[:, None, :] |
| | | mask_4d_bhlt = mask_4d_bhlt * -10000.0 |
| | | mask_4d_bhlt = (1 - sub_masks) * -10000.0 |
| | | |
| | | return mask_3d_btd, mask_4d_bhlt |
| | | |
| | | def forward(self, |
| | | speech: torch.Tensor, |
| | | speech_lengths: torch.Tensor, |
| | | vad_mask: torch.Tensor, |
| | | vad_masks: torch.Tensor, |
| | | sub_masks: torch.Tensor, |
| | | ): |
| | | speech = speech * self._output_size ** 0.5 |
| | | mask = self.make_pad_mask(speech_lengths) |
| | | mask = self.prepare_mask(mask) |
| | | vad_masks = self.prepare_mask(mask, vad_masks) |
| | | mask = self.prepare_mask(mask, sub_masks) |
| | | |
| | | if self.embed is None: |
| | | xs_pad = speech |
| | | else: |
| | |
| | | # encoder_outs = self.model.encoders(xs_pad, mask) |
| | | for layer_idx, encoder_layer in enumerate(self.model.encoders): |
| | | if layer_idx == len(self.model.encoders) - 1: |
| | | mask = (mask[0], vad_mask) |
| | | mask = vad_masks |
| | | encoder_outs = encoder_layer(xs_pad, mask) |
| | | xs_pad, masks = encoder_outs[0], encoder_outs[1] |
| | | |
| | |
| | | def get_output_size(self): |
| | | return self.model.encoders[0].size |
| | | |
| | | def get_dummy_inputs(self): |
| | | feats = torch.randn(1, 100, self.feats_dim) |
| | | return (feats) |
| | | |
| | | def get_input_names(self): |
| | | return ['feats'] |
| | | |
| | | def get_output_names(self): |
| | | return ['encoder_out', 'encoder_out_lens', 'predictor_weight'] |
| | | |
| | | def get_dynamic_axes(self): |
| | | return { |
| | | 'feats': { |
| | | 1: 'feats_length' |
| | | }, |
| | | 'encoder_out': { |
| | | 1: 'enc_out_length' |
| | | }, |
| | | 'predictor_weight': { |
| | | 1: 'pre_out_length' |
| | | } |
| | | |
| | | } |
| | | # def get_dummy_inputs(self): |
| | | # feats = torch.randn(1, 100, self.feats_dim) |
| | | # return (feats) |
| | | # |
| | | # def get_input_names(self): |
| | | # return ['feats'] |
| | | # |
| | | # def get_output_names(self): |
| | | # return ['encoder_out', 'encoder_out_lens', 'predictor_weight'] |
| | | # |
| | | # def get_dynamic_axes(self): |
| | | # return { |
| | | # 'feats': { |
| | | # 1: 'feats_length' |
| | | # }, |
| | | # 'encoder_out': { |
| | | # 1: 'enc_out_length' |
| | | # }, |
| | | # 'predictor_weight': { |
| | | # 1: 'pre_out_length' |
| | | # } |
| | | # |
| | | # } |