update sensevoice small with timestamp
| | |
| | | text = rich_transcription_postprocess(res[0]["text"]) |
| | | print(text) |
| | | |
| | | # en with timestamp |
| | | res = model.generate( |
| | | input=f"{model.model_path}/example/en.mp3", |
| | | cache={}, |
| | | language="auto", # "zn", "en", "yue", "ja", "ko", "nospeech" |
| | | use_itn=True, |
| | | batch_size_s=60, |
| | | merge_vad=True, # |
| | | merge_length_s=15, |
| | | output_timestamp=True, |
| | | ) |
| | | print(res) |
| | | text = rich_transcription_postprocess(res[0]["text"]) |
| | | print(text) |
| | | |
| | | # zh |
| | | res = model.generate( |
| | | input=f"{model.model_path}/example/zh.mp3", |
| | |
| | | text = rich_transcription_postprocess(res[0]["text"]) |
| | | print(text) |
| | | |
| | | # zh with timestamp |
| | | res = model.generate( |
| | | input=f"{model.model_path}/example/zh.mp3", |
| | | cache={}, |
| | | language="auto", # "zn", "en", "yue", "ja", "ko", "nospeech" |
| | | use_itn=True, |
| | | batch_size_s=60, |
| | | merge_vad=True, # |
| | | merge_length_s=15, |
| | | output_timestamp=True, |
| | | ) |
| | | print(res) |
| | | text = rich_transcription_postprocess(res[0]["text"]) |
| | | print(text) |
| | | |
| | | # yue |
| | | res = model.generate( |
| | | input=f"{model.model_path}/example/yue.mp3", |
| | |
| | | |
| | | |
| | | from funasr.models.paraformer.search import Hypothesis |
| | | from funasr.models.sense_voice.utils.ctc_alignment import ctc_forced_align |
| | | |
| | | |
| | | class SinusoidalPositionEncoder(torch.nn.Module): |
| | |
| | | |
| | | use_itn = kwargs.get("use_itn", False) |
| | | textnorm = kwargs.get("text_norm", None) |
| | | output_timestamp = kwargs.get("output_timestamp", False) |
| | | |
| | | if textnorm is None: |
| | | textnorm = "withitn" if use_itn else "woitn" |
| | | textnorm_query = self.embed( |
| | |
| | | # Change integer-ids to tokens |
| | | text = tokenizer.decode(token_int) |
| | | |
| | | result_i = {"key": key[i], "text": text} |
| | | results.append(result_i) |
| | | # result_i = {"key": key[i], "text": text} |
| | | # results.append(result_i) |
| | | |
| | | if ibest_writer is not None: |
| | | ibest_writer["text"][key[i]] = text |
| | | |
| | | if output_timestamp: |
| | | from itertools import groupby |
| | | timestamp = [] |
| | | tokens = tokenizer.text2tokens(text)[4:] |
| | | logits_speech = self.ctc.softmax(encoder_out)[i, 4:encoder_out_lens[i].item(), :] |
| | | pred = logits_speech.argmax(-1).cpu() |
| | | logits_speech[pred==self.blank_id, self.blank_id] = 0 |
| | | align = ctc_forced_align( |
| | | logits_speech.unsqueeze(0).float(), |
| | | torch.Tensor(token_int[4:]).unsqueeze(0).long().to(logits_speech.device), |
| | | (encoder_out_lens-4).long(), |
| | | torch.tensor(len(token_int)-4).unsqueeze(0).long().to(logits_speech.device), |
| | | ignore_id=self.ignore_id, |
| | | ) |
| | | pred = groupby(align[0, :encoder_out_lens[0]]) |
| | | _start = 0 |
| | | token_id = 0 |
| | | ts_max = encoder_out_lens[i] - 4 |
| | | for pred_token, pred_frame in pred: |
| | | _end = _start + len(list(pred_frame)) |
| | | if pred_token != 0: |
| | | ts_left = max((_start*60-30)/1000, 0) |
| | | ts_right = min((_end*60-30)/1000, (ts_max*60-30)/1000) |
| | | timestamp.append([tokens[token_id], ts_left, ts_right]) |
| | | token_id += 1 |
| | | _start = _end |
| | | timestamp = self.post(timestamp) |
| | | result_i = {"key": key[i], "text": text, "timestamp": timestamp} |
| | | results.append(result_i) |
| | | else: |
| | | result_i = {"key": key[i], "text": text} |
| | | results.append(result_i) |
| | | return results, meta_data |
| | | |
| | | def post(self, timestamp): |
| | | timestamp_new = [] |
| | | for i, t in enumerate(timestamp): |
| | | word, start, end = t |
| | | if word == '▁': |
| | | continue |
| | | if i == 0: |
| | | # timestamp_new.append([word, start, end]) |
| | | timestamp_new.append([int(start*1000), int(end*1000)]) |
| | | elif word.startswith("▁") or len(word) == 1 or not word[1].isalpha(): |
| | | word = word[1:] |
| | | # timestamp_new.append([word, start, end]) |
| | | timestamp_new.append([int(start*1000), int(end*1000)]) |
| | | else: |
| | | # timestamp_new[-1][0] += word |
| | | timestamp_new[-1][1] = int(end*1000) |
| | | return timestamp_new |
| | | def export(self, **kwargs): |
| | | from .export_meta import export_rebuild_model |
| | | from export_meta import export_rebuild_model |
| | | |
| | | if "max_seq_len" not in kwargs: |
| | | kwargs["max_seq_len"] = 512 |
| | | models = export_rebuild_model(model=self, **kwargs) |
| | | return models |
| | | |
| | | return results, meta_data |
| | | |
| New file |
| | |
| | | import torch |
| | | def ctc_forced_align( |
| | | log_probs: torch.Tensor, |
| | | targets: torch.Tensor, |
| | | input_lengths: torch.Tensor, |
| | | target_lengths: torch.Tensor, |
| | | blank: int = 0, |
| | | ignore_id: int = -1, |
| | | ) -> torch.Tensor: |
| | | """Align a CTC label sequence to an emission. |
| | | Args: |
| | | log_probs (Tensor): log probability of CTC emission output. |
| | | Tensor of shape `(B, T, C)`. where `B` is the batch size, `T` is the input length, |
| | | `C` is the number of characters in alphabet including blank. |
| | | targets (Tensor): Target sequence. Tensor of shape `(B, L)`, |
| | | where `L` is the target length. |
| | | input_lengths (Tensor): |
| | | Lengths of the inputs (max value must each be <= `T`). 1-D Tensor of shape `(B,)`. |
| | | target_lengths (Tensor): |
| | | Lengths of the targets. 1-D Tensor of shape `(B,)`. |
| | | blank_id (int, optional): The index of blank symbol in CTC emission. (Default: 0) |
| | | ignore_id (int, optional): The index of ignore symbol in CTC emission. (Default: -1) |
| | | """ |
| | | targets[targets == ignore_id] = blank |
| | | batch_size, input_time_size, _ = log_probs.size() |
| | | bsz_indices = torch.arange(batch_size, device=input_lengths.device) |
| | | _t_a_r_g_e_t_s_ = torch.cat( |
| | | ( |
| | | torch.stack((torch.full_like(targets, blank), targets), dim=-1).flatten(start_dim=1), |
| | | torch.full_like(targets[:, :1], blank), |
| | | ), |
| | | dim=-1, |
| | | ) |
| | | diff_labels = torch.cat( |
| | | ( |
| | | torch.as_tensor([[False, False]], device=targets.device).expand(batch_size, -1), |
| | | _t_a_r_g_e_t_s_[:, 2:] != _t_a_r_g_e_t_s_[:, :-2], |
| | | ), |
| | | dim=1, |
| | | ) |
| | | neg_inf = torch.tensor(float("-inf"), device=log_probs.device, dtype=log_probs.dtype) |
| | | padding_num = 2 |
| | | padded_t = padding_num + _t_a_r_g_e_t_s_.size(-1) |
| | | best_score = torch.full((batch_size, padded_t), neg_inf, device=log_probs.device, dtype=log_probs.dtype) |
| | | best_score[:, padding_num + 0] = log_probs[:, 0, blank] |
| | | best_score[:, padding_num + 1] = log_probs[bsz_indices, 0, _t_a_r_g_e_t_s_[:, 1]] |
| | | backpointers = torch.zeros((batch_size, input_time_size, padded_t), device=log_probs.device, dtype=targets.dtype) |
| | | for t in range(1, input_time_size): |
| | | prev = torch.stack( |
| | | (best_score[:, 2:], best_score[:, 1:-1], torch.where(diff_labels, best_score[:, :-2], neg_inf)) |
| | | ) |
| | | prev_max_value, prev_max_idx = prev.max(dim=0) |
| | | best_score[:, padding_num:] = log_probs[:, t].gather(-1, _t_a_r_g_e_t_s_) + prev_max_value |
| | | backpointers[:, t, padding_num:] = prev_max_idx |
| | | l1l2 = best_score.gather( |
| | | -1, torch.stack((padding_num + target_lengths * 2 - 1, padding_num + target_lengths * 2), dim=-1) |
| | | ) |
| | | path = torch.zeros((batch_size, input_time_size), device=best_score.device, dtype=torch.long) |
| | | path[bsz_indices, input_lengths - 1] = padding_num + target_lengths * 2 - 1 + l1l2.argmax(dim=-1) |
| | | for t in range(input_time_size - 1, 0, -1): |
| | | target_indices = path[:, t] |
| | | prev_max_idx = backpointers[bsz_indices, t, target_indices] |
| | | path[:, t - 1] += target_indices - prev_max_idx |
| | | alignments = _t_a_r_g_e_t_s_.gather(dim=-1, index=(path - padding_num).clamp(min=0)) |
| | | return alignments |