-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgat_classifier.py
182 lines (154 loc) · 7.14 KB
/
gat_classifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import dgl
import dgl.function as fn
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from dgl.nn.pytorch import *
from torch.utils.data import DataLoader
def collate(samples):
# The input `samples` is a list of pairs
# (graph, label).
graphs, labels = map(list, zip(*samples))
batched_graph = dgl.batch(graphs)
return batched_graph, torch.tensor(labels)
class GATLayer(nn.Module):
def __init__(self,
in_dim,
out_dim,
num_heads,
feat_drop=0.,
attn_drop=0.,
alpha=0.2,
agg_activation=F.elu):
super(GATLayer, self).__init__()
self.num_heads = num_heads
self.feat_drop = nn.Dropout(feat_drop)
self.fc = nn.Linear(in_dim, num_heads * out_dim, bias=False)
self.attn_l = nn.Parameter(torch.Tensor(size=(num_heads, out_dim, 1)))
self.attn_r = nn.Parameter(torch.Tensor(size=(num_heads, out_dim, 1)))
self.attn_drop = nn.Dropout(attn_drop)
self.activation = nn.LeakyReLU(alpha)
self.softmax = edge_softmax
self.agg_activation=agg_activation
def clean_data(self):
ndata_names = ['ft', 'a1', 'a2']
edata_names = ['a_drop']
for name in ndata_names:
self.g.ndata.pop(name)
for name in edata_names:
self.g.edata.pop(name)
def forward(self, feat, bg):
# prepare, inputs are of shape V x F, V the number of nodes, F the dim of input features
self.g = bg
h = self.feat_drop(feat)
# V x K x F', K number of heads, F' dim of transformed features
ft = self.fc(h).reshape((h.shape[0], self.num_heads, -1))
head_ft = ft.transpose(0, 1) # K x V x F'
a1 = torch.bmm(head_ft, self.attn_l).transpose(0, 1) # V x K x 1
a2 = torch.bmm(head_ft, self.attn_r).transpose(0, 1) # V x K x 1
self.g.ndata.update({'ft' : ft, 'a1' : a1, 'a2' : a2})
# 1. compute edge attention
self.g.apply_edges(self.edge_attention)
# 2. compute softmax in two parts: exp(x - max(x)) and sum(exp(x - max(x)))
self.edge_softmax()
# 2. compute the aggregated node features scaled by the dropped,
# unnormalized attention values.
self.g.update_all(fn.src_mul_edge('ft', 'a_drop', 'ft'), fn.sum('ft', 'ft'))
# 3. apply normalizer
ret = self.g.ndata['ft'] # V x K x F'
ret = ret.flatten(1)
if self.agg_activation is not None:
ret = self.agg_activation(ret)
# Clean ndata and edata
self.clean_data()
return ret
def edge_attention(self, edges):
# an edge UDF to compute un-normalized attention values from src and dst
a = self.activation(edges.src['a1'] + edges.dst['a2'])
return {'a' : a}
def edge_softmax(self):
attention = self.softmax(self.g, self.g.edata.pop('a'))
# Dropout attention scores and save them
self.g.edata['a_drop'] = self.attn_drop(attention)
class GATClassifier(nn.Module):
def __init__(self, in_dim, hidden_dim, num_heads, n_classes):
super(GATClassifier, self).__init__()
self.layers = nn.ModuleList([
GATLayer(in_dim, hidden_dim, num_heads),
GATLayer(hidden_dim * num_heads, hidden_dim, num_heads)
])
self.classify = nn.Linear(hidden_dim * num_heads, n_classes)
def forward(self, bg):
# For undirected graphs, in_degree is the same as
# out_degree.
h = bg.ndata['features']
for i, gnn in enumerate(self.layers):
h = gnn(h, bg)
bg.ndata['features'] = h
hg = dgl.mean_nodes(bg, 'features')
return self.classify(hg)
def main(bug_type, use_deepbugs_embeddings, dataset_size):
print('----GAT Classifier Training bug type {} with {}----'.format(bug_type, 'deepbugs embeddings' if use_deepbugs_embeddings else 'random embeddings'))
# Create training and test sets.
if dataset_size == 'mini':
from homogenous_mini_dataset import MiniCorrectAndBuggyDataset
trainset = MiniCorrectAndBuggyDataset(use_deepbugs_embeddings=use_deepbugs_embeddings, is_training=True, bug_type=bug_type)
testset = MiniCorrectAndBuggyDataset(use_deepbugs_embeddings=use_deepbugs_embeddings, is_training=False, bug_type=bug_type)
elif dataset_size == 'full':
from homogenous_full_dataset import FullCorrectAndBuggyDataset
trainset = FullCorrectAndBuggyDataset(use_deepbugs_embeddings=use_deepbugs_embeddings, is_training=True, bug_type=bug_type)
testset = FullCorrectAndBuggyDataset(use_deepbugs_embeddings=use_deepbugs_embeddings, is_training=False, bug_type=bug_type)
data_loader = DataLoader(trainset, batch_size=100, shuffle=True,
collate_fn=collate)
def evaluate():
## Evaluate model
model.eval()
# Convert a list of tuples to two lists
test_X, test_Y = map(list, zip(*testset))
test_bg = dgl.batch(test_X)
test_Y = torch.tensor(test_Y).float().view(-1, 1)
model_output = model(test_bg)
probs_Y = torch.softmax(model_output, 1)
# print('probs_Y', probs_Y)
sampled_Y = torch.multinomial(probs_Y, 1)
argmax_Y = torch.max(probs_Y, 1)[1].view(-1, 1)
print('Accuracy of sampled predictions on the test set: {:.4f}%'.format(
(test_Y == sampled_Y.float()).sum().item() / len(test_Y) * 100))
print('Accuracy of argmax predictions on the test set: {:4f}%'.format(
(test_Y == argmax_Y.float()).sum().item() / len(test_Y) * 100))
# Create model
model = GATClassifier(200, 20, 10, trainset.num_classes)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.005, weight_decay=5e-4)
model.train()
epoch_losses = []
for epoch in range(20):
epoch_loss = 0
for iter, (bg, label) in enumerate(data_loader):
prediction = model(bg)
loss = loss_func(prediction, label)
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch_loss += loss.detach().item()
epoch_loss /= (iter + 1)
print('Epoch {}, loss {:.4f}'.format(epoch, epoch_loss))
if epoch % 5 == 0:
evaluate()
epoch_losses.append(epoch_loss)
evaluate()
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
'--bug_type', help='Type of bug to train', choices=['swapped_args', 'incorrect_binary_operator', 'incorrect_binary_operand', 'all'], required=False)
parser.add_argument(
'--use_deepbugs_embeddings', help='Random or deepbugs embeddings', required=False)
parser.add_argument(
'--dataset_size', help='Mini or Full dataset', choices=['mini', 'full'], required=False)
if __name__=='__main__':
args = parser.parse_args()
bug_type = args.bug_type or 'all'
use_deepbugs_embeddings = True if args.use_deepbugs_embeddings in ['True', 'true'] else False
dataset_size = args.dataset_size or 'mini'
main(bug_type, use_deepbugs_embeddings, dataset_size)