-
Notifications
You must be signed in to change notification settings - Fork 51
/
synthetic.py
79 lines (62 loc) · 2.3 KB
/
synthetic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import numpy as np
from scipy.integrate import odeint
def make_var_stationary(beta, radius=0.97):
'''Rescale coefficients of VAR model to make stable.'''
p = beta.shape[0]
lag = beta.shape[1] // p
bottom = np.hstack((np.eye(p * (lag - 1)), np.zeros((p * (lag - 1), p))))
beta_tilde = np.vstack((beta, bottom))
eigvals = np.linalg.eigvals(beta_tilde)
max_eig = max(np.abs(eigvals))
nonstationary = max_eig > radius
if nonstationary:
return make_var_stationary(0.95 * beta, radius)
else:
return beta
def simulate_var(p, T, lag, sparsity=0.2, beta_value=1.0, sd=0.1, seed=0):
if seed is not None:
np.random.seed(seed)
# Set up coefficients and Granger causality ground truth.
GC = np.eye(p, dtype=int)
beta = np.eye(p) * beta_value
num_nonzero = int(p * sparsity) - 1
for i in range(p):
choice = np.random.choice(p - 1, size=num_nonzero, replace=False)
choice[choice >= i] += 1
beta[i, choice] = beta_value
GC[i, choice] = 1
beta = np.hstack([beta for _ in range(lag)])
beta = make_var_stationary(beta)
# Generate data.
burn_in = 100
errors = np.random.normal(scale=sd, size=(p, T + burn_in))
X = np.zeros((p, T + burn_in))
X[:, :lag] = errors[:, :lag]
for t in range(lag, T + burn_in):
X[:, t] = np.dot(beta, X[:, (t-lag):t].flatten(order='F'))
X[:, t] += + errors[:, t-1]
return X.T[burn_in:], beta, GC
def lorenz(x, t, F):
'''Partial derivatives for Lorenz-96 ODE.'''
p = len(x)
dxdt = np.zeros(p)
for i in range(p):
dxdt[i] = (x[(i+1) % p] - x[(i-2) % p]) * x[(i-1) % p] - x[i] + F
return dxdt
def simulate_lorenz_96(p, T, F=10.0, delta_t=0.1, sd=0.1, burn_in=1000,
seed=0):
if seed is not None:
np.random.seed(seed)
# Use scipy to solve ODE.
x0 = np.random.normal(scale=0.01, size=p)
t = np.linspace(0, (T + burn_in) * delta_t, T + burn_in)
X = odeint(lorenz, x0, t, args=(F,))
X += np.random.normal(scale=sd, size=(T + burn_in, p))
# Set up Granger causality ground truth.
GC = np.zeros((p, p), dtype=int)
for i in range(p):
GC[i, i] = 1
GC[i, (i + 1) % p] = 1
GC[i, (i - 1) % p] = 1
GC[i, (i - 2) % p] = 1
return X[burn_in:], GC