| | |
| | | |
| | | |
| | | from funasr.models.paraformer.search import Hypothesis |
| | | from .utils.ctc_alignment import ctc_forced_align |
| | | |
| | | |
| | | class SinusoidalPositionEncoder(torch.nn.Module): |
| | |
| | | ilens: torch.Tensor, |
| | | ): |
| | | """Embed positions in tensor.""" |
| | | masks = sequence_mask(ilens, device=ilens.device)[:, None, :] |
| | | maxlen = xs_pad.shape[1] |
| | | masks = sequence_mask(ilens, maxlen=maxlen, device=ilens.device)[:, None, :] |
| | | |
| | | xs_pad *= self.output_size() ** 0.5 |
| | | |
| | |
| | | |
| | | use_itn = kwargs.get("use_itn", False) |
| | | textnorm = kwargs.get("text_norm", None) |
| | | output_timestamp = kwargs.get("output_timestamp", False) |
| | | |
| | | if textnorm is None: |
| | | textnorm = "withitn" if use_itn else "woitn" |
| | | textnorm_query = self.embed( |
| | |
| | | # Change integer-ids to tokens |
| | | text = tokenizer.decode(token_int) |
| | | |
| | | result_i = {"key": key[i], "text": text} |
| | | results.append(result_i) |
| | | # result_i = {"key": key[i], "text": text} |
| | | # results.append(result_i) |
| | | |
| | | if ibest_writer is not None: |
| | | ibest_writer["text"][key[i]] = text |
| | | |
| | | if output_timestamp: |
| | | from itertools import groupby |
| | | |
| | | timestamp = [] |
| | | tokens = tokenizer.text2tokens(text)[4:] |
| | | token_back_to_id = tokenizer.tokens2ids(tokens) |
| | | token_ids = [] |
| | | for tok_ls in token_back_to_id: |
| | | if tok_ls: token_ids.extend(tok_ls) |
| | | else: token_ids.append(124) |
| | | |
| | | logits_speech = self.ctc.softmax(encoder_out)[i, 4 : encoder_out_lens[i].item(), :] |
| | | pred = logits_speech.argmax(-1).cpu() |
| | | logits_speech[pred == self.blank_id, self.blank_id] = 0 |
| | | align = ctc_forced_align( |
| | | logits_speech.unsqueeze(0).float(), |
| | | torch.Tensor(token_ids).unsqueeze(0).long().to(logits_speech.device), |
| | | (encoder_out_lens[i] - 4).long(), |
| | | torch.tensor(len(token_ids)).unsqueeze(0).long().to(logits_speech.device), |
| | | ignore_id=self.ignore_id, |
| | | ) |
| | | pred = groupby(align[0, : encoder_out_lens[i]]) |
| | | _start = 0 |
| | | token_id = 0 |
| | | ts_max = encoder_out_lens[i] - 4 |
| | | for pred_token, pred_frame in pred: |
| | | _end = _start + len(list(pred_frame)) |
| | | if pred_token != 0: |
| | | ts_left = max((_start * 60 - 30) / 1000, 0) |
| | | ts_right = min((_end * 60 - 30) / 1000, (ts_max * 60 - 30) / 1000) |
| | | timestamp.append([tokens[token_id], ts_left, ts_right]) |
| | | token_id += 1 |
| | | _start = _end |
| | | timestamp = self.post(timestamp) |
| | | result_i = {"key": key[i], "text": text, "timestamp": timestamp} |
| | | results.append(result_i) |
| | | else: |
| | | result_i = {"key": key[i], "text": text} |
| | | results.append(result_i) |
| | | return results, meta_data |
| | | |
| | | def post(self, timestamp): |
| | | timestamp_new = [] |
| | | prev_word = None |
| | | for i, t in enumerate(timestamp): |
| | | word, start, end = t |
| | | start = int(start * 1000) |
| | | end = int(end * 1000) |
| | | if word == "▁": |
| | | continue |
| | | if i == 0: |
| | | # timestamp_new.append([word, start, end]) |
| | | timestamp_new.append([start, end]) |
| | | elif word.startswith("▁"): |
| | | word = word[1:] |
| | | timestamp_new.append([start, end]) |
| | | elif prev_word is not None and prev_word.isalpha() and prev_word.isascii() and word.isalpha() and word.isascii(): |
| | | prev_word += word |
| | | timestamp_new[-1][1] = end |
| | | else: |
| | | # timestamp_new[-1][0] += word |
| | | timestamp_new.append([start, end]) |
| | | prev_word = word |
| | | return timestamp_new |
| | | |
| | | def export(self, **kwargs): |
| | | from .export_meta import export_rebuild_model |
| | |
| | | kwargs["max_seq_len"] = 512 |
| | | models = export_rebuild_model(model=self, **kwargs) |
| | | return models |
| | | |
| | | return results, meta_data |