From 4ace5a95b052d338947fc88809a440ccd55cf6b4 Mon Sep 17 00:00:00 2001
From: 游雁 <zhifu.gzf@alibaba-inc.com>
Date: 星期四, 16 十一月 2023 16:39:52 +0800
Subject: [PATCH] funasr pages
---
funasr/modules/subsampling.py | 331 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 320 insertions(+), 11 deletions(-)
diff --git a/funasr/modules/subsampling.py b/funasr/modules/subsampling.py
index f9a1c16..af33aef 100644
--- a/funasr/modules/subsampling.py
+++ b/funasr/modules/subsampling.py
@@ -5,11 +5,15 @@
# Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
"""Subsampling layer definition."""
-
+import numpy as np
import torch
import torch.nn.functional as F
from funasr.modules.embedding import PositionalEncoding
-
+import logging
+from funasr.modules.streaming_utils.utils import sequence_mask
+from funasr.modules.nets_utils import sub_factor_to_params, pad_to_len
+from typing import Optional, Tuple, Union
+import math
class TooShortUttError(Exception):
"""Raised when the utt is too short for subsampling.
@@ -87,6 +91,72 @@
if x_mask is None:
return x, None
return x, x_mask[:, :, :-2:2][:, :, :-2:2]
+
+ def __getitem__(self, key):
+ """Get item.
+
+ When reset_parameters() is called, if use_scaled_pos_enc is used,
+ return the positioning encoding.
+
+ """
+ if key != -1:
+ raise NotImplementedError("Support only `-1` (for `reset_parameters`).")
+ return self.out[key]
+
+class Conv2dSubsamplingPad(torch.nn.Module):
+ """Convolutional 2D subsampling (to 1/4 length).
+
+ Args:
+ idim (int): Input dimension.
+ odim (int): Output dimension.
+ dropout_rate (float): Dropout rate.
+ pos_enc (torch.nn.Module): Custom position encoding layer.
+
+ """
+
+ def __init__(self, idim, odim, dropout_rate, pos_enc=None):
+ """Construct an Conv2dSubsampling object."""
+ super(Conv2dSubsamplingPad, self).__init__()
+ self.conv = torch.nn.Sequential(
+ torch.nn.Conv2d(1, odim, 3, 2, padding=(0, 0)),
+ torch.nn.ReLU(),
+ torch.nn.Conv2d(odim, odim, 3, 2, padding=(0, 0)),
+ torch.nn.ReLU(),
+ )
+ self.out = torch.nn.Sequential(
+ torch.nn.Linear(odim * (((idim - 1) // 2 - 1) // 2), odim),
+ pos_enc if pos_enc is not None else PositionalEncoding(odim, dropout_rate),
+ )
+ self.pad_fn = torch.nn.ConstantPad1d((0, 4), 0.0)
+
+ def forward(self, x, x_mask):
+ """Subsample x.
+
+ Args:
+ x (torch.Tensor): Input tensor (#batch, time, idim).
+ x_mask (torch.Tensor): Input mask (#batch, 1, time).
+
+ Returns:
+ torch.Tensor: Subsampled tensor (#batch, time', odim),
+ where time' = time // 4.
+ torch.Tensor: Subsampled mask (#batch, 1, time'),
+ where time' = time // 4.
+
+ """
+ x = x.transpose(1, 2)
+ x = self.pad_fn(x)
+ x = x.transpose(1, 2)
+ x = x.unsqueeze(1) # (b, c, t, f)
+ x = self.conv(x)
+ b, c, t, f = x.size()
+ x = self.out(x.transpose(1, 2).contiguous().view(b, t, c * f))
+ if x_mask is None:
+ return x, None
+ x_len = torch.sum(x_mask[:, 0, :], dim=-1)
+ x_len = (x_len - 1) // 2 + 1
+ x_len = (x_len - 1) // 2 + 1
+ mask = sequence_mask(x_len, None, x_len.dtype, x[0].device)
+ return x, mask[:, None, :]
def __getitem__(self, key):
"""Get item.
@@ -267,12 +337,17 @@
"""
- def __init__(self, idim, odim, kernel_size, stride, pad):
+ def __init__(self, idim, odim, kernel_size, stride, pad,
+ tf2torch_tensor_name_prefix_torch: str = "stride_conv",
+ tf2torch_tensor_name_prefix_tf: str = "seq2seq/proj_encoder/downsampling",
+ ):
super(Conv1dSubsampling, self).__init__()
self.conv = torch.nn.Conv1d(idim, odim, kernel_size, stride)
self.pad_fn = torch.nn.ConstantPad1d(pad, 0.0)
self.stride = stride
self.odim = odim
+ self.tf2torch_tensor_name_prefix_torch = tf2torch_tensor_name_prefix_torch
+ self.tf2torch_tensor_name_prefix_tf = tf2torch_tensor_name_prefix_tf
def output_size(self) -> int:
return self.odim
@@ -283,7 +358,8 @@
"""
x = x.transpose(1, 2) # (b, d ,t)
x = self.pad_fn(x)
- x = F.relu(self.conv(x))
+ #x = F.relu(self.conv(x))
+ x = F.leaky_relu(self.conv(x), negative_slope=0.)
x = x.transpose(1, 2) # (b, t ,d)
if x_len is None:
@@ -292,13 +368,246 @@
x_len = (x_len - 1) // self.stride + 1
return x, x_len
- def __getitem__(self, key):
- """Get item.
+ def gen_tf2torch_map_dict(self):
+ tensor_name_prefix_torch = self.tf2torch_tensor_name_prefix_torch
+ tensor_name_prefix_tf = self.tf2torch_tensor_name_prefix_tf
+ map_dict_local = {
+ ## predictor
+ "{}.conv.weight".format(tensor_name_prefix_torch):
+ {"name": "{}/conv1d/kernel".format(tensor_name_prefix_tf),
+ "squeeze": None,
+ "transpose": (2, 1, 0),
+ }, # (256,256,3),(3,256,256)
+ "{}.conv.bias".format(tensor_name_prefix_torch):
+ {"name": "{}/conv1d/bias".format(tensor_name_prefix_tf),
+ "squeeze": None,
+ "transpose": None,
+ }, # (256,),(256,)
+ }
+ return map_dict_local
- When reset_parameters() is called, if use_scaled_pos_enc is used,
- return the positioning encoding.
+ def convert_tf2torch(self,
+ var_dict_tf,
+ var_dict_torch,
+ ):
+
+ map_dict = self.gen_tf2torch_map_dict()
+
+ var_dict_torch_update = dict()
+ for name in sorted(var_dict_torch.keys(), reverse=False):
+ names = name.split('.')
+ if names[0] == self.tf2torch_tensor_name_prefix_torch:
+ name_tf = map_dict[name]["name"]
+ data_tf = var_dict_tf[name_tf]
+ if map_dict[name]["squeeze"] is not None:
+ data_tf = np.squeeze(data_tf, axis=map_dict[name]["squeeze"])
+ if map_dict[name]["transpose"] is not None:
+ data_tf = np.transpose(data_tf, map_dict[name]["transpose"])
+ data_tf = torch.from_numpy(data_tf).type(torch.float32).to("cpu")
+
+ var_dict_torch_update[name] = data_tf
+
+ logging.info(
+ "torch tensor: {}, {}, loading from tf tensor: {}, {}".format(name, data_tf.size(), name_tf,
+ var_dict_tf[name_tf].shape))
+ return var_dict_torch_update
+class StreamingConvInput(torch.nn.Module):
+ """Streaming ConvInput module definition.
+ Args:
+ input_size: Input size.
+ conv_size: Convolution size.
+ subsampling_factor: Subsampling factor.
+ vgg_like: Whether to use a VGG-like network.
+ output_size: Block output dimension.
+ """
+
+ def __init__(
+ self,
+ input_size: int,
+ conv_size: Union[int, Tuple],
+ subsampling_factor: int = 4,
+ vgg_like: bool = True,
+ conv_kernel_size: int = 3,
+ output_size: Optional[int] = None,
+ ) -> None:
+ """Construct a ConvInput object."""
+ super().__init__()
+ if vgg_like:
+ if subsampling_factor == 1:
+ conv_size1, conv_size2 = conv_size
+
+ self.conv = torch.nn.Sequential(
+ torch.nn.Conv2d(1, conv_size1, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
+ torch.nn.ReLU(),
+ torch.nn.Conv2d(conv_size1, conv_size1, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
+ torch.nn.ReLU(),
+ torch.nn.MaxPool2d((1, 2)),
+ torch.nn.Conv2d(conv_size1, conv_size2, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
+ torch.nn.ReLU(),
+ torch.nn.Conv2d(conv_size2, conv_size2, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
+ torch.nn.ReLU(),
+ torch.nn.MaxPool2d((1, 2)),
+ )
+
+ output_proj = conv_size2 * ((input_size // 2) // 2)
+
+ self.subsampling_factor = 1
+
+ self.stride_1 = 1
+
+ self.create_new_mask = self.create_new_vgg_mask
+
+ else:
+ conv_size1, conv_size2 = conv_size
+
+ kernel_1 = int(subsampling_factor / 2)
+
+ self.conv = torch.nn.Sequential(
+ torch.nn.Conv2d(1, conv_size1, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
+ torch.nn.ReLU(),
+ torch.nn.Conv2d(conv_size1, conv_size1, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
+ torch.nn.ReLU(),
+ torch.nn.MaxPool2d((kernel_1, 2)),
+ torch.nn.Conv2d(conv_size1, conv_size2, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
+ torch.nn.ReLU(),
+ torch.nn.Conv2d(conv_size2, conv_size2, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
+ torch.nn.ReLU(),
+ torch.nn.MaxPool2d((2, 2)),
+ )
+
+ output_proj = conv_size2 * ((input_size // 2) // 2)
+
+ self.subsampling_factor = subsampling_factor
+
+ self.create_new_mask = self.create_new_vgg_mask
+
+ self.stride_1 = kernel_1
+
+ else:
+ if subsampling_factor == 1:
+ self.conv = torch.nn.Sequential(
+ torch.nn.Conv2d(1, conv_size, 3, [1,2], [1,0]),
+ torch.nn.ReLU(),
+ torch.nn.Conv2d(conv_size, conv_size, conv_kernel_size, [1,2], [1,0]),
+ torch.nn.ReLU(),
+ )
+
+ output_proj = conv_size * (((input_size - 1) // 2 - 1) // 2)
+
+ self.subsampling_factor = subsampling_factor
+ self.kernel_2 = conv_kernel_size
+ self.stride_2 = 1
+
+ self.create_new_mask = self.create_new_conv2d_mask
+
+ else:
+ kernel_2, stride_2, conv_2_output_size = sub_factor_to_params(
+ subsampling_factor,
+ input_size,
+ )
+
+ self.conv = torch.nn.Sequential(
+ torch.nn.Conv2d(1, conv_size, 3, 2, [1,0]),
+ torch.nn.ReLU(),
+ torch.nn.Conv2d(conv_size, conv_size, kernel_2, stride_2, [(kernel_2-1)//2, 0]),
+ torch.nn.ReLU(),
+ )
+
+ output_proj = conv_size * conv_2_output_size
+
+ self.subsampling_factor = subsampling_factor
+ self.kernel_2 = kernel_2
+ self.stride_2 = stride_2
+
+ self.create_new_mask = self.create_new_conv2d_mask
+
+ self.vgg_like = vgg_like
+ self.min_frame_length = 7
+
+ if output_size is not None:
+ self.output = torch.nn.Linear(output_proj, output_size)
+ self.output_size = output_size
+ else:
+ self.output = None
+ self.output_size = output_proj
+
+ def forward(
+ self, x: torch.Tensor, mask: Optional[torch.Tensor], chunk_size: Optional[torch.Tensor]
+ ) -> Tuple[torch.Tensor, torch.Tensor]:
+ """Encode input sequences.
+ Args:
+ x: ConvInput input sequences. (B, T, D_feats)
+ mask: Mask of input sequences. (B, 1, T)
+ Returns:
+ x: ConvInput output sequences. (B, sub(T), D_out)
+ mask: Mask of output sequences. (B, 1, sub(T))
"""
- if key != -1:
- raise NotImplementedError("Support only `-1` (for `reset_parameters`).")
- return self.out[key]
+ if mask is not None:
+ mask = self.create_new_mask(mask)
+ olens = max(mask.eq(0).sum(1))
+
+ b, t, f = x.size()
+ x = x.unsqueeze(1) # (b. 1. t. f)
+
+ if chunk_size is not None:
+ max_input_length = int(
+ chunk_size * self.subsampling_factor * (math.ceil(float(t) / (chunk_size * self.subsampling_factor) ))
+ )
+ x = map(lambda inputs: pad_to_len(inputs, max_input_length, 1), x)
+ x = list(x)
+ x = torch.stack(x, dim=0)
+ N_chunks = max_input_length // ( chunk_size * self.subsampling_factor)
+ x = x.view(b * N_chunks, 1, chunk_size * self.subsampling_factor, f)
+
+ x = self.conv(x)
+
+ _, c, _, f = x.size()
+ if chunk_size is not None:
+ x = x.transpose(1, 2).contiguous().view(b, -1, c * f)[:,:olens,:]
+ else:
+ x = x.transpose(1, 2).contiguous().view(b, -1, c * f)
+
+ if self.output is not None:
+ x = self.output(x)
+
+ return x, mask[:,:olens][:,:x.size(1)]
+
+ def create_new_vgg_mask(self, mask: torch.Tensor) -> torch.Tensor:
+ """Create a new mask for VGG output sequences.
+ Args:
+ mask: Mask of input sequences. (B, T)
+ Returns:
+ mask: Mask of output sequences. (B, sub(T))
+ """
+ if self.subsampling_factor > 1:
+ vgg1_t_len = mask.size(1) - (mask.size(1) % (self.subsampling_factor // 2 ))
+ mask = mask[:, :vgg1_t_len][:, ::self.subsampling_factor // 2]
+
+ vgg2_t_len = mask.size(1) - (mask.size(1) % 2)
+ mask = mask[:, :vgg2_t_len][:, ::2]
+ else:
+ mask = mask
+
+ return mask
+
+ def create_new_conv2d_mask(self, mask: torch.Tensor) -> torch.Tensor:
+ """Create new conformer mask for Conv2d output sequences.
+ Args:
+ mask: Mask of input sequences. (B, T)
+ Returns:
+ mask: Mask of output sequences. (B, sub(T))
+ """
+ if self.subsampling_factor > 1:
+ return mask[:, ::2][:, ::self.stride_2]
+ else:
+ return mask
+
+ def get_size_before_subsampling(self, size: int) -> int:
+ """Return the original size before subsampling for a given size.
+ Args:
+ size: Number of frames after subsampling.
+ Returns:
+ : Number of frames before subsampling.
+ """
+ return size * self.subsampling_factor
--
Gitblit v1.9.1