Skip to content

Commit

Permalink
[MXNET-apache#16795] Byteps-KVStore: Intergrate Byteps into mxnet as …
Browse files Browse the repository at this point in the history
…new type of kvstore backend (apache#17555)

* Add Byteps backend for kvstore

* Add a temp launcher for byteps backend

* make the test fit for byteps kvstore.

* final workable test

* Remove trashy print and logs

* correct comment

* add hostfile for ci test

* add ci test for byteps kvstore

* add visibile devices for byteps-kvstore ci test

* add licenses for tools/byteps_launcher.py

* syntax error

* pylint error (remove unused import like logging)

* pylint error

* pylint error

* enable launching without hostfile (local byteps)

* 1. rename byteps_kvstore.py to byteps.py; 2. shorten the launch option  to ; 3. add instruction for -H and -SH options for launch; 4. add documentation for byteps kvstore in kvstore/base.py: create(name='local')

* edit documentation of KVStoreBase::is_capable(capability); reture fasle for BytePS(KVStoreBase):is_capable(any).

* pylint error

* remove an error of arg.byteps

* use --env option to set workers' environment

* error in byteps-launcher.py

* remove the unpurposed editing mistake in runtime_functions.sh

* disable cpu support for byteps kvstore.

* 1. format the document to avoid julia doc build error;
2. little change to nightly test;
3. add byteps copy right declararation in byteps_launcher.py
4. if args.byteps == True ===> if args.byteps

* remove the --scheduler_ip and --scheduler_port options in launch.py

* 1. maintain the origin value of broadcast and pushpull
2. optimize when out = value or [out]=value
3. add some missing documentation to avoid doc building error.

* Add bytePS to CI

* add dependency

* +integrationtest_ubuntu_gpu_byteps

* add byteps pipeline

* disable a few tests

* remove more tests

* fix permission

* remove apt-get

* fix python path

* improve logging

* fix printns

* add back CI

Co-authored-by: Ubuntu <[email protected]>
Co-authored-by: Piyush Ghai <[email protected]>
Co-authored-by: eric-haibin-lin <[email protected]>
Co-authored-by: eric-haibin-lin <--global>
Co-authored-by: Lin <[email protected]>
  • Loading branch information
5 people authored and Vladimir Cherepanov committed Apr 7, 2020
1 parent 9a60542 commit a5a100d
Show file tree
Hide file tree
Showing 10 changed files with 624 additions and 3 deletions.
19 changes: 19 additions & 0 deletions ci/docker/runtime_functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ cd_unittest_ubuntu() {
# Adding these here as CI doesn't test all CUDA environments
$python_cmd example/image-classification/test_score.py
integrationtest_ubuntu_gpu_dist_kvstore
integrationtest_ubuntu_gpu_byteps
fi

if [[ ${mxnet_variant} = *mkl ]]; then
Expand Down Expand Up @@ -1351,6 +1352,24 @@ integrationtest_ubuntu_gpu_dist_kvstore() {
popd
}

integrationtest_ubuntu_gpu_byteps() {
set -ex
pushd .
export PYTHONPATH=$PWD/python/
export BYTEPS_WITHOUT_PYTORCH=1
export BYTEPS_WITHOUT_TENSORFLOW=1
git clone -b v0.2 https://github.com/bytedance/byteps/ --recursive
cd byteps && python3 setup.py install --user && cd -

export MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
export MXNET_SUBGRAPH_VERBOSE=0
export DMLC_LOG_STACK_TRACE_DEPTH=10
cd tests/nightly/
python3 ../../tools/launch.py -n 1 -s 1 --byteps --env NVIDIA_VISIBLE_DEVICES:0,1 python3 dist_device_sync_kvstore_byteps.py
popd
}


test_ubuntu_cpu_python3() {
set -ex
pushd .
Expand Down
14 changes: 14 additions & 0 deletions ci/jenkins/Jenkins_steps.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,20 @@ def test_unix_distributed_kvstore_cpu() {
}]
}

def test_unix_byteps_gpu() {
return ['byteps tests GPU': {
node(NODE_LINUX_GPU) {
ws('workspace/it-byteps') {
timeout(time: max_time, unit: 'MINUTES') {
utils.unpack_and_init('gpu', mx_lib)
utils.docker_run('ubuntu_gpu_cu101', 'integrationtest_ubuntu_gpu_byteps', true)
utils.publish_test_coverage()
}
}
}
}]
}

def test_unix_distributed_kvstore_gpu() {
return ['dist-kvstore tests GPU': {
node(NODE_LINUX_GPU) {
Expand Down
2 changes: 1 addition & 1 deletion ci/jenkins/Jenkinsfile_edge
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ core_logic: {

utils.parallel_stage('Tests', [
custom_steps.test_qemu_armv7_cpu()
])
])
}
,
failure_handler: {
Expand Down
1 change: 1 addition & 0 deletions ci/jenkins/Jenkinsfile_unix_gpu
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ core_logic: {
custom_steps.test_unix_cpp_package_gpu(),
custom_steps.test_unix_scala_gpu(),
custom_steps.test_unix_distributed_kvstore_gpu(),
custom_steps.test_unix_byteps_gpu(),
custom_steps.test_static_python_gpu(),
custom_steps.test_static_python_gpu_cmake(),
custom_steps.test_unix_python3_gpu_no_tvm_op(),
Expand Down
1 change: 1 addition & 0 deletions python/mxnet/kvstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@
from .kvstore import *
from .base import *
from .kvstore_server import *
from .byteps import *
9 changes: 8 additions & 1 deletion python/mxnet/kvstore/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ def pushpull(self, key, value, out=None, priority=0):
def is_capable(capability):
"""Queries if the KVStore type supports certain capability, such as optimizer algorithm,
gradient compression, sparsity, etc.
If the kvstore does not store weights in server part, then no optimizer is supported,
this function will return False.
Parameters
----------
Expand Down Expand Up @@ -427,10 +429,15 @@ def create(name='local'):
No two updates happen on the same weight at the same time. However, the order is not
guaranteed.
``byteps``: Use byteps as broadcast/pushpull backend.
This kind of kvstore doesn't store weights, thus there won't be optimizer in this kvstore server.
Byteps doesn't support pure cpu training, so be sure to enable gpu training when using this kvstore.
Parameters
----------
name : {'local', 'device', 'nccl', 'dist_sync', 'dist_device_sync', 'dist_async', 'horovod'}
name : {'local', 'device', 'nccl', 'dist_sync', 'dist_device_sync', 'dist_async', 'horovod', 'byteps'}
The type of KVStore.
Returns
-------
kv : KVStoreBase
Expand Down
255 changes: 255 additions & 0 deletions python/mxnet/kvstore/byteps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# coding: utf-8
""" BytePS backend for MXNet KVStore"""
from __future__ import absolute_import

from ..ndarray import NDArray
from .base import KVStoreBase

__all__ = ['BytePS']


@KVStoreBase.register
class BytePS(KVStoreBase):
"""BytePS backend for MXNet KVStore interface."""

def __init__(self):
"""Initializes a new KVStore."""
try:
import byteps.mxnet as bps
self.handle = bps
except ImportError as err:
print('Did not find BytePS library. Please install BytePS first')
raise err
self.handle.init()

def broadcast(self, key, value, out, priority=0):
""" Broadcast the value NDArray at rank 0 to all ranks' out. If out is None,
the result is stored in `value`.
Parameters
----------
key : str, or int
The keys.
value : NDArray, or list of NDArray
Values corresponding to the key.
out : NDArray, or lise of NDArray
Values corresponding to the keys.
Examples
--------
>>> # broadcast a single key-value pair
>>> shape = (2,3)
>>> kv = mx.kv.create('byteps')
>>> a = mx.nd.zeros(shape)
>>> kv.broadcast('3', mx.nd.ones(shape)*2, out=a)
>>> print a.asnumpy()
[[ 2. 2. 2.]
[ 2. 2. 2.]]
"""
# do not accept list or tuple for key/value
assert isinstance(key, (str, int))

# unpack the list if it contains just one NDArray
value = value[0] if isinstance(
value, list) and len(value) == 1 else value
assert isinstance(
value, NDArray), "The type of value can only be NDArray or list of NDArray which has only one element."
assert value.context.device_type == 'gpu', "Byteps KVStore only support GPU context for broadcast value."

# optimzation when out = value or out = [value]
if isinstance(out, (list, tuple)) and len(out) == 1:
inplace = value is out[0]
else:
inplace = value is out

if inplace:
broadcast_value = value
else:
broadcast_value = value.copy()
# for non-root-rank, assign value with 0, thus the result of pushpull will be
# equal to the value of root-rank, thus implementing broadcast.
root_rank = 0
if self.rank != root_rank:
broadcast_value.__imul__(0)
self.handle.byteps_push_pull(broadcast_value, version=0, priority=priority,
name=str(key), is_average=False)
# Make sure tensors pushed to MXNet engine get processed such that all
# workers are synced before starting training.
broadcast_value.wait_to_read()

out = out if isinstance(out, list) else [out]
for o in out:
broadcast_value.copyto(o)

def pushpull(self, key, value, out=None, priority=0):
""" Performs push and pull a single value from the store.
This function is coalesced form of push and pull operations.
`value` is pushed to the kvstore server for the specified keys and the aggregated
values are pulled from the server to `out`. If `out` is not specified the pulled
values are written to `value`.
Parameters
----------
key : str, or int
The key.
value : NDArray, or list of NDArray
Values corresponding to the key.
out: NDArray, or list of NDArray
Values corresponding to the key.
priority : int, optional
The priority of the operation.
Higher priority operations are likely to be executed before other actions.
Examples
--------
>>> # pushpull a single key-value pair
>>> kv.pushpull('3', mx.nd.ones(shape)*8, out=a)
>>> print a.asnumpy()
[[ 8. 8. 8.]
[ 8. 8. 8.]]
"""
# the most common operation operates on one NDArray as `value`, and
# `out` is set to None, for inplace pushpull.

assert isinstance(key, (str, int))

# unpack the list if it contains just one NDArray
value = value[0] if isinstance(
value, list) and len(value) == 1 else value
assert isinstance(
value, NDArray), "The type of value can only be NDArray or list of NDArray which has only one element."
assert value.context.device_type == 'gpu', "Byteps KVStore only support GPU context for pushpull value"

# optimzation when out = value or out = [value]
if isinstance(out, (list, tuple)) and len(out) == 1:
inplace = value is out[0]
else:
inplace = value is out

if inplace:
pushpull_value = value
else:
pushpull_value = value.copy()

self.handle.byteps_push_pull(pushpull_value, version=0, priority=priority,
name=str(key), is_average=False)

if out is not None:
out = out if isinstance(out, list) else [out]
for o in out:
pushpull_value.copyto(o)

@staticmethod
def is_capable(capability):
"""Queries if the KVStore type supports certain capability, such as optimizer algorithm,
gradient compression, sparsity, etc.
As byteps server does not store weight, this function will return false for any capabilities.
Parameters
----------
capability: str
The capability to query
Returns
-------
result : bool
Whether the capability is supported or not.
"""
return False

@property
def type(self):
""" Returns the type of this kvstore.
Returns
-------
type : str
the string type
"""
return 'byteps'

@property
def local_rank(self):
""" Returns the local rank of this worker on the node.
Returns
-------
rank : int
The local rank of this node, which is in range [0, num_workers_on_current_node())
"""
return self.handle.local_rank()

@property
def rank(self):
""" Returns the rank of this worker node.
Returns
-------
rank : int
The rank of this node, which is in range [0, num_workers())
"""
return self.handle.rank()

@property
def num_workers(self):
"""Returns the number of worker nodes.
Returns
-------
size :int
The number of worker nodes.
"""
return self.handle.size()

def set_optimizer(self, optimizer):
"""
Not Implement yet.
Parameters
----------
optimizer : KVStoreBase
The new optimizer for the store
"""
raise NotImplementedError()

def save_optimizer_states(self, fname, dump_optimizer=False):
"""
Not Implement yet.
Parameters
----------
fname : str
Path to the output states file.
dump_optimizer : bool, default False
Whether to also save the optimizer itself. This would also save optimizer
information such as learning rate and weight decay schedules.
"""
raise NotImplementedError()

def load_optimizer_states(self, fname):
"""
Not Implement yet.
Parameters
----------
fname : str
Path to input states file.
"""
raise NotImplementedError()
Loading

0 comments on commit a5a100d

Please sign in to comment.