| | |
| | | import numpy as np |
| | | import torch |
| | | import torch.nn as nn |
| | | |
| | | try: |
| | | from torch_complex.tensor import ComplexTensor |
| | | except: |
| | |
| | | |
| | | |
| | | @tables.register("frontend_classes", "DefaultFrontend") |
| | | @tables.register("frontend_classes", "EspnetFrontend") |
| | | class DefaultFrontend(nn.Module): |
| | | """Conventional frontend structure for ASR. |
| | | Stft -> WPE -> MVDR-Beamformer -> Power-spec -> Mel-Fbank -> CMVN |
| | | """ |
| | | |
| | | def __init__( |
| | | self, |
| | | fs: int = 16000, |
| | | n_fft: int = 512, |
| | | win_length: int = None, |
| | | hop_length: int = 128, |
| | | window: Optional[str] = "hann", |
| | | center: bool = True, |
| | | normalized: bool = False, |
| | | onesided: bool = True, |
| | | n_mels: int = 80, |
| | | fmin: int = None, |
| | | fmax: int = None, |
| | | htk: bool = False, |
| | | frontend_conf: Optional[dict] = None, |
| | | apply_stft: bool = True, |
| | | use_channel: int = None, |
| | | **kwargs, |
| | | self, |
| | | fs: int = 16000, |
| | | n_fft: int = 512, |
| | | win_length: int = None, |
| | | hop_length: int = 128, |
| | | window: Optional[str] = "hann", |
| | | center: bool = True, |
| | | normalized: bool = False, |
| | | onesided: bool = True, |
| | | n_mels: int = 80, |
| | | fmin: int = None, |
| | | fmax: int = None, |
| | | htk: bool = False, |
| | | frontend_conf: Optional[dict] = None, |
| | | apply_stft: bool = True, |
| | | use_channel: int = None, |
| | | **kwargs, |
| | | ): |
| | | super().__init__() |
| | | |
| | |
| | | return self.n_mels |
| | | |
| | | def forward( |
| | | self, input: torch.Tensor, input_lengths: torch.Tensor |
| | | self, input: torch.Tensor, input_lengths: Union[torch.Tensor, list] |
| | | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | if isinstance(input_lengths, list): |
| | | input_lengths = torch.tensor(input_lengths) |
| | | if input.dtype == torch.float64: |
| | | input = input.float() |
| | | # 1. Domain-conversion: e.g. Stft: time -> time-freq |
| | | if self.stft is not None: |
| | | input_stft, feats_lens = self._compute_stft(input, input_lengths) |
| | |
| | | |
| | | # 4. STFT -> Power spectrum |
| | | # h: ComplexTensor(B, T, F) -> torch.Tensor(B, T, F) |
| | | input_power = input_stft.real ** 2 + input_stft.imag ** 2 |
| | | input_power = input_stft.real**2 + input_stft.imag**2 |
| | | |
| | | # 5. Feature transform e.g. Stft -> Log-Mel-Fbank |
| | | # input_power: (Batch, [Channel,] Length, Freq) |
| | |
| | | |
| | | return input_feats, feats_lens |
| | | |
| | | def _compute_stft( |
| | | self, input: torch.Tensor, input_lengths: torch.Tensor |
| | | ) -> torch.Tensor: |
| | | def _compute_stft(self, input: torch.Tensor, input_lengths: torch.Tensor) -> torch.Tensor: |
| | | input_stft, feats_lens = self.stft(input, input_lengths) |
| | | |
| | | assert input_stft.dim() >= 4, input_stft.shape |
| | |
| | | """ |
| | | |
| | | def __init__( |
| | | self, |
| | | fs: int = 16000, |
| | | n_fft: int = 512, |
| | | win_length: int = None, |
| | | hop_length: int = None, |
| | | frame_length: int = None, |
| | | frame_shift: int = None, |
| | | window: Optional[str] = "hann", |
| | | center: bool = True, |
| | | normalized: bool = False, |
| | | onesided: bool = True, |
| | | n_mels: int = 80, |
| | | fmin: int = None, |
| | | fmax: int = None, |
| | | htk: bool = False, |
| | | frontend_conf: Optional[dict] = None, |
| | | apply_stft: bool = True, |
| | | use_channel: int = None, |
| | | lfr_m: int = 1, |
| | | lfr_n: int = 1, |
| | | cmvn_file: str = None, |
| | | mc: bool = True |
| | | self, |
| | | fs: int = 16000, |
| | | n_fft: int = 512, |
| | | win_length: int = None, |
| | | hop_length: int = None, |
| | | frame_length: int = None, |
| | | frame_shift: int = None, |
| | | window: Optional[str] = "hann", |
| | | center: bool = True, |
| | | normalized: bool = False, |
| | | onesided: bool = True, |
| | | n_mels: int = 80, |
| | | fmin: int = None, |
| | | fmax: int = None, |
| | | htk: bool = False, |
| | | frontend_conf: Optional[dict] = None, |
| | | apply_stft: bool = True, |
| | | use_channel: int = None, |
| | | lfr_m: int = 1, |
| | | lfr_n: int = 1, |
| | | cmvn_file: str = None, |
| | | mc: bool = True, |
| | | ): |
| | | super().__init__() |
| | | # Deepcopy (In general, dict shouldn't be used as default arg) |
| | |
| | | self.hop_length = self.hop_length |
| | | else: |
| | | logging.error( |
| | | "Only one of (win_length, hop_length) and (frame_length, frame_shift)" |
| | | "can be set." |
| | | "Only one of (win_length, hop_length) and (frame_length, frame_shift)" "can be set." |
| | | ) |
| | | exit(1) |
| | | |
| | |
| | | return self.n_mels |
| | | |
| | | def forward( |
| | | self, input: torch.Tensor, input_lengths: torch.Tensor |
| | | self, input: torch.Tensor, input_lengths: torch.Tensor |
| | | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | # 1. Domain-conversion: e.g. Stft: time -> time-freq |
| | | #import pdb;pdb.set_trace() |
| | | if self.stft is not None: |
| | | input_stft, feats_lens = self._compute_stft(input, input_lengths) |
| | | else: |
| | |
| | | if self.training: |
| | | if self.use_channel is not None: |
| | | input_stft = input_stft[:, :, self.use_channel, :] |
| | | |
| | | |
| | | else: |
| | | # Select 1ch randomly |
| | | ch = np.random.randint(input_stft.size(2)) |
| | |
| | | |
| | | # 4. STFT -> Power spectrum |
| | | # h: ComplexTensor(B, T, F) -> torch.Tensor(B, T, F) |
| | | input_power = input_stft.real ** 2 + input_stft.imag ** 2 |
| | | input_power = input_stft.real**2 + input_stft.imag**2 |
| | | |
| | | # 5. Feature transform e.g. Stft -> Log-Mel-Fbank |
| | | # input_power: (Batch, [Channel,] Length, Freq) |
| | |
| | | input_feats, _ = self.logmel(input_power, feats_lens) |
| | | if self.mc: |
| | | # MFCCA |
| | | if input_feats.dim() ==4: |
| | | if input_feats.dim() == 4: |
| | | bt = input_feats.size(0) |
| | | channel_size = input_feats.size(2) |
| | | input_feats = input_feats.transpose(1,2).reshape(bt*channel_size,-1,80).contiguous() |
| | | feats_lens = feats_lens.repeat(1,channel_size).squeeze() |
| | | input_feats = ( |
| | | input_feats.transpose(1, 2).reshape(bt * channel_size, -1, 80).contiguous() |
| | | ) |
| | | feats_lens = feats_lens.repeat(1, channel_size).squeeze() |
| | | else: |
| | | channel_size = 1 |
| | | return input_feats, feats_lens, channel_size |
| | |
| | | |
| | | return input_feats, feats_lens |
| | | |
| | | def _compute_stft( |
| | | self, input: torch.Tensor, input_lengths: torch.Tensor |
| | | ) -> torch.Tensor: |
| | | def _compute_stft(self, input: torch.Tensor, input_lengths: torch.Tensor) -> torch.Tensor: |
| | | input_stft, feats_lens = self.stft(input, input_lengths) |
| | | |
| | | assert input_stft.dim() >= 4, input_stft.shape |
| | |
| | | return input_stft, feats_lens |
| | | |
| | | def _load_cmvn(self, cmvn_file): |
| | | with open(cmvn_file, 'r', encoding='utf-8') as f: |
| | | with open(cmvn_file, "r", encoding="utf-8") as f: |
| | | lines = f.readlines() |
| | | means_list = [] |
| | | vars_list = [] |
| | | for i in range(len(lines)): |
| | | line_item = lines[i].split() |
| | | if line_item[0] == '<AddShift>': |
| | | if line_item[0] == "<AddShift>": |
| | | line_item = lines[i + 1].split() |
| | | if line_item[0] == '<LearnRateCoef>': |
| | | add_shift_line = line_item[3:(len(line_item) - 1)] |
| | | if line_item[0] == "<LearnRateCoef>": |
| | | add_shift_line = line_item[3 : (len(line_item) - 1)] |
| | | means_list = list(add_shift_line) |
| | | continue |
| | | elif line_item[0] == '<Rescale>': |
| | | elif line_item[0] == "<Rescale>": |
| | | line_item = lines[i + 1].split() |
| | | if line_item[0] == '<LearnRateCoef>': |
| | | rescale_line = line_item[3:(len(line_item) - 1)] |
| | | if line_item[0] == "<LearnRateCoef>": |
| | | rescale_line = line_item[3 : (len(line_item) - 1)] |
| | | vars_list = list(rescale_line) |
| | | continue |
| | | means = np.array(means_list).astype(np.float) |