Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

[MXNET-#16795] Byteps-KVStore: Intergrate Byteps into mxnet as new type of kvstore backend #17555

Merged
merged 41 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
011f1bb
Add Byteps backend for kvstore
Feb 5, 2020
017a225
Add a temp launcher for byteps backend
Feb 5, 2020
933c8c2
make the test fit for byteps kvstore.
Feb 7, 2020
3a5c78a
final workable test
Feb 7, 2020
f183b50
Remove trashy print and logs
Feb 8, 2020
95c119b
correct comment
Feb 8, 2020
d02da35
add hostfile for ci test
Feb 8, 2020
2bf7bf6
Merge branch 'master' of https://github.com/apache/incubator-mxnet in…
Feb 8, 2020
7bfeb33
add ci test for byteps kvstore
Feb 8, 2020
dcc003b
add visibile devices for byteps-kvstore ci test
Feb 8, 2020
03c505f
add licenses for tools/byteps_launcher.py
Feb 8, 2020
f5f0f6d
syntax error
Feb 8, 2020
e244561
pylint error (remove unused import like logging)
Feb 8, 2020
2dbabb6
pylint error
Feb 8, 2020
cdb6d2e
pylint error
Feb 8, 2020
1901919
enable launching without hostfile (local byteps)
Feb 8, 2020
9223b2a
1. rename byteps_kvstore.py to byteps.py; 2. shorten the launch optio…
Feb 9, 2020
b54c215
edit documentation of KVStoreBase::is_capable(capability); reture fas…
Feb 9, 2020
f359299
pylint error
Feb 9, 2020
1a2b269
remove an error of arg.byteps
Feb 9, 2020
842649e
use --env option to set workers' environment
Feb 9, 2020
38917ee
error in byteps-launcher.py
Feb 9, 2020
9a4e639
remove the unpurposed editing mistake in runtime_functions.sh
Feb 10, 2020
7435a8c
disable cpu support for byteps kvstore.
Feb 10, 2020
1871071
1. format the document to avoid julia doc build error;
Feb 10, 2020
bad53a0
remove the --scheduler_ip and --scheduler_port options in launch.py
Feb 10, 2020
0951911
1. maintain the origin value of broadcast and pushpull
Feb 10, 2020
b55001c
resolve conflcit
piyushghai Mar 23, 2020
a4ec4b8
Add bytePS to CI
eric-haibin-lin Mar 23, 2020
55adf68
add dependency
Mar 24, 2020
d024ca9
+integrationtest_ubuntu_gpu_byteps
Mar 25, 2020
46bfd8d
add byteps pipeline
Mar 25, 2020
a9a13b0
disable a few tests
Mar 25, 2020
6d48642
remove more tests
Mar 25, 2020
73ffc37
fix permission
Mar 25, 2020
97497bc
remove apt-get
Mar 25, 2020
0177bf5
fix python path
Mar 25, 2020
5f5fa85
improve logging
Mar 25, 2020
db173ba
fix printns
Mar 25, 2020
13e917c
add back CI
Mar 25, 2020
01eeab3
Merge remote-tracking branch 'origin/master' into pr-byteps-backend
Apr 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ci/docker/runtime_functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,7 @@ integrationtest_ubuntu_gpu_dist_kvstore() {
../../tools/launch.py -n 4 --launcher local python dist_device_sync_kvstore_custom.py
../../tools/launch.py --p3 -n 4 --launcher local python dist_device_sync_kvstore_custom.py
../../tools/launch.py -n 4 --launcher local python dist_sync_kvstore.py --type=init_gpu
../../tools/launch.py -n 1 -s 1 --byteps --env NVIDIA_VISIBLE_DEVICES:0,1 python3 dist_device_sync_kvstore_byteps.py
popd
}

Expand Down Expand Up @@ -1781,7 +1782,7 @@ build_julia_docs() {
export LD_PRELOAD='/usr/lib/x86_64-linux-gnu/libjemalloc.so'
export LD_LIBRARY_PATH=/work/mxnet/lib:$LD_LIBRARY_PATH

julia_doc_path='julia/docs/site/'
julia_doc_path='julia/docs/site/'f
eric-haibin-lin marked this conversation as resolved.
Show resolved Hide resolved
julia_doc_artifact='docs/_build/julia-artifacts.tgz'

echo "Julia will check for MXNet in $MXNET_HOME/lib"
Expand Down
1 change: 1 addition & 0 deletions python/mxnet/kvstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@
from .kvstore import *
from .base import *
from .kvstore_server import *
from .byteps import *
8 changes: 7 additions & 1 deletion python/mxnet/kvstore/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ def pushpull(self, key, value, out=None, priority=0):
def is_capable(capability):
"""Queries if the KVStore type supports certain capability, such as optimizer algorithm,
gradient compression, sparsity, etc.
If the kvstore does not store weights in server part, then no optimizer is supported,
this function will return False.

Parameters
----------
Expand Down Expand Up @@ -428,9 +430,13 @@ def create(name='local'):
No two updates happen on the same weight at the same time. However, the order is not
guaranteed.

``byteps``: Use byteps as broadcast/pushpull backend.
This kind of kvstore doesn't store weights, thus there won't be optimizer in this kvstore server.
Byteps doesn't support pure cpu training, so be sure to enable gpu training when using this kvstore.

Parameters
----------
name : {'local', 'device', 'nccl', 'dist_sync', 'dist_device_sync', 'dist_async', 'horovod'}
name : {'local', 'device', 'nccl', 'dist_sync', 'dist_device_sync', 'dist_async', 'horovod', 'byteps'}
eric-haibin-lin marked this conversation as resolved.
Show resolved Hide resolved
The type of KVStore.
Returns
-------
Expand Down
210 changes: 210 additions & 0 deletions python/mxnet/kvstore/byteps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# coding: utf-8
""" BytePS backend for MXNet KVStore"""
from __future__ import absolute_import

from ..ndarray import NDArray
from .base import KVStoreBase

__all__ = ['BytePS']


@KVStoreBase.register
class BytePS(KVStoreBase):
"""BytePS backend for MXNet KVStore interface."""

def __init__(self):
"""Initializes a new KVStore."""
try:
import byteps.mxnet as bps
self.handle = bps
except ImportError as err:
print('Did not find BytePS library. Please install BytePS first')
raise err
self.handle.init()

def broadcast(self, key, value, out, priority=0):
""" Broadcast the value NDArray at rank 0 to all ranks' out. If out is None,
the result is stored in `value`.
ChaokunChang marked this conversation as resolved.
Show resolved Hide resolved
Parameters
----------
key : str, or int
The keys.
value : NDArray, or list of NDArray
Values corresponding to the key.
out : NDArray, or lise of NDArray
Values corresponding to the keys.
ChaokunChang marked this conversation as resolved.
Show resolved Hide resolved
Examples
--------
>>> # broadcast a single key-value pair
>>> shape = (2,3)
>>> kv = mx.kv.create('byteps')
>>> a = mx.nd.zeros(shape)
>>> kv.broadcast('3', mx.nd.ones(shape)*2, out=a)
>>> print a.asnumpy()
[[ 2. 2. 2.]
[ 2. 2. 2.]]
"""

# do not accept list or tuple for key/value
assert isinstance(key, (str, int))

# unpack the list if it contains just one NDArray
value = value[0] if isinstance(
value, list) and len(value) == 1 else value
assert isinstance(
value, NDArray), "The type of value can only be NDArray or list of NDArray which has only one element."

ChaokunChang marked this conversation as resolved.
Show resolved Hide resolved
# for non-root-rank, assign value with 0, thus the result of pushpull will be
# equal to the value of root-rank, thus implementing broadcast.
root_rank = 0
if self.rank != root_rank:
value.__imul__(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broadcast API is generic and users don't expect the value of value to be changed. We can optimize for the common case, where users call kv.broadcast('x', value=x, out=x) or kv.broadcast('x', value=x, out=[x]). This is usually used for initializing all network parameters.

# value is a NDArray 
inplace = value is out
if isinstance(out, (list, tuple)) and len(out) == 1:
    inplace = value is out[0]

For this case we can check the value of inplace, if so we modify value inplace and call byteps_push_pull. Otherwise, we need to create use temporary buffers to perform pushpull.

self.handle.byteps_push_pull(value, version=0, priority=priority,
name=str(key), is_average=False)
# Make sure tensors pushed to MXNet engine get processed such that all
# workers are synced before starting training.
value.wait_to_read()
ChaokunChang marked this conversation as resolved.
Show resolved Hide resolved

out = out if isinstance(out, list) else [out]
for o in out:
value.copyto(o)

def pushpull(self, key, value, out=None, priority=0):
""" Performs push and pull a single value from the store.
This function is coalesced form of push and pull operations.
`value` is pushed to the kvstore server for the specified keys and the aggregated
values are pulled from the server to `out`. If `out` is not specified the pulled
values are written to `value`.
ChaokunChang marked this conversation as resolved.
Show resolved Hide resolved
Parameters
----------
key : str, or int
The key.
value : NDArray, or list of NDArray
Values corresponding to the key.
out: NDArray, or list of NDArray
Values corresponding to the key.
priority : int, optional
The priority of the operation.
Higher priority operations are likely to be executed before other actions.
ChaokunChang marked this conversation as resolved.
Show resolved Hide resolved
Examples
--------
>>> # pushpull a single key-value pair
>>> kv.pushpull('3', mx.nd.ones(shape)*8, out=a)
>>> print a.asnumpy()
[[ 8. 8. 8.]
[ 8. 8. 8.]]
"""
# the most common operation operates on one NDArray as `value`, and
# `out` is set to None, for inplace pushpull.

assert isinstance(key, (str, int))

# unpack the list if it contains just one NDArray
value = value[0] if isinstance(
value, list) and len(value) == 1 else value
assert isinstance(
value, NDArray), "The type of value can only be NDArray or list of NDArray which has only one element."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users don't expect value to change unless out is value

self.handle.byteps_push_pull(value, version=0, priority=priority,
name=str(key), is_average=False)

if out is not None:
out = out if isinstance(out, list) else [out]
for o in out:
value.copyto(o)

@staticmethod
def is_capable(capability):
"""Queries if the KVStore type supports certain capability, such as optimizer algorithm,
gradient compression, sparsity, etc.
As byteps server does not store weight, this function will return false for any capabilities.

Parameters
----------
capability: str
The capability to query
ChaokunChang marked this conversation as resolved.
Show resolved Hide resolved
Returns
-------
result : bool
Whether the capability is supported or not.
"""
return False

@property
def type(self):
""" Returns the type of this kvstore.

Returns
-------
type : str
the string type
"""
return 'byteps'

@property
def local_rank(self):
""" Returns the local rank of this worker on the node.

Returns
-------
rank : int
The local rank of this node, which is in range [0, num_workers_on_current_node())
"""
return self.handle.local_rank()

@property
def rank(self):
""" Returns the rank of this worker node.

Returns
-------
rank : int
The rank of this node, which is in range [0, num_workers())
"""
return self.handle.rank()

@property
def num_workers(self):
"""Returns the number of worker nodes.

Returns
-------
size :int
The number of worker nodes.
"""
return self.handle.size()

def set_optimizer(self, optimizer):
"""
Not Implement yet.
"""
raise NotImplementedError()

def save_optimizer_states(self, fname, dump_optimizer=False):
"""
Not Implement yet.
"""
raise NotImplementedError()

def load_optimizer_states(self, fname):
"""
Not Implement yet.
"""
raise NotImplementedError()
115 changes: 115 additions & 0 deletions tests/nightly/dist_device_sync_kvstore_byteps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/usr/bin/env python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import sys
sys.path.insert(0, "../../python/")
import mxnet as mx
import numpy as np
import numpy.random as rnd
import time
import argparse
from mxnet.log import get_logger
import logging
from mxnet.kvstore import BytePS
logger = get_logger("Byteps-Backend-Test", level=logging.DEBUG)

# parser
parser = argparse.ArgumentParser(description='kvstore test')
parser.add_argument('--name', type=str, default='byteps')
args = parser.parse_args()

def check_diff_to_scalar(A, x, rank=None):
""" assert A == x"""
assert(np.sum(np.abs((A - x).asnumpy())) == 0), (rank, A.asnumpy(), x)

# setup
keys = ['3', '5', '7']
init_test_keys = [str(i) for i in range(200,300)]
init_test_keys_big = [str(i) for i in range(300,400)]
init_test_keys_device = [str(i) for i in range(400,500)]
init_test_keys_device_big = [str(i) for i in range(500,600)]

shape = (2, 3)
big_shape = (1200, 1200) # bigger than MXNET_KVSTORE_BIGARRAY_BOUND

kv = mx.kv.create(args.name)
my_rank = kv.rank
my_num_workers = kv.num_workers

has_gpu = mx.context.num_gpus() > 0

def current_context(device=False):
if has_gpu and device==True:
return mx.gpu(kv.local_rank)
else:
return mx.current_context()

def test_pushpull():
num_gpus = 2
def check_default_keys(nrepeat=3):
# init kv dns keys
kv.broadcast('3', mx.nd.ones(shape, ctx=current_context(device=True)), mx.nd.ones(shape, ctx=current_context(device=True)))
ChaokunChang marked this conversation as resolved.
Show resolved Hide resolved
kv.broadcast('99', mx.nd.ones(big_shape, ctx=current_context(device=True)), mx.nd.ones(big_shape, ctx=current_context(device=True)))
for i in range(nrepeat):
scale = my_rank + 1
num = (my_num_workers + 1) * my_num_workers * num_gpus / 2

arr = mx.nd.ones(shape, ctx=current_context(device=True)) * scale
# inplace
kv.pushpull('3', arr)
check_diff_to_scalar(arr, num)

big_arr = mx.nd.ones(big_shape, ctx=current_context(device=True)) * scale
# inplace
kv.pushpull('99', big_arr)
check_diff_to_scalar(big_arr, num)

check_default_keys(nrepeat=3)
logger.debug('worker ' + str(my_rank) + ' is done')

def test_broadcast():
def check_broadcast(kv, cur_keys, cur_shape, device=False):
logger.debug("check_broadcast: {}, {}, {}, {}".format(kv, cur_keys, cur_shape, device))
ctx = current_context(device=device)
val = [mx.nd.zeros(cur_shape, ctx) for i in cur_keys]
for i in range(len(cur_keys)):
expected = i
tmpNDarray = [mx.nd.ones(cur_shape, ctx) * i]
kv.broadcast(cur_keys[i], tmpNDarray, out=val[i])
check_diff_to_scalar(val[i], expected, my_rank)
logger.debug("check_broadcast passed: ", val)
#check_broadcast(kv, init_test_keys, shape) #Byteps doesn't support pure CPU training
#check_broadcast(kv, init_test_keys_big, big_shape) #Byteps doesn't support pure CPU training
check_broadcast(kv, init_test_keys_device, shape, device=True)
check_broadcast(kv, init_test_keys_device_big, big_shape, device=True)
logger.debug('worker ' + str(my_rank) + ' is initialized')

def test_type():
assert kv.type == args.name

if __name__ == "__main__":
logger.debug("Type Test Begin")
test_type()
logger.debug("Type Test Passed")
logger.debug("Broadcast Test Begin")
test_broadcast()
logger.debug("Broadcast Test Passed")
logger.debug("PushPull Test Begin")
test_pushpull()
logger.debug("PushPull Test Passed")
Loading