| | |
| | | self.lid_int_dict = {24884: 3, 24885: 4, 24888: 7, 24892: 11, 24896: 12, 24992: 13} |
| | | self.textnorm_dict = {"withitn": 14, "woitn": 15} |
| | | self.textnorm_int_dict = {25016: 14, 25017: 15} |
| | | |
| | | def _get_lid(self, lid): |
| | | if lid in list(self.lid_dict.keys()): |
| | | return self.lid_dict[lid] |
| | | else: |
| | | raise ValueError( |
| | | f"The language {l} is not in {list(self.lid_dict.keys())}" |
| | | ) |
| | | |
| | | def _get_tnid(self, tnid): |
| | | if tnid in list(self.textnorm_dict.keys()): |
| | | return self.textnorm_dict[tnid] |
| | | else: |
| | | raise ValueError( |
| | | f"The textnorm {tnid} is not in {list(self.textnorm_dict.keys())}" |
| | | ) |
| | | |
| | | def read_tags(self, language_input, textnorm_input): |
| | | # handle language |
| | | if isinstance(language_input, list): |
| | | language_list = [] |
| | | for l in language_input: |
| | | language_list.append(self._get_lid(l)) |
| | | elif isinstance(language_input, str): |
| | | # if is existing file |
| | | if os.path.exists(language_input): |
| | | language_file = open(language_input, "r").readlines() |
| | | language_list = [ |
| | | self._get_lid(l.strip()) |
| | | for l in language_file |
| | | ] |
| | | else: |
| | | language_list = [self._get_lid(language_input)] |
| | | else: |
| | | raise ValueError( |
| | | f"Unsupported type {type(language_input)} for language_input" |
| | | ) |
| | | # handle textnorm |
| | | if isinstance(textnorm_input, list): |
| | | textnorm_list = [] |
| | | for tn in textnorm_input: |
| | | textnorm_list.append(self._get_tnid(tn)) |
| | | elif isinstance(textnorm_input, str): |
| | | # if is existing file |
| | | if os.path.exists(textnorm_input): |
| | | textnorm_file = open(textnorm_input, "r").readlines() |
| | | textnorm_list = [ |
| | | self._get_tnid(tn.strip()) |
| | | for tn in textnorm_file |
| | | ] |
| | | else: |
| | | textnorm_list = [self._get_tnid(textnorm_input)] |
| | | else: |
| | | raise ValueError( |
| | | f"Unsupported type {type(textnorm_input)} for textnorm_input" |
| | | ) |
| | | return language_list, textnorm_list |
| | | |
| | | def __call__(self, wav_content: Union[str, np.ndarray, List[str]], **kwargs): |
| | | |
| | | language = self.lid_dict[kwargs.get("language", "auto")] |
| | | use_itn = kwargs.get("use_itn", False) |
| | | textnorm = kwargs.get("text_norm", None) |
| | | if textnorm is None: |
| | | textnorm = "withitn" if use_itn else "woitn" |
| | | textnorm = self.textnorm_dict[textnorm] |
| | | |
| | | language_input = kwargs.get("language", "auto") |
| | | textnorm_input = kwargs.get("textnorm", "woitn") |
| | | language_list, textnorm_list = self.read_tags(language_input, textnorm_input) |
| | | |
| | | waveform_list = self.load_data(wav_content, self.frontend.opts.frame_opts.samp_freq) |
| | | waveform_nums = len(waveform_list) |
| | | |
| | | assert len(language_list) == 1 or len(language_list) == waveform_nums, \ |
| | | "length of parsed language list should be 1 or equal to the number of waveforms" |
| | | assert len(textnorm_list) == 1 or len(textnorm_list) == waveform_nums, \ |
| | | "length of parsed textnorm list should be 1 or equal to the number of waveforms" |
| | | |
| | | asr_res = [] |
| | | for beg_idx in range(0, waveform_nums, self.batch_size): |
| | | end_idx = min(waveform_nums, beg_idx + self.batch_size) |
| | | feats, feats_len = self.extract_feat(waveform_list[beg_idx:end_idx]) |
| | | _language_list = language_list[beg_idx:end_idx] |
| | | _textnorm_list = textnorm_list[beg_idx:end_idx] |
| | | B = feats.shape[0] |
| | | if len(_language_list) == 1 and B != 1: |
| | | _language_list = _language_list * B |
| | | if len(_textnorm_list) == 1 and B != 1: |
| | | _textnorm_list = _textnorm_list * B |
| | | ctc_logits, encoder_out_lens = self.infer( |
| | | feats, |
| | | feats_len, |
| | | np.array([language], dtype=np.int32), |
| | | np.array([textnorm], dtype=np.int32), |
| | | np.array(_language_list, dtype=np.int32), |
| | | np.array(_textnorm_list, dtype=np.int32), |
| | | ) |
| | | # back to torch.Tensor |
| | | ctc_logits = torch.from_numpy(ctc_logits).float() |
| | | # support batch_size=1 only currently |
| | | x = ctc_logits[0, : encoder_out_lens[0].item(), :] |
| | | yseq = x.argmax(dim=-1) |
| | | yseq = torch.unique_consecutive(yseq, dim=-1) |
| | | for b in range(feats.shape[0]): |
| | | # back to torch.Tensor |
| | | if isinstance(ctc_logits, np.ndarray): |
| | | ctc_logits = torch.from_numpy(ctc_logits).float() |
| | | # support batch_size=1 only currently |
| | | x = ctc_logits[b, : encoder_out_lens[b].item(), :] |
| | | yseq = x.argmax(dim=-1) |
| | | yseq = torch.unique_consecutive(yseq, dim=-1) |
| | | |
| | | mask = yseq != self.blank_id |
| | | token_int = yseq[mask].tolist() |
| | | mask = yseq != self.blank_id |
| | | token_int = yseq[mask].tolist() |
| | | |
| | | asr_res.append(self.tokenizer.decode(token_int)) |
| | | asr_res.append(self.tokenizer.decode(token_int)) |
| | | |
| | | return asr_res |
| | | |