| | |
| | | #!/usr/bin/env python3 |
| | | # -*- coding: utf-8 -*- |
| | | # -*- encoding: utf-8 -*- |
| | | # Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights Reserved. |
| | | # MIT License (https://opensource.org/licenses/MIT) |
| | | |
| | | # Copyright 2019 Shigeki Karita |
| | | # Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0) |
| | | |
| | | """Subsampling layer definition.""" |
| | | import numpy as np |
| | | import torch |
| | | import torch.nn.functional as F |
| | | from funasr.models.transformer.embedding import PositionalEncoding |
| | | import logging |
| | | from funasr.models.scama.utils import sequence_mask |
| | | from funasr.models.transformer.utils.nets_utils import sub_factor_to_params, pad_to_len |
| | | from typing import Optional, Tuple, Union |
| | | import math |
| | | import torch |
| | | from typing import Optional, Tuple, Union |
| | | from funasr.models.transformer.utils.nets_utils import pad_to_len |
| | | |
| | | |
| | | class TooShortUttError(Exception): |
| | | """Raised when the utt is too short for subsampling. |
| | |
| | | conv_size1, conv_size2, conv_size3 = conv_size |
| | | |
| | | self.conv = torch.nn.Sequential( |
| | | torch.nn.Conv2d(1, conv_size1, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d(conv_size1, conv_size1, conv_kernel_size, stride=[1, 2], padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d(conv_size1, conv_size2, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d(conv_size2, conv_size2, conv_kernel_size, stride=[1, 2], padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d(conv_size2, conv_size3, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d(conv_size3, conv_size3, conv_kernel_size, stride=[1, 2], padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | 1, conv_size1, conv_kernel_size, stride=1, padding=(conv_kernel_size - 1) // 2 |
| | | ), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | conv_size1, |
| | | conv_size1, |
| | | conv_kernel_size, |
| | | stride=[1, 2], |
| | | padding=(conv_kernel_size - 1) // 2, |
| | | ), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | conv_size1, |
| | | conv_size2, |
| | | conv_kernel_size, |
| | | stride=1, |
| | | padding=(conv_kernel_size - 1) // 2, |
| | | ), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | conv_size2, |
| | | conv_size2, |
| | | conv_kernel_size, |
| | | stride=[1, 2], |
| | | padding=(conv_kernel_size - 1) // 2, |
| | | ), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | conv_size2, |
| | | conv_size3, |
| | | conv_kernel_size, |
| | | stride=1, |
| | | padding=(conv_kernel_size - 1) // 2, |
| | | ), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | conv_size3, |
| | | conv_size3, |
| | | conv_kernel_size, |
| | | stride=[1, 2], |
| | | padding=(conv_kernel_size - 1) // 2, |
| | | ), |
| | | torch.nn.ReLU(), |
| | | ) |
| | | |
| | | output_proj = conv_size3 * ((input_size // 2) // 2) |
| | |
| | | kernel_1 = int(subsampling_factor / 2) |
| | | |
| | | self.conv = torch.nn.Sequential( |
| | | torch.nn.Conv2d(1, conv_size1, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d(conv_size1, conv_size1, conv_kernel_size, stride=[kernel_1, 2], padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d(conv_size1, conv_size2, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d(conv_size2, conv_size2, conv_kernel_size, stride=[2, 2], padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d(conv_size2, conv_size3, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d(conv_size3, conv_size3, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | 1, conv_size1, conv_kernel_size, stride=1, padding=(conv_kernel_size - 1) // 2 |
| | | ), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | conv_size1, |
| | | conv_size1, |
| | | conv_kernel_size, |
| | | stride=[kernel_1, 2], |
| | | padding=(conv_kernel_size - 1) // 2, |
| | | ), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | conv_size1, |
| | | conv_size2, |
| | | conv_kernel_size, |
| | | stride=1, |
| | | padding=(conv_kernel_size - 1) // 2, |
| | | ), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | conv_size2, |
| | | conv_size2, |
| | | conv_kernel_size, |
| | | stride=[2, 2], |
| | | padding=(conv_kernel_size - 1) // 2, |
| | | ), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | conv_size2, |
| | | conv_size3, |
| | | conv_kernel_size, |
| | | stride=1, |
| | | padding=(conv_kernel_size - 1) // 2, |
| | | ), |
| | | torch.nn.ReLU(), |
| | | torch.nn.Conv2d( |
| | | conv_size3, |
| | | conv_size3, |
| | | conv_kernel_size, |
| | | stride=1, |
| | | padding=(conv_kernel_size - 1) // 2, |
| | | ), |
| | | torch.nn.ReLU(), |
| | | ) |
| | | |
| | | output_proj = conv_size3 * ((input_size // 2) // 2) |
| | |
| | | olens = max(mask.eq(0).sum(1)) |
| | | |
| | | b, t, f = x.size() |
| | | x = x.unsqueeze(1) # (b. 1. t. f) |
| | | x = x.unsqueeze(1) # (b. 1. t. f) |
| | | |
| | | if chunk_size is not None: |
| | | max_input_length = int( |
| | | chunk_size * self.subsampling_factor * (math.ceil(float(t) / (chunk_size * self.subsampling_factor) )) |
| | | chunk_size |
| | | * self.subsampling_factor |
| | | * (math.ceil(float(t) / (chunk_size * self.subsampling_factor))) |
| | | ) |
| | | x = map(lambda inputs: pad_to_len(inputs, max_input_length, 1), x) |
| | | x = list(x) |
| | | x = torch.stack(x, dim=0) |
| | | N_chunks = max_input_length // ( chunk_size * self.subsampling_factor) |
| | | N_chunks = max_input_length // (chunk_size * self.subsampling_factor) |
| | | x = x.view(b * N_chunks, 1, chunk_size * self.subsampling_factor, f) |
| | | |
| | | x = self.conv(x) |
| | | |
| | | _, c, _, f = x.size() |
| | | if chunk_size is not None: |
| | | x = x.transpose(1, 2).contiguous().view(b, -1, c * f)[:,:olens,:] |
| | | x = x.transpose(1, 2).contiguous().view(b, -1, c * f)[:, :olens, :] |
| | | else: |
| | | x = x.transpose(1, 2).contiguous().view(b, -1, c * f) |
| | | |
| | | if self.output is not None: |
| | | x = self.output(x) |
| | | |
| | | return x, mask[:,:olens][:,:x.size(1)] |
| | | return x, mask[:, :olens][:, : x.size(1)] |
| | | |
| | | def create_new_vgg_mask(self, mask: torch.Tensor) -> torch.Tensor: |
| | | """Create a new mask for VGG output sequences. |
| | |
| | | mask: Mask of output sequences. (B, sub(T)) |
| | | """ |
| | | if self.subsampling_factor > 1: |
| | | return mask[:, ::2][:, ::self.stride_1] |
| | | return mask[:, ::2][:, :: self.stride_1] |
| | | else: |
| | | return mask |
| | | return mask |
| | | |
| | | def get_size_before_subsampling(self, size: int) -> int: |
| | | """Return the original size before subsampling for a given size. |