| | |
| | | import time |
| | | import math |
| | | import torch |
| | | import numpy as np |
| | | from torch import nn |
| | | from enum import Enum |
| | | from dataclasses import dataclass |
| | |
| | | cache["stats"].data_buf_all = torch.cat( |
| | | (cache["stats"].data_buf_all, cache["stats"].waveform[0]) |
| | | ) |
| | | for offset in range( |
| | | 0, cache["stats"].waveform.shape[1] - frame_sample_length + 1, frame_shift_length |
| | | ): |
| | | cache["stats"].decibel.append( |
| | | 10 |
| | | * math.log10( |
| | | (cache["stats"].waveform[0][offset : offset + frame_sample_length]) |
| | | .square() |
| | | .sum() |
| | | + 0.000001 |
| | | ) |
| | | ) |
| | | |
| | | waveform_numpy = cache["stats"].waveform.numpy() |
| | | |
| | | offsets = np.arange(0, waveform_numpy.shape[1] - frame_sample_length + 1, frame_shift_length) |
| | | frames = waveform_numpy[0, offsets[:, np.newaxis] + np.arange(frame_sample_length)] |
| | | |
| | | decibel_numpy = 10 * np.log10(np.sum(np.square(frames), axis=1) + 0.000001) |
| | | decibel_numpy = decibel_numpy.tolist() |
| | | |
| | | cache["stats"].decibel.extend(decibel_numpy) |
| | | |
| | | |
| | | def ComputeScores(self, feats: torch.Tensor, cache: dict = {}) -> None: |
| | | scores = self.encoder(feats, cache=cache["encoder"]).to("cpu") # return B * T * D |
| | |
| | | cur_seg = cache["stats"].output_data_buf[-1] |
| | | if cur_seg.end_ms != start_frm * self.vad_opts.frame_in_ms: |
| | | print("warning\n") |
| | | out_pos = len(cur_seg.buffer) # cur_seg.buff现在没做任何操作 |
| | | data_to_pop = 0 |
| | | if end_point_is_sent_end: |
| | | data_to_pop = expected_sample_number |
| | |
| | | expected_sample_number = len(cache["stats"].data_buf) |
| | | |
| | | cur_seg.doa = 0 |
| | | for sample_cpy_out in range(0, data_to_pop): |
| | | # cur_seg.buffer[out_pos ++] = data_buf_.back(); |
| | | out_pos += 1 |
| | | for sample_cpy_out in range(data_to_pop, expected_sample_number): |
| | | # cur_seg.buffer[out_pos++] = data_buf_.back() |
| | | out_pos += 1 |
| | | if cur_seg.end_ms != start_frm * self.vad_opts.frame_in_ms: |
| | | print("Something wrong with the VAD algorithm\n") |
| | | cache["stats"].data_buf_start_frame += frm_cnt |
| | |
| | | assert len(cache["stats"].sil_pdf_ids) == self.vad_opts.silence_pdf_num |
| | | if len(cache["stats"].sil_pdf_ids) > 0: |
| | | assert len(cache["stats"].scores) == 1 # 只支持batch_size = 1的测试 |
| | | sil_pdf_scores = [ |
| | | cache["stats"].scores[0][t][sil_pdf_id] for sil_pdf_id in cache["stats"].sil_pdf_ids |
| | | ] |
| | | sum_score = sum(sil_pdf_scores) |
| | | """ |
| | | - Change type of `sum_score` to float. The reason is that `sum_score` is a tensor with single element. |
| | | and `torch.Tensor` is slower `float` when tensor has only one element. |
| | | - Put the iteration of `sil_pdf_ids` inside `sum()` to reduce the overhead of creating a new list. |
| | | - The default `sil_pdf_ids` is [0], the `if` statement is used to reduce the overhead of expression |
| | | generation, which result in a mere (~2%) performance gain. |
| | | """ |
| | | if len(cache["stats"].sil_pdf_ids) > 1: |
| | | sum_score = sum(cache["stats"].scores[0][t][sil_pdf_id].item() for sil_pdf_id in cache["stats"].sil_pdf_ids) |
| | | else: |
| | | sum_score = cache["stats"].scores[0][t][cache["stats"].sil_pdf_ids[0]].item() |
| | | noise_prob = math.log(sum_score) * self.vad_opts.speech_2_noise_ratio |
| | | total_score = 1.0 |
| | | sum_score = total_score - sum_score |