| | |
| | | from funasr.models.transformer.utils.nets_utils import make_pad_mask |
| | | from funasr.models.decoder.abs_decoder import AbsDecoder |
| | | from funasr.models.encoder.abs_encoder import AbsEncoder |
| | | from funasr.models.frontend.abs_frontend import AbsFrontend |
| | | from funasr.frontends.abs_frontend import AbsFrontend |
| | | from funasr.models.specaug.abs_specaug import AbsSpecAug |
| | | from funasr.models.specaug.abs_profileaug import AbsProfileAug |
| | | from funasr.layers.abs_normalize import AbsNormalize |
| | |
| | | ) |
| | | self.criterion_bce = SequenceBinaryCrossEntropy(normalize_length=length_normalized_loss) |
| | | self.pse_embedding = self.generate_pse_embedding() |
| | | self.power_weight = torch.from_numpy(2 ** np.arange(max_spk_num)[np.newaxis, np.newaxis, :]).float() |
| | | self.int_token_arr = torch.from_numpy(np.array(self.token_list).astype(int)[np.newaxis, np.newaxis, :]).int() |
| | | self.power_weight = torch.from_numpy( |
| | | 2 ** np.arange(max_spk_num)[np.newaxis, np.newaxis, :] |
| | | ).float() |
| | | self.int_token_arr = torch.from_numpy( |
| | | np.array(self.token_list).astype(int)[np.newaxis, np.newaxis, :] |
| | | ).int() |
| | | self.speaker_discrimination_loss_weight = speaker_discrimination_loss_weight |
| | | self.inter_score_loss_weight = inter_score_loss_weight |
| | | self.forward_steps = 0 |
| | |
| | | def get_regularize_parameters(self): |
| | | to_regularize_parameters, normal_parameters = [], [] |
| | | for name, param in self.named_parameters(): |
| | | if ("encoder" in name and "weight" in name and "bn" not in name and |
| | | ("conv2" in name or "conv1" in name or "conv_sc" in name or "dense" in name) |
| | | if ( |
| | | "encoder" in name |
| | | and "weight" in name |
| | | and "bn" not in name |
| | | and ("conv2" in name or "conv1" in name or "conv_sc" in name or "dense" in name) |
| | | ): |
| | | to_regularize_parameters.append((name, param)) |
| | | else: |
| | |
| | | raw_profile: B, N, D |
| | | raw_binary_labels: B, T, N |
| | | """ |
| | | assert raw_profile.shape[1] == raw_binary_labels.shape[2], \ |
| | | "Num profile: {}, Num label: {}".format(raw_profile.shape[1], raw_binary_labels.shape[-1]) |
| | | assert ( |
| | | raw_profile.shape[1] == raw_binary_labels.shape[2] |
| | | ), "Num profile: {}, Num label: {}".format( |
| | | raw_profile.shape[1], raw_binary_labels.shape[-1] |
| | | ) |
| | | profile = torch.clone(raw_profile) |
| | | binary_labels = torch.clone(raw_binary_labels) |
| | | bsz, num_spk = profile.shape[0], profile.shape[1] |
| | |
| | | # 0b. augment profiles |
| | | if self.profileaug is not None and self.training: |
| | | speech, profile, binary_labels = self.profileaug( |
| | | speech, speech_lengths, |
| | | profile, profile_lengths, |
| | | binary_labels, binary_labels_lengths |
| | | speech, |
| | | speech_lengths, |
| | | profile, |
| | | profile_lengths, |
| | | binary_labels, |
| | | binary_labels_lengths, |
| | | ) |
| | | |
| | | # 1. Calculate power-set encoding (PSE) labels |
| | | pad_bin_labels = F.pad(binary_labels, (0, self.max_spk_num - binary_labels.shape[2]), "constant", 0.0) |
| | | pad_bin_labels = F.pad( |
| | | binary_labels, (0, self.max_spk_num - binary_labels.shape[2]), "constant", 0.0 |
| | | ) |
| | | raw_pse_labels = torch.sum(pad_bin_labels * self.power_weight, dim=2, keepdim=True) |
| | | pse_labels = torch.argmax((raw_pse_labels.int() == self.int_token_arr).float(), dim=2) |
| | | |
| | | # 2. Network forward |
| | | pred, inter_outputs = self.prediction_forward( |
| | | speech, speech_lengths, |
| | | profile, profile_lengths, |
| | | return_inter_outputs=True |
| | | speech, speech_lengths, profile, profile_lengths, return_inter_outputs=True |
| | | ) |
| | | (speech, speech_lengths), (profile, profile_lengths), (ci_score, cd_score) = inter_outputs |
| | | |
| | |
| | | |
| | | loss_diar = self.classification_loss(pred, pse_labels, binary_labels_lengths) |
| | | loss_spk_dis = self.speaker_discrimination_loss(profile, profile_lengths) |
| | | loss_inter_ci, loss_inter_cd = self.internal_score_loss(cd_score, ci_score, pse_labels, binary_labels_lengths) |
| | | loss_inter_ci, loss_inter_cd = self.internal_score_loss( |
| | | cd_score, ci_score, pse_labels, binary_labels_lengths |
| | | ) |
| | | regularizer_loss = None |
| | | if self.model_regularizer_weight > 0 and self.to_regularize_parameters is not None: |
| | | regularizer_loss = self.calculate_regularizer_loss() |
| | | label_mask = make_pad_mask(binary_labels_lengths, maxlen=pse_labels.shape[1]).to(pse_labels.device) |
| | | loss = (loss_diar + self.speaker_discrimination_loss_weight * loss_spk_dis |
| | | + self.inter_score_loss_weight * (loss_inter_ci + loss_inter_cd)) |
| | | label_mask = make_pad_mask(binary_labels_lengths, maxlen=pse_labels.shape[1]).to( |
| | | pse_labels.device |
| | | ) |
| | | loss = ( |
| | | loss_diar |
| | | + self.speaker_discrimination_loss_weight * loss_spk_dis |
| | | + self.inter_score_loss_weight * (loss_inter_ci + loss_inter_cd) |
| | | ) |
| | | # if regularizer_loss is not None: |
| | | # loss = loss + regularizer_loss * self.model_regularizer_weight |
| | | |
| | |
| | | ) = self.calc_diarization_error( |
| | | pred=F.embedding(pred.argmax(dim=2) * (~label_mask), self.pse_embedding), |
| | | label=F.embedding(pse_labels * (~label_mask), self.pse_embedding), |
| | | length=binary_labels_lengths |
| | | length=binary_labels_lengths, |
| | | ) |
| | | |
| | | if speech_scored > 0 and num_frames > 0: |
| | |
| | | return regularizer_loss |
| | | |
| | | def classification_loss( |
| | | self, |
| | | predictions: torch.Tensor, |
| | | labels: torch.Tensor, |
| | | prediction_lengths: torch.Tensor |
| | | self, predictions: torch.Tensor, labels: torch.Tensor, prediction_lengths: torch.Tensor |
| | | ) -> torch.Tensor: |
| | | mask = make_pad_mask(prediction_lengths, maxlen=labels.shape[1]) |
| | | pad_labels = labels.masked_fill( |
| | | mask.to(predictions.device), |
| | | value=self.ignore_id |
| | | ) |
| | | pad_labels = labels.masked_fill(mask.to(predictions.device), value=self.ignore_id) |
| | | loss = self.criterion_diar(predictions.contiguous(), pad_labels) |
| | | |
| | | return loss |
| | | |
| | | def speaker_discrimination_loss( |
| | | self, |
| | | profile: torch.Tensor, |
| | | profile_lengths: torch.Tensor |
| | | self, profile: torch.Tensor, profile_lengths: torch.Tensor |
| | | ) -> torch.Tensor: |
| | | profile_mask = (torch.linalg.norm(profile, ord=2, dim=2, keepdim=True) > 0).float() # (B, N, 1) |
| | | profile_mask = ( |
| | | torch.linalg.norm(profile, ord=2, dim=2, keepdim=True) > 0 |
| | | ).float() # (B, N, 1) |
| | | mask = torch.matmul(profile_mask, profile_mask.transpose(1, 2)) # (B, N, N) |
| | | mask = mask * (1.0 - torch.eye(self.max_spk_num).unsqueeze(0).to(mask)) |
| | | |
| | | eps = 1e-12 |
| | | coding_norm = torch.linalg.norm( |
| | | profile * profile_mask + (1 - profile_mask) * eps, |
| | | dim=2, keepdim=True |
| | | ) * profile_mask |
| | | coding_norm = ( |
| | | torch.linalg.norm( |
| | | profile * profile_mask + (1 - profile_mask) * eps, dim=2, keepdim=True |
| | | ) |
| | | * profile_mask |
| | | ) |
| | | # profile: Batch, N, dim |
| | | cos_theta = F.cosine_similarity(profile.unsqueeze(2), profile.unsqueeze(1), dim=-1, eps=eps) * mask |
| | | cos_theta = ( |
| | | F.cosine_similarity(profile.unsqueeze(2), profile.unsqueeze(1), dim=-1, eps=eps) * mask |
| | | ) |
| | | cos_theta = torch.clip(cos_theta, -1 + eps, 1 - eps) |
| | | loss = (F.relu(mask * coding_norm * (cos_theta - 0.0))).sum() / mask.sum() |
| | | |
| | |
| | | |
| | | def calculate_multi_labels(self, pse_labels, pse_labels_lengths): |
| | | mask = make_pad_mask(pse_labels_lengths, maxlen=pse_labels.shape[1]) |
| | | padding_labels = pse_labels.masked_fill( |
| | | mask.to(pse_labels.device), |
| | | value=0 |
| | | ).to(pse_labels) |
| | | padding_labels = pse_labels.masked_fill(mask.to(pse_labels.device), value=0).to(pse_labels) |
| | | multi_labels = F.embedding(padding_labels, self.pse_embedding) |
| | | |
| | | return multi_labels |
| | | |
| | | def internal_score_loss( |
| | | self, |
| | | cd_score: torch.Tensor, |
| | | ci_score: torch.Tensor, |
| | | pse_labels: torch.Tensor, |
| | | pse_labels_lengths: torch.Tensor |
| | | self, |
| | | cd_score: torch.Tensor, |
| | | ci_score: torch.Tensor, |
| | | pse_labels: torch.Tensor, |
| | | pse_labels_lengths: torch.Tensor, |
| | | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | multi_labels = self.calculate_multi_labels(pse_labels, pse_labels_lengths) |
| | | ci_loss = self.criterion_bce(ci_score, multi_labels, pse_labels_lengths) |
| | |
| | | return {"feats": feats, "feats_lengths": feats_lengths} |
| | | |
| | | def encode_speaker( |
| | | self, |
| | | profile: torch.Tensor, |
| | | profile_lengths: torch.Tensor, |
| | | self, |
| | | profile: torch.Tensor, |
| | | profile_lengths: torch.Tensor, |
| | | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | with autocast(False): |
| | | if profile.shape[1] < self.max_spk_num: |
| | | profile = F.pad(profile, [0, 0, 0, self.max_spk_num-profile.shape[1], 0, 0], "constant", 0.0) |
| | | profile = F.pad( |
| | | profile, [0, 0, 0, self.max_spk_num - profile.shape[1], 0, 0], "constant", 0.0 |
| | | ) |
| | | profile_mask = (torch.linalg.norm(profile, ord=2, dim=2, keepdim=True) > 0).float() |
| | | profile = F.normalize(profile, dim=2) |
| | | if self.speaker_encoder is not None: |
| | |
| | | return profile, profile_lengths |
| | | |
| | | def encode_speech( |
| | | self, |
| | | speech: torch.Tensor, |
| | | speech_lengths: torch.Tensor, |
| | | self, |
| | | speech: torch.Tensor, |
| | | speech_lengths: torch.Tensor, |
| | | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | if self.encoder is not None and self.inputs_type == "raw": |
| | | speech, speech_lengths = self.encode(speech, speech_lengths) |
| | |
| | | return speech, speech_lengths |
| | | |
| | | @staticmethod |
| | | def concate_speech_ivc( |
| | | speech: torch.Tensor, |
| | | ivc: torch.Tensor |
| | | ) -> torch.Tensor: |
| | | def concate_speech_ivc(speech: torch.Tensor, ivc: torch.Tensor) -> torch.Tensor: |
| | | nn, tt = ivc.shape[1], speech.shape[1] |
| | | speech = speech.unsqueeze(dim=1) # B x 1 x T x D |
| | | speech = speech.unsqueeze(dim=1) # B x 1 x T x D |
| | | speech = speech.expand(-1, nn, -1, -1) # B x N x T x D |
| | | ivc = ivc.unsqueeze(dim=2) # B x N x 1 x D |
| | | ivc = ivc.expand(-1, -1, tt, -1) # B x N x T x D |
| | | ivc = ivc.unsqueeze(dim=2) # B x N x 1 x D |
| | | ivc = ivc.expand(-1, -1, tt, -1) # B x N x T x D |
| | | sd_in = torch.cat([speech, ivc], dim=3) # B x N x T x 2D |
| | | return sd_in |
| | | |
| | | def calc_similarity( |
| | | self, |
| | | speech_encoder_outputs: torch.Tensor, |
| | | speaker_encoder_outputs: torch.Tensor, |
| | | seq_len: torch.Tensor = None, |
| | | spk_len: torch.Tensor = None, |
| | | self, |
| | | speech_encoder_outputs: torch.Tensor, |
| | | speaker_encoder_outputs: torch.Tensor, |
| | | seq_len: torch.Tensor = None, |
| | | spk_len: torch.Tensor = None, |
| | | ) -> Tuple[torch.Tensor, torch.Tensor]: |
| | | bb, tt = speech_encoder_outputs.shape[0], speech_encoder_outputs.shape[1] |
| | | d_sph, d_spk = speech_encoder_outputs.shape[2], speaker_encoder_outputs.shape[2] |
| | |
| | | return logits |
| | | |
| | | def prediction_forward( |
| | | self, |
| | | speech: torch.Tensor, |
| | | speech_lengths: torch.Tensor, |
| | | profile: torch.Tensor, |
| | | profile_lengths: torch.Tensor, |
| | | return_inter_outputs: bool = False, |
| | | self, |
| | | speech: torch.Tensor, |
| | | speech_lengths: torch.Tensor, |
| | | profile: torch.Tensor, |
| | | profile_lengths: torch.Tensor, |
| | | return_inter_outputs: bool = False, |
| | | ) -> [torch.Tensor, Optional[list]]: |
| | | # speech encoding |
| | | speech, speech_lengths = self.encode_speech(speech, speech_lengths) |
| | |
| | | logits = self.post_net_forward(similarity, speech_lengths) |
| | | |
| | | if return_inter_outputs: |
| | | return logits, [(speech, speech_lengths), (profile, profile_lengths), (ci_simi, cd_simi)] |
| | | return logits, [ |
| | | (speech, speech_lengths), |
| | | (profile, profile_lengths), |
| | | (ci_simi, cd_simi), |
| | | ] |
| | | return logits |
| | | |
| | | def encode( |