-
Notifications
You must be signed in to change notification settings - Fork 4
/
models.py
121 lines (105 loc) · 5.49 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import torch
import torch.nn as nn
from torch.autograd import Variable
class lstm_rnn_gru(nn.Module):
def __init__(self, cell_type="lstm", input_size=1, hidden_size=20, output_size=1, nonlinearity="tanh"):
super(lstm_rnn_gru, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.nonlinearity = nonlinearity.lower()
assert self.nonlinearity in ['tanh', 'relu']
self.cell_type = cell_type.lower()
if self.cell_type == "lstm":
self.layer1 = nn.LSTMCell(input_size=self.input_size, hidden_size=self.hidden_size)
self.layer2 = nn.LSTMCell(input_size=self.hidden_size, hidden_size=self.output_size)
elif self.cell_type == "rnn":
self.layer1 = nn.RNNCell(input_size=self.input_size, hidden_size=self.hidden_size, nonlinearity=self.nonlinearity)
self.layer2 = nn.RNNCell(input_size=self.hidden_size, hidden_size=self.output_size, nonlinearity=self.nonlinearity)
elif self.cell_type == "gru":
self.layer1 = nn.GRUCell(input_size=self.input_size, hidden_size=self.hidden_size)
self.layer2 = nn.GRUCell(input_size=self.hidden_size, hidden_size=self.output_size)
else:
raise ("Please enter a good cell type (LSTM/RNN/GRU)")
self.layer1.weight_hh.data.normal_(0.0, 0.1)
self.layer1.weight_ih.data.normal_(0.0, 0.1)
self.layer2.weight_hh.data.normal_(0.0, 0.1)
self.layer2.weight_ih.data.normal_(0.0, 0.1)
# Should I do something about the biases here?
def forward(self, input_data, future=0):
"""
Note here that the input is 2-D
:param input_data:
:param future:
:return:
"""
outputs = []
h_t = Variable(torch.zeros(input_data.size(0), self.hidden_size), requires_grad=False).cuda()
h_t2 = Variable(torch.zeros(input_data.size(0), self.output_size), requires_grad=False).cuda()
if self.cell_type == "lstm":
c_t = Variable(torch.zeros(input_data.size(0), self.hidden_size), requires_grad=False).cuda()
c_t2 = Variable(torch.zeros(input_data.size(0), self.output_size), requires_grad=False).cuda()
# I've to feed the network one time step at a time
for i, input_t in enumerate(input_data.chunk(input_data.size(1), dim=1)):
if self.cell_type == "lstm":
h_t, c_t = self.layer1(input_t, (h_t, c_t))
h_t2, c_t2 = self.layer2(c_t, (h_t2, c_t2))
outputs += [c_t2]
else:
h_t = self.layer1(input_t, h_t)
h_t2 = self.layer2(h_t, h_t2)
outputs += [h_t2]
for i in range(future): # if we should predict the future
if self.cell_type == "lstm":
h_t, c_t = self.layer1(c_t2, (h_t, c_t))
h_t2, c_t2 = self.layer2(c_t, (h_t2, c_t2))
outputs += [c_t2]
else:
h_t = self.layer1(h_t2, h_t)
h_t2 = self.layer2(h_t, h_t2)
outputs += [h_t2]
outputs = torch.stack(outputs, 1).squeeze(2)
return outputs
def fineTuneModel(self, input_data, start_steps=1):
"""
In this case, the model will receive the first input from the ground truth.
Then, it will be asked to generated the rest of the sequence
The idea is to train the model on its own output, in order to enhance its generation ability
#TODO: to make this more robust, instead of feeding the model with the first time step only, I can feed it with
N time steps, where N is a random value, and ask the model to generate the rest
:param input_data: This is the entire batch (like the forward function input). However, I will feed model here with the
first time step only, and ask the model to generate the rest
:return:
"""
outputs = []
h_t = Variable(torch.zeros(input_data.size(0), self.hidden_size), requires_grad=False).cuda()
h_t2 = Variable(torch.zeros(input_data.size(0), self.output_size), requires_grad=False).cuda()
if self.cell_type == "lstm":
c_t = Variable(torch.zeros(input_data.size(0), self.hidden_size), requires_grad=False).cuda()
c_t2 = Variable(torch.zeros(input_data.size(0), self.output_size), requires_grad=False).cuda()
sequence_len = input_data.size(1)
for i, input_t in enumerate(input_data.chunk(input_data.size(1), dim=1)):
# print "ground truth i = ", i
if i < start_steps:
if self.cell_type == "lstm":
h_t, c_t = self.layer1(input_t, (h_t, c_t))
h_t2, c_t2 = self.layer2(c_t, (h_t2, c_t2))
outputs += [c_t2]
else:
h_t = self.layer1(input_t, h_t)
h_t2 = self.layer2(h_t, h_t2)
outputs += [h_t2]
else:
break
for i in range(sequence_len - start_steps): # Generate the rest of the sequence
# print "extra i = ", i
if self.cell_type == "lstm":
h_t, c_t = self.layer1(c_t2, (h_t, c_t))
h_t2, c_t2 = self.layer2(c_t, (h_t2, c_t2))
outputs += [c_t2]
else:
h_t = self.layer1(h_t2, h_t)
h_t2 = self.layer2(h_t, h_t2)
outputs += [h_t2]
outputs = torch.stack(outputs, 1).squeeze(2)
return outputs