Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

[MXNET-1111] Horovod support for MXNet #12666

Merged
merged 11 commits into from
Oct 29, 2018
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/mxnet/module/base_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ def output_shapes(self):
################################################################################
# Parameters of a module
################################################################################
def get_params(self):
def get_params(self, copy_to_cpu=True):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all the places you added this new param, could you please update the comments for the function to explain what this copy_to_cpu is for, like what you did in executor_group.py? May also be good to add some explanation why it is not used here and where it is going to be used. The readers might have the same confusion why the param is in function signature but not used in the implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline, based on our tests of distributed training on 1 node and 2 node p3.16xlarge instances, the training accuracy looks normal. We also saved and compared the module parameters from a rank from each node in the 2 node scenario and they look identical. I think we can remove this copy_to_cpu argument change

"""Gets parameters, those are potentially copies of the the actual parameters used
to do computation on the device.

Expand Down
2 changes: 1 addition & 1 deletion python/mxnet/module/bucketing_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def output_shapes(self):
assert self.binded
return self._curr_module.output_shapes

def get_params(self):
def get_params(self, copy_to_cpu=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this copy_to_cpu argument needed here since it is not used. Nor is it declared the BaseModule

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check Line 174 in this file: params = self._curr_module.get_params(). Do we need to change to params = self._curr_module.get_params(copy_to_cpu)?

"""Gets current parameters.

Returns
Expand Down
18 changes: 15 additions & 3 deletions python/mxnet/module/executor_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def set_params(self, arg_params, aux_params, allow_extra=False):
for exec_ in self.execs:
exec_.copy_params_from(arg_params, aux_params, allow_extra_params=allow_extra)

def get_params(self, arg_params, aux_params):
def get_params(self, arg_params, aux_params, copy_to_cpu):
""" Copy data from each executor to `arg_params` and `aux_params`.

Parameters
Expand All @@ -421,17 +421,29 @@ def get_params(self, arg_params, aux_params):
Target parameter arrays.
aux_params : list of NDArray
Target aux arrays.
copy_to_cpu : boolean
Whether or not to copy parameters to CPU. (default to 'true')

Notes
-----
- This function will inplace update the NDArrays in arg_params and aux_params.
"""
for name, block in zip(self.param_names, self.param_arrays):
weight = sum(w.copyto(ctx.cpu()) for w in block) / len(block)
if copy_to_cpu:
context = ctx.cpu()
else:
context = block[0].context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this else block is identical to the if block except for the context. Can you add the if check to the context and change context between block[0].context and ctx.cpu()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

weight = sum(w.copyto(context) for w in block) / len(block)
weight.astype(arg_params[name].dtype).copyto(arg_params[name])
arg_params[name] = arg_params[name].as_in_context(context)
for name, block in zip(self.aux_names, self.aux_arrays):
weight = sum(w.copyto(ctx.cpu()) for w in block) / len(block)
if copy_to_cpu:
context = ctx.cpu()
else:
context = block[0].context
weight = sum(w.copyto(context) for w in block) / len(block)
weight.astype(aux_params[name].dtype).copyto(aux_params[name])
aux_params[name] = aux_params[name].as_in_context(context)

def forward(self, data_batch, is_train=None):
"""Split `data_batch` according to workload and run forward on each devices.
Expand Down
12 changes: 6 additions & 6 deletions python/mxnet/module/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def output_shapes(self):
assert self.binded
return self._exec_group.get_output_shapes()

def get_params(self):
def get_params(self, copy_to_cpu=True):
"""Gets current parameters.

Returns
Expand All @@ -252,8 +252,8 @@ def get_params(self):
"""
assert self.binded and self.params_initialized

if self._params_dirty:
self._sync_params_from_devices()
if not copy_to_cpu or self._params_dirty:
self._sync_params_from_devices(copy_to_cpu)
return (self._arg_params, self._aux_params)

def init_params(self, initializer=Uniform(0.01), arg_params=None, aux_params=None,
Expand Down Expand Up @@ -495,7 +495,7 @@ def init_optimizer(self, kvstore='local', optimizer='sgd',
return

if self._params_dirty:
self._sync_params_from_devices()
self._sync_params_from_devices(copy_to_cpu=True)

(kvstore, update_on_kvstore) = \
_create_kvstore(kvstore, len(self._context), self._arg_params)
Expand Down Expand Up @@ -772,15 +772,15 @@ def update_metric(self, eval_metric, labels, pre_sliced=False):
"""
self._exec_group.update_metric(eval_metric, labels, pre_sliced)

