From 0cc7af2c894a31f1b6d95d8af7e4efa414e2a11b Mon Sep 17 00:00:00 2001
From: zhifu gao <zhifu.gzf@alibaba-inc.com>
Date: 星期五, 07 四月 2023 15:54:21 +0800
Subject: [PATCH] Merge pull request #328 from alibaba-damo-academy/dev_cmz2
---
funasr/models/encoder/sanm_encoder.py | 232 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 230 insertions(+), 2 deletions(-)
diff --git a/funasr/models/encoder/sanm_encoder.py b/funasr/models/encoder/sanm_encoder.py
index 57890ef..2a3a353 100644
--- a/funasr/models/encoder/sanm_encoder.py
+++ b/funasr/models/encoder/sanm_encoder.py
@@ -10,7 +10,7 @@
from typeguard import check_argument_types
import numpy as np
from funasr.modules.nets_utils import make_pad_mask
-from funasr.modules.attention import MultiHeadedAttention, MultiHeadedAttentionSANM
+from funasr.modules.attention import MultiHeadedAttention, MultiHeadedAttentionSANM, MultiHeadedAttentionSANMwithMask
from funasr.modules.embedding import SinusoidalPositionEncoder
from funasr.modules.layer_norm import LayerNorm
from funasr.modules.multi_layer_conv import Conv1dLinear
@@ -27,7 +27,7 @@
from funasr.modules.subsampling import check_short_utt
from funasr.models.ctc import CTC
from funasr.models.encoder.abs_encoder import AbsEncoder
-
+from funasr.modules.mask import subsequent_mask, vad_mask
class EncoderLayerSANM(nn.Module):
def __init__(
@@ -958,3 +958,231 @@
var_dict_tf[name_tf].shape))
return var_dict_torch_update
+
+
+class SANMVadEncoder(AbsEncoder):
+ """
+ author: Speech Lab, Alibaba Group, China
+
+ """
+
+ def __init__(
+ self,
+ input_size: int,
+ output_size: int = 256,
+ attention_heads: int = 4,
+ linear_units: int = 2048,
+ num_blocks: int = 6,
+ dropout_rate: float = 0.1,
+ positional_dropout_rate: float = 0.1,
+ attention_dropout_rate: float = 0.0,
+ input_layer: Optional[str] = "conv2d",
+ pos_enc_class=SinusoidalPositionEncoder,
+ normalize_before: bool = True,
+ concat_after: bool = False,
+ positionwise_layer_type: str = "linear",
+ positionwise_conv_kernel_size: int = 1,
+ padding_idx: int = -1,
+ interctc_layer_idx: List[int] = [],
+ interctc_use_conditioning: bool = False,
+ kernel_size : int = 11,
+ sanm_shfit : int = 0,
+ selfattention_layer_type: str = "sanm",
+ ):
+ assert check_argument_types()
+ super().__init__()
+ self._output_size = output_size
+
+ if input_layer == "linear":
+ self.embed = torch.nn.Sequential(
+ torch.nn.Linear(input_size, output_size),
+ torch.nn.LayerNorm(output_size),
+ torch.nn.Dropout(dropout_rate),
+ torch.nn.ReLU(),
+ pos_enc_class(output_size, positional_dropout_rate),
+ )
+ elif input_layer == "conv2d":
+ self.embed = Conv2dSubsampling(input_size, output_size, dropout_rate)
+ elif input_layer == "conv2d2":
+ self.embed = Conv2dSubsampling2(input_size, output_size, dropout_rate)
+ elif input_layer == "conv2d6":
+ self.embed = Conv2dSubsampling6(input_size, output_size, dropout_rate)
+ elif input_layer == "conv2d8":
+ self.embed = Conv2dSubsampling8(input_size, output_size, dropout_rate)
+ elif input_layer == "embed":
+ self.embed = torch.nn.Sequential(
+ torch.nn.Embedding(input_size, output_size, padding_idx=padding_idx),
+ SinusoidalPositionEncoder(),
+ )
+ elif input_layer is None:
+ if input_size == output_size:
+ self.embed = None
+ else:
+ self.embed = torch.nn.Linear(input_size, output_size)
+ elif input_layer == "pe":
+ self.embed = SinusoidalPositionEncoder()
+ else:
+ raise ValueError("unknown input_layer: " + input_layer)
+ self.normalize_before = normalize_before
+ if positionwise_layer_type == "linear":
+ positionwise_layer = PositionwiseFeedForward
+ positionwise_layer_args = (
+ output_size,
+ linear_units,
+ dropout_rate,
+ )
+ elif positionwise_layer_type == "conv1d":
+ positionwise_layer = MultiLayeredConv1d
+ positionwise_layer_args = (
+ output_size,
+ linear_units,
+ positionwise_conv_kernel_size,
+ dropout_rate,
+ )
+ elif positionwise_layer_type == "conv1d-linear":
+ positionwise_layer = Conv1dLinear
+ positionwise_layer_args = (
+ output_size,
+ linear_units,
+ positionwise_conv_kernel_size,
+ dropout_rate,
+ )
+ else:
+ raise NotImplementedError("Support only linear or conv1d.")
+
+ if selfattention_layer_type == "selfattn":
+ encoder_selfattn_layer = MultiHeadedAttention
+ encoder_selfattn_layer_args = (
+ attention_heads,
+ output_size,
+ attention_dropout_rate,
+ )
+
+ elif selfattention_layer_type == "sanm":
+ self.encoder_selfattn_layer = MultiHeadedAttentionSANMwithMask
+ encoder_selfattn_layer_args0 = (
+ attention_heads,
+ input_size,
+ output_size,
+ attention_dropout_rate,
+ kernel_size,
+ sanm_shfit,
+ )
+
+ encoder_selfattn_layer_args = (
+ attention_heads,
+ output_size,
+ output_size,
+ attention_dropout_rate,
+ kernel_size,
+ sanm_shfit,
+ )
+
+ self.encoders0 = repeat(
+ 1,
+ lambda lnum: EncoderLayerSANM(
+ input_size,
+ output_size,
+ self.encoder_selfattn_layer(*encoder_selfattn_layer_args0),
+ positionwise_layer(*positionwise_layer_args),
+ dropout_rate,
+ normalize_before,
+ concat_after,
+ ),
+ )
+
+ self.encoders = repeat(
+ num_blocks-1,
+ lambda lnum: EncoderLayerSANM(
+ output_size,
+ output_size,
+ self.encoder_selfattn_layer(*encoder_selfattn_layer_args),
+ positionwise_layer(*positionwise_layer_args),
+ dropout_rate,
+ normalize_before,
+ concat_after,
+ ),
+ )
+ if self.normalize_before:
+ self.after_norm = LayerNorm(output_size)
+
+ self.interctc_layer_idx = interctc_layer_idx
+ if len(interctc_layer_idx) > 0:
+ assert 0 < min(interctc_layer_idx) and max(interctc_layer_idx) < num_blocks
+ self.interctc_use_conditioning = interctc_use_conditioning
+ self.conditioning_layer = None
+ self.dropout = nn.Dropout(dropout_rate)
+
+ def output_size(self) -> int:
+ return self._output_size
+
+ def forward(
+ self,
+ xs_pad: torch.Tensor,
+ ilens: torch.Tensor,
+ vad_indexes: torch.Tensor,
+ prev_states: torch.Tensor = None,
+ ctc: CTC = None,
+ ) -> Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor]]:
+ """Embed positions in tensor.
+
+ Args:
+ xs_pad: input tensor (B, L, D)
+ ilens: input length (B)
+ prev_states: Not to be used now.
+ Returns:
+ position embedded tensor and mask
+ """
+ masks = (~make_pad_mask(ilens)[:, None, :]).to(xs_pad.device)
+ sub_masks = subsequent_mask(masks.size(-1), device=xs_pad.device).unsqueeze(0)
+ no_future_masks = masks & sub_masks
+ xs_pad *= self.output_size()**0.5
+ if self.embed is None:
+ xs_pad = xs_pad
+ elif (isinstance(self.embed, Conv2dSubsampling) or isinstance(self.embed, Conv2dSubsampling2)
+ or isinstance(self.embed, Conv2dSubsampling6) or isinstance(self.embed, Conv2dSubsampling8)):
+ short_status, limit_size = check_short_utt(self.embed, xs_pad.size(1))
+ if short_status:
+ raise TooShortUttError(
+ f"has {xs_pad.size(1)} frames and is too short for subsampling " +
+ f"(it needs more than {limit_size} frames), return empty results",
+ xs_pad.size(1),
+ limit_size,
+ )
+ xs_pad, masks = self.embed(xs_pad, masks)
+ else:
+ xs_pad = self.embed(xs_pad)
+
+ # xs_pad = self.dropout(xs_pad)
+ mask_tup0 = [masks, no_future_masks]
+ encoder_outs = self.encoders0(xs_pad, mask_tup0)
+ xs_pad, _ = encoder_outs[0], encoder_outs[1]
+ intermediate_outs = []
+
+
+ for layer_idx, encoder_layer in enumerate(self.encoders):
+ if layer_idx + 1 == len(self.encoders):
+ # This is last layer.
+ coner_mask = torch.ones(masks.size(0),
+ masks.size(-1),
+ masks.size(-1),
+ device=xs_pad.device,
+ dtype=torch.bool)
+ for word_index, length in enumerate(ilens):
+ coner_mask[word_index, :, :] = vad_mask(masks.size(-1),
+ vad_indexes[word_index],
+ device=xs_pad.device)
+ layer_mask = masks & coner_mask
+ else:
+ layer_mask = no_future_masks
+ mask_tup1 = [masks, layer_mask]
+ encoder_outs = encoder_layer(xs_pad, mask_tup1)
+ xs_pad, layer_mask = encoder_outs[0], encoder_outs[1]
+
+ if self.normalize_before:
+ xs_pad = self.after_norm(xs_pad)
+
+ olens = masks.squeeze(1).sum(1)
+ if len(intermediate_outs) > 0:
+ return (xs_pad, intermediate_outs), olens, None
+ return xs_pad, olens, None
--
Gitblit v1.9.1