-
Notifications
You must be signed in to change notification settings - Fork 0
/
kernel_kmeans.py
executable file
·123 lines (96 loc) · 3.97 KB
/
kernel_kmeans.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Kernel K-means"""
# Author: Mathieu Blondel <[email protected]>
# License: BSD 3 clause
import numpy as np
from sklearn.base import BaseEstimator, ClusterMixin
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.utils import check_random_state
class KernelKMeans(BaseEstimator, ClusterMixin):
"""
Kernel K-means
Reference
---------
Kernel k-means, Spectral Clustering and Normalized Cuts.
Inderjit S. Dhillon, Yuqiang Guan, Brian Kulis.
KDD 2004.
"""
def __init__(self, n_clusters=3, max_iter=50, tol=1e-3, random_state=None,
kernel="linear", gamma=None, degree=3, coef0=1,
kernel_params=None, verbose=0):
self.n_clusters = n_clusters
self.max_iter = max_iter
self.tol = tol
self.random_state = random_state
self.kernel = kernel
self.gamma = gamma
self.degree = degree
self.coef0 = coef0
self.kernel_params = kernel_params
self.verbose = verbose
@property
def _pairwise(self):
return self.kernel == "precomputed"
def _get_kernel(self, X, Y=None):
if callable(self.kernel):
params = self.kernel_params or {}
else:
params = {"gamma": self.gamma,
"degree": self.degree,
"coef0": self.coef0}
return pairwise_kernels(X, Y, metric=self.kernel,
filter_params=True, **params)
def fit(self, X, y=None, sample_weight=None):
n_samples = X.shape[0]
K = self._get_kernel(X)
sw = sample_weight if sample_weight else np.ones(n_samples)
self.sample_weight_ = sw
rs = check_random_state(self.random_state)
self.labels_ = rs.randint(self.n_clusters, size=n_samples)
dist = np.zeros((n_samples, self.n_clusters))
self.within_distances_ = np.zeros(self.n_clusters)
for it in range(self.max_iter):
dist.fill(0)
self._compute_dist(K, dist, self.within_distances_,
update_within=True)
labels_old = self.labels_
self.labels_ = dist.argmin(axis=1)
# Compute the number of samples whose cluster did not change
# since last iteration.
n_same = np.sum((self.labels_ - labels_old) == 0)
if 1 - float(n_same) / n_samples < self.tol:
if self.verbose:
print(f'Converged at iteration {it + 1}')
break
self.X_fit_ = X
return self
def _compute_dist(self, K, dist, within_distances, update_within):
"""Compute a n_samples x n_clusters distance matrix using the
kernel trick."""
sw = self.sample_weight_
for j in range(self.n_clusters):
mask = self.labels_ == j
if np.sum(mask) == 0:
raise ValueError("Empty cluster found, try smaller n_cluster.")
denom = sw[mask].sum()
denomsq = denom * denom
if update_within:
KK = K[mask][:, mask] # K[mask, mask] does not work.
dist_j = np.sum(np.outer(sw[mask], sw[mask]) * KK / denomsq)
within_distances[j] = dist_j
dist[:, j] += dist_j
else:
dist[:, j] += within_distances[j]
dist[:, j] -= 2 * np.sum(sw[mask] * K[:, mask], axis=1) / denom
def predict(self, X):
K = self._get_kernel(X, self.X_fit_)
n_samples = X.shape[0]
dist = np.zeros((n_samples, self.n_clusters))
self._compute_dist(K, dist, self.within_distances_,
update_within=False)
return dist.argmin(axis=1)
if __name__ == '__main__':
from sklearn.datasets import make_blobs
X, y = make_blobs(n_samples=1000, centers=5, random_state=0)
km = KernelKMeans(n_clusters=5, max_iter=100, random_state=0, verbose=1)
print(km.fit_predict(X)[:10])
print(km.predict(X[:10]))