| File was renamed from funasr/models/transformer/transformer_encoder.py |
| | |
| | | from torch import nn |
| | | import logging |
| | | |
| | | from funasr.models.ctc import CTC |
| | | from funasr.models.encoder.abs_encoder import AbsEncoder |
| | | from funasr.models.transformer.attention import MultiHeadedAttention |
| | | from funasr.models.transformer.embedding import PositionalEncoding |
| | | from funasr.models.transformer.layer_norm import LayerNorm |
| | |
| | | from funasr.models.transformer.positionwise_feed_forward import ( |
| | | PositionwiseFeedForward, # noqa: H301 |
| | | ) |
| | | from funasr.models.transformer.repeat import repeat |
| | | from funasr.models.transformer.utils.nets_utils import rename_state_dict |
| | | from funasr.models.transformer.utils.repeat import repeat |
| | | from funasr.models.transformer.utils.dynamic_conv import DynamicConvolution |
| | | from funasr.models.transformer.utils.dynamic_conv2d import DynamicConvolution2D |
| | | from funasr.models.transformer.utils.lightconv import LightweightConvolution |
| | | from funasr.models.transformer.utils.lightconv2d import LightweightConvolution2D |
| | | from funasr.models.transformer.subsampling import Conv2dSubsampling |
| | | from funasr.models.transformer.subsampling import Conv2dSubsampling2 |
| | | from funasr.models.transformer.subsampling import Conv2dSubsampling6 |
| | | from funasr.models.transformer.subsampling import Conv2dSubsampling8 |
| | | from funasr.models.transformer.subsampling import TooShortUttError |
| | | from funasr.models.transformer.subsampling import check_short_utt |
| | | from funasr.models.transformer.utils.subsampling import Conv2dSubsampling |
| | | from funasr.models.transformer.utils.subsampling import Conv2dSubsampling2 |
| | | from funasr.models.transformer.utils.subsampling import Conv2dSubsampling6 |
| | | from funasr.models.transformer.utils.subsampling import Conv2dSubsampling8 |
| | | from funasr.models.transformer.utils.subsampling import TooShortUttError |
| | | from funasr.models.transformer.utils.subsampling import check_short_utt |
| | | |
| | | |
| | | class EncoderLayer(nn.Module): |
| | |
| | | return x, mask |
| | | |
| | | |
| | | class TransformerEncoder(AbsEncoder): |
| | | """Transformer encoder module. |
| | | |
| | | Args: |
| | | input_size: input dim |
| | | output_size: dimension of attention |
| | | attention_heads: the number of heads of multi head attention |
| | | linear_units: the number of units of position-wise feed forward |
| | | num_blocks: the number of decoder blocks |
| | | dropout_rate: dropout rate |
| | | attention_dropout_rate: dropout rate in attention |
| | | positional_dropout_rate: dropout rate after adding positional encoding |
| | | input_layer: input layer type |
| | | pos_enc_class: PositionalEncoding or ScaledPositionalEncoding |
| | | normalize_before: whether to use layer_norm before the first block |
| | | concat_after: whether to concat attention layer's input and output |
| | | if True, additional linear will be applied. |
| | | i.e. x -> x + linear(concat(x, att(x))) |
| | | if False, no additional linear will be applied. |
| | | i.e. x -> x + att(x) |
| | | positionwise_layer_type: linear of conv1d |
| | | positionwise_conv_kernel_size: kernel size of positionwise conv1d layer |
| | | padding_idx: padding_idx for input_layer=embed |
| | | """ |
| | | |
| | | def __init__( |
| | | self, |
| | | input_size: int, |
| | | output_size: int = 256, |
| | | attention_heads: int = 4, |
| | | linear_units: int = 2048, |
| | | num_blocks: int = 6, |
| | | dropout_rate: float = 0.1, |
| | | positional_dropout_rate: float = 0.1, |
| | | attention_dropout_rate: float = 0.0, |
| | | input_layer: Optional[str] = "conv2d", |
| | | pos_enc_class=PositionalEncoding, |
| | | normalize_before: bool = True, |
| | | concat_after: bool = False, |
| | | positionwise_layer_type: str = "linear", |
| | | positionwise_conv_kernel_size: int = 1, |
| | | padding_idx: int = -1, |
| | | interctc_layer_idx: List[int] = [], |
| | | interctc_use_conditioning: bool = False, |
| | | ): |
| | | super().__init__() |
| | | self._output_size = output_size |
| | | |
| | | if input_layer == "linear": |
| | | self.embed = torch.nn.Sequential( |
| | | torch.nn.Linear(input_size, output_size), |
| | | torch.nn.LayerNorm(output_size), |
| | | torch.nn.Dropout(dropout_rate), |
| | | torch.nn.ReLU(), |
| | | pos_enc_class(output_size, positional_dropout_rate), |
| | | ) |
| | | elif input_layer == "conv2d": |
| | | self.embed = Conv2dSubsampling(input_size, output_size, dropout_rate) |
| | | elif input_layer == "conv2d2": |
| | | self.embed = Conv2dSubsampling2(input_size, output_size, dropout_rate) |
| | | elif input_layer == "conv2d6": |
| | | self.embed = Conv2dSubsampling6(input_size, output_size, dropout_rate) |
| | | elif input_layer == "conv2d8": |
| | | self.embed = Conv2dSubsampling8(input_size, output_size, dropout_rate) |
| | | elif input_layer == "embed": |
| | | self.embed = torch.nn.Sequential( |
| | | torch.nn.Embedding(input_size, output_size, padding_idx=padding_idx), |
| | | pos_enc_class(output_size, positional_dropout_rate), |
| | | ) |
| | | elif input_layer is None: |
| | | if input_size == output_size: |
| | | self.embed = None |
| | | else: |
| | | self.embed = torch.nn.Linear(input_size, output_size) |
| | | else: |
| | | raise ValueError("unknown input_layer: " + input_layer) |
| | | self.normalize_before = normalize_before |
| | | if positionwise_layer_type == "linear": |
| | | positionwise_layer = PositionwiseFeedForward |
| | | positionwise_layer_args = ( |
| | | output_size, |
| | | linear_units, |
| | | dropout_rate, |
| | | ) |
| | | elif positionwise_layer_type == "conv1d": |
| | | positionwise_layer = MultiLayeredConv1d |
| | | positionwise_layer_args = ( |
| | | output_size, |
| | | linear_units, |
| | | positionwise_conv_kernel_size, |
| | | dropout_rate, |
| | | ) |
| | | elif positionwise_layer_type == "conv1d-linear": |
| | | positionwise_layer = Conv1dLinear |
| | | positionwise_layer_args = ( |
| | | output_size, |
| | | linear_units, |
| | | positionwise_conv_kernel_size, |
| | | dropout_rate, |
| | | ) |
| | | else: |
| | | raise NotImplementedError("Support only linear or conv1d.") |
| | | self.encoders = repeat( |
| | | num_blocks, |
| | | lambda lnum: EncoderLayer( |
| | | output_size, |
| | | MultiHeadedAttention( |
| | | attention_heads, output_size, attention_dropout_rate |
| | | ), |
| | | positionwise_layer(*positionwise_layer_args), |
| | | dropout_rate, |
| | | normalize_before, |
| | | concat_after, |
| | | ), |
| | | ) |
| | | if self.normalize_before: |
| | | self.after_norm = LayerNorm(output_size) |
| | | |
| | | self.interctc_layer_idx = interctc_layer_idx |
| | | if len(interctc_layer_idx) > 0: |
| | | assert 0 < min(interctc_layer_idx) and max(interctc_layer_idx) < num_blocks |
| | | self.interctc_use_conditioning = interctc_use_conditioning |
| | | self.conditioning_layer = None |
| | | |
| | | def output_size(self) -> int: |
| | | return self._output_size |
| | | |
| | | def forward( |
| | | self, |
| | | xs_pad: torch.Tensor, |
| | | ilens: torch.Tensor, |
| | | prev_states: torch.Tensor = None, |
| | | ctc: CTC = None, |
| | | ) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]: |
| | | """Embed positions in tensor. |
| | | |
| | | Args: |
| | | xs_pad: input tensor (B, L, D) |
| | | ilens: input length (B) |
| | | prev_states: Not to be used now. |
| | | Returns: |
| | | position embedded tensor and mask |
| | | """ |
| | | masks = (~make_pad_mask(ilens)[:, None, :]).to(xs_pad.device) |
| | | |
| | | if self.embed is None: |
| | | xs_pad = xs_pad |
| | | elif ( |
| | | isinstance(self.embed, Conv2dSubsampling) |
| | | or isinstance(self.embed, Conv2dSubsampling2) |
| | | or isinstance(self.embed, Conv2dSubsampling6) |
| | | or isinstance(self.embed, Conv2dSubsampling8) |
| | | ): |
| | | short_status, limit_size = check_short_utt(self.embed, xs_pad.size(1)) |
| | | if short_status: |
| | | raise TooShortUttError( |
| | | f"has {xs_pad.size(1)} frames and is too short for subsampling " |
| | | + f"(it needs more than {limit_size} frames), return empty results", |
| | | xs_pad.size(1), |
| | | limit_size, |
| | | ) |
| | | xs_pad, masks = self.embed(xs_pad, masks) |
| | | else: |
| | | xs_pad = self.embed(xs_pad) |
| | | |
| | | intermediate_outs = [] |
| | | if len(self.interctc_layer_idx) == 0: |
| | | xs_pad, masks = self.encoders(xs_pad, masks) |
| | | else: |
| | | for layer_idx, encoder_layer in enumerate(self.encoders): |
| | | xs_pad, masks = encoder_layer(xs_pad, masks) |
| | | |
| | | if layer_idx + 1 in self.interctc_layer_idx: |
| | | encoder_out = xs_pad |
| | | |
| | | # intermediate outputs are also normalized |
| | | if self.normalize_before: |
| | | encoder_out = self.after_norm(encoder_out) |
| | | |
| | | intermediate_outs.append((layer_idx + 1, encoder_out)) |
| | | |
| | | if self.interctc_use_conditioning: |
| | | ctc_out = ctc.softmax(encoder_out) |
| | | xs_pad = xs_pad + self.conditioning_layer(ctc_out) |
| | | |
| | | if self.normalize_before: |
| | | xs_pad = self.after_norm(xs_pad) |
| | | |
| | | olens = masks.squeeze(1).sum(1) |
| | | if len(intermediate_outs) > 0: |
| | | return (xs_pad, intermediate_outs), olens, None |
| | | return xs_pad, olens, None |
| | | |
| | | |
| | | def _pre_hook( |
| | | state_dict, |
| | | prefix, |
| | | local_metadata, |
| | | strict, |
| | | missing_keys, |
| | | unexpected_keys, |
| | | error_msgs, |
| | | ): |
| | | # https://github.com/espnet/espnet/commit/21d70286c354c66c0350e65dc098d2ee236faccc#diff-bffb1396f038b317b2b64dd96e6d3563 |
| | | rename_state_dict(prefix + "input_layer.", prefix + "embed.", state_dict) |
| | | # https://github.com/espnet/espnet/commit/3d422f6de8d4f03673b89e1caef698745ec749ea#diff-bffb1396f038b317b2b64dd96e6d3563 |
| | | rename_state_dict(prefix + "norm.", prefix + "after_norm.", state_dict) |
| | | |
| | | |
| | | class TransformerEncoder_s0(torch.nn.Module): |
| | | class TransformerEncoder_lm(nn.Module): |
| | | """Transformer encoder module. |
| | | |
| | | Args: |
| | |
| | | conditioning_layer_dim=None, |
| | | ): |
| | | """Construct an Encoder object.""" |
| | | super(TransformerEncoder_s0, self).__init__() |
| | | self._register_load_state_dict_pre_hook(_pre_hook) |
| | | super().__init__() |
| | | |
| | | self.conv_subsampling_factor = 1 |
| | | if input_layer == "linear": |