| | |
| | | n_feat, |
| | | dropout_rate, |
| | | kernel_size, |
| | | sanm_shift=0, |
| | | sanm_shfit=0, |
| | | lora_list=None, |
| | | lora_rank=8, |
| | | lora_alpha=16, |
| | |
| | | ) |
| | | # padding |
| | | left_padding = (kernel_size - 1) // 2 |
| | | if sanm_shift > 0: |
| | | left_padding = left_padding + sanm_shift |
| | | if sanm_shfit > 0: |
| | | left_padding = left_padding + sanm_shfit |
| | | right_padding = kernel_size - 1 - left_padding |
| | | self.pad_fn = nn.ConstantPad1d((left_padding, right_padding), 0.0) |
| | | |
| | | def forward_fsmn(self, inputs, mask, mask_shift_chunk=None): |
| | | def forward_fsmn(self, inputs, mask, mask_shfit_chunk=None): |
| | | b, t, d = inputs.size() |
| | | if mask is not None: |
| | | mask = torch.reshape(mask, (b, -1, 1)) |
| | | if mask_shift_chunk is not None: |
| | | mask = mask * mask_shift_chunk |
| | | if mask_shfit_chunk is not None: |
| | | mask = mask * mask_shfit_chunk |
| | | inputs = inputs * mask |
| | | |
| | | x = inputs.transpose(1, 2) |
| | |
| | | |
| | | return self.linear_out(x) # (batch, time1, d_model) |
| | | |
| | | def forward(self, x, mask, mask_shift_chunk=None, mask_att_chunk_encoder=None): |
| | | def forward(self, x, mask, mask_shfit_chunk=None, mask_att_chunk_encoder=None): |
| | | """Compute scaled dot product attention. |
| | | |
| | | Args: |
| | |
| | | |
| | | """ |
| | | q_h, k_h, v_h, v = self.forward_qkv(x) |
| | | fsmn_memory = self.forward_fsmn(v, mask, mask_shift_chunk) |
| | | fsmn_memory = self.forward_fsmn(v, mask, mask_shfit_chunk) |
| | | q_h = q_h * self.d_k ** (-0.5) |
| | | scores = torch.matmul(q_h, k_h.transpose(-2, -1)) |
| | | att_outs = self.forward_attention(v_h, scores, mask, mask_att_chunk_encoder) |
| | |
| | | self.stochastic_depth_rate = stochastic_depth_rate |
| | | self.dropout_rate = dropout_rate |
| | | |
| | | def forward(self, x, mask, cache=None, mask_shift_chunk=None, mask_att_chunk_encoder=None): |
| | | def forward(self, x, mask, cache=None, mask_shfit_chunk=None, mask_att_chunk_encoder=None): |
| | | """Compute encoded features. |
| | | |
| | | Args: |
| | |
| | | self.self_attn( |
| | | x, |
| | | mask, |
| | | mask_shift_chunk=mask_shift_chunk, |
| | | mask_shfit_chunk=mask_shfit_chunk, |
| | | mask_att_chunk_encoder=mask_att_chunk_encoder, |
| | | ), |
| | | ), |
| | |
| | | self.self_attn( |
| | | x, |
| | | mask, |
| | | mask_shift_chunk=mask_shift_chunk, |
| | | mask_shfit_chunk=mask_shfit_chunk, |
| | | mask_att_chunk_encoder=mask_att_chunk_encoder, |
| | | ) |
| | | ) |
| | |
| | | self.self_attn( |
| | | x, |
| | | mask, |
| | | mask_shift_chunk=mask_shift_chunk, |
| | | mask_shfit_chunk=mask_shfit_chunk, |
| | | mask_att_chunk_encoder=mask_att_chunk_encoder, |
| | | ) |
| | | ) |
| | |
| | | if not self.normalize_before: |
| | | x = self.norm2(x) |
| | | |
| | | return x, mask, cache, mask_shift_chunk, mask_att_chunk_encoder |
| | | return x, mask, cache, mask_shfit_chunk, mask_att_chunk_encoder |
| | | |
| | | def forward_chunk(self, x, cache=None, chunk_size=None, look_back=0): |
| | | """Compute encoded features. |
| | |
| | | positionwise_conv_kernel_size: int = 1, |
| | | padding_idx: int = -1, |
| | | kernel_size: int = 11, |
| | | sanm_shift: int = 0, |
| | | sanm_shfit: int = 0, |
| | | selfattention_layer_type: str = "sanm", |
| | | **kwargs, |
| | | ): |
| | |
| | | output_size, |
| | | attention_dropout_rate, |
| | | kernel_size, |
| | | sanm_shift, |
| | | sanm_shfit, |
| | | ) |
| | | encoder_selfattn_layer_args = ( |
| | | attention_heads, |
| | |
| | | output_size, |
| | | attention_dropout_rate, |
| | | kernel_size, |
| | | sanm_shift, |
| | | sanm_shfit, |
| | | ) |
| | | |
| | | self.encoders0 = nn.ModuleList( |
| | |
| | | |
| | | timestamp = [] |
| | | tokens = tokenizer.text2tokens(text)[4:] |
| | | token_back_to_id = tokenizer.tokens2ids(tokens) |
| | | token_ids = [] |
| | | for tok_ls in token_back_to_id: |
| | | if tok_ls: token_ids.extend(tok_ls) |
| | | else: token_ids.append(124) |
| | | |
| | | if len(token_ids) == 0: |
| | | result_i = {"key": key[i], "text": text} |
| | | results.append(result_i) |
| | | continue |
| | | |
| | | logits_speech = self.ctc.softmax(encoder_out)[i, 4 : encoder_out_lens[i].item(), :] |
| | | pred = logits_speech.argmax(-1).cpu() |
| | | logits_speech[pred == self.blank_id, self.blank_id] = 0 |
| | | align = ctc_forced_align( |
| | | logits_speech.unsqueeze(0).float(), |
| | | torch.Tensor(token_int[4:]).unsqueeze(0).long().to(logits_speech.device), |
| | | (encoder_out_lens - 4).long(), |
| | | torch.tensor(len(token_int) - 4).unsqueeze(0).long().to(logits_speech.device), |
| | | torch.Tensor(token_ids).unsqueeze(0).long().to(logits_speech.device), |
| | | (encoder_out_lens[i] - 4).long(), |
| | | torch.tensor(len(token_ids)).unsqueeze(0).long().to(logits_speech.device), |
| | | ignore_id=self.ignore_id, |
| | | ) |
| | | pred = groupby(align[0, : encoder_out_lens[0]]) |
| | | pred = groupby(align[0, : encoder_out_lens[i]]) |
| | | _start = 0 |
| | | token_id = 0 |
| | | ts_max = encoder_out_lens[i] - 4 |
| | |
| | | timestamp.append([tokens[token_id], ts_left, ts_right]) |
| | | token_id += 1 |
| | | _start = _end |
| | | timestamp = self.post(timestamp) |
| | | result_i = {"key": key[i], "text": text, "timestamp": timestamp} |
| | | timestamp, words = self.post(timestamp) |
| | | result_i = {"key": key[i], "text": text, "timestamp": timestamp, "words": words} |
| | | results.append(result_i) |
| | | else: |
| | | result_i = {"key": key[i], "text": text} |
| | |
| | | |
| | | def post(self, timestamp): |
| | | timestamp_new = [] |
| | | words_new = [] |
| | | prev_word = None |
| | | for i, t in enumerate(timestamp): |
| | | word, start, end = t |
| | | start = int(start * 1000) |
| | | end = int(end * 1000) |
| | | if word == "▁": |
| | | continue |
| | | if i == 0: |
| | | # timestamp_new.append([word, start, end]) |
| | | timestamp_new.append([int(start * 1000), int(end * 1000)]) |
| | | elif word.startswith("▁") or len(word) == 1 or not word[1].isalpha(): |
| | | timestamp_new.append([start, end]) |
| | | words_new.append(word) |
| | | elif word.startswith("▁"): |
| | | word = word[1:] |
| | | # timestamp_new.append([word, start, end]) |
| | | timestamp_new.append([int(start * 1000), int(end * 1000)]) |
| | | timestamp_new.append([start, end]) |
| | | words_new.append(word) |
| | | elif prev_word is not None and prev_word.isalpha() and prev_word.isascii() and word.isalpha() and word.isascii(): |
| | | word = prev_word + word |
| | | timestamp_new[-1][1] = end |
| | | words_new[-1] = word |
| | | else: |
| | | # timestamp_new[-1][0] += word |
| | | timestamp_new[-1][1] = int(end * 1000) |
| | | return timestamp_new |
| | | timestamp_new.append([start, end]) |
| | | words_new.append(word) |
| | | prev_word = word |
| | | return timestamp_new, words_new |
| | | |
| | | def export(self, **kwargs): |
| | | from .export_meta import export_rebuild_model |