Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Enable bulking test on windows #14392

Merged
merged 6 commits into from
Mar 12, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 40 additions & 52 deletions tests/python/gpu/test_operator_gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -2102,63 +2102,51 @@ def test_bilinear_sampler_versions():
assert_almost_equal(exe.grad_dict['grid'].asnumpy(), exe_list[ref_idx].grad_dict['grid'].asnumpy(), rtol=1e-3, atol=1e-5)


@with_seed()
@unittest.skip("test fails on windows gpu. temporarily disabled till it gets fixed. tracked at https://github.com/apache/incubator-mxnet/issues/14368")
def test_bulking():
# Return the execution time of a model with the specified limits to the bulked op segments
def test_bulking_helper(data_shape, num_ops, num_iterations,
max_fwd_segment_size, max_bwd_segment_size, enable_bulking_in_training):
orig_environ = os.environ.copy()
try:
# Explore different ways of setting the env vars.
# The framework does not cache the bulked seg size env var lookups during symbolic.
os.environ['MXNET_EXEC_BULK_EXEC_TRAIN'] = str(enable_bulking_in_training)
if max_fwd_segment_size == max_bwd_segment_size:
os.environ['MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN'] = str(max_fwd_segment_size)
os.environ.pop('MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD', None)
os.environ.pop('MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD', None)
else:
os.environ.pop('MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN', None)
os.environ['MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD'] = str(max_fwd_segment_size)
os.environ['MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD'] = str(max_bwd_segment_size)

ctx = default_context()
# build symbol
X = mx.sym.Variable('X')
sym = mx.sym.flip(X, axis=0)
for _ in range(num_ops-1):
sym = mx.sym.flip(sym, axis=0)
x = mx.ndarray.zeros(data_shape)
dx = mx.ndarray.zeros(data_shape)
dy = mx.ndarray.ones(data_shape)
exe = sym.bind(ctx=ctx, args=[x], args_grad = {'X':dx})

# time a number of forward() and backward() executions after some warm-up iterations
warmups = 1
for i in range(num_iterations+warmups):
if i == warmups:
start = time.time()
exe.forward(is_train=True)
exe.backward(dy)
dx.wait_to_read()
time_per_iteration = (time.time() - start) / num_iterations
finally:
os.environ.clear()
os.environ.update(orig_environ)
return time_per_iteration

# isolated execution bulking test function to be invoked with different env var settings
def _test_bulking_in_process(seed, time_per_iteration):
data_shape = (10,)
num_ops = 1000
num_iterations = 20

ctx = default_context()
# build symbol
X = mx.sym.Variable('X')
sym = mx.sym.flip(X, axis=0)
for _ in range(num_ops-1):
sym = mx.sym.flip(sym, axis=0)
x = mx.ndarray.zeros(data_shape)
dx = mx.ndarray.zeros(data_shape)
dy = mx.ndarray.ones(data_shape)
exe = sym.bind(ctx=ctx, args=[x], args_grad = {'X':dx})

# time a number of forward() and backward() executions after some warm-up iterations
warmups = 1
for i in range(num_iterations+warmups):
if i == warmups:
start = time.time()
exe.forward(is_train=True)
exe.backward(dy)
dx.wait_to_read()
time_per_iteration.value = (time.time() - start) / num_iterations

@with_seed()
def test_bulking():
# test case format: (max_fwd_segment_size, max_bwd_segment_size, enable_bulking_in_training)
test_cases = [(0,0,True), (1,1,True), (15,15,False), (15,0,True), (0,15,True), (15,15,True)]
times = {}
times_str = ''
for seg_sizes in test_cases:
times[seg_sizes] = test_bulking_helper(data_shape, num_ops, num_iterations,
seg_sizes[0], seg_sizes[1], seg_sizes[2])
times_str +=\
# Create shared variable to return measured time from test process
time_per_iteration = mp.Manager().Value('d', 0.0)
if not run_in_spawned_process(_test_bulking_in_process,
{'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD' : seg_sizes[0],
'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD' : seg_sizes[1],
'MXNET_EXEC_BULK_EXEC_TRAIN' : seg_sizes[2]},
time_per_iteration):
# skip test since the python version can't run it properly. Warning msg was logged.
return
times[seg_sizes] = time_per_iteration.value
times_str += \
'\n runtime of (fwd,bwd,enable) op seg setting ({},{},{}) =\t{:.1f} msec'.format(
seg_sizes[0], seg_sizes[1], seg_sizes[2], 1000.0 * times[seg_sizes])

Expand All @@ -2170,12 +2158,12 @@ def test_bulking_helper(data_shape, num_ops, num_iterations,
print(times_str)
# Non-bulked times[0,0,True], times[1,1,True] and times[15,15,False] should be about the same,
# slower than both half-bulked times[0,15,True] and times[15,0,True]
assert slowest_half_bulked_time < fastest_non_bulked_time,\
'A half-bulked exec time is slower than the non-bulked time by {} secs! {}'\
assert slowest_half_bulked_time < fastest_non_bulked_time, \
'A half-bulked exec time is slower than the non-bulked time by {} secs! {}' \
.format(slowest_half_bulked_time - fastest_non_bulked_time, times_str)
# The fully bulked times[15,15,True] should be faster than both half-bulked runs
assert fully_bulked_time < fastest_half_bulked_time,\
'The fully-bulked exec time is slower than a half-bulked time by {} secs! {}'\
assert fully_bulked_time < fastest_half_bulked_time, \
'The fully-bulked exec time is slower than a half-bulked time by {} secs! {}' \
.format(fully_bulked_time - fastest_half_bulked_time, times_str)


Expand Down