Skip to content

Commit

Permalink
Fix bugs of missing ndarrays in shared_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
reminisce committed Jun 3, 2017
1 parent 95cd217 commit 6fc1886
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 58 deletions.
7 changes: 5 additions & 2 deletions src/c_api/c_api_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -443,14 +443,17 @@ int MXExecutorSimpleBind(SymbolHandle symbol_handle,
}

if (use_shared_buffer) {
ret->ret_vec_str.clear();
ret->ret_vec_str.reserve(shared_buffer_map.size());
ret->ret_vec_charp.clear();
ret->ret_vec_charp.reserve(shared_buffer_map.size());
for (const auto kv : shared_buffer_map) {
for (const auto& kv : shared_buffer_map) {
if (kv.second.is_none()) {
LOG(FATAL) << "Shared data NDArray cannot be un-allocated";
}
ret->ret_handles.push_back(new NDArray(kv.second));
ret->ret_vec_charp.push_back(kv.first.c_str());
ret->ret_vec_str.emplace_back(kv.first);
ret->ret_vec_charp.push_back(ret->ret_vec_str.back().c_str());
}
*shared_buffer_len = shared_buffer_map.size();
*updated_shared_buffer_handle_list = &(ret->ret_handles[nd_idx]);
Expand Down
150 changes: 94 additions & 56 deletions tests/python/unittest/test_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from functools import reduce
from mxnet.module.executor_group import DataParallelExecutorGroup


def test_module_dtype():
dtype = np.float16
dshape = (3, 8, 7)
Expand Down Expand Up @@ -46,6 +47,7 @@ def test_module_input_grads():
assert np.all(b_grad == 2), b_grad
assert np.all(c_grad == 3), c_grad


def test_module_layout():
sym = mx.sym.Variable('data')
sym = mx.sym.Activation(data=sym, act_type='relu', __layout__='TNC')
Expand All @@ -63,6 +65,7 @@ def test_module_layout():
for x in mod.get_outputs(merge_multi_context=False)[0]:
assert x.shape == hdshape


def test_save_load():
def dict_equ(a, b):
assert set(a) == set(b)
Expand Down Expand Up @@ -102,6 +105,7 @@ def dict_equ(a, b):
dict_equ(mod.get_params()[0], mod2.get_params()[0])
dict_equ(mod._kvstore._updater.states, mod2._updater.states)


def test_module_reshape():
data = mx.sym.Variable('data')
sym = mx.sym.FullyConnected(data, num_hidden=20, name='fc')
Expand All @@ -128,6 +132,7 @@ def test_module_reshape():
assert mod.get_outputs()[0].shape == dshape
assert (mod.get_params()[0]['fc_bias'].asnumpy() == -3).all()


def test_module_states():
stack = mx.rnn.SequentialRNNCell()
for i in range(2):
Expand All @@ -154,6 +159,7 @@ def test_module_states():
for x1, x2 in zip(out1, out2):
assert not mx.test_utils.almost_equal(x1.asnumpy(), x2.asnumpy(), rtol=1e-3)


def test_module_switch_bucket():
vocab_dim = 5000
num_hidden = 100
Expand Down Expand Up @@ -208,6 +214,7 @@ def create_bucketing_module(key):
#the default bucket is expected to reuse the bytes allocated
assert total_bytes_after == total_bytes_before


def test_monitor():
# data iter
mx.random.seed(11)
Expand Down Expand Up @@ -255,87 +262,118 @@ def mean_abs(x):
break
assert(mon_result_counts == [2, 2, 1, 6, 6, 4])


def test_executor_group():
def test_create_exec_group(exec_grp_shared, exec_grp_created,
shared_arg_names, extra_input=[], extra_arg=[]):
def get_rnn_sym(num_layers, num_words, num_hidden, num_embed, seq_len):
stack = mx.rnn.SequentialRNNCell()
for i in range(num_layers):
stack.add(mx.rnn.LSTMCell(num_hidden=num_hidden, prefix='lstm_l%d_' % i))
data = mx.sym.Variable('data')
label = mx.sym.Variable('softmax_label')
embed = mx.sym.Embedding(data=data, input_dim=num_words,
output_dim=num_embed, name='embed')

stack.reset()
outputs, states = stack.unroll(seq_len, inputs=embed, merge_outputs=True)

pred = mx.sym.Reshape(outputs, shape=(-1, num_hidden))
pred = mx.sym.FullyConnected(data=pred, num_hidden=num_words, name='pred')

label = mx.sym.Reshape(label, shape=(-1,))
pred = mx.sym.SoftmaxOutput(data=pred, label=label, name='softmax')
return pred

def test_shared_exec_group(exec_grp_shared, exec_grp_created, shared_arg_names=None, extra_args=None):
# Test shared data arrays
for i in range(len(exec_grp_shared.execs)):
# test same shared_data_arrays for two exec groups
shared_data_array1 = exec_grp_shared.shared_data_arrays[i]
shared_data_array2 = exec_grp_created.shared_data_arrays[i]
if extra_args is not None:
assert len(shared_data_array1) == len(extra_args),\
"exec_grp_shared.shared_data_arrays[%d] should have same number of args as extra_args"
assert len(shared_data_array1) == len(shared_data_array2),\
"length of shared_data_array of the shared executor group not equal to the created executor group"
for k, v in shared_data_array1.items():
if extra_args is not None:
assert k in extra_args, "arg %s is not in extra_args" % k
assert k in shared_data_array2,\
"arg %s of the shared executor group not in the shared_data_array of the created executor group" % k
assert mx.test_utils.same_array(v, shared_data_array2[k])

for data_name, array in exec_grp_shared.shared_data_arrays[i].items():
assert data_name in exec_grp_created.shared_data_arrays[i], \
"Shared input data '%s' is not in " \
"shared_data_arrays of created executor group." % (data_name)
assert mx.test_utils.same_array(array, exec_grp_created.shared_data_arrays[i][data_name]), \
"Shared input data '%s' does not share memory." % (data_name)
for input_name in extra_input:
assert input_name in exec_grp_created.execs[i].arg_dict, \
"Extra input data '%s' is not in arg_dict of created executor group." % (input_name)

# Test shared argument arrays and gradient arrays
for i in range(len(exec_grp_shared.execs)):
exec1 = exec_grp_shared.execs[i]
exec2 = exec_grp_created.execs[i]
for arg_name in shared_arg_names:
assert arg_name in exec2.arg_dict, \
"Shared argument '%s' is not in arg_dict of created executor group." % (arg_name)
assert mx.test_utils.same_array(exec1.arg_dict[arg_name], exec2.arg_dict[arg_name]), \
"Shared argument '%s' does not share memory." % (arg_name)
for arg_name in extra_arg:
assert arg_name in exec2.arg_dict, \
"Extra argument '%s' is not in arg_dict of created executor group." % (arg_name)
# Test shared argument arrays and gradient arrays
exec_shared = exec_grp_shared.execs[i]
exec_created = exec_grp_created.execs[i]
if shared_arg_names is not None:
# test shared arguments
for arg_name in shared_arg_names:
assert arg_name in exec_created.arg_dict, \
"Shared argument '%s' is not in arg_dict of created executor group." % (arg_name)
assert mx.test_utils.same_array(exec_shared.arg_dict[arg_name], exec_created.arg_dict[arg_name]), \
"Shared argument '%s' does not share memory." % (arg_name)
# test shared argument gradients
for arg_name in shared_arg_names:
assert arg_name in exec_created.grad_dict, \
"Shared argument gradient '%s' is not in " \
"grad_dict of created executor group." % (arg_name)
assert mx.test_utils.same_array(exec_shared.grad_dict[arg_name], exec_created.grad_dict[arg_name]), \
"Shared argument gradient '%s' does not sharing memory." % (arg_name)

for arg_name, grad in exec_grp_shared.grad_req.items():
assert grad == exec_grp_created.grad_req[arg_name], \
"Gradient requirements for shared argument '%s' are inconsistent. " \
"Shared executor group requires '%s' while created executor group requires '%s'" \
%(arg_name, grad, exec_grp_created.grad_req[arg_name])
for arg_name in shared_arg_names:
assert arg_name in exec2.grad_dict, \
"Shared argument gradient '%s' is not in " \
"grad_dict of created executor group." % (arg_name)
assert mx.test_utils.same_array(exec1.grad_dict[arg_name], exec2.grad_dict[arg_name]), \
"Shared argument gradient '%s' does not sharing memory." % (arg_name)

contexts = [mx.cpu(0), mx.cpu(1)]
workload = [1] * len(contexts)
batch_size = 16
num_hidden = 4
data_shapes1 = [('data1', (batch_size, 10))]
data_shapes2 = [('data1', (batch_size, 10)), ('data2', (batch_size, 10))]
label_shapes = [('softmax_label', (batch_size,))]

data1 = mx.sym.Variable('data1')
data2 = mx.sym.Variable('data2')
fc1 = mx.sym.FullyConnected(data=data1, name='fc1', num_hidden=num_hidden)
mlp1 = mx.sym.SoftmaxOutput(data=fc1, name='softmax')
fc1 = mx.sym.FullyConnected(data=data1 + data2, name='fc1', num_hidden=num_hidden)
fc2 = mx.sym.FullyConnected(data=fc1, name='fc2', num_hidden=num_hidden)
mlp2 = mx.sym.SoftmaxOutput(data=fc2, name='softmax')

arg_names = mlp1.list_arguments()
input_names = [name[0] for name in data_shapes1] + [name[0] for name in label_shapes]
shared_arg_names = [name for name in arg_names if name not in input_names]

exec_group1 = DataParallelExecutorGroup(symbol=mlp1, contexts=contexts,
workload=workload, data_shapes=data_shapes1,
batch_size = 32
max_bucket_size = 80
num_words = 1000
num_hidden = 100
num_embed = 200
data_shapes = [('data', (batch_size, max_bucket_size))]
label_shapes = [('softmax_label', (batch_size, max_bucket_size))]

# generate an rnn sym with #layers=5
sym = get_rnn_sym(num_layers=3, num_words=num_words, num_hidden=num_hidden,
num_embed=num_embed, seq_len=max_bucket_size)
arg_names1 = sym.list_arguments()
input_names = [name[0] for name in data_shapes] + [name[0] for name in label_shapes]
shared_arg_names = [name for name in arg_names1 if name not in input_names]
exec_group1 = DataParallelExecutorGroup(symbol=sym, contexts=contexts,
workload=workload, data_shapes=data_shapes,
label_shapes=label_shapes, param_names=shared_arg_names,
for_training=True, inputs_need_grad=False)

# Test two executor groups with the same symbol sharing memory
exec_group2 = DataParallelExecutorGroup(symbol=mlp1, contexts=contexts,
workload=workload, data_shapes=data_shapes1,
# shared_data_arrays should only have input "data" and "softmax_label" arrays
for i in range(len(contexts)):
assert len(exec_group1.shared_data_arrays[i]) == len(input_names),\
"exec_group1.shared_data_arrays[%d] should have the same number of names as in input_names" % i
for name in input_names:
assert name in exec_group1.shared_data_arrays[i],\
"arg %s should be in exec_group1.shared_data_arrays[%d]" % (name, i)

# generate an rnn sym with #layers=5
sym = get_rnn_sym(num_layers=5, num_words=num_words, num_hidden=num_hidden,
num_embed=num_embed, seq_len=max_bucket_size)
arg_names2 = sym.list_arguments()
exec_group2 = DataParallelExecutorGroup(symbol=sym, contexts=contexts,
workload=workload, data_shapes=data_shapes,
label_shapes=label_shapes, param_names=shared_arg_names,
for_training=True, inputs_need_grad=False,
shared_group=exec_group1)
test_create_exec_group(exec_group1, exec_group2, shared_arg_names)
extra_args = [name for name in arg_names2 if name not in shared_arg_names]
test_shared_exec_group(exec_grp_shared=exec_group1, exec_grp_created=exec_group2,
shared_arg_names=shared_arg_names, extra_args=extra_args)

# Test two executor groups with different symbol sharing memory
exec_group3 = DataParallelExecutorGroup(symbol=mlp2, contexts=contexts,
workload=workload, data_shapes=data_shapes2,
label_shapes=label_shapes, param_names=shared_arg_names,
for_training=True, inputs_need_grad=False,
shared_group=exec_group1)
extra_input = ['data2']
extra_arg = ['fc2_weight', 'fc2_bias']
test_create_exec_group(exec_group1, exec_group3, shared_arg_names, extra_input, extra_arg)

if __name__ == '__main__':
test_module_dtype()
Expand Down

0 comments on commit 6fc1886

Please sign in to comment.