Rin Arakaki
2024-12-24 1367973f9818d8e15c7bf52ad6ffba4ddb6ac2b2
funasr/models/sanm/encoder.py
@@ -69,7 +69,7 @@
        self.stochastic_depth_rate = stochastic_depth_rate
        self.dropout_rate = dropout_rate
    def forward(self, x, mask, cache=None, mask_shfit_chunk=None, mask_att_chunk_encoder=None):
    def forward(self, x, mask, cache=None, mask_shift_chunk=None, mask_att_chunk_encoder=None):
        """Compute encoded features.
        Args:
@@ -106,7 +106,7 @@
                    self.self_attn(
                        x,
                        mask,
                        mask_shfit_chunk=mask_shfit_chunk,
                        mask_shift_chunk=mask_shift_chunk,
                        mask_att_chunk_encoder=mask_att_chunk_encoder,
                    ),
                ),
@@ -122,7 +122,7 @@
                    self.self_attn(
                        x,
                        mask,
                        mask_shfit_chunk=mask_shfit_chunk,
                        mask_shift_chunk=mask_shift_chunk,
                        mask_att_chunk_encoder=mask_att_chunk_encoder,
                    )
                )
@@ -131,7 +131,7 @@
                    self.self_attn(
                        x,
                        mask,
                        mask_shfit_chunk=mask_shfit_chunk,
                        mask_shift_chunk=mask_shift_chunk,
                        mask_att_chunk_encoder=mask_att_chunk_encoder,
                    )
                )
@@ -145,7 +145,7 @@
        if not self.normalize_before:
            x = self.norm2(x)
        return x, mask, cache, mask_shfit_chunk, mask_att_chunk_encoder
        return x, mask, cache, mask_shift_chunk, mask_att_chunk_encoder
    def forward_chunk(self, x, cache=None, chunk_size=None, look_back=0):
        """Compute encoded features.
@@ -212,7 +212,7 @@
        interctc_layer_idx: List[int] = [],
        interctc_use_conditioning: bool = False,
        kernel_size: int = 11,
        sanm_shfit: int = 0,
        sanm_shift: int = 0,
        lora_list: List[str] = None,
        lora_rank: int = 8,
        lora_alpha: int = 16,
@@ -299,7 +299,7 @@
                output_size,
                attention_dropout_rate,
                kernel_size,
                sanm_shfit,
                sanm_shift,
                lora_list,
                lora_rank,
                lora_alpha,
@@ -312,7 +312,7 @@
                output_size,
                attention_dropout_rate,
                kernel_size,
                sanm_shfit,
                sanm_shift,
                lora_list,
                lora_rank,
                lora_alpha,
@@ -484,226 +484,6 @@
        return xs_pad, ilens, None
