| | |
| | | from funasr.models.transformer.utils.add_sos_eos import add_sos_eos |
| | | from funasr.models.transformer.utils.nets_utils import make_pad_mask, pad_list |
| | | from funasr.utils.load_utils import load_audio_text_image_video, extract_fbank |
| | | |
| | | from funasr.train_utils.device_funcs import to_device |
| | | |
| | | @tables.register("model_classes", "Paraformer") |
| | | class Paraformer(torch.nn.Module): |
| | |
| | | self.predictor_bias = predictor_bias |
| | | self.sampling_ratio = sampling_ratio |
| | | self.criterion_pre = mae_loss(normalize_length=length_normalized_loss) |
| | | # self.step_cur = 0 |
| | | # |
| | | |
| | | |
| | | self.share_embedding = share_embedding |
| | | if self.share_embedding: |
| | | self.decoder.embed = None |
| | |
| | | speech, speech_lengths = data_in, data_lengths |
| | | if len(speech.shape) < 3: |
| | | speech = speech[None, :, :] |
| | | if speech_lengths is None: |
| | | if speech_lengths is not None: |
| | | speech_lengths = speech_lengths.squeeze(-1) |
| | | else: |
| | | speech_lengths = speech.shape[1] |
| | | else: |
| | | # extract fbank feats |
| | |
| | | b, n, d = decoder_out.size() |
| | | if isinstance(key[0], (list, tuple)): |
| | | key = key[0] |
| | | if len(key) < b: |
| | | key = key*b |
| | | for i in range(b): |
| | | x = encoder_out[i, :encoder_out_lens[i], :] |
| | | am_scores = decoder_out[i, :pre_token_length[i], :] |
| | |
| | | nbest_hyps = [Hypothesis(yseq=yseq, score=score)] |
| | | for nbest_idx, hyp in enumerate(nbest_hyps): |
| | | ibest_writer = None |
| | | if ibest_writer is None and kwargs.get("output_dir") is not None: |
| | | writer = DatadirWriter(kwargs.get("output_dir")) |
| | | ibest_writer = writer[f"{nbest_idx+1}best_recog"] |
| | | if kwargs.get("output_dir") is not None: |
| | | if not hasattr(self, "writer"): |
| | | self.writer = DatadirWriter(kwargs.get("output_dir")) |
| | | ibest_writer = self.writer[f"{nbest_idx+1}best_recog"] |
| | | # remove sos/eos and get results |
| | | last_pos = -1 |
| | | if isinstance(hyp.yseq, list): |
| | |
| | | if tokenizer is not None: |
| | | # Change integer-ids to tokens |
| | | token = tokenizer.ids2tokens(token_int) |
| | | text = tokenizer.tokens2text(token) |
| | | |
| | | text_postprocessed, _ = postprocess_utils.sentence_postprocess(token) |
| | | text_postprocessed = tokenizer.tokens2text(token) |
| | | if not hasattr(tokenizer, "bpemodel"): |
| | | text_postprocessed, _ = postprocess_utils.sentence_postprocess(token) |
| | | |
| | | result_i = {"key": key[i], "text": text_postprocessed} |
| | | |
| | | |
| | | if ibest_writer is not None: |
| | | ibest_writer["token"][key[i]] = " ".join(token) |
| | | # ibest_writer["text"][key[i]] = text |
| | |
| | | |
| | | return results, meta_data |
| | | |
| | | def export( |
| | | self, |
| | | max_seq_len=512, |
| | | **kwargs, |
| | | ): |
| | | self.device = kwargs.get("device") |
| | | is_onnx = kwargs.get("type", "onnx") == "onnx" |
| | | encoder_class = tables.encoder_classes.get(kwargs["encoder"]+"Export") |
| | | self.encoder = encoder_class(self.encoder, onnx=is_onnx) |
| | | |
| | | predictor_class = tables.predictor_classes.get(kwargs["predictor"]+"Export") |
| | | self.predictor = predictor_class(self.predictor, onnx=is_onnx) |
| | | |
| | | |
| | | decoder_class = tables.decoder_classes.get(kwargs["decoder"]+"Export") |
| | | self.decoder = decoder_class(self.decoder, onnx=is_onnx) |
| | | |
| | | from funasr.utils.torch_function import MakePadMask |
| | | from funasr.utils.torch_function import sequence_mask |
| | | |
| | | |
| | | if is_onnx: |
| | | self.make_pad_mask = MakePadMask(max_seq_len, flip=False) |
| | | else: |
| | | self.make_pad_mask = sequence_mask(max_seq_len, flip=False) |
| | | |
| | | self.forward = self._export_forward |
| | | |
| | | return self |
| | | |
| | | def export_forward( |
| | | self, |
| | | speech: torch.Tensor, |
| | | speech_lengths: torch.Tensor, |
| | | ): |
| | | # a. To device |
| | | batch = {"speech": speech, "speech_lengths": speech_lengths} |
| | | batch = to_device(batch, device=self.device) |
| | | |
| | | enc, enc_len = self.encoder(**batch) |
| | | mask = self.make_pad_mask(enc_len)[:, None, :] |
| | | pre_acoustic_embeds, pre_token_length, alphas, pre_peak_index = self.predictor(enc, mask) |
| | | pre_token_length = pre_token_length.floor().type(torch.int32) |
| | | |
| | | decoder_out, _ = self.decoder(enc, enc_len, pre_acoustic_embeds, pre_token_length) |
| | | decoder_out = torch.log_softmax(decoder_out, dim=-1) |
| | | # sample_ids = decoder_out.argmax(dim=-1) |
| | | |
| | | return decoder_out, pre_token_length |
| | | |
| | | def export_dummy_inputs(self): |
| | | speech = torch.randn(2, 30, 560) |
| | | speech_lengths = torch.tensor([6, 30], dtype=torch.int32) |
| | | return (speech, speech_lengths) |
| | | |
| | | |
| | | def export_input_names(self): |
| | | return ['speech', 'speech_lengths'] |
| | | |
| | | def export_output_names(self): |
| | | return ['logits', 'token_num'] |
| | | |
| | | def export_dynamic_axes(self): |
| | | return { |
| | | 'speech': { |
| | | 0: 'batch_size', |
| | | 1: 'feats_length' |
| | | }, |
| | | 'speech_lengths': { |
| | | 0: 'batch_size', |
| | | }, |
| | | 'logits': { |
| | | 0: 'batch_size', |
| | | 1: 'logits_length' |
| | | }, |
| | | } |
| | | |
| | | def export_name(self, ): |
| | | return "model.onnx" |