-
Notifications
You must be signed in to change notification settings - Fork 2
/
utils.py
144 lines (120 loc) · 5.04 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import numpy as np
from scipy import stats, io, signal, linalg
import matplotlib.pyplot as plt
from scipy.spatial.distance import squareform, pdist
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.svm import SVC
from sklearn import metrics
import warnings
from os.path import join
import os
import math
from sklearn.cluster import KMeans
import torch
from normalize_0_mean_1_std import normalize_0_mean_1_std
from calc_f_stat import calc_f_stat
from multivariate_split import multivariate_split
"""
This code includes the main functionality of the proposed method.
"""
"""
The file includes the primary function which calculates Granger causality using lsNGC.
"""
def lsNGC(inp_series, ar_order=1, k_f=3, k_g=2, normalize=1):
if normalize:
X_normalized=normalize_0_mean_1_std(inp_series)
else:
X_normalized=inp_series.copy()
X_train, Y_train , X_test, Y_test=multivariate_split(X=X_normalized,ar_order=ar_order)
X_train=torch.flatten(X_train, start_dim=1)
km= KMeans(n_clusters= k_f, max_iter= 100, random_state=123)
km.fit(X_train)
cent= km.cluster_centers_
max=0
for i in range(k_f):
for j in range(k_f):
d= np.linalg.norm(cent[i]-cent[j])
if(d> max):
max= d
d= max
sigma= d/math.sqrt(2*k_f)
sig_d=np.zeros((np.shape(X_normalized)[0],np.shape(X_normalized)[0]));
sig=np.zeros((np.shape(X_normalized)[0],np.shape(X_normalized)[0]));
# Z_train_label=Y_train
for i in range(X_normalized.shape[0]):
Z_temp=X_normalized.copy()
Z_train, Z_train_label , _ , _=multivariate_split(X=Z_temp,ar_order=ar_order)
Z_train=torch.flatten(Z_train, start_dim=1)
Z_train_label=torch.flatten(Z_train_label, start_dim=1)
# Obtain phase space Z_s by exclusing time series of of x_s
Z_s_train, Z_s_train_label , _ , _=multivariate_split(X=np.delete(Z_temp,[i],axis=0),ar_order=ar_order)
# Obtain phase space reconstruction of x_s
W_s_train, W_s_train_label , _ , _=multivariate_split(X=np.array([Z_temp[i]]),ar_order=ar_order)
# Flatten data
Z_s_train=torch.flatten(Z_s_train, start_dim=1)
Z_s_train_label=torch.flatten(Z_s_train_label, start_dim=1)
W_s_train=torch.flatten(W_s_train, start_dim=1)
W_s_train_label=torch.flatten(W_s_train_label, start_dim=1)
# Obtain k_g number of cluster centers in the phase space W_s with k-means clustering, will have dim=(k_g * d)
kmg= KMeans(n_clusters= k_g, max_iter= 100, random_state=123)
kmg.fit(W_s_train)
cent_W_s= kmg.cluster_centers_
# Calculate activations for each of the k_g neurons
shape= W_s_train.shape
row= shape[0]
column= k_g
G= np.empty((row,column), dtype= float)
maxg=0
for ii in range(k_g):
for jj in range(k_g):
dg= np.linalg.norm(cent_W_s[ii]-cent_W_s[jj])
if(dg> maxg):
maxg= dg
dg= maxg
sigmag= dg/math.sqrt(2*k_g)
if sigmag==0:
sigmag=1
for ii in range(row):
for jj in range(column):
dist= np.linalg.norm(W_s_train[ii]-cent_W_s[jj])
G[ii][jj]= math.exp(-math.pow(dist,2)/math.pow(2*sigmag,2))
# Generalized radial basis function
g_ws=np.array([G[ii]/sum(G[ii]) for ii in range(len(G))])
# Calculate activations for each of the k_f neurons
shape= Z_s_train.shape
row= shape[0]
column= k_f
F= np.empty((row,column), dtype= float)
for ii in range(row):
for jj in range(column):
cent_temp=cent.copy()
cent_temp=np.delete(cent_temp,np.arange(jj,jj+ar_order),axis=1)
dist= np.linalg.norm(Z_s_train[ii]-cent_temp)
F[ii][jj]= math.exp(-math.pow(dist,2)/math.pow(2*sigma,2))
# Generalized radial basis function
f_zs=np.array([F[ii]/sum(F[ii]) for ii in range(len(F))])
# Prediction in the presence of x_s
num_samples=f_zs.shape[0]
f_new=np.concatenate((0.5*f_zs,0.5*g_ws),axis=1)
GTG= np.dot(f_new.T,f_new)
GTG_inv= np.linalg.pinv(GTG)
fac= np.dot(GTG_inv,f_new.T)
W_presence= np.dot(fac,Z_train_label)
prediction_presence= np.dot(f_new,W_presence)
error_presence=prediction_presence-np.array(Z_train_label)
sig[i,:]=np.diag(np.cov(error_presence.T))
# Prediction without x_s
GTG= np.dot(f_zs.T,f_zs)
GTG_inv= np.linalg.pinv(GTG)
fac= np.dot(GTG_inv,f_zs.T)
W_absence= np.dot(fac,Z_train_label)
prediction_absence= np.dot(f_zs,W_absence)
error_absence=prediction_absence-np.array(Z_train_label)
sig_d[i,:]=np.diag(np.cov(error_absence.T))
# Comupte the Granger causality index
Aff=np.log(np.divide(sig_d,sig))
Aff=(Aff>0)*Aff
np.fill_diagonal(Aff,0)
f_stat=calc_f_stat(sig_d, sig, n=num_samples+1, pu=k_f+k_g, pr=k_f)
np.fill_diagonal(f_stat,0)
return Aff, f_stat