Skip to content

Commit

Permalink
compression: allreduce results for training scripts (#16)
Browse files Browse the repository at this point in the history
* compression: update cifar100 training script (#15)

* cifar: update cifar script

* cifar: update lr

* cifar: add warmup

* cifar: update parse

* cifar: update

* cifar: add log

* cifar: fix typo

* cifar: fix bug

* cifar: fix lr

* cifar: fix typo

* cifar: update num samples

* reduce: update mnist script

* reduce: update logger

* reduce: fix typo

* reduce: fix typo

* reduce: fix typo

* reduce: fix input

* reduce: add stream logger

* reduce: reduce validation acc

* reduce: fix typo

* reduce: add train acc

* reduce: update log file name

* reduce: update log file name

* reduce: fix typo

* reduce: update gpu name extractor

* reduce: update logging file mode

* reduce: fix overwrite bug

* misc: update gitignore

* reduce: update cifar100 script

* reduce:  advance init

* reduce: use gpu context

* reduce: update imagenet script

* reduce: udpate imagenet script

* reduce: fix typo
  • Loading branch information
jasperzhong committed Jun 23, 2020
1 parent 3712c33 commit f881fb4
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 114 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,4 @@ venv.bak/

# for development
scripts/
exps/
176 changes: 104 additions & 72 deletions example/mxnet/train_cifar100_byteps_gc.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
# Copyright 2019 Bytedance Inc. or its affiliates. All Rights Reserved.
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This file is modified from
`gluon-cv/scripts/classification/cifar/train_cifar10.py`"""
import argparse
import logging
import subprocess
import time
import argparse
import byteps.mxnet as bps

import gluoncv as gcv
import matplotlib
import mxnet as mx
from gluoncv.data import transforms as gcv_transforms
from gluoncv.utils import makedirs, TrainingHistory
from gluoncv.model_zoo import get_model
import gluoncv as gcv
from mxnet.gluon.data.vision import transforms
from mxnet.gluon import nn
from gluoncv.utils import LRScheduler, LRSequential, makedirs
from mxnet import autograd as ag
from mxnet import gluon, nd
import mxnet as mx
import numpy as np
import matplotlib
from mxnet import gluon
from mxnet.gluon.data.vision import transforms

import byteps.mxnet as bps

matplotlib.use('Agg')


Expand All @@ -32,20 +50,24 @@ def parse_args():
help='model to use. options are resnet and wrn. default is resnet.')
parser.add_argument('-j', '--num-data-workers', dest='num_workers', default=4, type=int,
help='number of preprocessing workers')
parser.add_argument('--num-epochs', type=int, default=3,
parser.add_argument('--num-epochs', type=int, default=200,
help='number of training epochs.')
parser.add_argument('--lr', type=float, default=0.1,
help='learning rate. default is 0.1.')
parser.add_argument('--momentum', type=float, default=0.9,
help='momentum value for optimizer, default is 0.9.')
parser.add_argument('--wd', type=float, default=0.0001,
help='weight decay rate. default is 0.0001.')
parser.add_argument('--wd', type=float, default=0.0005,
help='weight decay rate. default is 0.0005.')
parser.add_argument('--lr-decay', type=float, default=0.1,
help='decay rate of learning rate. default is 0.1.')
parser.add_argument('--lr-decay-period', type=int, default=0,
help='period in epoch for learning rate decays. default is 0 (has no effect).')
parser.add_argument('--lr-decay-epoch', type=str, default='40,60',
help='epochs at which learning rate decays. default is 40,60.')
parser.add_argument('--lr-decay-epoch', type=str, default='100,150',
help='epochs at which learning rate decays. default is 100,150.')
parser.add_argument('--warmup-lr', type=float, default=0.0,
help='starting warmup learning rate. default is 0.0.')
parser.add_argument('--warmup-epochs', type=int, default=0,
help='number of warmup epochs.')
parser.add_argument('--drop-rate', type=float, default=0.0,
help='dropout rate for wide resnet. default is 0.')
parser.add_argument('--mode', type=str,
Expand All @@ -56,29 +78,37 @@ def parse_args():
help='directory of saved models')
parser.add_argument('--resume-from', type=str,
help='resume training from the model')
parser.add_argument('--save-plot-dir', type=str, default='.',
help='the path to save the history plot')
parser.add_argument('--logging-file', type=str, default='train_cifar100.log',
parser.add_argument('--logging-file', type=str, default='baseline',
help='name of training log file')
# additional arguments for gradient compression
parser.add_argument('--compressor', type=str, default='',
help='which compressor')
parser.add_argument('--ef', type=str, default=None,
help='enable error-feedback')
parser.add_argument('--ef', type=str, default='',
help='which error-feedback')
parser.add_argument('--compress-momentum', type=str, default='',
help='which compress momentum')
parser.add_argument('--onebit-scaling', action='store_true', default=False,
help='enable scaling for onebit compressor')
parser.add_argument('--k', default=1, type=int,
help='topk or randomk')
parser.add_argument('--fp16-pushpull', action='store_true', default=False,
help='use fp16 compression during pushpull')
parser.add_argument('--compress-momentum', action='store_true', default=False,
help='enable compress momentum.')
opt = parser.parse_args()
return opt


def main():
opt = parse_args()

filehandler = logging.FileHandler(opt.logging_file)
bps.init()

gpu_name = subprocess.check_output(
['nvidia-smi', '--query-gpu=gpu_name', '--format=csv'])
gpu_name = gpu_name.decode('utf8').split('\n')[-2]
gpu_name = '-'.join(gpu_name.split())
filename = "cifar100-%d-%s-%s.log" % (bps.size(),
gpu_name, opt.logging_file)
filehandler = logging.FileHandler(filename, mode='w')
streamhandler = logging.StreamHandler()

logger = logging.getLogger('')
Expand All @@ -88,8 +118,6 @@ def main():

logger.info(opt)

bps.init()

batch_size = opt.batch_size
classes = 100

Expand All @@ -102,7 +130,20 @@ def main():
rank = bps.rank()

lr_decay = opt.lr_decay
lr_decay_epoch = [int(i) for i in opt.lr_decay_epoch.split(',')] + [np.inf]
lr_decay_epoch = [int(i) for i in opt.lr_decay_epoch.split(',')]

num_batches = 50000 // (opt.batch_size * nworker)
lr_scheduler = LRSequential([
LRScheduler('linear', base_lr=opt.warmup_lr,
target_lr=opt.lr * nworker / bps.local_size(),
nepochs=opt.warmup_epochs, iters_per_epoch=num_batches),
LRScheduler('step', base_lr=opt.lr * nworker / bps.local_size(),
target_lr=0,
nepochs=opt.num_epochs - opt.warmup_epochs,
iters_per_epoch=num_batches,
step_epoch=lr_decay_epoch,
step_factor=lr_decay, power=2)
])

model_name = opt.model
if model_name.startswith('cifar_wideresnet'):
Expand All @@ -113,7 +154,11 @@ def main():
net = get_model(model_name, **kwargs)
if opt.resume_from:
net.load_parameters(opt.resume_from, ctx=context)
optimizer = 'sgd'

if opt.compressor:
optimizer = 'sgd'
else:
optimizer = 'nag'

save_period = opt.save_period
if opt.save_dir and save_period:
Expand All @@ -123,8 +168,6 @@ def main():
save_dir = ''
save_period = 0

plot_path = opt.save_plot_dir

transform_train = transforms.Compose([
gcv_transforms.RandomCrop(32, pad=4),
transforms.RandomFlipLeftRight(),
Expand Down Expand Up @@ -158,55 +201,44 @@ def train(epochs, ctx):
train_data = gluon.data.DataLoader(
gluon.data.vision.CIFAR100(train=True).shard(
nworker, rank).transform_first(transform_train),
batch_size=batch_size, shuffle=True, last_batch='discard', num_workers=num_workers)
batch_size=batch_size, shuffle=True, last_batch='discard',
num_workers=num_workers)

val_data = gluon.data.DataLoader(
gluon.data.vision.CIFAR100(train=False).shard(
nworker, rank).transform_first(transform_test),
batch_size=batch_size, shuffle=False, num_workers=num_workers)

params = net.collect_params()
if opt.compressor:
for _, param in params.items():
setattr(param, "byteps_compressor_type", opt.compressor)
if opt.ef:
setattr(param, "byteps_error_feedback_type", opt.ef)
if opt.onebit_scaling:
setattr(
param, "byteps_compressor_onebit_enable_scale", opt.onebit_scaling)
if opt.compress_momentum:
setattr(param, "byteps_momentum_type", "nesterov")
setattr(param, "byteps_momentum_mu", opt.momentum)

optimizer_params = {'learning_rate': opt.lr *
nworker, 'wd': opt.wd, 'momentum': opt.momentum}
if opt.compress_momentum:
del optimizer_params["momentum"]

compression = bps.Compression.fp16 if opt.fp16_pushpull else bps.Compression.none

compression_params = {
"compressor": opt.compressor,
"ef": opt.ef,
"momentum": opt.compress_momentum,
"scaling": opt.onebit_scaling,
"k": opt.k
}

optimizer_params = {'lr_scheduler': lr_scheduler,
'wd': opt.wd, 'momentum': opt.momentum}

trainer = bps.DistributedTrainer(params,
optimizer, optimizer_params, compression=compression)
optimizer,
optimizer_params,
compression_params=compression_params)
metric = mx.metric.Accuracy()
train_metric = mx.metric.Accuracy()
loss_fn = gluon.loss.SoftmaxCrossEntropyLoss()
train_history = TrainingHistory(['training-error', 'validation-error'])

iteration = 0
lr_decay_count = 0

best_val_score = 0

bps.byteps_declare_tensor("acc")
for epoch in range(epochs):
tic = time.time()
train_metric.reset()
metric.reset()
train_loss = 0
num_batch = len(train_data)
alpha = 1

if epoch == lr_decay_epoch[lr_decay_count]:
trainer.set_learning_rate(trainer.learning_rate*lr_decay)
lr_decay_count += 1

for i, batch in enumerate(train_data):
data = gluon.utils.split_and_load(
Expand All @@ -223,32 +255,32 @@ def train(epochs, ctx):
train_loss += sum([l.sum().asscalar() for l in loss])

train_metric.update(label, output)
name, acc = train_metric.get()
name, train_acc = train_metric.get()
iteration += 1

train_loss /= batch_size * num_batch
name, acc = train_metric.get()
name, train_acc = train_metric.get()
throughput = int(batch_size * nworker * i / (time.time() - tic))

if rank == 0:
logger.info('[Epoch %d] training: %s=%f' %
(epoch, name, acc))
logger.info('[Epoch %d] speed: %d samples/sec\ttime cost: %f' %
(epoch, throughput, time.time()-tic))
logger.info('[Epoch %d] speed: %d samples/sec\ttime cost: %f lr=%f' %
(epoch, throughput, time.time()-tic, trainer.learning_rate))

name, val_acc = test(ctx, val_data)
if rank == 0:
acc = mx.nd.array([train_acc, val_acc], ctx=ctx[0])
bps.byteps_push_pull(acc, name="acc", is_average=False)
acc /= bps.size()
train_acc, val_acc = acc[0].asscalar(), acc[1].asscalar()
if bps.rank() == 0:
logger.info('[Epoch %d] training: %s=%f' %
(epoch, name, train_acc))
logger.info('[Epoch %d] validation: %s=%f' %
(epoch, name, val_acc))

train_history.update([1-acc, 1-val_acc])
train_history.plot(save_path='%s/%s_history.png' %
(plot_path, model_name))
(epoch, name, val_acc))

if val_acc > best_val_score:
best_val_score = val_acc
net.save_parameters('%s/%.4f-cifar-%s-%d-best.params' %
(save_dir, best_val_score, model_name, epoch))
(save_dir, best_val_score, model_name,
epoch))

if save_period and save_dir and (epoch + 1) % save_period == 0:
net.save_parameters('%s/cifar100-%s-%d.params' %
Expand Down
Loading

0 comments on commit f881fb4

Please sign in to comment.