def _sync_params_from_devices(self):
def _sync_params_from_devices(self, copy_to_cpu):
"""Synchronizes parameters from devices to CPU. This function should be called after
calling `update` that updates the parameters on the devices, before one can read the
latest parameters from ``self._arg_params`` and ``self._aux_params``.

For row_sparse parameters on devices, ther are pulled from KVStore with all row ids.

"""
self._exec_group.get_params(self._arg_params, self._aux_params)
self._exec_group.get_params(self._arg_params, self._aux_params, copy_to_cpu)
if self._kvstore and self._update_on_kvstore:
for param_name, param_val in sorted(self._arg_params.items()):
if param_val.stype == 'row_sparse':
Expand Down
2 changes: 1 addition & 1 deletion python/mxnet/module/python_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def output_shapes(self):
################################################################################
# Parameters of a module
################################################################################
def get_params(self):
def get_params(self, copy_to_cpu=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this argument needed here?

"""Gets parameters, those are potentially copies of the the actual parameters used
to do computation on the device. Subclass should override this method if contains
parameters.
Expand Down
2 changes: 1 addition & 1 deletion python/mxnet/module/sequential_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def output_shapes(self):
assert self.binded
return self._modules[-1].output_shapes

def get_params(self):
def get_params(self, copy_to_cpu=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this argument needed here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check Line 167 in this file: arg, aux = module.get_params(). Do we need to pass in the copy_to_cpu value when calling get_params? arg, aux = module.get_params(copy_to_cpu)?

"""Gets current parameters.

Returns
Expand Down
4 changes: 2 additions & 2 deletions src/io/iter_image_recordio_2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,9 @@ inline bool ImageRecordIOParser2<DType>::ParseNext(DataBatch *out) {
shape_vec.push_back(param_.label_width);
TShape label_shape(shape_vec.begin(), shape_vec.end());

out->data.at(0) = NDArray(data_shape, Context::CPUPinned(0), false,
out->data.at(0) = NDArray(data_shape, Context::CPU(0), false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding, why do we change to CPU from CPUPinned in this PR? Is it going to cause issue if we stick to CPUPinned when conduct distributed training with Horovod?

Copy link
Contributor Author

@ctcyang ctcyang Oct 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my benchmark, it doesn't affect performance.

You may recall we can't use horovod.local_rank() to determine the GPU id because mxnet cannot be compile-time dependent on horovod. But then if you set all GPUs from 0-7 to use CPUPinned(0) you will get 8 processes starting the CUDA driver on GPU 0, which wastes memory and you won't be able to get the largest batch size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ctcyang what did you benchmark? Did you also run kvstore('local') and kvstore('nccl') to verify the perf impact?

Copy link
Contributor Author

@ctcyang ctcyang Oct 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my benchmark, I ran kvstore('nccl') and kvstore('device') and there is no perf impact before and after the change. I did not test kvstore('local').

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that data copy will more likely become the bottleneck as GPUs become faster in the next generation. What about adding an ctx option like https://mxnet.incubator.apache.org/versions/master/api/python/ndarray/ndarray.html?highlight=logis#mxnet.ndarray.zeros which defaults to cpu_pinned(0)? For horovod users, if they have memory issues, they can either pass ctx=mx.cpu() or mx.cpu_pinned(local_rank()).

mshadow::DataType<DType>::kFlag);
out->data.at(1) = NDArray(label_shape, Context::CPUPinned(0), false,
out->data.at(1) = NDArray(label_shape, Context::CPU(0), false,
mshadow::DataType<real_t>::kFlag);
unit_size_[0] = param_.data_shape.Size();
unit_size_[1] = param_.label_width;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace kvstore {
class Comm {
public:
Comm() {
pinned_ctx_ = Context::CPUPinned(0);
pinned_ctx_ = Context::CPU(0);
}
virtual ~Comm() { }
/**
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/kvstore_nccl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class KVStoreNCCL : public KVStoreLocal {
KVStoreNCCL() : KVStoreLocal() {
// Due to aggregation, we do not use the Comm interface
comm_ = nullptr;
pinned_ctx_ = Context::CPUPinned(0);
pinned_ctx_ = Context::CPU(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @ptrendx @DickJC123 is this ok to nccl?

Copy link
Contributor Author

@ctcyang ctcyang Oct 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in src/kvstore/kvstore_nccl.h and src/kvstore/comm.h are artifacts of the PoC API, so we can revert these changes. In the Final API, the user sets --kv-store None, so this codepath won't be used and these changes can be reverted.

inited_ = false;
}

Expand Down