| | |
| | | |
| | | def p_pruning(self, A): |
| | | if A.shape[0] * self.pval < 6: |
| | | pval = 6. / A.shape[0] |
| | | pval = 6.0 / A.shape[0] |
| | | else: |
| | | pval = self.pval |
| | | |
| | |
| | | num_of_spk = k_oracle |
| | | else: |
| | | lambda_gap_list = self.getEigenGaps( |
| | | lambdas[self.min_num_spks - 1:self.max_num_spks + 1]) |
| | | lambdas[self.min_num_spks - 1 : self.max_num_spks + 1] |
| | | ) |
| | | num_of_spk = np.argmax(lambda_gap_list) + self.min_num_spks |
| | | |
| | | emb = eig_vecs[:, :num_of_spk] |
| | |
| | | Emphasis On Topological Structure. ICASSP2022 |
| | | """ |
| | | |
| | | def __init__(self, |
| | | n_neighbors=20, |
| | | n_components=60, |
| | | min_samples=10, |
| | | min_cluster_size=10, |
| | | metric='cosine'): |
| | | def __init__( |
| | | self, n_neighbors=20, n_components=60, min_samples=10, min_cluster_size=10, metric="cosine" |
| | | ): |
| | | self.n_neighbors = n_neighbors |
| | | self.n_components = n_components |
| | | self.min_samples = min_samples |
| | |
| | | |
| | | def __call__(self, X): |
| | | import umap.umap_ as umap |
| | | |
| | | umap_X = umap.UMAP( |
| | | n_neighbors=self.n_neighbors, |
| | | min_dist=0.0, |
| | |
| | | labels = HDBSCAN( |
| | | min_samples=self.min_samples, |
| | | min_cluster_size=self.min_cluster_size, |
| | | allow_single_cluster=True).fit_predict(umap_X) |
| | | allow_single_cluster=True, |
| | | ).fit_predict(umap_X) |
| | | return labels |
| | | |
| | | |
| | |
| | | model_config: The model config. |
| | | """ |
| | | |
| | | def __init__(self): |
| | | def __init__(self, merge_thr=0.78): |
| | | super().__init__() |
| | | self.model_config = {'merge_thr':0.78} |
| | | self.model_config = {"merge_thr": merge_thr} |
| | | # self.other_config = kwargs |
| | | |
| | | self.spectral_cluster = SpectralCluster() |
| | |
| | | |
| | | def forward(self, X, **params): |
| | | # clustering and return the labels |
| | | k = params['oracle_num'] if 'oracle_num' in params else None |
| | | assert len( |
| | | X.shape |
| | | ) == 2, 'modelscope error: the shape of input should be [N, C]' |
| | | k = params["oracle_num"] if "oracle_num" in params else None |
| | | assert len(X.shape) == 2, "modelscope error: the shape of input should be [N, C]" |
| | | if X.shape[0] < 20: |
| | | return np.zeros(X.shape[0], dtype='int') |
| | | return np.zeros(X.shape[0], dtype="int") |
| | | if X.shape[0] < 2048 or k is not None: |
| | | # unexpected corner case |
| | | labels = self.spectral_cluster(X, k) |
| | | else: |
| | | labels = self.umap_hdbscan_cluster(X) |
| | | |
| | | if k is None and 'merge_thr' in self.model_config: |
| | | labels = self.merge_by_cos(labels, X, |
| | | self.model_config['merge_thr']) |
| | | if k is None and "merge_thr" in self.model_config: |
| | | labels = self.merge_by_cos(labels, X, self.model_config["merge_thr"]) |
| | | |
| | | return labels |
| | | |
| | |
| | | spk_center.append(spk_emb) |
| | | assert len(spk_center) > 0 |
| | | spk_center = np.stack(spk_center, axis=0) |
| | | norm_spk_center = spk_center / np.linalg.norm( |
| | | spk_center, axis=1, keepdims=True) |
| | | norm_spk_center = spk_center / np.linalg.norm(spk_center, axis=1, keepdims=True) |
| | | affinity = np.matmul(norm_spk_center, norm_spk_center.T) |
| | | affinity = np.triu(affinity, 1) |
| | | spks = np.unravel_index(np.argmax(affinity), affinity.shape) |