This repository has been archived by the owner on Nov 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add examples of running MXNet with Horovod (#14286)
* Add examples for MXNet with Horovod * update readme * update examples * update README * update mnist_module example * Update README * update README * update README * update README
- Loading branch information
1 parent
29e13b4
commit 056fce4
Showing
4 changed files
with
1,002 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
<!--- Licensed to the Apache Software Foundation (ASF) under one --> | ||
<!--- or more contributor license agreements. See the NOTICE file --> | ||
<!--- distributed with this work for additional information --> | ||
<!--- regarding copyright ownership. The ASF licenses this file --> | ||
<!--- to you under the Apache License, Version 2.0 (the --> | ||
<!--- "License"); you may not use this file except in compliance --> | ||
<!--- with the License. You may obtain a copy of the License at --> | ||
|
||
<!--- http://www.apache.org/licenses/LICENSE-2.0 --> | ||
|
||
<!--- Unless required by applicable law or agreed to in writing, --> | ||
<!--- software distributed under the License is distributed on an --> | ||
<!--- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY --> | ||
<!--- KIND, either express or implied. See the License for the --> | ||
<!--- specific language governing permissions and limitations --> | ||
<!--- under the License. --> | ||
|
||
# Distributed Training using MXNet with Horovod | ||
[Horovod](https://github.com/horovod/horovod) is a distributed training framework that demonstrates | ||
excellent scaling efficiency for dense models running on a large number of nodes. It currently | ||
supports mainstream deep learning frameworks such as MXNet, TensorFlow, Keras, and PyTorch. | ||
It is created at Uber and currently hosted by the [Linux Foundation Deep Learning](https://lfdl.io)(LF DL). | ||
|
||
MXNet is supported in Horovod 0.16.0 [release](https://eng.uber.com/horovod-pyspark-apache-mxnet-support/). | ||
|
||
## What's New? | ||
Compared with the standard distributed training script in MXNet which uses parameter server to | ||
distribute and aggregate parameters, Horovod uses ring allreduce and/or tree-based allreduce algorithm | ||
to communicate parameters between workers. There is no dedicated server and the communication data size | ||
between workers does not depend on the number of workers. Therefore, it scales well in the case where | ||
there are a large number of workers and network bandwidth is the bottleneck. | ||
|
||
# Install | ||
## Install MXNet | ||
```bash | ||
$ pip install mxnet | ||
``` | ||
**Note**: There is a [known issue](https://github.com/horovod/horovod/issues/884) when running Horovod with MXNet on a Linux system with GCC version 5.X and above. We recommend users to build MXNet from source following this [guide](https://mxnet.incubator.apache.org/install/build_from_source.html) as a workaround for now. Also mxnet-mkl package in 1.4.0 release does not support Horovod. | ||
|
||
## Install Horovod | ||
```bash | ||
$ pip install horovod | ||
``` | ||
|
||
This basic installation is good for laptops and for getting to know Horovod. | ||
If you're installing Horovod on a server with GPUs, read the [Horovod on GPU](https://github.com/horovod/horovod/blob/master/docs/gpus.md) page. | ||
If you want to use Docker, read the [Horovod in Docker](https://github.com/horovod/horovod/blob/master/docs/docker.md) page. | ||
|
||
## Install MPI | ||
MPI is required to run distributed training with Horovod. Install [Open MPI](https://www.open-mpi.org/) or another MPI implementation. | ||
Steps to install Open MPI are listed [here](https://www.open-mpi.org/faq/?category=building#easy-build). | ||
|
||
**Note**: Open MPI 3.1.3 has an issue that may cause hangs. It is recommended | ||
to downgrade to Open MPI 3.1.2 or upgrade to Open MPI 4.0.0. | ||
|
||
# Usage | ||
|
||
To run MXNet with Horovod, make the following additions to your training script: | ||
|
||
1. Run `hvd.init()`. | ||
|
||
2. Pin the context to a processor using `hvd.local_rank()`. | ||
Typically, each Horovod worker is associated with one process. The local rank is a unique ID specifically | ||
for all processes running Horovod job on the same node. | ||
|
||
3. Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by | ||
the number of workers. An increase in learning rate compensates for the increased batch size. | ||
|
||
4. Wrap optimizer in `hvd.DistributedOptimizer`. The distributed optimizer delegates gradient computation | ||
to the original optimizer, averages gradients using *allreduce* or *allgather*, and then applies those averaged | ||
gradients. | ||
|
||
5. Add `hvd.broadcast_parameters` to broadcast initial variable states from rank 0 to all other processes. | ||
This is necessary to ensure consistent initialization of all workers when training is started with random weights or | ||
restored from a checkpoint. | ||
|
||
# Example | ||
|
||
Here we provide the building blocks to train a model using MXNet with Horovod. | ||
The full examples are in [MNIST](gluon_mnist.py) and [ImageNet](resnet50_imagenet.py). | ||
|
||
## Gluon API | ||
```python | ||
from mxnet import autograd, gluon | ||
import mxnet as mx | ||
import horovod.mxnet as hvd | ||
|
||
# Initialize Horovod | ||
hvd.init() | ||
|
||
# Set context to current process | ||
context = mx.cpu(hvd.local_rank()) if args.no_cuda else mx.gpu(hvd.local_rank()) | ||
|
||
num_workers = hvd.size() | ||
|
||
# Build model | ||
model = ... | ||
model.hybridize() | ||
|
||
# Define hyper parameters | ||
optimizer_params = ... | ||
|
||
# Add Horovod Distributed Optimizer | ||
opt = mx.optimizer.create('sgd', **optimizer_params) | ||
opt = hvd.DistributedOptimizer(opt) | ||
|
||
# Initialize parameters | ||
model.initialize(initializer, ctx=context) | ||
|
||
# Fetch and broadcast parameters | ||
params = model.collect_params() | ||
if params is not None: | ||
hvd.broadcast_parameters(params, root_rank=0) | ||
|
||
# Create trainer and loss function | ||
trainer = gluon.Trainer(params, opt, kvstore=None) | ||
loss_fn = ... | ||
|
||
# Train model | ||
for epoch in range(num_epoch): | ||
train_data.reset() | ||
for nbatch, batch in enumerate(train_data, start=1): | ||
data = batch.data[0].as_in_context(context) | ||
label = batch.label[0].as_in_context(context) | ||
with autograd.record(): | ||
output = model(data.astype(dtype, copy=False)) | ||
loss = loss_fn(output, label) | ||
loss.backward() | ||
trainer.step(batch_size) | ||
``` | ||
|
||
## Module API | ||
```python | ||
import mxnet as mx | ||
import horovod.mxnet as hvd | ||
|
||
# Initialize Horovod | ||
hvd.init() | ||
|
||
# Set context to current process | ||
context = mx.cpu(hvd.local_rank()) if args.no_cuda else mx.gpu(hvd.local_rank()) | ||
num_workers = hvd.size() | ||
|
||
# Build model | ||
model = ... | ||
|
||
# Define hyper parameters | ||
optimizer_params = ... | ||
|
||
# Add Horovod Distributed Optimizer | ||
opt = mx.optimizer.create('sgd', **optimizer_params) | ||
opt = hvd.DistributedOptimizer(opt) | ||
|
||
# Initialize parameters | ||
initializer = mx.init.Xavier(rnd_type='gaussian', factor_type="in", | ||
magnitude=2) | ||
model.bind(data_shapes=train_data.provide_data, | ||
label_shapes=train_data.provide_label) | ||
model.init_params(initializer) | ||
|
||
# Fetch and broadcast parameters | ||
(arg_params, aux_params) = model.get_params() | ||
if arg_params: | ||
hvd.broadcast_parameters(arg_params, root_rank=0) | ||
if aux_params: | ||
hvd.broadcast_parameters(aux_params, root_rank=0) | ||
model.set_params(arg_params=arg_params, aux_params=aux_params) | ||
|
||
# Train model | ||
model.fit(train_data, | ||
kvstore=None, | ||
optimizer=opt, | ||
num_epoch=num_epoch) | ||
``` | ||
|
||
|
||
# Running Horovod | ||
|
||
The example commands below show how to run distributed training. See the | ||
[Running Horovod](https://github.com/horovod/horovod/blob/master/docs/running.md) | ||
page for more instructions, including RoCE/InfiniBand tweaks and tips for dealing with hangs. | ||
|
||
1. To run on a machine with 4 CPUs: | ||
|
||
```bash | ||
$ mpirun -np 4 \ | ||
-H localhost:4 \ | ||
-bind-to none -map-by slot \ | ||
python train.py | ||
``` | ||
|
||
2. To run on 2 machines with 4 GPUs each: | ||
|
||
```bash | ||
$ mpirun -np 8 \ | ||
-H server1:4,server2:4 \ | ||
-bind-to none -map-by slot \ | ||
-x NCCL_DEBUG=INFO \ | ||
-mca pml ob1 -mca btl ^openib \ | ||
python train.py | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
import argparse | ||
import logging | ||
import os | ||
import zipfile | ||
import time | ||
|
||
import mxnet as mx | ||
import horovod.mxnet as hvd | ||
from mxnet import autograd, gluon, nd | ||
from mxnet.test_utils import download | ||
|
||
# Training settings | ||
parser = argparse.ArgumentParser(description='MXNet MNIST Example') | ||
|
||
parser.add_argument('--batch-size', type=int, default=64, | ||
help='training batch size (default: 64)') | ||
parser.add_argument('--dtype', type=str, default='float32', | ||
help='training data type (default: float32)') | ||
parser.add_argument('--epochs', type=int, default=5, | ||
help='number of training epochs (default: 5)') | ||
parser.add_argument('--lr', type=float, default=0.01, | ||
help='learning rate (default: 0.01)') | ||
parser.add_argument('--momentum', type=float, default=0.9, | ||
help='SGD momentum (default: 0.9)') | ||
parser.add_argument('--use-gpu', action='store_true', default=False, | ||
help='run training on GPU (default: False)') | ||
args = parser.parse_args() | ||
|
||
logging.basicConfig(level=logging.INFO) | ||
logging.info(args) | ||
|
||
|
||
# Function to get mnist iterator given a rank | ||
def get_mnist_iterator(rank): | ||
data_dir = "data-%d" % rank | ||
if not os.path.isdir(data_dir): | ||
os.makedirs(data_dir) | ||
zip_file_path = download('http://data.mxnet.io/mxnet/data/mnist.zip', | ||
dirname=data_dir) | ||
with zipfile.ZipFile(zip_file_path) as zf: | ||
zf.extractall(data_dir) | ||
|
||
input_shape = (1, 28, 28) | ||
batch_size = args.batch_size | ||
|
||
train_iter = mx.io.MNISTIter( | ||
image="%s/train-images-idx3-ubyte" % data_dir, | ||
label="%s/train-labels-idx1-ubyte" % data_dir, | ||
input_shape=input_shape, | ||
batch_size=batch_size, | ||
shuffle=True, | ||
flat=False, | ||
num_parts=hvd.size(), | ||
part_index=hvd.rank() | ||
) | ||
|
||
val_iter = mx.io.MNISTIter( | ||
image="%s/t10k-images-idx3-ubyte" % data_dir, | ||
label="%s/t10k-labels-idx1-ubyte" % data_dir, | ||
input_shape=input_shape, | ||
batch_size=batch_size, | ||
flat=False, | ||
) | ||
|
||
return train_iter, val_iter | ||
|
||
|
||
# Function to define neural network | ||
def conv_nets(): | ||
net = gluon.nn.HybridSequential() | ||
with net.name_scope(): | ||
net.add(gluon.nn.Conv2D(channels=20, kernel_size=5, activation='relu')) | ||
net.add(gluon.nn.MaxPool2D(pool_size=2, strides=2)) | ||
net.add(gluon.nn.Conv2D(channels=50, kernel_size=5, activation='relu')) | ||
net.add(gluon.nn.MaxPool2D(pool_size=2, strides=2)) | ||
net.add(gluon.nn.Flatten()) | ||
net.add(gluon.nn.Dense(512, activation="relu")) | ||
net.add(gluon.nn.Dense(10)) | ||
return net | ||
|
||
|
||
# Function to evaluate accuracy for a model | ||
def evaluate(model, data_iter, context): | ||
data_iter.reset() | ||
metric = mx.metric.Accuracy() | ||
for _, batch in enumerate(data_iter): | ||
data = batch.data[0].as_in_context(context) | ||
label = batch.label[0].as_in_context(context) | ||
output = model(data.astype(args.dtype, copy=False)) | ||
metric.update([label], [output]) | ||
|
||
return metric.get() | ||
|
||
|
||
# Initialize Horovod | ||
hvd.init() | ||
|
||
# Horovod: pin context to local rank | ||
context = mx.gpu(hvd.local_rank()) if args.use_gpu else mx.cpu(hvd.local_rank()) | ||
num_workers = hvd.size() | ||
|
||
# Load training and validation data | ||
train_data, val_data = get_mnist_iterator(hvd.rank()) | ||
|
||
# Build model | ||
model = conv_nets() | ||
model.cast(args.dtype) | ||
model.hybridize() | ||
|
||
# Define hyper parameters | ||
optimizer_params = {'momentum': args.momentum, | ||
'learning_rate': args.lr * hvd.size(), | ||
'rescale_grad': 1.0 / args.batch_size} | ||
|
||
# Add Horovod Distributed Optimizer | ||
opt = mx.optimizer.create('sgd', **optimizer_params) | ||
opt = hvd.DistributedOptimizer(opt) | ||
|
||
# Initialize parameters | ||
initializer = mx.init.Xavier(rnd_type='gaussian', factor_type="in", | ||
magnitude=2) | ||
model.initialize(initializer, ctx=context) | ||
|
||
# Fetch and broadcast parameters | ||
params = model.collect_params() | ||
if params is not None: | ||
hvd.broadcast_parameters(params, root_rank=0) | ||
|
||
# Create trainer, loss function and train metric | ||
trainer = gluon.Trainer(params, opt, kvstore=None) | ||
loss_fn = gluon.loss.SoftmaxCrossEntropyLoss() | ||
metric = mx.metric.Accuracy() | ||
|
||
# Train model | ||
for epoch in range(args.epochs): | ||
tic = time.time() | ||
train_data.reset() | ||
metric.reset() | ||
for nbatch, batch in enumerate(train_data, start=1): | ||
data = batch.data[0].as_in_context(context) | ||
label = batch.label[0].as_in_context(context) | ||
with autograd.record(): | ||
output = model(data.astype(args.dtype, copy=False)) | ||
loss = loss_fn(output, label) | ||
loss.backward() | ||
trainer.step(args.batch_size) | ||
metric.update([label], [output]) | ||
|
||
if nbatch % 100 == 0: | ||
name, acc = metric.get() | ||
logging.info('[Epoch %d Batch %d] Training: %s=%f' % | ||
(epoch, nbatch, name, acc)) | ||
|
||
if hvd.rank() == 0: | ||
elapsed = time.time() - tic | ||
speed = nbatch * args.batch_size * hvd.size() / elapsed | ||
logging.info('Epoch[%d]\tSpeed=%.2f samples/s\tTime cost=%f', | ||
epoch, speed, elapsed) | ||
|
||
# Evaluate model accuracy | ||
_, train_acc = metric.get() | ||
name, val_acc = evaluate(model, val_data, context) | ||
if hvd.rank() == 0: | ||
logging.info('Epoch[%d]\tTrain: %s=%f\tValidation: %s=%f', epoch, name, | ||
train_acc, name, val_acc) | ||
|
||
if hvd.rank() == 0 and epoch == args.epochs - 1: | ||
assert val_acc > 0.96, "Achieved accuracy (%f) is lower than expected\ | ||
(0.96)" % val_acc |
Oops, something went wrong.