| | |
| | | from funasr.models.frontend.abs_frontend import AbsFrontend |
| | | from typeguard import check_argument_types |
| | | from torch.nn.utils.rnn import pad_sequence |
| | | import kaldifeat |
| | | # import kaldifeat |
| | | |
| | | def load_cmvn(cmvn_file): |
| | | with open(cmvn_file, 'r', encoding='utf-8') as f: |
| | |
| | | return LFR_outputs.type(torch.float32) |
| | | |
| | | |
| | | class WavFrontend_kaldifeat(AbsFrontend): |
| | | """Conventional frontend structure for ASR. |
| | | """ |
| | | |
| | | def __init__( |
| | | self, |
| | | cmvn_file: str = None, |
| | | fs: int = 16000, |
| | | window: str = 'hamming', |
| | | n_mels: int = 80, |
| | | frame_length: int = 25, |
| | | frame_shift: int = 10, |
| | | lfr_m: int = 1, |
| | | lfr_n: int = 1, |
| | | dither: float = 1.0, |
| | | snip_edges: bool = True, |
| | | upsacle_samples: bool = True, |
| | | device: str = 'cpu', |
| | | **kwargs, |
| | | ): |
| | | super().__init__() |
| | | |
| | | opts = kaldifeat.FbankOptions() |
| | | opts.device = device |
| | | opts.frame_opts.samp_freq = fs |
| | | opts.frame_opts.dither = dither |
| | | opts.frame_opts.window_type = window |
| | | opts.frame_opts.frame_shift_ms = float(frame_shift) |
| | | opts.frame_opts.frame_length_ms = float(frame_length) |
| | | opts.mel_opts.num_bins = n_mels |
| | | opts.energy_floor = 0 |
| | | opts.frame_opts.snip_edges = snip_edges |
| | | opts.mel_opts.debug_mel = False |
| | | self.opts = opts |
| | | self.fbank_fn = None |
| | | self.fbank_beg_idx = 0 |
| | | self.reset_fbank_status() |
| | | |
| | | self.lfr_m = lfr_m |
| | | self.lfr_n = lfr_n |
| | | self.cmvn_file = cmvn_file |
| | | self.upsacle_samples = upsacle_samples |
| | | |
| | | def output_size(self) -> int: |
| | | return self.n_mels * self.lfr_m |
| | | |
| | | def forward_fbank( |
| | | self, |
| | | input: torch.Tensor, |
| | | input_lengths: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | batch_size = input.size(0) |
| | | feats = [] |
| | | feats_lens = [] |
| | | for i in range(batch_size): |
| | | waveform_length = input_lengths[i] |
| | | waveform = input[i][:waveform_length] |
| | | waveform = waveform * (1 << 15) |
| | | |
| | | self.fbank_fn.accept_waveform(self.opts.frame_opts.samp_freq, waveform.tolist()) |
| | | frames = self.fbank_fn.num_frames_ready |
| | | frames_cur = frames - self.fbank_beg_idx |
| | | mat = torch.empty([frames_cur, self.opts.mel_opts.num_bins], dtype=torch.float32).to( |
| | | device=self.opts.device) |
| | | for i in range(self.fbank_beg_idx, frames): |
| | | mat[i, :] = self.fbank_fn.get_frame(i) |
| | | self.fbank_beg_idx += frames_cur |
| | | |
| | | feat_length = mat.size(0) |
| | | feats.append(mat) |
| | | feats_lens.append(feat_length) |
| | | |
| | | feats_lens = torch.as_tensor(feats_lens) |
| | | feats_pad = pad_sequence(feats, |
| | | batch_first=True, |
| | | padding_value=0.0) |
| | | return feats_pad, feats_lens |
| | | |
| | | def reset_fbank_status(self): |
| | | self.fbank_fn = kaldifeat.OnlineFbank(self.opts) |
| | | self.fbank_beg_idx = 0 |
| | | |
| | | def forward_lfr_cmvn( |
| | | self, |
| | | input: torch.Tensor, |
| | | input_lengths: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | batch_size = input.size(0) |
| | | feats = [] |
| | | feats_lens = [] |
| | | for i in range(batch_size): |
| | | mat = input[i, :input_lengths[i], :] |
| | | if self.lfr_m != 1 or self.lfr_n != 1: |
| | | mat = apply_lfr(mat, self.lfr_m, self.lfr_n) |
| | | if self.cmvn_file is not None: |
| | | mat = apply_cmvn(mat, self.cmvn_file) |
| | | feat_length = mat.size(0) |
| | | feats.append(mat) |
| | | feats_lens.append(feat_length) |
| | | |
| | | feats_lens = torch.as_tensor(feats_lens) |
| | | feats_pad = pad_sequence(feats, |
| | | batch_first=True, |
| | | padding_value=0.0) |
| | | return feats_pad, feats_lens |
| | | # class WavFrontend_kaldifeat(AbsFrontend): |
| | | # """Conventional frontend structure for ASR. |
| | | # """ |
| | | # |
| | | # def __init__( |
| | | # self, |
| | | # cmvn_file: str = None, |
| | | # fs: int = 16000, |
| | | # window: str = 'hamming', |
| | | # n_mels: int = 80, |
| | | # frame_length: int = 25, |
| | | # frame_shift: int = 10, |
| | | # lfr_m: int = 1, |
| | | # lfr_n: int = 1, |
| | | # dither: float = 1.0, |
| | | # snip_edges: bool = True, |
| | | # upsacle_samples: bool = True, |
| | | # device: str = 'cpu', |
| | | # **kwargs, |
| | | # ): |
| | | # super().__init__() |
| | | # |
| | | # opts = kaldifeat.FbankOptions() |
| | | # opts.device = device |
| | | # opts.frame_opts.samp_freq = fs |
| | | # opts.frame_opts.dither = dither |
| | | # opts.frame_opts.window_type = window |
| | | # opts.frame_opts.frame_shift_ms = float(frame_shift) |
| | | # opts.frame_opts.frame_length_ms = float(frame_length) |
| | | # opts.mel_opts.num_bins = n_mels |
| | | # opts.energy_floor = 0 |
| | | # opts.frame_opts.snip_edges = snip_edges |
| | | # opts.mel_opts.debug_mel = False |
| | | # self.opts = opts |
| | | # self.fbank_fn = None |
| | | # self.fbank_beg_idx = 0 |
| | | # self.reset_fbank_status() |
| | | # |
| | | # self.lfr_m = lfr_m |
| | | # self.lfr_n = lfr_n |
| | | # self.cmvn_file = cmvn_file |
| | | # self.upsacle_samples = upsacle_samples |
| | | # |
| | | # def output_size(self) -> int: |
| | | # return self.n_mels * self.lfr_m |
| | | # |
| | | # def forward_fbank( |
| | | # self, |
| | | # input: torch.Tensor, |
| | | # input_lengths: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | # batch_size = input.size(0) |
| | | # feats = [] |
| | | # feats_lens = [] |
| | | # for i in range(batch_size): |
| | | # waveform_length = input_lengths[i] |
| | | # waveform = input[i][:waveform_length] |
| | | # waveform = waveform * (1 << 15) |
| | | # |
| | | # self.fbank_fn.accept_waveform(self.opts.frame_opts.samp_freq, waveform.tolist()) |
| | | # frames = self.fbank_fn.num_frames_ready |
| | | # frames_cur = frames - self.fbank_beg_idx |
| | | # mat = torch.empty([frames_cur, self.opts.mel_opts.num_bins], dtype=torch.float32).to( |
| | | # device=self.opts.device) |
| | | # for i in range(self.fbank_beg_idx, frames): |
| | | # mat[i, :] = self.fbank_fn.get_frame(i) |
| | | # self.fbank_beg_idx += frames_cur |
| | | # |
| | | # feat_length = mat.size(0) |
| | | # feats.append(mat) |
| | | # feats_lens.append(feat_length) |
| | | # |
| | | # feats_lens = torch.as_tensor(feats_lens) |
| | | # feats_pad = pad_sequence(feats, |
| | | # batch_first=True, |
| | | # padding_value=0.0) |
| | | # return feats_pad, feats_lens |
| | | # |
| | | # def reset_fbank_status(self): |
| | | # self.fbank_fn = kaldifeat.OnlineFbank(self.opts) |
| | | # self.fbank_beg_idx = 0 |
| | | # |
| | | # def forward_lfr_cmvn( |
| | | # self, |
| | | # input: torch.Tensor, |
| | | # input_lengths: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | # batch_size = input.size(0) |
| | | # feats = [] |
| | | # feats_lens = [] |
| | | # for i in range(batch_size): |
| | | # mat = input[i, :input_lengths[i], :] |
| | | # if self.lfr_m != 1 or self.lfr_n != 1: |
| | | # mat = apply_lfr(mat, self.lfr_m, self.lfr_n) |
| | | # if self.cmvn_file is not None: |
| | | # mat = apply_cmvn(mat, self.cmvn_file) |
| | | # feat_length = mat.size(0) |
| | | # feats.append(mat) |
| | | # feats_lens.append(feat_length) |
| | | # |
| | | # feats_lens = torch.as_tensor(feats_lens) |
| | | # feats_pad = pad_sequence(feats, |
| | | # batch_first=True, |
| | | # padding_value=0.0) |
| | | # return feats_pad, feats_lens |