diff --git a/.gitignore b/.gitignore index 1996f1cbf..3b7bcbc53 100644 --- a/.gitignore +++ b/.gitignore @@ -116,3 +116,4 @@ venv.bak/ # for development scripts/ +exps/ diff --git a/example/mxnet/train_cifar100_byteps_gc.py b/example/mxnet/train_cifar100_byteps_gc.py index 076dcaef9..1039f7325 100644 --- a/example/mxnet/train_cifar100_byteps_gc.py +++ b/example/mxnet/train_cifar100_byteps_gc.py @@ -1,18 +1,36 @@ +# Copyright 2019 Bytedance Inc. or its affiliates. All Rights Reserved. +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This file is modified from +`gluon-cv/scripts/classification/cifar/train_cifar10.py`""" +import argparse import logging +import subprocess import time -import argparse -import byteps.mxnet as bps + +import gluoncv as gcv +import matplotlib +import mxnet as mx from gluoncv.data import transforms as gcv_transforms -from gluoncv.utils import makedirs, TrainingHistory from gluoncv.model_zoo import get_model -import gluoncv as gcv -from mxnet.gluon.data.vision import transforms -from mxnet.gluon import nn +from gluoncv.utils import LRScheduler, LRSequential, makedirs from mxnet import autograd as ag -from mxnet import gluon, nd -import mxnet as mx -import numpy as np -import matplotlib +from mxnet import gluon +from mxnet.gluon.data.vision import transforms + +import byteps.mxnet as bps + matplotlib.use('Agg') @@ -32,20 +50,24 @@ def parse_args(): help='model to use. options are resnet and wrn. default is resnet.') parser.add_argument('-j', '--num-data-workers', dest='num_workers', default=4, type=int, help='number of preprocessing workers') - parser.add_argument('--num-epochs', type=int, default=3, + parser.add_argument('--num-epochs', type=int, default=200, help='number of training epochs.') parser.add_argument('--lr', type=float, default=0.1, help='learning rate. default is 0.1.') parser.add_argument('--momentum', type=float, default=0.9, help='momentum value for optimizer, default is 0.9.') - parser.add_argument('--wd', type=float, default=0.0001, - help='weight decay rate. default is 0.0001.') + parser.add_argument('--wd', type=float, default=0.0005, + help='weight decay rate. default is 0.0005.') parser.add_argument('--lr-decay', type=float, default=0.1, help='decay rate of learning rate. default is 0.1.') parser.add_argument('--lr-decay-period', type=int, default=0, help='period in epoch for learning rate decays. default is 0 (has no effect).') - parser.add_argument('--lr-decay-epoch', type=str, default='40,60', - help='epochs at which learning rate decays. default is 40,60.') + parser.add_argument('--lr-decay-epoch', type=str, default='100,150', + help='epochs at which learning rate decays. default is 100,150.') + parser.add_argument('--warmup-lr', type=float, default=0.0, + help='starting warmup learning rate. default is 0.0.') + parser.add_argument('--warmup-epochs', type=int, default=0, + help='number of warmup epochs.') parser.add_argument('--drop-rate', type=float, default=0.0, help='dropout rate for wide resnet. default is 0.') parser.add_argument('--mode', type=str, @@ -56,21 +78,21 @@ def parse_args(): help='directory of saved models') parser.add_argument('--resume-from', type=str, help='resume training from the model') - parser.add_argument('--save-plot-dir', type=str, default='.', - help='the path to save the history plot') - parser.add_argument('--logging-file', type=str, default='train_cifar100.log', + parser.add_argument('--logging-file', type=str, default='baseline', help='name of training log file') # additional arguments for gradient compression parser.add_argument('--compressor', type=str, default='', help='which compressor') - parser.add_argument('--ef', type=str, default=None, - help='enable error-feedback') + parser.add_argument('--ef', type=str, default='', + help='which error-feedback') + parser.add_argument('--compress-momentum', type=str, default='', + help='which compress momentum') parser.add_argument('--onebit-scaling', action='store_true', default=False, help='enable scaling for onebit compressor') + parser.add_argument('--k', default=1, type=int, + help='topk or randomk') parser.add_argument('--fp16-pushpull', action='store_true', default=False, help='use fp16 compression during pushpull') - parser.add_argument('--compress-momentum', action='store_true', default=False, - help='enable compress momentum.') opt = parser.parse_args() return opt @@ -78,7 +100,15 @@ def parse_args(): def main(): opt = parse_args() - filehandler = logging.FileHandler(opt.logging_file) + bps.init() + + gpu_name = subprocess.check_output( + ['nvidia-smi', '--query-gpu=gpu_name', '--format=csv']) + gpu_name = gpu_name.decode('utf8').split('\n')[-2] + gpu_name = '-'.join(gpu_name.split()) + filename = "cifar100-%d-%s-%s.log" % (bps.size(), + gpu_name, opt.logging_file) + filehandler = logging.FileHandler(filename, mode='w') streamhandler = logging.StreamHandler() logger = logging.getLogger('') @@ -88,8 +118,6 @@ def main(): logger.info(opt) - bps.init() - batch_size = opt.batch_size classes = 100 @@ -102,7 +130,20 @@ def main(): rank = bps.rank() lr_decay = opt.lr_decay - lr_decay_epoch = [int(i) for i in opt.lr_decay_epoch.split(',')] + [np.inf] + lr_decay_epoch = [int(i) for i in opt.lr_decay_epoch.split(',')] + + num_batches = 50000 // (opt.batch_size * nworker) + lr_scheduler = LRSequential([ + LRScheduler('linear', base_lr=opt.warmup_lr, + target_lr=opt.lr * nworker / bps.local_size(), + nepochs=opt.warmup_epochs, iters_per_epoch=num_batches), + LRScheduler('step', base_lr=opt.lr * nworker / bps.local_size(), + target_lr=0, + nepochs=opt.num_epochs - opt.warmup_epochs, + iters_per_epoch=num_batches, + step_epoch=lr_decay_epoch, + step_factor=lr_decay, power=2) + ]) model_name = opt.model if model_name.startswith('cifar_wideresnet'): @@ -113,7 +154,11 @@ def main(): net = get_model(model_name, **kwargs) if opt.resume_from: net.load_parameters(opt.resume_from, ctx=context) - optimizer = 'sgd' + + if opt.compressor: + optimizer = 'sgd' + else: + optimizer = 'nag' save_period = opt.save_period if opt.save_dir and save_period: @@ -123,8 +168,6 @@ def main(): save_dir = '' save_period = 0 - plot_path = opt.save_plot_dir - transform_train = transforms.Compose([ gcv_transforms.RandomCrop(32, pad=4), transforms.RandomFlipLeftRight(), @@ -158,7 +201,8 @@ def train(epochs, ctx): train_data = gluon.data.DataLoader( gluon.data.vision.CIFAR100(train=True).shard( nworker, rank).transform_first(transform_train), - batch_size=batch_size, shuffle=True, last_batch='discard', num_workers=num_workers) + batch_size=batch_size, shuffle=True, last_batch='discard', + num_workers=num_workers) val_data = gluon.data.DataLoader( gluon.data.vision.CIFAR100(train=False).shard( @@ -166,47 +210,35 @@ def train(epochs, ctx): batch_size=batch_size, shuffle=False, num_workers=num_workers) params = net.collect_params() - if opt.compressor: - for _, param in params.items(): - setattr(param, "byteps_compressor_type", opt.compressor) - if opt.ef: - setattr(param, "byteps_error_feedback_type", opt.ef) - if opt.onebit_scaling: - setattr( - param, "byteps_compressor_onebit_enable_scale", opt.onebit_scaling) - if opt.compress_momentum: - setattr(param, "byteps_momentum_type", "nesterov") - setattr(param, "byteps_momentum_mu", opt.momentum) - - optimizer_params = {'learning_rate': opt.lr * - nworker, 'wd': opt.wd, 'momentum': opt.momentum} - if opt.compress_momentum: - del optimizer_params["momentum"] - - compression = bps.Compression.fp16 if opt.fp16_pushpull else bps.Compression.none + + compression_params = { + "compressor": opt.compressor, + "ef": opt.ef, + "momentum": opt.compress_momentum, + "scaling": opt.onebit_scaling, + "k": opt.k + } + + optimizer_params = {'lr_scheduler': lr_scheduler, + 'wd': opt.wd, 'momentum': opt.momentum} + trainer = bps.DistributedTrainer(params, - optimizer, optimizer_params, compression=compression) + optimizer, + optimizer_params, + compression_params=compression_params) metric = mx.metric.Accuracy() train_metric = mx.metric.Accuracy() loss_fn = gluon.loss.SoftmaxCrossEntropyLoss() - train_history = TrainingHistory(['training-error', 'validation-error']) iteration = 0 - lr_decay_count = 0 - best_val_score = 0 - + bps.byteps_declare_tensor("acc") for epoch in range(epochs): tic = time.time() train_metric.reset() metric.reset() train_loss = 0 num_batch = len(train_data) - alpha = 1 - - if epoch == lr_decay_epoch[lr_decay_count]: - trainer.set_learning_rate(trainer.learning_rate*lr_decay) - lr_decay_count += 1 for i, batch in enumerate(train_data): data = gluon.utils.split_and_load( @@ -223,32 +255,32 @@ def train(epochs, ctx): train_loss += sum([l.sum().asscalar() for l in loss]) train_metric.update(label, output) - name, acc = train_metric.get() + name, train_acc = train_metric.get() iteration += 1 train_loss /= batch_size * num_batch - name, acc = train_metric.get() + name, train_acc = train_metric.get() throughput = int(batch_size * nworker * i / (time.time() - tic)) - if rank == 0: - logger.info('[Epoch %d] training: %s=%f' % - (epoch, name, acc)) - logger.info('[Epoch %d] speed: %d samples/sec\ttime cost: %f' % - (epoch, throughput, time.time()-tic)) + logger.info('[Epoch %d] speed: %d samples/sec\ttime cost: %f lr=%f' % + (epoch, throughput, time.time()-tic, trainer.learning_rate)) name, val_acc = test(ctx, val_data) - if rank == 0: + acc = mx.nd.array([train_acc, val_acc], ctx=ctx[0]) + bps.byteps_push_pull(acc, name="acc", is_average=False) + acc /= bps.size() + train_acc, val_acc = acc[0].asscalar(), acc[1].asscalar() + if bps.rank() == 0: + logger.info('[Epoch %d] training: %s=%f' % + (epoch, name, train_acc)) logger.info('[Epoch %d] validation: %s=%f' % - (epoch, name, val_acc)) - - train_history.update([1-acc, 1-val_acc]) - train_history.plot(save_path='%s/%s_history.png' % - (plot_path, model_name)) + (epoch, name, val_acc)) if val_acc > best_val_score: best_val_score = val_acc net.save_parameters('%s/%.4f-cifar-%s-%d-best.params' % - (save_dir, best_val_score, model_name, epoch)) + (save_dir, best_val_score, model_name, + epoch)) if save_period and save_dir and (epoch + 1) % save_period == 0: net.save_parameters('%s/cifar100-%s-%d.params' % diff --git a/example/mxnet/train_gluon_imagenet_byteps_gc.py b/example/mxnet/train_gluon_imagenet_byteps_gc.py index 1bbb5152a..7da702d6a 100644 --- a/example/mxnet/train_gluon_imagenet_byteps_gc.py +++ b/example/mxnet/train_gluon_imagenet_byteps_gc.py @@ -1,22 +1,22 @@ -import byteps.mxnet as bps -from gluoncv.utils import makedirs, LRSequential, LRScheduler -from gluoncv.model_zoo import get_model -from gluoncv.data import imagenet import argparse -import time import logging -import os import math +import os +import subprocess +import time -import numpy as np -import mxnet as mx import gluoncv as gcv -from mxnet import gluon, nd +import mxnet as mx +import numpy as np +from gluoncv.data import imagenet +from gluoncv.model_zoo import get_model +from gluoncv.utils import LRScheduler, LRSequential, makedirs from mxnet import autograd as ag -from mxnet.gluon import nn +from mxnet import gluon, nd from mxnet.gluon.data.vision import transforms -import gluoncv as gcv +import byteps.mxnet as bps + gcv.utils.check_version('0.6.0') @@ -134,7 +134,14 @@ def parse_args(): def main(): opt = parse_args() - filehandler = logging.FileHandler(opt.logging_file) + bps.init() + gpu_name = subprocess.check_output( + ['nvidia-smi', '--query-gpu=gpu_name', '--format=csv']) + gpu_name = gpu_name.decode('utf8').split('\n')[-2] + gpu_name = '-'.join(gpu_name.split()) + filename = "imagenet-%d-%s-%s.log" % (bps.size(), + gpu_name, opt.logging_file) + filehandler = logging.FileHandler(filename) streamhandler = logging.StreamHandler() logger = logging.getLogger('') @@ -144,8 +151,6 @@ def main(): logger.info(opt) - bps.init() - batch_size = opt.batch_size classes = 1000 num_training_samples = 1281167 @@ -409,7 +414,8 @@ def train(ctx): } trainer = bps.DistributedTrainer( - net.collect_params(), optimizer, optimizer_params, compression_params=compression_params) + net.collect_params(), optimizer, optimizer_params, + compression_params=compression_params) if opt.resume_states is not '': trainer.load_states(opt.resume_states) @@ -428,6 +434,7 @@ def train(ctx): best_val_score = 1 + bps.byteps_declare_tensor("acc") for epoch in range(opt.resume_epoch, opt.num_epochs): tic = time.time() if opt.use_rec: @@ -493,15 +500,23 @@ def train(ctx): train_metric_name, train_metric_score = train_metric.get() throughput = int(batch_size * nworker * i / (time.time() - tic)) - logger.info('[Epoch %d] training: %s=%f' % - (epoch, train_metric_name, train_metric_score)) logger.info('[Epoch %d] speed: %d samples/sec\ttime cost: %f' % (epoch, throughput, time.time()-tic)) err_top1_val, err_top5_val = test(ctx, val_data) - logger.info('[Epoch %d] validation: err-top1=%f err-top5=%f' % - (epoch, err_top1_val, err_top5_val)) + acc = mx.nd.array([train_metric_score, err_top1_val, err_top5_val], + ctx=ctx[0]) + bps.byteps_push_pull(acc, name="acc", is_average=False) + acc /= bps.size() + train_metric_score, err_top1_val, err_top5_val = acc[0].asscalar( + ), acc[1].asscalar(), acc[2].asscalar() + + if bps.rank() == 0: + logger.info('[Epoch %d] training: %s=%f' % + (epoch, train_metric_name, train_metric_score)) + logger.info('[Epoch %d] validation: err-top1=%f err-top5=%f' % + (epoch, err_top1_val, err_top5_val)) if err_top1_val < best_val_score: best_val_score = err_top1_val diff --git a/example/mxnet/train_gluon_mnist_byteps_gc.py b/example/mxnet/train_gluon_mnist_byteps_gc.py index 674e6e276..9604948d5 100644 --- a/example/mxnet/train_gluon_mnist_byteps_gc.py +++ b/example/mxnet/train_gluon_mnist_byteps_gc.py @@ -12,20 +12,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""This file is modified from `horovod/examples/mxnet_mnist.py`, using gluon style MNIST dataset and data_loader.""" -import time - +"""This file is modified from `horovod/examples/mxnet_mnist.py`, using gluon +style MNIST dataset and data_loader.""" import argparse import logging +import subprocess +import time import mxnet as mx -import byteps.mxnet as bps from mxnet import autograd, gluon, nd from mxnet.gluon.data.vision import MNIST +import byteps.mxnet as bps # Higher download speed for chinese users -# os.environ['MXNET_GLUON_REPO'] = 'https://apache-mxnet.s3.cn-north-1.amazonaws.com.cn/' +# os.environ['MXNET_GLUON_REPO'] = +# 'https://apache-mxnet.s3.cn-north-1.amazonaws.com.cn/' # Training settings parser = argparse.ArgumentParser(description='MXNet MNIST Example') @@ -58,15 +60,32 @@ help='topk or randomk') parser.add_argument('--fp16-pushpull', action='store_true', default=False, help='use fp16 compression during pushpull') +parser.add_argument('--logging-file', type=str, default='baseline', + help='name of training log file') args = parser.parse_args() + if not args.no_cuda: # Disable CUDA if there are no GPUs. if mx.context.num_gpus() == 0: args.no_cuda = True -logging.basicConfig(level=logging.INFO) -logging.info(args) +# Initialize BytePS +bps.init() + +gpu_name = subprocess.check_output( + ['nvidia-smi', '--query-gpu=gpu_name', '--format=csv']) +gpu_name = gpu_name.decode('utf8').split('\n')[-2] +gpu_name = '-'.join(gpu_name.split()) +filename = "mnist-%d-%s-%s.log" % (bps.size(), gpu_name, args.logging_file) +filehandler = logging.FileHandler(filename, mode='w') +streamhandler = logging.StreamHandler() + +logger = logging.getLogger('') +logger.setLevel(level=logging.INFO) +logger.addHandler(filehandler) +logger.addHandler(streamhandler) +logger.info(args) def dummy_transform(data, label): @@ -79,7 +98,8 @@ def dummy_transform(data, label): def get_mnist_iterator(): train_set = MNIST(train=True, transform=dummy_transform) train_iter = gluon.data.DataLoader( - train_set, args.batch_size, True, num_workers=args.j, last_batch='discard') + train_set, args.batch_size, True, num_workers=args.j, + last_batch='discard') val_set = MNIST(train=False, transform=dummy_transform) val_iter = gluon.data.DataLoader( val_set, args.batch_size, False, num_workers=args.j) @@ -116,9 +136,6 @@ def evaluate(model, data_iter, context): # Load training and validation data train_data, val_data, train_size = get_mnist_iterator() -# Initialize BytePS -bps.init() - # BytePS: pin context to local rank context = mx.cpu(bps.local_rank()) if args.no_cuda else mx.gpu( bps.local_rank()) @@ -155,7 +172,9 @@ def evaluate(model, data_iter, context): loss_fn = gluon.loss.SoftmaxCrossEntropyLoss() metric = mx.metric.Accuracy() +total_time = 0 # Train model +bps.byteps_declare_tensor("acc") for epoch in range(args.epochs): tic = time.time() metric.reset() @@ -173,22 +192,29 @@ def evaluate(model, data_iter, context): if i % 100 == 0: name, acc = metric.get() - logging.info('[Epoch %d Batch %d] Training: %s=%f' % - (epoch, i, name, acc)) + logger.info('[Epoch %d Batch %d] Training: %s=%f' % + (epoch, i, name, acc)) - if bps.rank() == 0: - elapsed = time.time() - tic - speed = train_size * num_workers / elapsed - logging.info('Epoch[%d]\tSpeed=%.2f samples/s\tTime cost=%f', - epoch, speed, elapsed) + elapsed = time.time() - tic + total_time += elapsed + speed = train_size * num_workers / elapsed + logger.info('Epoch[%d]\tSpeed=%.2f samples/s\tTime cost=%f', + epoch, speed, elapsed) # Evaluate model accuracy _, train_acc = metric.get() name, val_acc = evaluate(model, val_data, context) + acc = mx.nd.array([train_acc, val_acc], ctx=context) + bps.byteps_push_pull(acc, name="acc", is_average=False) + acc /= bps.size() + train_acc, val_acc = acc[0].asscalar(), acc[1].asscalar() if bps.rank() == 0: - logging.info('Epoch[%d]\tTrain: %s=%f\tValidation: %s=%f', epoch, name, - train_acc, name, val_acc) + logger.info('Epoch[%d]\tTrain: %s=%f\tValidation: %s=%f', epoch, name, + train_acc, name, val_acc) + + +if bps.rank() == 0 and epoch == args.epochs - 1: + assert val_acc > 0.96, "Achieved accuracy (%f) is lower than expected\ + (0.96)" % val_acc - if bps.rank() == 0 and epoch == args.epochs - 1: - assert val_acc > 0.96, "Achieved accuracy (%f) is lower than expected\ - (0.96)" % val_acc +logger.info("total time=%.2f", total_time)