@tables.register("encoder_classes", "SANMTPEncoder")
class SANMTPEncoder(nn.Module):
    """
    Author: Speech Lab of DAMO Academy, Alibaba Group
    SCAMA: Streaming chunk-aware multihead attention for online end-to-end speech recognition
    https://arxiv.org/abs/2006.01713
    """
    def __init__(
            self,
            input_size: int,
            output_size: int = 256,
            attention_heads: int = 4,
            linear_units: int = 2048,
            num_blocks: int = 6,
            tp_blocks: int = 0,
            dropout_rate: float = 0.1,
            positional_dropout_rate: float = 0.1,
            attention_dropout_rate: float = 0.0,
            stochastic_depth_rate: float = 0.0,
            input_layer: Optional[str] = "conv2d",
            pos_enc_class=SinusoidalPositionEncoder,
            normalize_before: bool = True,
            concat_after: bool = False,
            positionwise_layer_type: str = "linear",
            positionwise_conv_kernel_size: int = 1,
            padding_idx: int = -1,
            kernel_size: int = 11,
            sanm_shfit: int = 0,
            selfattention_layer_type: str = "sanm",
    ):
        super().__init__()
        self._output_size = output_size
        if input_layer == "linear":
            self.embed = torch.nn.Sequential(
                torch.nn.Linear(input_size, output_size),
                torch.nn.LayerNorm(output_size),
                torch.nn.Dropout(dropout_rate),
                torch.nn.ReLU(),
                eval(pos_enc_class)(output_size, positional_dropout_rate),
            )
        elif input_layer == "linear_no_pos":
            self.embed = torch.nn.Sequential(
                torch.nn.Linear(input_size, output_size),
                torch.nn.LayerNorm(output_size),
                torch.nn.Dropout(dropout_rate),
                eval(pos_enc_class)(output_size, positional_dropout_rate, use_pos=False),
            )
        elif input_layer == "conv2d":
            self.embed = Conv2dSubsampling(input_size, output_size, dropout_rate)
        elif input_layer == "conv2d2":
            self.embed = Conv2dSubsampling2(input_size, output_size, dropout_rate)
        elif input_layer == "conv2d6":
            self.embed = Conv2dSubsampling6(input_size, output_size, dropout_rate)
        elif input_layer == "conv2d8":
            self.embed = Conv2dSubsampling8(input_size, output_size, dropout_rate)
        elif input_layer == "embed":
            self.embed = torch.nn.Sequential(
                torch.nn.Embedding(input_size, output_size, padding_idx=padding_idx),
                eval(pos_enc_class)(output_size, positional_dropout_rate),
            )
        elif input_layer is None:
            if input_size == output_size:
                self.embed = None
            else:
                self.embed = torch.nn.Linear(input_size, output_size)
        elif input_layer == "pe":
            self.embed = SinusoidalPositionEncoder()
        elif input_layer == "pe_online":
            self.embed = StreamSinusoidalPositionEncoder()
        else:
            raise ValueError("unknown input_layer: " + input_layer)
        self.normalize_before = normalize_before
        if positionwise_layer_type == "linear":
            positionwise_layer = PositionwiseFeedForward
            positionwise_layer_args = (
                output_size,
                linear_units,
                dropout_rate,
            )
        elif positionwise_layer_type == "conv1d":
            positionwise_layer = MultiLayeredConv1d
            positionwise_layer_args = (
                output_size,
                linear_units,
                positionwise_conv_kernel_size,
                dropout_rate,
            )
        elif positionwise_layer_type == "conv1d-linear":
            positionwise_layer = Conv1dLinear
            positionwise_layer_args = (
                output_size,
                linear_units,
                positionwise_conv_kernel_size,
                dropout_rate,
            )
        else:
            raise NotImplementedError("Support only linear or conv1d.")
        if selfattention_layer_type == "selfattn":
            encoder_selfattn_layer = MultiHeadedAttention
            encoder_selfattn_layer_args = (
                attention_heads,
                output_size,
                attention_dropout_rate,
            )
        elif selfattention_layer_type == "sanm":
            encoder_selfattn_layer = MultiHeadedAttentionSANM
            encoder_selfattn_layer_args0 = (
                attention_heads,
                input_size,
                output_size,
                attention_dropout_rate,
                kernel_size,
                sanm_shfit,
            )
            encoder_selfattn_layer_args = (
                attention_heads,
                output_size,
                output_size,
                attention_dropout_rate,
                kernel_size,
                sanm_shfit,
            )
        self.encoders0 = repeat(
            1,
            lambda lnum: EncoderLayerSANM(
                input_size,
                output_size,
                encoder_selfattn_layer(*encoder_selfattn_layer_args0),
                positionwise_layer(*positionwise_layer_args),
                dropout_rate,
                normalize_before,
                concat_after,
            ),
        )
        self.encoders = repeat(
            num_blocks - 1,
            lambda lnum: EncoderLayerSANM(
                output_size,
                output_size,
                encoder_selfattn_layer(*encoder_selfattn_layer_args),
                positionwise_layer(*positionwise_layer_args),
                dropout_rate,
                normalize_before,
                concat_after,
                stochastic_depth_rate,
            ),
        )
        self.tp_encoders = repeat(
            tp_blocks,
            lambda lnum: EncoderLayerSANM(
                output_size,
                output_size,
                encoder_selfattn_layer(*encoder_selfattn_layer_args),
                positionwise_layer(*positionwise_layer_args),
                dropout_rate,
                normalize_before,
                concat_after,
                stochastic_depth_rate,
            ),
        )
        if self.normalize_before:
            self.after_norm = LayerNorm(output_size)
        self.tp_blocks = tp_blocks
        if self.tp_blocks > 0:
            self.tp_norm = LayerNorm(output_size)
    def output_size(self) -> int:
        return self._output_size
    def forward(
            self,
            xs_pad: torch.Tensor,
            ilens: torch.Tensor,
    ) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]:
        """Embed positions in tensor.
        Args:
            xs_pad: input tensor (B, L, D)
            ilens: input length (B)
            prev_states: Not to be used now.
        Returns:
            position embedded tensor and mask
        """
        masks = (~make_pad_mask(ilens)[:, None, :]).to(xs_pad.device)
        xs_pad *= self.output_size() ** 0.5
        if self.embed is None:
            xs_pad = xs_pad
        elif (
                isinstance(self.embed, Conv2dSubsampling)
                or isinstance(self.embed, Conv2dSubsampling2)
                or isinstance(self.embed, Conv2dSubsampling6)
                or isinstance(self.embed, Conv2dSubsampling8)
        ):
            short_status, limit_size = check_short_utt(self.embed, xs_pad.size(1))
            if short_status:
                raise TooShortUttError(
                    f"has {xs_pad.size(1)} frames and is too short for subsampling "
                    + f"(it needs more than {limit_size} frames), return empty results",
                    xs_pad.size(1),
                    limit_size,
                )
            xs_pad, masks = self.embed(xs_pad, masks)
        else:
            xs_pad = self.embed(xs_pad)
        # forward encoder1
        mask_shfit_chunk, mask_att_chunk_encoder = None, None
        encoder_outs = self.encoders0(xs_pad, masks, None, mask_shfit_chunk, mask_att_chunk_encoder)
        xs_pad, masks = encoder_outs[0], encoder_outs[1]
        encoder_outs = self.encoders(xs_pad, masks, None, mask_shfit_chunk, mask_att_chunk_encoder)
        xs_pad, masks = encoder_outs[0], encoder_outs[1]
        if self.normalize_before:
            xs_pad = self.after_norm(xs_pad)
        # forward encoder2
        olens = masks.squeeze(1).sum(1)
        mask_shfit_chunk2, mask_att_chunk_encoder2 = None, None
        for layer_idx, encoder_layer in enumerate(self.tp_encoders):
            encoder_outs = encoder_layer(xs_pad, masks, None, mask_shfit_chunk2, mask_att_chunk_encoder2)
            xs_pad, masks = encoder_outs[0], encoder_outs[1]
        if self.tp_blocks > 0:
            xs_pad = self.tp_norm(xs_pad)
        return xs_pad, olens
