| | |
| | | from typing import Optional |
| | | from typing import Tuple |
| | | from typing import Union |
| | | |
| | | import logging |
| | | import humanfriendly |
| | | import numpy as np |
| | | import torch |
| | | from torch_complex.tensor import ComplexTensor |
| | | from typeguard import check_argument_types |
| | | |
| | | from funasr.layers.log_mel import LogMel |
| | | from funasr.layers.stft import Stft |
| | | from funasr.models.frontend.abs_frontend import AbsFrontend |
| | | from funasr.modules.frontends.frontend import Frontend |
| | | from funasr.utils.get_default_kwargs import get_default_kwargs |
| | | from funasr.modules.nets_utils import make_pad_mask |
| | | |
| | | |
| | | class DefaultFrontend(AbsFrontend): |
| | | """Conventional frontend structure for ASR. |
| | | |
| | | Stft -> WPE -> MVDR-Beamformer -> Power-spec -> Mel-Fbank -> CMVN |
| | | """ |
| | | |
| | |
| | | htk: bool = False, |
| | | frontend_conf: Optional[dict] = get_default_kwargs(Frontend), |
| | | apply_stft: bool = True, |
| | | use_channel: int = None, |
| | | ): |
| | | assert check_argument_types() |
| | | super().__init__() |
| | | if isinstance(fs, str): |
| | | fs = humanfriendly.parse_size(fs) |
| | |
| | | htk=htk, |
| | | ) |
| | | self.n_mels = n_mels |
| | | self.use_channel = use_channel |
| | | self.frontend_type = "default" |
| | | |
| | | def output_size(self) -> int: |
| | |
| | | if input_stft.dim() == 4: |
| | | # h: (B, T, C, F) -> h: (B, T, F) |
| | | if self.training: |
| | | # Select 1ch randomly |
| | | ch = np.random.randint(input_stft.size(2)) |
| | | input_stft = input_stft[:, :, ch, :] |
| | | if self.use_channel is not None: |
| | | input_stft = input_stft[:, :, self.use_channel, :] |
| | | else: |
| | | # Select 1ch randomly |
| | | ch = np.random.randint(input_stft.size(2)) |
| | | input_stft = input_stft[:, :, ch, :] |
| | | else: |
| | | # Use the first channel |
| | | input_stft = input_stft[:, :, 0, :] |
| | |
| | | return input_stft, feats_lens |
| | | |
| | | |
| | | |
| | | |
| | | class MultiChannelFrontend(AbsFrontend): |
| | | """Conventional frontend structure for ASR. |
| | | |
| | | Stft -> WPE -> MVDR-Beamformer -> Power-spec -> Mel-Fbank -> CMVN |
| | | """ |
| | | |
| | |
| | | fs: Union[int, str] = 16000, |
| | | n_fft: int = 512, |
| | | win_length: int = None, |
| | | hop_length: int = 128, |
| | | hop_length: int = None, |
| | | frame_length: int = None, |
| | | frame_shift: int = None, |
| | | window: Optional[str] = "hann", |
| | | center: bool = True, |
| | | normalized: bool = False, |
| | |
| | | htk: bool = False, |
| | | frontend_conf: Optional[dict] = get_default_kwargs(Frontend), |
| | | apply_stft: bool = True, |
| | | frame_length: int = None, |
| | | frame_shift: int = None, |
| | | lfr_m: int = None, |
| | | lfr_n: int = None, |
| | | use_channel: int = None, |
| | | lfr_m: int = 1, |
| | | lfr_n: int = 1, |
| | | cmvn_file: str = None, |
| | | mc: bool = True |
| | | ): |
| | | assert check_argument_types() |
| | | super().__init__() |
| | | if isinstance(fs, str): |
| | | fs = humanfriendly.parse_size(fs) |
| | | |
| | | # Deepcopy (In general, dict shouldn't be used as default arg) |
| | | frontend_conf = copy.deepcopy(frontend_conf) |
| | | self.hop_length = hop_length |
| | | if win_length is None and hop_length is None: |
| | | self.win_length = frame_length * 16 |
| | | self.hop_length = frame_shift * 16 |
| | | elif frame_length is None and frame_shift is None: |
| | | self.win_length = self.win_length |
| | | self.hop_length = self.hop_length |
| | | else: |
| | | logging.error( |
| | | "Only one of (win_length, hop_length) and (frame_length, frame_shift)" |
| | | "can be set." |
| | | ) |
| | | exit(1) |
| | | |
| | | if apply_stft: |
| | | self.stft = Stft( |
| | | n_fft=n_fft, |
| | | win_length=win_length, |
| | | hop_length=hop_length, |
| | | win_length=self.win_length, |
| | | hop_length=self.hop_length, |
| | | center=center, |
| | | window=window, |
| | | normalized=normalized, |
| | |
| | | htk=htk, |
| | | ) |
| | | self.n_mels = n_mels |
| | | self.use_channel = use_channel |
| | | self.mc = mc |
| | | if not self.mc: |
| | | if self.use_channel is not None: |
| | | logging.info("use the channel %d" % (self.use_channel)) |
| | | else: |
| | | logging.info("random select channel") |
| | | self.cmvn_file = cmvn_file |
| | | if self.cmvn_file is not None: |
| | | mean, std = self._load_cmvn(self.cmvn_file) |
| | | self.register_buffer("mean", torch.from_numpy(mean)) |
| | | self.register_buffer("std", torch.from_numpy(std)) |
| | | self.frontend_type = "multichannelfrontend" |
| | | |
| | | def output_size(self) -> int: |
| | |
| | | if self.stft is not None: |
| | | input_stft, feats_lens = self._compute_stft(input, input_lengths) |
| | | else: |
| | | if isinstance(input, ComplexTensor): |
| | | input_stft = input |
| | | else: |
| | | input_stft = ComplexTensor(input[..., 0], input[..., 1]) |
| | | input_stft = ComplexTensor(input[..., 0], input[..., 1]) |
| | | feats_lens = input_lengths |
| | | # 2. [Option] Speech enhancement |
| | | if self.frontend is not None: |
| | | assert isinstance(input_stft, ComplexTensor), type(input_stft) |
| | | # input_stft: (Batch, Length, [Channel], Freq) |
| | | input_stft, _, mask = self.frontend(input_stft, feats_lens) |
| | | |
| | | # 3. [Multi channel case]: Select a channel(sa_asr) |
| | | if input_stft.dim() == 4 and not self.mc: |
| | | # h: (B, T, C, F) -> h: (B, T, F) |
| | | if self.training: |
| | | if self.use_channel is not None: |
| | | input_stft = input_stft[:, :, self.use_channel, :] |
| | | |
| | | else: |
| | | # Select 1ch randomly |
| | | ch = np.random.randint(input_stft.size(2)) |
| | | input_stft = input_stft[:, :, ch, :] |
| | | else: |
| | | # Use the first channel |
| | | input_stft = input_stft[:, :, 0, :] |
| | | |
| | | # 4. STFT -> Power spectrum |
| | | # h: ComplexTensor(B, T, F) -> torch.Tensor(B, T, F) |
| | | input_power = input_stft.real ** 2 + input_stft.imag ** 2 |
| | |
| | | # input_power: (Batch, [Channel,] Length, Freq) |
| | | # -> input_feats: (Batch, Length, Dim) |
| | | input_feats, _ = self.logmel(input_power, feats_lens) |
| | | bt = input_feats.size(0) |
| | | if input_feats.dim() ==4: |
| | | channel_size = input_feats.size(2) |
| | | # batch * channel * T * D |
| | | #pdb.set_trace() |
| | | input_feats = input_feats.transpose(1,2).reshape(bt*channel_size,-1,80).contiguous() |
| | | # input_feats = input_feats.transpose(1,2) |
| | | # batch * channel |
| | | feats_lens = feats_lens.repeat(1,channel_size).squeeze() |
| | | if self.mc: |
| | | # MFCCA |
| | | if input_feats.dim() ==4: |
| | | bt = input_feats.size(0) |
| | | channel_size = input_feats.size(2) |
| | | input_feats = input_feats.transpose(1,2).reshape(bt*channel_size,-1,80).contiguous() |
| | | feats_lens = feats_lens.repeat(1,channel_size).squeeze() |
| | | else: |
| | | channel_size = 1 |
| | | return input_feats, feats_lens, channel_size |
| | | else: |
| | | channel_size = 1 |
| | | return input_feats, feats_lens, channel_size |
| | | # 6. Apply CMVN |
| | | if self.cmvn_file is not None: |
| | | if feats_lens is None: |
| | | feats_lens = input_feats.new_full([input_feats.size(0)], input_feats.size(1)) |
| | | self.mean = self.mean.to(input_feats.device, input_feats.dtype) |
| | | self.std = self.std.to(input_feats.device, input_feats.dtype) |
| | | mask = make_pad_mask(feats_lens, input_feats, 1) |
| | | |
| | | if input_feats.requires_grad: |
| | | input_feats = input_feats + self.mean |
| | | else: |
| | | input_feats += self.mean |
| | | if input_feats.requires_grad: |
| | | input_feats = input_feats.masked_fill(mask, 0.0) |
| | | else: |
| | | input_feats.masked_fill_(mask, 0.0) |
| | | |
| | | input_feats *= self.std |
| | | |
| | | return input_feats, feats_lens |
| | | |
| | | def _compute_stft( |
| | | self, input: torch.Tensor, input_lengths: torch.Tensor |
| | |
| | | # input_stft: (..., F, 2) -> (..., F) |
| | | input_stft = ComplexTensor(input_stft[..., 0], input_stft[..., 1]) |
| | | return input_stft, feats_lens |
| | | |
| | | def _load_cmvn(self, cmvn_file): |
| | | with open(cmvn_file, 'r', encoding='utf-8') as f: |
| | | lines = f.readlines() |
| | | means_list = [] |
| | | vars_list = [] |
| | | for i in range(len(lines)): |
| | | line_item = lines[i].split() |
| | | if line_item[0] == '<AddShift>': |
| | | line_item = lines[i + 1].split() |
| | | if line_item[0] == '<LearnRateCoef>': |
| | | add_shift_line = line_item[3:(len(line_item) - 1)] |
| | | means_list = list(add_shift_line) |
| | | continue |
| | | elif line_item[0] == '<Rescale>': |
| | | line_item = lines[i + 1].split() |
| | | if line_item[0] == '<LearnRateCoef>': |
| | | rescale_line = line_item[3:(len(line_item) - 1)] |
| | | vars_list = list(rescale_line) |
| | | continue |
| | | means = np.array(means_list).astype(np.float) |
| | | vars = np.array(vars_list).astype(np.float) |
| | | return means, vars |