Skip to content

Commit

Permalink
Aggregate SGD (apache#13346)
Browse files Browse the repository at this point in the history
* Aggregate SGD

* Make OpWrapperGenerator understand Tuple<float>

* Trigger

* Add NNVM Tuple to cpp-package op.h

* Trigger

* Fix pylint aggregate SGD

* Update info about new ENV vars and modifying 2 tests that require
update_on_kvstore to be true

* Fix

* Aggregate SGD support for Gluon trainer

* Added text to doc about aggregate update in SGD optimizer

* Docs changes from review
  • Loading branch information
ptrendx authored and haohuw committed Jun 23, 2019
1 parent 72be2c2 commit f07317c
Show file tree
Hide file tree
Showing 10 changed files with 711 additions and 66 deletions.
4 changes: 3 additions & 1 deletion cpp-package/scripts/OpWrapperGenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class Arg:
'double':'double',\
'double or None':'dmlc::optional<double>',\
'Shape or None':'dmlc::optional<Shape>',\
'string':'const std::string&'}
'string':'const std::string&',\
'tuple of <float>':'nnvm::Tuple<mx_float>'}
name = ''
type = ''
description = ''
Expand Down Expand Up @@ -407,6 +408,7 @@ def ParseAllOps():
"#include \"mxnet-cpp/op_util.h\"\n"
"#include \"mxnet-cpp/operator.h\"\n"
"#include \"dmlc/optional.h\"\n"
"#include \"nnvm/tuple.h\"\n"
"\n"
"namespace mxnet {\n"
"namespace cpp {\n"
Expand Down
9 changes: 9 additions & 0 deletions docs/faq/env_var.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
- If true, MXNet tries to use GPU peer-to-peer communication, if available on your device,
when kvstore's type is `device`.

* MXNET_UPDATE_ON_KVSTORE
- Values: 0(false) or 1(true) ```(default=1)```
- If true, weight updates are performed during the communication step, if possible.

## Memonger

* MXNET_BACKWARD_DO_MIRROR
Expand Down Expand Up @@ -218,6 +222,11 @@ When USE_PROFILER is enabled in Makefile or CMake, the following environments ca
- When the array size is bigger than or equal to this threshold, NDArray::Copy(from, to) is implemented by OpenMP with the Recommended OMP Thread Count.
- When the array size is less than this threshold, NDArray::Copy(from , to)) is implemented by memcpy in single thread.

* MXNET_OPTIMIZER_AGGREGATION_SIZE
- Values: Int ```(default=4)```
- Maximum value is 60.
- This variable controls how many weights will be updated in a single call to optimizer (for optimizers that support aggregation, currently limited to SGD).

Settings for Minimum Memory Usage
---------------------------------
- Make sure ```min(MXNET_EXEC_NUM_TEMP, MXNET_GPU_WORKER_NTHREADS) = 1```
Expand Down
15 changes: 12 additions & 3 deletions python/mxnet/gluon/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class Trainer(object):
See mxnet.KVStore.set_gradient_compression method for more details on gradient compression.
update_on_kvstore : bool, default None
Whether to perform parameter updates on kvstore. If None, then trainer will choose the more
suitable option depending on the type of kvstore.
suitable option depending on the type of kvstore. If the `update_on_kvstore` argument is
provided, environment variable `MXNET_UPDATE_ON_KVSTORE` will be ignored.
Properties
----------
Expand Down Expand Up @@ -393,6 +394,8 @@ def update(self, batch_size, ignore_stale_grad=False):
self._update(ignore_stale_grad)

def _update(self, ignore_stale_grad=False):
updates = [[] for _ in self._updaters]

for i, param in enumerate(self._params):
if param.grad_req == 'null':
continue
Expand All @@ -416,11 +419,17 @@ def _update(self, ignore_stale_grad=False):
self._kvstore.pull(i, param.list_data(), priority=-i)
continue

for upd, arr, grad in zip(self._updaters, param.list_data(), param.list_grad()):
for upd, arr, grad in zip(updates, param.list_data(), param.list_grad()):
if not ignore_stale_grad or arr._fresh_grad:
upd(i, grad, arr)
upd.append((i, grad, arr))
arr._fresh_grad = False

if not (self._kvstore and self._update_on_kvstore):
for updater, upd in zip(self._updaters, updates):
if upd:
i, w, g = zip(*upd)
updater(i, w, g)

def save_states(self, fname):
"""Saves trainer states (e.g. optimizer, momentum) to a file.
Expand Down
10 changes: 7 additions & 3 deletions python/mxnet/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ def _create_kvstore(kvstore, num_device, arg_params):
arg_params : dict of str to `NDArray`.
Model parameter, dict of name to `NDArray` of net's weights.
"""
update_on_kvstore = True
update_on_kvstore = bool(int(os.getenv('MXNET_UPDATE_ON_KVSTORE', "1")))
if kvstore is None:
kv = None
elif isinstance(kvstore, kvs.KVStore):
kv = kvstore
elif isinstance(kvstore, str):
# create kvstore using the string type
if num_device is 1 and 'dist' not in kvstore:
if num_device == 1 and 'dist' not in kvstore:
# no need to use kv for single device and single machine
kv = None
else:
Expand Down Expand Up @@ -162,6 +162,7 @@ def _update_params_on_kvstore(param_arrays, grad_arrays, kvstore, param_names):
def _update_params(param_arrays, grad_arrays, updater, num_device,
kvstore=None, param_names=None):
"""Perform update of param_arrays from grad_arrays not on kvstore."""
updates = [[] for _ in range(num_device)]
for i, pair in enumerate(zip(param_arrays, grad_arrays)):
arg_list, grad_list = pair
if grad_list[0] is None:
Expand All @@ -178,7 +179,10 @@ def _update_params(param_arrays, grad_arrays, updater, num_device,
# state for the same index but on diff devs, TODO(mli)
# use a better solution later
w, g = p
updater(index*num_device+k, g, w)
updates[k].append((index*num_device+k, g, w))
for dev_updates in updates:
i, w, g = zip(*dev_updates)
updater(i, w, g)


def _multiple_callbacks(callbacks, *args, **kwargs):
Expand Down
Loading

0 comments on commit f07317c

Please sign in to comment.