Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2 from dmlc/master
Browse files Browse the repository at this point in the history
Merge Back
  • Loading branch information
hetong007 committed Oct 11, 2015
2 parents a777402 + 49a0ff6 commit b7c4401
Show file tree
Hide file tree
Showing 16 changed files with 419 additions and 110 deletions.
94 changes: 94 additions & 0 deletions doc/developer-guide/multi_node.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Multi-devices and multi-machines

## Introduction

MXNet uses a two-level *parameter server* for data synchronization.

<img src=https://raw.githubusercontent.com/dmlc/dmlc.github.io/master/img/mxnet/multi-node/ps_arch.png width=400/>

- On the first layer, data are synchronized over multiple devices within a
single worker machine. A device could be a GPU card, CPU, or other computational
units. We often use sequential consistency model, also known as BSP, on this
level.

- On the second layer, data are synchronize over multiple workers via server
machines. We can either use a sequential consistency model for guaranteed
convergence or an (partial)-asynchronous model for better system performance.

## KVStore

MXNet implemented the two-level parameter server in class *KVStore*. We
currently provide the following three types. Given the batch size *b*:

| kvstore type | #devices | #workers | #ex per device | #ex per update | max delay |
| :--- | --- | --- | --- | --- | --- |
| `local` | *k* | 1 | *b / k* | *b* | *0* |
| `dist_sync` | *k* | *n* | *b / k* | *b × n* | *0* |
| `dist_async` | *k* | *n* | *b / k* | *b* | inf |

where the number of devices *k* used on a worker could vary for different
workers. And

- **number examples per update** : for each update, the number of examples used to
calculate the averaged gradients. Often the larger, the slower the convergence.
- **number examples per device** : the number of examples batched to one device
each time. Often the larger, the better the performance.
- **max delay** : The maximal delay of the weight a worker can get. Given a worker,
a delay *d* for weight *w* means when this worker uses *w* (to calculate the
gradient), *w* have been already updated by *d* times on some other places. A
larger delay often improves the performance, but may slows down the
convergence.

## Multiple devices on a single machine

KV store `local` synchronizes data over multiple devices on a single machine.
It gives the same results (e.g. model accuracy) as the single device case. But
comparing to the latter, assume there are *k* devices, then each device only
processes *1 / k* examples each time (also consumes *1 / k* device memory). We
often increase the batch size *b* for better system performance.

When using `local`, the system will automatically chooses one of the following
three types. Their differences are on where to average
the gradients over all devices, and where to update the weight.

| kvstore type | average gradient | perform update |
| :--- | :--- | --- |
| `local_update_cpu` | CPU | CPU |
| `local_allreduce_cpu` | CPU | all devices |
| `local_allreduce_device` | a device | all devices |

They produce (almost) the same results, but may vary on speed.

- `local_update_cpu`, gradients are first copied to main memory, next averaged on CPU,
and then update the weight on CPU. It is suitable when the average size of
weights are not large and there are a large number of weight. For example the
google Inception network.

- `local_allreduce_cpu` is similar to `local_update_cpu` except that the
averaged gradients are copied back to the devices, and then weights are
updated on devices. It is faster than 1 when the weight size is large so we
can use the device to accelerate the computation (but we increase the workload
by *k* times). Examples are AlexNet on imagenet.

- `local_allreduce_device` is similar to `local_allreduce_cpu` except that the
gradient are averaged on a chosen device. It may take advantage of the
possible device-to-device communication, and may accelerate the averaging
step. It is faster than 2 when the gradients are huge. But it requires more
device memory.

## Multiple machines

Both `dist_async` and `dist_sync` can handle the multiple machines
situation. But they are different on both semantic and performance.

- `dist_sync`: the gradients are first averaged on the servers, and then send to
back to workers for updating the weight. It is similar to `local` and
`update_on_kvstore=false` if we treat a machine as a device. It guarantees
almost identical convergence with the single machine single device situation
if reduces the batch size to *b / n*. However, it requires synchronization
between all workers, and therefore may harm the system performance.

- `dist_async`: the gradient is sent to the servers, and the weight is updated
there. The weights a worker has may be stale. This loose data consistency
model reduces the machine synchronization cost and therefore could improve the
system performance. But it may harm the convergence speed.
8 changes: 8 additions & 0 deletions include/mxnet/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,14 @@ MXNET_DLL int MXKVStoreSetUpdater(KVStoreHandle handle,
MXKVStoreUpdater updater);