class EncoderLayerSANMExport(nn.Module):
    def __init__(
        self,
@@ -743,6 +523,7 @@
        feats_dim=560,
        model_name="encoder",
        onnx: bool = True,
        ctc_linear: nn.Module = None,
    ):
        super().__init__()
        self.embed = model.embed
@@ -773,6 +554,8 @@
        self.num_heads = model.encoders[0].self_attn.h
        self.hidden_size = model.encoders[0].self_attn.linear_out.out_features
        self.ctc_linear = ctc_linear
    def prepare_mask(self, mask):
        mask_3d_btd = mask[:, :, None]
        if len(mask.shape) == 2:
@@ -786,6 +569,7 @@
    def forward(self, speech: torch.Tensor, speech_lengths: torch.Tensor, online: bool = False):
        if not online:
            speech = speech * self._output_size**0.5
        mask = self.make_pad_mask(speech_lengths)
        mask = self.prepare_mask(mask)
        if self.embed is None:
@@ -801,6 +585,10 @@
        xs_pad = self.model.after_norm(xs_pad)
        if self.ctc_linear is not None:
            xs_pad = self.ctc_linear(xs_pad)
            xs_pad = F.softmax(xs_pad, dim=2)
        return xs_pad, speech_lengths
    def get_output_size(self):