| | |
| | | from funasr.models.transformer.utils.add_sos_eos import add_sos_eos |
| | | from funasr.models.transformer.utils.nets_utils import make_pad_mask, pad_list |
| | | from funasr.utils.load_utils import load_audio_text_image_video, extract_fbank |
| | | |
| | | import pdb |
| | | |
| | | if LooseVersion(torch.__version__) >= LooseVersion("1.6.0"): |
| | | from torch.cuda.amp import autocast |
| | |
| | | crit_attn_weight = kwargs.get("crit_attn_weight", 0.0) |
| | | crit_attn_smooth = kwargs.get("crit_attn_smooth", 0.0) |
| | | bias_encoder_dropout_rate = kwargs.get("bias_encoder_dropout_rate", 0.0) |
| | | |
| | | |
| | | if bias_encoder_type == 'lstm': |
| | | self.bias_encoder = torch.nn.LSTM(inner_dim, inner_dim, 1, batch_first=True, dropout=bias_encoder_dropout_rate) |
| | |
| | | # 1. Encoder |
| | | encoder_out, encoder_out_lens = self.encode(speech, speech_lengths) |
| | | |
| | | |
| | | loss_ctc, cer_ctc = None, None |
| | | |
| | | stats = dict() |
| | |
| | | # Collect CTC branch stats |
| | | stats["loss_ctc"] = loss_ctc.detach() if loss_ctc is not None else None |
| | | stats["cer_ctc"] = cer_ctc |
| | | |
| | | |
| | | # 2b. Attention decoder branch |
| | | loss_att, acc_att, cer_att, wer_att, loss_pre, loss_ideal = self._calc_att_clas_loss( |
| | |
| | | ): |
| | | encoder_out_mask = (~make_pad_mask(encoder_out_lens, maxlen=encoder_out.size(1))[:, None, :]).to( |
| | | encoder_out.device) |
| | | |
| | | if self.predictor_bias == 1: |
| | | _, ys_pad = add_sos_eos(ys_pad, self.sos, self.eos, self.ignore_id) |
| | | ys_pad_lens = ys_pad_lens + self.predictor_bias |
| | | |
| | | pre_acoustic_embeds, pre_token_length, _, _ = self.predictor(encoder_out, ys_pad, encoder_out_mask, |
| | | ignore_id=self.ignore_id) |
| | | |
| | | # -1. bias encoder |
| | | if self.use_decoder_embedding: |
| | | hw_embed = self.decoder.embed(hotword_pad) |
| | | else: |
| | | hw_embed = self.bias_embed(hotword_pad) |
| | | |
| | | hw_embed, (_, _) = self.bias_encoder(hw_embed) |
| | | _ind = np.arange(0, hotword_pad.shape[0]).tolist() |
| | | selected = hw_embed[_ind, [i - 1 for i in hotword_lengths.detach().cpu().tolist()]] |
| | |
| | | decoder_outs = self.decoder( |
| | | encoder_out, encoder_out_lens, sematic_embeds, ys_pad_lens, contextual_info=hw_embed, clas_scale=clas_scale |
| | | ) |
| | | |
| | | decoder_out = decoder_outs[0] |
| | | decoder_out = torch.log_softmax(decoder_out, dim=-1) |
| | | return decoder_out, ys_pad_lens |
| | |
| | | **kwargs, |
| | | ): |
| | | # init beamsearch |
| | | |
| | | is_use_ctc = kwargs.get("decoding_ctc_weight", 0.0) > 0.00001 and self.ctc != None |
| | | is_use_lm = kwargs.get("lm_weight", 0.0) > 0.00001 and kwargs.get("lm_file", None) is not None |
| | | if self.beam_search is None and (is_use_lm or is_use_ctc): |
| | |
| | | |
| | | # extract fbank feats |
| | | time1 = time.perf_counter() |
| | | |
| | | audio_sample_list = load_audio_text_image_video(data_in, fs=frontend.fs, audio_fs=kwargs.get("fs", 16000)) |
| | | |
| | | time2 = time.perf_counter() |
| | | meta_data["load_data"] = f"{time2 - time1:0.3f}" |
| | | |
| | | speech, speech_lengths = extract_fbank(audio_sample_list, data_type=kwargs.get("data_type", "sound"), |
| | | frontend=frontend) |
| | | time3 = time.perf_counter() |
| | |
| | | pre_token_length = pre_token_length.round().long() |
| | | if torch.max(pre_token_length) < 1: |
| | | return [] |
| | | |
| | | |
| | | decoder_outs = self.cal_decoder_with_predictor(encoder_out, encoder_out_lens, |
| | | pre_acoustic_embeds, |