/*!
* \brief get the type of the kvstore
* \param handle handle to the KVStore
* \param type a string type
* \return 0 when success, -1 when failure happens
*/
MXNET_DLL int MXKVStoreGetType(KVStoreHandle handle,
const char** type);
//--------------------------------------------
// Part 6: advanced KVStore for multi-machines
//--------------------------------------------
Expand Down
20 changes: 17 additions & 3 deletions include/mxnet/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,22 @@ class KVStore {

/*!
* \brief Factory function to create a new KVStore.
* \param type The type of the kvstore, can be "local" or "dist"
* - local works for multiple devices on a single machine (single process)
* - dist works for multi-machines (multiple processes)
* \param type The type of the kvstore,
* 'local' : multi-devices on a single machine. can be also
* 'local_update_cpu', 'local_allreduce_cpu'
* 'device' or 'local_allreduce_device' : same to local but use gpus for kv
* allreduce
* 'dist_sync' : multi-machines with BSP
* 'dist_async' : multi-machines with partical asynchronous
* \return a new created KVStore.
*/
static KVStore *Create(const char *type = "local");

/**
* \brief return the type
*/
inline const std::string& type() { return type_; }

/*!
* \brief Initialize a list of key-value pair to the store.
*
Expand Down Expand Up @@ -269,6 +278,11 @@ class KVStore {
* \brief the user-defined updater
*/
Updater updater_;

/**
* \brief the kvstore type
*/
std::string type_;
};

} // namespace mxnet
Expand Down
4 changes: 2 additions & 2 deletions python/mxnet/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class Context(object):
"""
# static class variable
default_ctx = None
devtype2str = {1: 'cpu', 2: 'gpu'}
devstr2type = {'cpu': 1, 'gpu': 2}
devtype2str = {1: 'cpu', 2: 'gpu', 3: 'cpu_pinned'}
devstr2type = {'cpu': 1, 'gpu': 2, 'cpu_pinned': 3}
def __init__(self, device_type, device_id=0):
if isinstance(device_type, Context):
self.device_typeid = device_type.device_typeid
Expand Down
44 changes: 31 additions & 13 deletions python/mxnet/kvstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pickle
from .ndarray import NDArray
from .base import _LIB
from .base import check_call, c_array, c_str, string_types, mx_uint
from .base import check_call, c_array, c_str, string_types, mx_uint, py_str
from .base import NDArrayHandle, KVStoreHandle
from . import optimizer as opt

Expand Down Expand Up @@ -68,7 +68,7 @@ def init(self, key, value):
For each key, one must init it before push and pull.
Only worker 0's (get_rank() == 0) data are used.
Only worker 0's (rank == 0) data are used.
This function returns after data have been initialized successfully
Expand All @@ -95,7 +95,7 @@ def init(self, key, value):
>>> keys = [5, 7, 9]
>>> kv.init(keys, [mx.nd.ones(shape)]*len(keys))
"""
if (self.get_rank() == 0):
if (self.rank == 0):
ckeys, cvals = _ctype_key_value(key, value)
check_call(_LIB.MXKVStoreInit(
self.handle, mx_uint(len(ckeys)), ckeys, cvals))
Expand Down Expand Up @@ -169,6 +169,9 @@ def push(self, key, value, priority=0):
self.handle, mx_uint(len(ckeys)), ckeys, cvals,
ctypes.c_int(priority)))

# self._wait(key)
# self._barrier()

def pull(self, key, out=None, priority=0):
""" Pull a single value or a sequence of values from the store.
Expand Down Expand Up @@ -261,9 +264,23 @@ def set_optimizer(self, optimizer):
raise
self._send_command_to_servers(0, optim_str)
else:
self._set_updater(opt.optimizer_clossure(optimizer))
self._set_updater(opt.get_updater(optimizer))

@property
def type(self):
"""Get the type of this kvstore
Returns
-------
type : str
the string type
"""
kv_type = ctypes.c_char_p()
check_call(_LIB.MXKVStoreGetType(self.handle, ctypes.byref(kv_type)))
return py_str(kv_type.value)

def get_rank(self):
@property
def rank(self):
"""Get the rank of this worker node
Returns
Expand All @@ -275,7 +292,8 @@ def get_rank(self):
check_call(_LIB.MXKVStoreGetRank(self.handle, ctypes.byref(rank)))
return rank.value

def get_num_workers(self):
@property
def num_workers(self):
"""Get the number of worker ndoes
Returns
Expand Down Expand Up @@ -329,17 +347,17 @@ def _barrier(self):
pulling, we can place a barrier to guarantee that the initialization is
finished.
The following codes run on n machines in parallel
>>> if kv.get_rank() == 0:
... kv.init(keys, values);
... kv.barrier()
... kv.pull(keys, out = values);
But note that, this functions only blocks the main thread of workers
until all of them are reached this point. It doesn't guarantee that all
operations issued before are actually finished, such as \ref Push and
\ref Pull. In that case, we need to call \ref Wait or \ref WaitAll
The following codes implement a BSP model
>>> kv.push(keys, values)
... kv._wait(keys)
... kv._barrier()
... kv.pull(keys, out = values);
"""
check_call(_LIB.MXKVStoreBarrier(self.handle))

Expand Down
2 changes: 1 addition & 1 deletion python/mxnet/kvstore_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def server_controller(cmd_id, cmd_body):
self.kvstore.set_optimizer(optimizer)
else:
print ("server %d, unknown command (%d, %s)" % (
self.kvstore.get_rank(), cmd_id, cmd_body))
self.kvstore.rank, cmd_id, cmd_body))
return server_controller

def run(self):
Expand Down
Loading

0 comments on commit b7c4401

Please sign in to comment.