forked from SleepyBag/Statistical-Learning-Methods
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGMM.py
97 lines (84 loc) · 3.78 KB
/
GMM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import numpy as np
from matplotlib import pyplot as plt
class GMM:
def __init__(self, k, max_step=2000):
self.k = k
self.max_step = max_step
self.epsilon = 1e-8
def fit(self, X):
"""
X: training data of shape [n, feature_size]
"""
n, feature_size = X.shape
# the parameter of each gaussian distribution
self.prior = np.ones(self.k) / self.k
self.prior /= self.prior.sum()
self.std = np.repeat(np.std(X, axis=0, keepdims=True), self.k, axis=0)
self.mean = np.random.normal(X.mean(axis=0), self.std, [self.k, feature_size])
pre_likelihood = np.zeros([self.k, n])
for step in range(self.max_step):
# Expectation step
posterior = self.predict(X)
# Maximization step
self.mean = (posterior[:, :, None] * X[None, :, :]).sum(axis=1) / \
(posterior.sum(axis=1)[:, None] + self.epsilon)
var = (posterior[:, :, None] * (X[None, :, :] - self.mean[:, None, :]) ** 2).sum(axis=1) / \
(posterior.sum(axis=1)[:, None] + self.epsilon)
self.std = np.sqrt(var)
self.prior = posterior.sum(axis=1)
self.prior /= (self.prior.sum() + self.epsilon)
if (self.likelihood - pre_likelihood).max() < self.epsilon:
break
pre_likelihood = self.likelihood
def predict(self, X):
"""return the probability of each x belonging to each gaussian distribution"""
dis = X[None, :, :] - self.mean[:, None, :]
# likelihook is of shape [k, n, feature_size]
log_likelihood = -dis ** 2 / 2 / (self.std[:, None, :] ** 2 + self.epsilon) \
- np.log(np.sqrt(2 * np.pi) + self.epsilon) - np.log(self.std[:, None, :] + self.epsilon)
log_likelihood = log_likelihood.sum(-1)
# reduce likelihood to shape [k, n]
likelihood = np.exp(log_likelihood)
self.likelihood = likelihood
# the posterior of each datium belonging to a distribution, of shape [k, n]
posterior = self.prior[:, None] * likelihood
posterior /= (posterior.sum(axis=0, keepdims=True) + self.epsilon)
return posterior
if __name__ == '__main__':
def demonstrate(desc, X):
gmm = GMM(3)
gmm.fit(X)
pred = gmm.predict(X).T
plt.scatter(X[:, 0], X[:, 1], color=pred)
plt.title(desc)
plt.show()
# ---------------------- Example 1---------------------------------------------
X = np.concatenate([
np.random.normal([0, 0], [.3, .3], [100, 2]),
np.random.normal([0, 1], [.3, .3], [100, 2]),
np.random.normal([1, 0], [.3, .3], [100, 2]),
])
demonstrate("Example 1", X)
# ---------------------- Example 2---------------------------------------------
demonstrate("Example 2: GMM does'nt promise the same result for the same data", X)
# ---------------------- Example 3---------------------------------------------
X = np.concatenate([
np.random.normal([0, 0], [.4, .4], [100, 2]),
np.random.normal([0, 1], [.4, .4], [100, 2]),
np.random.normal([1, 0], [.4, .4], [100, 2]),
])
demonstrate("Example 3", X)
# ---------------------- Example 4---------------------------------------------
X = np.concatenate([
np.random.normal([0, 0], [.4, .4], [100, 2]),
np.random.normal([0, 3], [.4, .4], [100, 2]),
np.random.normal([3, 0], [.4, .4], [100, 2]),
])
demonstrate("Example 4", X)
# ---------------------- Example 5---------------------------------------------
X = np.concatenate([
np.random.normal([0, 0], [.4, .4], [1, 2]),
np.random.normal([0, 3], [.4, .4], [1, 2]),
np.random.normal([3, 0], [.4, .4], [1, 2]),
])
demonstrate("Example 5", X)