| | |
| | | get_logger, |
| | | read_yaml, |
| | | ) |
| | | from .utils.sentencepiece_tokenizer import SentencepiecesTokenizer |
| | | from .utils.frontend import WavFrontend |
| | | |
| | | logging = get_logger() |
| | | |
| | | |
| | | class SenseVoiceSmallONNX: |
| | | class SenseVoiceSmall: |
| | | """ |
| | | Author: Speech Lab of DAMO Academy, Alibaba Group |
| | | Paraformer: Fast and Accurate Parallel Transformer for Non-autoregressive End-to-End Speech Recognition |
| | |
| | | cache_dir: str = None, |
| | | **kwargs, |
| | | ): |
| | | |
| | | if not Path(model_dir).exists(): |
| | | try: |
| | | from modelscope.hub.snapshot_download import snapshot_download |
| | | except: |
| | | raise "You are exporting model from modelscope, please install modelscope and try it again. To install modelscope, you could:\n" "\npip3 install -U modelscope\n" "For the users in China, you could install with the command:\n" "\npip3 install -U modelscope -i https://mirror.sjtu.edu.cn/pypi/web/simple" |
| | | try: |
| | | model_dir = snapshot_download(model_dir, cache_dir=cache_dir) |
| | | except: |
| | | raise "model_dir must be model_name in modelscope or local path downloaded from modelscope, but is {}".format( |
| | | model_dir |
| | | ) |
| | | |
| | | model_file = os.path.join(model_dir, "model.onnx") |
| | | if quantize: |
| | | model_file = os.path.join(model_dir, "model_quant.onnx") |
| | | else: |
| | | model_file = os.path.join(model_dir, "model.onnx") |
| | | if not os.path.exists(model_file): |
| | | print(".onnx does not exist, begin to export onnx") |
| | | try: |
| | | from funasr import AutoModel |
| | | except: |
| | | raise "You are exporting onnx, please install funasr and try it again. To install funasr, you could:\n" "\npip3 install -U funasr\n" "For the users in China, you could install with the command:\n" "\npip3 install -U funasr -i https://mirror.sjtu.edu.cn/pypi/web/simple" |
| | | |
| | | model = AutoModel(model=model_dir) |
| | | model_dir = model.export(type="onnx", quantize=quantize, **kwargs) |
| | | |
| | | config_file = os.path.join(model_dir, "config.yaml") |
| | | cmvn_file = os.path.join(model_dir, "am.mvn") |
| | | config = read_yaml(config_file) |
| | | # token_list = os.path.join(model_dir, "tokens.json") |
| | | # with open(token_list, "r", encoding="utf-8") as f: |
| | | # token_list = json.load(f) |
| | | |
| | | # self.converter = TokenIDConverter(token_list) |
| | | self.tokenizer = CharTokenizer() |
| | | config["frontend_conf"]['cmvn_file'] = cmvn_file |
| | | self.tokenizer = SentencepiecesTokenizer( |
| | | bpemodel=os.path.join(model_dir, "chn_jpn_yue_eng_ko_spectok.bpe.model") |
| | | ) |
| | | config["frontend_conf"]["cmvn_file"] = cmvn_file |
| | | self.frontend = WavFrontend(**config["frontend_conf"]) |
| | | self.ort_infer = OrtInferSession( |
| | | model_file, device_id, intra_op_num_threads=intra_op_num_threads |
| | | ) |
| | | self.batch_size = batch_size |
| | | self.blank_id = 0 |
| | | self.lid_dict = {"auto": 0, "zh": 3, "en": 4, "yue": 7, "ja": 11, "ko": 12, "nospeech": 13} |
| | | self.lid_int_dict = {24884: 3, 24885: 4, 24888: 7, 24892: 11, 24896: 12, 24992: 13} |
| | | self.textnorm_dict = {"withitn": 14, "woitn": 15} |
| | | self.textnorm_int_dict = {25016: 14, 25017: 15} |
| | | |
| | | def __call__(self, |
| | | wav_content: Union[str, np.ndarray, List[str]], |
| | | language: List, |
| | | textnorm: List, |
| | | tokenizer=None, |
| | | **kwargs) -> List: |
| | | def __call__(self, wav_content: Union[str, np.ndarray, List[str]], **kwargs): |
| | | |
| | | language = self.lid_dict[kwargs.get("language", "auto")] |
| | | use_itn = kwargs.get("use_itn", False) |
| | | textnorm = kwargs.get("text_norm", None) |
| | | if textnorm is None: |
| | | textnorm = "withitn" if use_itn else "woitn" |
| | | textnorm = self.textnorm_dict[textnorm] |
| | | |
| | | waveform_list = self.load_data(wav_content, self.frontend.opts.frame_opts.samp_freq) |
| | | waveform_nums = len(waveform_list) |
| | | asr_res = [] |
| | | for beg_idx in range(0, waveform_nums, self.batch_size): |
| | | end_idx = min(waveform_nums, beg_idx + self.batch_size) |
| | | feats, feats_len = self.extract_feat(waveform_list[beg_idx:end_idx]) |
| | | ctc_logits, encoder_out_lens = self.infer(feats, |
| | | feats_len, |
| | | np.array(language, dtype=np.int32), |
| | | np.array(textnorm, dtype=np.int32) |
| | | ) |
| | | ctc_logits, encoder_out_lens = self.infer( |
| | | feats, |
| | | feats_len, |
| | | np.array(language, dtype=np.int32), |
| | | np.array(textnorm, dtype=np.int32), |
| | | ) |
| | | # back to torch.Tensor |
| | | ctc_logits = torch.from_numpy(ctc_logits).float() |
| | | # support batch_size=1 only currently |
| | |
| | | |
| | | mask = yseq != self.blank_id |
| | | token_int = yseq[mask].tolist() |
| | | |
| | | if tokenizer is not None: |
| | | asr_res.append(tokenizer.tokens2text(token_int)) |
| | | else: |
| | | asr_res.append(token_int) |
| | | |
| | | asr_res.append(self.tokenizer.encode(token_int)) |
| | | |
| | | return asr_res |
| | | |
| | | def load_data(self, wav_content: Union[str, np.ndarray, List[str]], fs: int = None) -> List: |
| | |
| | | feats = np.array(feat_res).astype(np.float32) |
| | | return feats |
| | | |
| | | def infer(self, |
| | | feats: np.ndarray, |
| | | feats_len: np.ndarray, |
| | | language: np.ndarray, |
| | | textnorm: np.ndarray,) -> Tuple[np.ndarray, np.ndarray]: |
| | | def infer( |
| | | self, |
| | | feats: np.ndarray, |
| | | feats_len: np.ndarray, |
| | | language: np.ndarray, |
| | | textnorm: np.ndarray, |
| | | ) -> Tuple[np.ndarray, np.ndarray]: |
| | | outputs = self.ort_infer([feats, feats_len, language, textnorm]) |
| | | return outputs |