| | |
| | | frontend: Optional[AbsFrontend], |
| | | specaug: Optional[AbsSpecAug], |
| | | normalize: Optional[AbsNormalize], |
| | | encoder: AbsEncoder, |
| | | speaker_encoder: AbsEncoder, |
| | | encoder: torch.nn.Module, |
| | | speaker_encoder: Optional[torch.nn.Module], |
| | | ci_scorer: torch.nn.Module, |
| | | cd_scorer: torch.nn.Module, |
| | | cd_scorer: Optional[torch.nn.Module], |
| | | decoder: torch.nn.Module, |
| | | token_list: list, |
| | | lsm_weight: float = 0.1, |
| | |
| | | normalize_length=length_normalized_loss, |
| | | ) |
| | | self.criterion_bce = SequenceBinaryCrossEntropy(normalize_length=length_normalized_loss) |
| | | self.pse_embedding = self.generate_pse_embedding() |
| | | self.power_weight = torch.from_numpy(2 ** np.arange(max_spk_num)[np.newaxis, np.newaxis, :]) |
| | | self.int_token_arr = torch.from_numpy(np.array(self.token_list).astype(int)[np.newaxis, np.newaxis, :]) |
| | | pse_embedding = self.generate_pse_embedding() |
| | | self.register_buffer("pse_embedding", pse_embedding) |
| | | power_weight = torch.from_numpy(2 ** np.arange(max_spk_num)[np.newaxis, np.newaxis, :]).float() |
| | | self.register_buffer("power_weight", power_weight) |
| | | int_token_arr = torch.from_numpy(np.array(self.token_list).astype(int)[np.newaxis, np.newaxis, :]).int() |
| | | self.register_buffer("int_token_arr", int_token_arr) |
| | | self.speaker_discrimination_loss_weight = speaker_discrimination_loss_weight |
| | | self.inter_score_loss_weight = inter_score_loss_weight |
| | | self.forward_steps = 0 |
| | |
| | | def generate_pse_embedding(self): |
| | | embedding = np.zeros((len(self.token_list), self.max_spk_num), dtype=np.float) |
| | | for idx, pse_label in enumerate(self.token_list): |
| | | emb = int2vec(pse_label, vec_dim=self.max_spk_num, dtype=np.float) |
| | | emb = int2vec(int(pse_label), vec_dim=self.max_spk_num, dtype=np.float) |
| | | embedding[idx] = emb |
| | | return torch.from_numpy(embedding) |
| | | |
| | |
| | | ) |
| | | # 2. Calculate power-set encoding (PSE) labels |
| | | raw_pse_labels = torch.sum(binary_labels * self.power_weight, dim=2, keepdim=True) |
| | | pse_labels = torch.argmax(raw_pse_labels == self.int_token_arr, dim=2) |
| | | pse_labels = torch.argmax((raw_pse_labels.int() == self.int_token_arr).float(), dim=2) |
| | | |
| | | # If encoder uses conv* as input_layer (i.e., subsampling), |
| | | # the sequence length of 'pred' might be slightly less than the |
| | |
| | | loss_diar = self.classification_loss(pred, pse_labels, binary_labels_lengths) |
| | | loss_spk_dis = self.speaker_discrimination_loss(profile, profile_lengths) |
| | | loss_inter_ci, loss_inter_cd = self.internal_score_loss(cd_score, ci_score, pse_labels, binary_labels_lengths) |
| | | label_mask = make_pad_mask(binary_labels_lengths, maxlen=pse_labels.shape[1]) |
| | | label_mask = make_pad_mask(binary_labels_lengths, maxlen=pse_labels.shape[1]).to(pse_labels.device) |
| | | loss = (loss_diar + self.speaker_discrimination_loss_weight * loss_spk_dis |
| | | + self.inter_score_loss_weight * (loss_inter_ci + loss_inter_cd)) |
| | | |
| | |
| | | speaker_falarm, |
| | | speaker_error, |
| | | ) = self.calc_diarization_error( |
| | | pred=F.embedding(pred.argmax(dim=2) * label_mask, self.pse_embedding), |
| | | label=F.embedding(pse_labels * label_mask, self.pse_embedding), |
| | | pred=F.embedding(pred.argmax(dim=2) * (~label_mask), self.pse_embedding), |
| | | label=F.embedding(pse_labels * (~label_mask), self.pse_embedding), |
| | | length=binary_labels_lengths |
| | | ) |
| | | |
| | |
| | | labels: torch.Tensor, |
| | | prediction_lengths: torch.Tensor |
| | | ) -> torch.Tensor: |
| | | mask = make_pad_mask(prediction_lengths, maxlen=labels.shape[1]) |
| | | pad_labels = labels.masked_fill( |
| | | make_pad_mask(prediction_lengths, maxlen=labels.shape[1]), |
| | | mask.to(predictions.device), |
| | | value=self.ignore_id |
| | | ) |
| | | loss = self.criterion_diar(predictions, pad_labels) |
| | | loss = self.criterion_diar(predictions.contiguous(), pad_labels) |
| | | |
| | | return loss |
| | | |
| | |
| | | ) -> torch.Tensor: |
| | | profile_mask = (torch.linalg.norm(profile, ord=2, dim=2, keepdim=True) > 0).float() # (B, N, 1) |
| | | mask = torch.matmul(profile_mask, profile_mask.transpose(1, 2)) # (B, N, N) |
| | | mask = mask * (1.0 - torch.eye(self.max_spk_num).unsqueeze(0)) |
| | | mask = mask * (1.0 - torch.eye(self.max_spk_num).unsqueeze(0).to(mask)) |
| | | |
| | | eps = 1e-12 |
| | | coding_norm = torch.linalg.norm( |
| | | profile * profile_mask + (1 - profile_mask) * eps, |
| | | dim=2, keepdim=True |
| | | ) * profile_mask |
| | | cos_theta = F.cosine_similarity(profile, profile, dim=2, eps=eps) * mask |
| | | # profile: Batch, N, dim |
| | | cos_theta = F.cosine_similarity(profile.unsqueeze(2), profile.unsqueeze(1), dim=-1, eps=eps) * mask |
| | | cos_theta = torch.clip(cos_theta, -1 + eps, 1 - eps) |
| | | loss = (F.relu(mask * coding_norm * (cos_theta - 0.0))).sum() / mask.sum() |
| | | |
| | | return loss |
| | | |
| | | def calculate_multi_labels(self, pse_labels, pse_labels_lengths): |
| | | mask = make_pad_mask(pse_labels_lengths, maxlen=pse_labels.shape[1]) |
| | | padding_labels = pse_labels.masked_fill( |
| | | make_pad_mask(pse_labels_lengths, maxlen=pse_labels.shape[1]), |
| | | mask.to(pse_labels.device), |
| | | value=0 |
| | | ).to(pse_labels.dtype) |
| | | ).to(pse_labels) |
| | | multi_labels = F.embedding(padding_labels, self.pse_embedding) |
| | | |
| | | return multi_labels |
| | |
| | | speaker_encoder_outputs: torch.Tensor, |
| | | seq_len: torch.Tensor = None, |
| | | spk_len: torch.Tensor = None, |
| | | ) -> torch.Tensor: |
| | | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | bb, tt = speech_encoder_outputs.shape[0], speech_encoder_outputs.shape[1] |
| | | d_sph, d_spk = speech_encoder_outputs.shape[2], speaker_encoder_outputs.shape[2] |
| | | if self.normalize_speech_speaker: |
| | |
| | | ci_simi = self.ci_scorer(ge_in, ge_len)[0] |
| | | else: |
| | | ci_simi = self.ci_scorer(speech_encoder_outputs, speaker_encoder_outputs) |
| | | simi = torch.cat([cd_simi, ci_simi], dim=2) |
| | | |
| | | return simi |
| | | return ci_simi, cd_simi |
| | | |
| | | def post_net_forward(self, simi, seq_len): |
| | | logits = self.decoder(simi, seq_len)[0] |
| | |
| | | # speaker encoding |
| | | profile, profile_lengths = self.encode_speaker(profile, profile_lengths) |
| | | # calculating similarity |
| | | similarity = self.calc_similarity(speech, profile, speech_lengths, profile_lengths) |
| | | ci_simi, cd_simi = self.calc_similarity(speech, profile, speech_lengths, profile_lengths) |
| | | similarity = torch.cat([cd_simi, ci_simi], dim=2) |
| | | # post net forward |
| | | logits = self.post_net_forward(similarity, speech_lengths) |
| | | |
| | | if return_inter_outputs: |
| | | return logits, [(speech, speech_lengths), (profile, profile_lengths), torch.split(similarity, 2)] |
| | | return logits, [(speech, speech_lengths), (profile, profile_lengths), (ci_simi, cd_simi)] |
| | | return logits |
| | | |
| | | def encode( |
| | |
| | | # 4. Forward encoder |
| | | # feats: (Batch, Length, Dim) |
| | | # -> encoder_out: (Batch, Length2, Dim) |
| | | encoder_out, encoder_out_lens, _ = self.encoder(feats, feats_lengths) |
| | | encoder_outputs = self.encoder(feats, feats_lengths) |
| | | encoder_out, encoder_out_lens = encoder_outputs[:2] |
| | | |
| | | assert encoder_out.size(0) == speech.size(0), ( |
| | | encoder_out.size(), |
| | |
| | | |
| | | (batch_size, max_len, num_output) = label.size() |
| | | # mask the padding part |
| | | mask = np.zeros((batch_size, max_len, num_output)) |
| | | for i in range(batch_size): |
| | | mask[i, : length[i], :] = 1 |
| | | mask = ~make_pad_mask(length, maxlen=label.shape[1]).unsqueeze(-1).numpy() |
| | | |
| | | # pred and label have the shape (batch_size, max_len, num_output) |
| | | label_np = label.data.cpu().numpy().astype(int) |