diff --git a/.gitignore b/.gitignore index 894a44c..f82a459 100644 --- a/.gitignore +++ b/.gitignore @@ -102,3 +102,5 @@ venv.bak/ # mypy .mypy_cache/ + +.idea/ diff --git a/DDP.py b/DDP.py new file mode 100644 index 0000000..a181cdd --- /dev/null +++ b/DDP.py @@ -0,0 +1,164 @@ +import tempfile +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.optim as optim +import torch.multiprocessing as mp + +from torch.nn.parallel import DistributedDataParallel as DDP +import os + + +def setup(rank, world_size): + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = '29500' + + # initialize the process group + dist.init_process_group("gloo", rank=rank, world_size=world_size) + + # Explicitly setting seed to make sure that models created in two processes + # start from same random weights and biases. + torch.manual_seed(42) + + +def cleanup(): + dist.destroy_process_group() + + +class ToyModel(nn.Module): + def __init__(self): + super(ToyModel, self).__init__() + self.net1 = nn.Linear(10, 10) + self.relu = nn.ReLU() + self.net2 = nn.Linear(10, 5) + + def forward(self, x): + return self.net2(self.relu(self.net1(x))) + + +def demo_basic(rank, world_size): + setup(rank, world_size) + + # setup devices for this process, rank 1 uses GPUs [0, 1, 2, 3] and + # rank 2 uses GPUs [4, 5, 6, 7]. + n = torch.cuda.device_count() // world_size + device_ids = list(range(rank * n, (rank + 1) * n)) + + # create model and move it to device_ids[0] + model = ToyModel().to(device_ids[0]) + # output_device defaults to device_ids[0] + ddp_model = DDP(model, device_ids=device_ids) + + loss_fn = nn.MSELoss() + optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) + + optimizer.zero_grad() + outputs = ddp_model(torch.randn(20, 10)) + labels = torch.randn(20, 5).to(device_ids[0]) + loss_fn(outputs, labels).backward() + optimizer.step() + + cleanup() + + +def run_demo(demo_fn, world_size): + mp.spawn(demo_fn, + args=(world_size,), + nprocs=world_size, + join=True) + + +def demo_checkpoint(rank, world_size): + setup(rank, world_size) + + # setup devices for this process, rank 1 uses GPUs [0, 1, 2, 3] and + # rank 2 uses GPUs [4, 5, 6, 7]. + n = torch.cuda.device_count() // world_size + device_ids = list(range(rank * n, (rank + 1) * n)) + + model = ToyModel().to(device_ids[0]) + # output_device defaults to device_ids[0] + ddp_model = DDP(model, device_ids=device_ids) + + loss_fn = nn.MSELoss() + optimizer = optim.SGD(ddp_model.parameters(), lr=0.001) + + CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint" + if rank == 0: + # All processes should see same parameters as they all start from same + # random parameters and gradients are synchronized in backward passes. + # Therefore, saving it in one process is sufficient. + torch.save(ddp_model.state_dict(), CHECKPOINT_PATH) + + # Use a barrier() to make sure that process 1 loads the model after process + # 0 saves it. + dist.barrier() + # configure map_location properly + rank0_devices = [x - rank * len(device_ids) for x in device_ids] + device_pairs = zip(rank0_devices, device_ids) + map_location = {'cuda:%d' % x: 'cuda:%d' % y for x, y in device_pairs} + ddp_model.load_state_dict( + torch.load(CHECKPOINT_PATH, map_location=map_location)) + + optimizer.zero_grad() + outputs = ddp_model(torch.randn(20, 10)) + labels = torch.randn(20, 5).to(device_ids[0]) + loss_fn = nn.MSELoss() + loss_fn(outputs, labels).backward() + optimizer.step() + + # Use a barrier() to make sure that all processes have finished reading the + # checkpoint + dist.barrier() + + if rank == 0: + os.remove(CHECKPOINT_PATH) + + cleanup() + + +class ToyMpModel(nn.Module): + def __init__(self, dev0, dev1): + super(ToyMpModel, self).__init__() + self.dev0 = dev0 + self.dev1 = dev1 + self.net1 = torch.nn.Linear(10, 10).to(dev0) + self.relu = torch.nn.ReLU() + self.net2 = torch.nn.Linear(10, 5).to(dev1) + + def forward(self, x): + x = x.to(self.dev0) + x = self.relu(self.net1(x)) + x = x.to(self.dev1) + return self.net2(x) + + +def demo_model_parallel(rank, world_size): + setup(rank, world_size) + + # setup mp_model and devices for this process + dev0 = rank * 1 + dev1 = rank * 1 + 1 + mp_model = ToyMpModel(dev0, dev1) + ddp_mp_model = DDP(mp_model) + + loss_fn = nn.MSELoss() + optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001) + + optimizer.zero_grad() + # outputs will be on dev1 + outputs = ddp_mp_model(torch.randn(20, 10)) + labels = torch.randn(20, 5).to(dev1) + loss_fn(outputs, labels).backward() + optimizer.step() + + cleanup() + + +if __name__ == "__main__": + run_demo(demo_basic, 2) + run_demo(demo_checkpoint, 2) + + if torch.cuda.device_count() >= 8: + run_demo(demo_model_parallel, 4) + diff --git a/allreduce.py b/allreduce.py new file mode 100644 index 0000000..f36b44c --- /dev/null +++ b/allreduce.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +import os +import torch as th +import torch.distributed as dist +from torch.multiprocessing import Process + + +def allreduce(send, recv): + """ Implementation of a ring-reduce. """ + rank = dist.get_rank() + size = dist.get_world_size() + send_buff = th.zeros(send.size()) + recv_buff = th.zeros(send.size()) + accum = th.zeros(send.size()) + accum[:] = send[:] + #th.cuda.synchronize() + + left = ((rank - 1) + size) % size + right = (rank + 1) % size + + for i in range(size - 1): + if i % 2 == 0: + # Send send_buff + send_req = dist.isend(send_buff, right) + dist.recv(recv_buff, left) + accum[:] += recv[:] + else: + # Send recv_buff + send_req = dist.isend(recv_buff, right) + dist.recv(send_buff, left) + accum[:] += send[:] + send_req.wait() + #th.cuda.synchronize() + recv[:] = accum[:] + + +def run(rank, size): + """ Distributed function to be implemented later. """ +# t = th.ones(2, 2) + t = th.rand(2, 2).cuda() + # for _ in range(10000000): + for _ in range(4): + c = t.clone() + dist.all_reduce(c, dist.reduce_op.SUM) +# allreduce(t, c) + t.set_(c) + print(t) + +def init_processes(rank, size, fn, backend='mpi'): + """ Initialize the distributed environment. """ + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = '29500' +# dist.init_process_group(backend, rank=rank, world_size=size) + dist.init_process_group(backend, world_size=size) + fn(rank, size) + + +if __name__ == "__main__": + size = 1 + processes = [] + for rank in range(size): + p = Process(target=init_processes, args=(rank, size, run)) + p.start() + processes.append(p) + + for p in processes: + p.join() diff --git a/aws.py b/aws.py new file mode 100644 index 0000000..16ccb17 --- /dev/null +++ b/aws.py @@ -0,0 +1,248 @@ +import time +import sys +import torch + +if __name__ == '__main__': + torch.multiprocessing.set_start_method('spawn') + +import torch.nn as nn +import torch.nn.parallel +import torch.distributed as dist +import torch.optim +import torch.utils.data +import torch.utils.data.distributed +import torchvision.transforms as transforms +import torchvision.datasets as datasets +import torchvision.models as models + +from torch.multiprocessing import Pool, Process + +class AverageMeter(object): + """Computes and stores the average and current value""" + def __init__(self): + self.reset() + + def reset(self): + self.val = 0 + self.avg = 0 + self.sum = 0 + self.count = 0 + + def update(self, val, n=1): + self.val = val + self.sum += val * n + self.count += n + self.avg = self.sum / self.count + +def accuracy(output, target, topk=(1,)): + """Computes the precision@k for the specified values of k""" + with torch.no_grad(): + maxk = max(topk) + batch_size = target.size(0) + + _, pred = output.topk(maxk, 1, True, True) + pred = pred.t() + correct = pred.eq(target.view(1, -1).expand_as(pred)) + + res = [] + for k in topk: + correct_k = correct[:k].view(-1).float().sum(0, keepdim=True) + res.append(correct_k.mul_(100.0 / batch_size)) + return res + + +def train(train_loader, model, criterion, optimizer, epoch): + + batch_time = AverageMeter() + data_time = AverageMeter() + losses = AverageMeter() + top1 = AverageMeter() + top5 = AverageMeter() + + # switch to train mode 转到训练模式 + model.train() + + end = time.time() + for i, (input, target) in enumerate(train_loader): + + # measure data loading time 计算加载数据的时间 + data_time.update(time.time() - end) + + # Create non_blocking tensors for distributed training 为分布式训练创建 non_blocking 张量 + input = input.cuda(non_blocking=True) + target = target.cuda(non_blocking=True) + + # compute output 计算输出 + output = model(input) + loss = criterion(output, target) + + # measure accuracy and record loss 计算准确率并记录 loss + prec1, prec5 = accuracy(output, target, topk=(1, 5)) + losses.update(loss.item(), input.size(0)) + top1.update(prec1[0], input.size(0)) + top5.update(prec5[0], input.size(0)) + + # compute gradients in a backward pass 在反向传播中计算梯度 + optimizer.zero_grad() + loss.backward() + + # Call step of optimizer to update model params 调用一个 optimizer 步骤来更新模型参数 + optimizer.step() + + # measure elapsed time 计算花费的时间 + batch_time.update(time.time() - end) + end = time.time() + + if i % 10 == 0: + print('Epoch: [{0}][{1}/{2}]\t' + 'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t' + 'Data {data_time.val:.3f} ({data_time.avg:.3f})\t' + 'Loss {loss.val:.4f} ({loss.avg:.4f})\t' + 'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t' + 'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format( + epoch, i, len(train_loader), batch_time=batch_time, + data_time=data_time, loss=losses, top1=top1, top5=top5)) + +def adjust_learning_rate(initial_lr, optimizer, epoch): + """Sets the learning rate to the initial LR decayed by 10 every 30 epochs""" + lr = initial_lr * (0.1 ** (epoch // 30)) + for param_group in optimizer.param_groups: + param_group['lr'] = lr + + +def validate(val_loader, model, criterion): + + batch_time = AverageMeter() + losses = AverageMeter() + top1 = AverageMeter() + top5 = AverageMeter() + + # switch to evaluate mode 转到验证模式 + model.eval() + + with torch.no_grad(): + end = time.time() + for i, (input, target) in enumerate(val_loader): + + input = input.cuda(non_blocking=True) + target = target.cuda(non_blocking=True) + + # compute output 计算输出 + output = model(input) + loss = criterion(output, target) + + # measure accuracy and record loss 计算准确率并记录 loss + prec1, prec5 = accuracy(output, target, topk=(1, 5)) + losses.update(loss.item(), input.size(0)) + top1.update(prec1[0], input.size(0)) + top5.update(prec5[0], input.size(0)) + + # measure elapsed time 计算花费时间 + batch_time.update(time.time() - end) + end = time.time() + + if i % 100 == 0: + print('Test: [{0}/{1}]\t' + 'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t' + 'Loss {loss.val:.4f} ({loss.avg:.4f})\t' + 'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t' + 'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format( + i, len(val_loader), batch_time=batch_time, loss=losses, + top1=top1, top5=top5)) + + print(' * Prec@1 {top1.avg:.3f} Prec@5 {top5.avg:.3f}' + .format(top1=top1, top5=top5)) + + return top1.avg + +print("Collect Inputs...") + +# Batch Size for training and testing 训练和测试的 batch size +batch_size = 32 + +# Number of additional worker processes for dataloading 数据加载的额外工作进程数 +workers = 2 + +# Number of epochs to train for 训练的 epoch 数 +num_epochs = 2 + +# Starting Learning Rate 初始学习率 +starting_lr = 0.1 + +# Number of distributed processes 分布式进程数 +world_size = 4 + +# Distributed backend type 分布式后端类型 +dist_backend = 'nccl' + +# Url used to setup distributed training 设置分布式训练的 url +dist_url = "tcp://172.31.22.234:23456" + +print("Initialize Process Group...") +# Initialize Process Group 初始化进程组 +# v1 - init with url 使用 url 初始化 +dist.init_process_group(backend=dist_backend, init_method=dist_url, rank=int(sys.argv[1]), world_size=world_size) +# v2 - init with file 使用文件初始化 +# dist.init_process_group(backend="nccl", init_method="file:///home/ubuntu/pt-distributed-tutorial/trainfile", rank=int(sys.argv[1]), world_size=world_size) +# v3 - init with environment variables 使用环境变量初始化 +# dist.init_process_group(backend="nccl", init_method="env://", rank=int(sys.argv[1]), world_size=world_size) + +# Establish Local Rank and set device on this node 设置节点的本地化编号和设备 +local_rank = int(sys.argv[2]) +dp_device_ids = [local_rank] +torch.cuda.set_device(local_rank) + +print("Initialize Model...") +# Construct Model 构建模型 +model = models.resnet18(pretrained=False).cuda() +# Make model DistributedDataParallel +model = torch.nn.parallel.DistributedDataParallel(model, device_ids=dp_device_ids, output_device=local_rank) + +# define loss function (criterion) and optimizer 定义 loss 函数和 optimizer +criterion = nn.CrossEntropyLoss().cuda() +optimizer = torch.optim.SGD(model.parameters(), starting_lr, momentum=0.9, weight_decay=1e-4) + +print("Initialize Dataloaders...") +# Define the transform for the data. Notice, we must resize to 224x224 with this dataset and model. 定义数据的变换。尺寸转为224x224 +transform = transforms.Compose( + [transforms.Resize(224), + transforms.ToTensor(), + transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) + +# Initialize Datasets. STL10 will automatically download if not present 初始化数据集。如果没有STL10数据集则会自动下载 +trainset = datasets.STL10(root='./data', split='train', download=True, transform=transform) +valset = datasets.STL10(root='./data', split='test', download=True, transform=transform) + +# Create DistributedSampler to handle distributing the dataset across nodes when training 创建分布式采样器来控制训练中节点间的数据分发 +# This can only be called after torch.distributed.init_process_group is called 这个只能在 torch.distributed.init_process_group 被调用后调用 +train_sampler = torch.utils.data.distributed.DistributedSampler(trainset) + +# Create the Dataloaders to feed data to the training and validation steps 创建数据加载器,在训练和验证步骤中喂数据 +train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler) +val_loader = torch.utils.data.DataLoader(valset, batch_size=batch_size, shuffle=False, num_workers=workers, pin_memory=False) + +best_prec1 = 0 + +for epoch in range(num_epochs): + # Set epoch count for DistributedSampler 为分布式采样器设置 epoch 数 + train_sampler.set_epoch(epoch) + + # Adjust learning rate according to schedule 调整学习率 + adjust_learning_rate(starting_lr, optimizer, epoch) + + # train for one epoch 训练1个 epoch + print("\nBegin Training Epoch {}".format(epoch+1)) + train(train_loader, model, criterion, optimizer, epoch) + + # evaluate on validation set 在验证集上验证 + print("Begin Validation @ Epoch {}".format(epoch+1)) + prec1 = validate(val_loader, model, criterion) + + # remember best prec@1 and save checkpoint if desired 保存最佳的prec@1,如果需要的话保存检查点 + # is_best = prec1 > best_prec1 + best_prec1 = max(prec1, best_prec1) + + print("Epoch Summary: ") + print("\tEpoch Accuracy: {}".format(prec1)) + print("\tBest Accuracy: {}".format(best_prec1)) + diff --git a/gloo.py b/gloo.py new file mode 100644 index 0000000..976f3b1 --- /dev/null +++ b/gloo.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python +import os +import torch as th +import torch.distributed as dist +from torch.multiprocessing import Process + + +def allreduce(send, recv): + """ Implementation of a ring-reduce. """ + rank = dist.get_rank() + size = dist.get_world_size() + send_buff = th.zeros(send.size()) + recv_buff = th.zeros(send.size()) + accum = th.zeros(send.size()) + accum[:] = send[:] + # th.cuda.synchronize() + + left = ((rank - 1) + size) % size + right = (rank + 1) % size + + for i in range(size - 1): + if i % 2 == 0: + # Send send_buff + send_req = dist.isend(send_buff, right) + dist.recv(recv_buff, left) + accum[:] += recv[:] + else: + # Send recv_buff + send_req = dist.isend(recv_buff, right) + dist.recv(send_buff, left) + accum[:] += send[:] + send_req.wait() + # th.cuda.synchronize() + recv[:] = accum[:] + + +def run(rank, size): + """ Distributed function to be implemented later. """ +# t = th.ones(2, 2) + t = th.rand(2, 2).cuda() + # for _ in range(10000000): + for _ in range(4): + c = t.clone() + dist.all_reduce(c, dist.reduce_op.SUM) + # allreduce(t, c) + t.set_(c) + print(t) + + +def init_processes(rank, size, fn, backend='gloo'): + """ Initialize the distributed environment. """ + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = '29500' + dist.init_process_group(backend, rank=rank, world_size=size) + fn(rank, size) + + +if __name__ == "__main__": + size = 7 + processes = [] + for rank in range(size): + p = Process(target=init_processes, args=(rank, size, run)) + p.start() + processes.append(p) + + for p in processes: + p.join() + diff --git a/imagenet.py b/imagenet.py new file mode 100644 index 0000000..02d31f3 --- /dev/null +++ b/imagenet.py @@ -0,0 +1,425 @@ +import argparse +import os +import random +import shutil +import time +import warnings + +import torch +import torch.nn as nn +import torch.nn.parallel +import torch.backends.cudnn as cudnn +import torch.distributed as dist +import torch.optim +import torch.multiprocessing as mp +import torch.utils.data +import torch.utils.data.distributed +import torchvision.transforms as transforms +import torchvision.datasets as datasets +import torchvision.models as models + +model_names = sorted(name for name in models.__dict__ + if name.islower() and not name.startswith("__") + and callable(models.__dict__[name])) + +parser = argparse.ArgumentParser(description='PyTorch ImageNet Training') +parser.add_argument('data', metavar='DIR', + help='path to dataset') +parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet18', + choices=model_names, + help='model architecture: ' + + ' | '.join(model_names) + + ' (default: resnet18)') +parser.add_argument('-j', '--workers', default=4, type=int, metavar='N', + help='number of data loading workers (default: 4)') +parser.add_argument('--epochs', default=90, type=int, metavar='N', + help='number of total epochs to run') +parser.add_argument('--start-epoch', default=0, type=int, metavar='N', + help='manual epoch number (useful on restarts)') +parser.add_argument('-b', '--batch-size', default=256, type=int, + metavar='N', + help='mini-batch size (default: 256), this is the total ' + 'batch size of all GPUs on the current node when ' + 'using Data Parallel or Distributed Data Parallel') +parser.add_argument('--lr', '--learning-rate', default=0.1, type=float, + metavar='LR', help='initial learning rate', dest='lr') +parser.add_argument('--momentum', default=0.9, type=float, metavar='M', + help='momentum') +parser.add_argument('--wd', '--weight-decay', default=1e-4, type=float, + metavar='W', help='weight decay (default: 1e-4)', + dest='weight_decay') +parser.add_argument('-p', '--print-freq', default=10, type=int, + metavar='N', help='print frequency (default: 10)') +parser.add_argument('--resume', default='', type=str, metavar='PATH', + help='path to latest checkpoint (default: none)') +parser.add_argument('-e', '--evaluate', dest='evaluate', action='store_true', + help='evaluate model on validation set') +parser.add_argument('--pretrained', dest='pretrained', action='store_true', + help='use pre-trained model') +parser.add_argument('--world-size', default=-1, type=int, + help='number of nodes for distributed training') +parser.add_argument('--rank', default=-1, type=int, + help='node rank for distributed training') +parser.add_argument('--dist-url', default='tcp://172.18.146.204:23456', type=str, + help='url used to set up distributed training') +parser.add_argument('--dist-backend', default='nccl', type=str, + help='distributed backend') +parser.add_argument('--seed', default=None, type=int, + help='seed for initializing training. ') +parser.add_argument('--gpu', default=None, type=int, + help='GPU id to use.') +parser.add_argument('--multiprocessing-distributed', action='store_true', + help='Use multi-processing distributed training to launch ' + 'N processes per node, which has N GPUs. This is the ' + 'fastest way to use PyTorch for either single node or ' + 'multi node data parallel training') + +best_acc1 = 0 + + +def main(): + args = parser.parse_args() + + if args.seed is not None: + random.seed(args.seed) + torch.manual_seed(args.seed) + cudnn.deterministic = True + warnings.warn('You have chosen to seed training. ' + 'This will turn on the CUDNN deterministic setting, ' + 'which can slow down your training considerably! ' + 'You may see unexpected behavior when restarting ' + 'from checkpoints.') + + if args.gpu is not None: + warnings.warn('You have chosen a specific GPU. This will completely ' + 'disable data parallelism.') + + if args.dist_url == "env://" and args.world_size == -1: + args.world_size = int(os.environ["WORLD_SIZE"]) + + args.distributed = args.world_size > 1 or args.multiprocessing_distributed + + ngpus_per_node = torch.cuda.device_count() + if args.multiprocessing_distributed: + # Since we have ngpus_per_node processes per node, the total world_size + # needs to be adjusted accordingly + args.world_size = ngpus_per_node * args.world_size + # Use torch.multiprocessing.spawn to launch distributed processes: the + # main_worker process function + mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args)) + else: + # Simply call main_worker function + main_worker(args.gpu, ngpus_per_node, args) + + +def main_worker(gpu, ngpus_per_node, args): + global best_acc1 + args.gpu = gpu + + if args.gpu is not None: + print("Use GPU: {} for training".format(args.gpu)) + + if args.distributed: + if args.dist_url == "env://" and args.rank == -1: + args.rank = int(os.environ["RANK"]) + if args.multiprocessing_distributed: + # For multiprocessing distributed training, rank needs to be the + # global rank among all the processes + args.rank = args.rank * ngpus_per_node + gpu + dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, + world_size=args.world_size, rank=args.rank) + # create model + if args.pretrained: + print("=> using pre-trained model '{}'".format(args.arch)) + model = models.__dict__[args.arch](pretrained=True) + else: + print("=> creating model '{}'".format(args.arch)) + model = models.__dict__[args.arch]() + + if args.distributed: + # For multiprocessing distributed, DistributedDataParallel constructor + # should always set the single device scope, otherwise, + # DistributedDataParallel will use all available devices. + if args.gpu is not None: + torch.cuda.set_device(args.gpu) + model.cuda(args.gpu) + # When using a single GPU per process and per + # DistributedDataParallel, we need to divide the batch size + # ourselves based on the total number of GPUs we have + args.batch_size = int(args.batch_size / ngpus_per_node) + args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node) + model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) + else: + model.cuda() + # DistributedDataParallel will divide and allocate batch_size to all + # available GPUs if device_ids are not set + model = torch.nn.parallel.DistributedDataParallel(model) + elif args.gpu is not None: + torch.cuda.set_device(args.gpu) + model = model.cuda(args.gpu) + else: + # DataParallel will divide and allocate batch_size to all available GPUs + if args.arch.startswith('alexnet') or args.arch.startswith('vgg'): + model.features = torch.nn.DataParallel(model.features) + model.cuda() + else: + model = torch.nn.DataParallel(model).cuda() + + # define loss function (criterion) and optimizer + criterion = nn.CrossEntropyLoss().cuda(args.gpu) + + optimizer = torch.optim.SGD(model.parameters(), args.lr, + momentum=args.momentum, + weight_decay=args.weight_decay) + + # optionally resume from a checkpoint + if args.resume: + if os.path.isfile(args.resume): + print("=> loading checkpoint '{}'".format(args.resume)) + if args.gpu is None: + checkpoint = torch.load(args.resume) + else: + # Map model to be loaded to specified single gpu. + loc = 'cuda:{}'.format(args.gpu) + checkpoint = torch.load(args.resume, map_location=loc) + args.start_epoch = checkpoint['epoch'] + best_acc1 = checkpoint['best_acc1'] + if args.gpu is not None: + # best_acc1 may be from a checkpoint from a different GPU + best_acc1 = best_acc1.to(args.gpu) + model.load_state_dict(checkpoint['state_dict']) + optimizer.load_state_dict(checkpoint['optimizer']) + print("=> loaded checkpoint '{}' (epoch {})" + .format(args.resume, checkpoint['epoch'])) + else: + print("=> no checkpoint found at '{}'".format(args.resume)) + + cudnn.benchmark = True + + # Data loading code + traindir = os.path.join(args.data, 'train') + valdir = os.path.join(args.data, 'val') + normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]) + + train_dataset = datasets.ImageFolder( + traindir, + transforms.Compose([ + transforms.RandomResizedCrop(224), + transforms.RandomHorizontalFlip(), + transforms.ToTensor(), + normalize, + ])) + + if args.distributed: + train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) + else: + train_sampler = None + + train_loader = torch.utils.data.DataLoader( + train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), + num_workers=args.workers, pin_memory=True, sampler=train_sampler) + + val_loader = torch.utils.data.DataLoader( + datasets.ImageFolder(valdir, transforms.Compose([ + transforms.Resize(256), + transforms.CenterCrop(224), + transforms.ToTensor(), + normalize, + ])), + batch_size=args.batch_size, shuffle=False, + num_workers=args.workers, pin_memory=True) + + if args.evaluate: + validate(val_loader, model, criterion, args) + return + + for epoch in range(args.start_epoch, args.epochs): + if args.distributed: + train_sampler.set_epoch(epoch) + adjust_learning_rate(optimizer, epoch, args) + + # train for one epoch + train(train_loader, model, criterion, optimizer, epoch, args) + + # evaluate on validation set + acc1 = validate(val_loader, model, criterion, args) + + # remember best acc@1 and save checkpoint + is_best = acc1 > best_acc1 + best_acc1 = max(acc1, best_acc1) + + if not args.multiprocessing_distributed or (args.multiprocessing_distributed + and args.rank % ngpus_per_node == 0): + save_checkpoint({ + 'epoch': epoch + 1, + 'arch': args.arch, + 'state_dict': model.state_dict(), + 'best_acc1': best_acc1, + 'optimizer' : optimizer.state_dict(), + }, is_best) + + +def train(train_loader, model, criterion, optimizer, epoch, args): + batch_time = AverageMeter('Time', ':6.3f') + data_time = AverageMeter('Data', ':6.3f') + losses = AverageMeter('Loss', ':.4e') + top1 = AverageMeter('Acc@1', ':6.2f') + top5 = AverageMeter('Acc@5', ':6.2f') + progress = ProgressMeter( + len(train_loader), + [batch_time, data_time, losses, top1, top5], + prefix="Epoch: [{}]".format(epoch)) + + # switch to train mode + model.train() + + end = time.time() + for i, (images, target) in enumerate(train_loader): + # measure data loading time + data_time.update(time.time() - end) + + if args.gpu is not None: + images = images.cuda(args.gpu, non_blocking=True) + target = target.cuda(args.gpu, non_blocking=True) + + # compute output + output = model(images) + loss = criterion(output, target) + + # measure accuracy and record loss + acc1, acc5 = accuracy(output, target, topk=(1, 5)) + losses.update(loss.item(), images.size(0)) + top1.update(acc1[0], images.size(0)) + top5.update(acc5[0], images.size(0)) + + # compute gradient and do SGD step + optimizer.zero_grad() + loss.backward() + optimizer.step() + + # measure elapsed time + batch_time.update(time.time() - end) + end = time.time() + + if i % args.print_freq == 0: + progress.display(i) + + +def validate(val_loader, model, criterion, args): + batch_time = AverageMeter('Time', ':6.3f') + losses = AverageMeter('Loss', ':.4e') + top1 = AverageMeter('Acc@1', ':6.2f') + top5 = AverageMeter('Acc@5', ':6.2f') + progress = ProgressMeter( + len(val_loader), + [batch_time, losses, top1, top5], + prefix='Test: ') + + # switch to evaluate mode + model.eval() + + with torch.no_grad(): + end = time.time() + for i, (images, target) in enumerate(val_loader): + if args.gpu is not None: + images = images.cuda(args.gpu, non_blocking=True) + target = target.cuda(args.gpu, non_blocking=True) + + # compute output + output = model(images) + loss = criterion(output, target) + + # measure accuracy and record loss + acc1, acc5 = accuracy(output, target, topk=(1, 5)) + losses.update(loss.item(), images.size(0)) + top1.update(acc1[0], images.size(0)) + top5.update(acc5[0], images.size(0)) + + # measure elapsed time + batch_time.update(time.time() - end) + end = time.time() + + if i % args.print_freq == 0: + progress.display(i) + + # TODO: this should also be done with the ProgressMeter + print(' * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}' + .format(top1=top1, top5=top5)) + + return top1.avg + + +def save_checkpoint(state, is_best, filename='checkpoint.pth.tar'): + torch.save(state, filename) + if is_best: + shutil.copyfile(filename, 'model_best.pth.tar') + + +class AverageMeter(object): + """Computes and stores the average and current value""" + def __init__(self, name, fmt=':f'): + self.name = name + self.fmt = fmt + self.reset() + + def reset(self): + self.val = 0 + self.avg = 0 + self.sum = 0 + self.count = 0 + + def update(self, val, n=1): + self.val = val + self.sum += val * n + self.count += n + self.avg = self.sum / self.count + + def __str__(self): + fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})' + return fmtstr.format(**self.__dict__) + + +class ProgressMeter(object): + def __init__(self, num_batches, meters, prefix=""): + self.batch_fmtstr = self._get_batch_fmtstr(num_batches) + self.meters = meters + self.prefix = prefix + + def display(self, batch): + entries = [self.prefix + self.batch_fmtstr.format(batch)] + entries += [str(meter) for meter in self.meters] + print('\t'.join(entries)) + + def _get_batch_fmtstr(self, num_batches): + num_digits = len(str(num_batches // 1)) + fmt = '{:' + str(num_digits) + 'd}' + return '[' + fmt + '/' + fmt.format(num_batches) + ']' + + +def adjust_learning_rate(optimizer, epoch, args): + """Sets the learning rate to the initial LR decayed by 10 every 30 epochs""" + lr = args.lr * (0.1 ** (epoch // 30)) + for param_group in optimizer.param_groups: + param_group['lr'] = lr + + +def accuracy(output, target, topk=(1,)): + """Computes the accuracy over the k top predictions for the specified values of k""" + with torch.no_grad(): + maxk = max(topk) + batch_size = target.size(0) + + _, pred = output.topk(maxk, 1, True, True) + pred = pred.t() + correct = pred.eq(target.view(1, -1).expand_as(pred)) + + res = [] + for k in topk: + correct_k = correct[:k].view(-1).float().sum(0, keepdim=True) + res.append(correct_k.mul_(100.0 / batch_size)) + return res + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/mnist1n1g.py b/mnist1n1g.py new file mode 100644 index 0000000..bd60b11 --- /dev/null +++ b/mnist1n1g.py @@ -0,0 +1,149 @@ +from __future__ import print_function +import argparse +import torch +import torch.nn as nn +import time + +import torch.nn.parallel +import torch.nn.functional as F +import torch.backends.cudnn as cudnn +import torch.distributed as dist +import torch.utils.data +import torch.utils.data.distributed +import torch.optim as optim +from torchvision import datasets, transforms + +# Training settings +parser = argparse.ArgumentParser(description='PyTorch MNIST Example') +parser.add_argument('--batch-size', type=int, default=1024, metavar='N', + help='input batch size for training (default: 64)') +parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', + help='input batch size for testing (default: 1000)') +parser.add_argument('--epochs', type=int, default=20, metavar='N', + help='number of epochs to train (default: 10)') +parser.add_argument('--lr', type=float, default=0.01, metavar='LR', + help='learning rate (default: 0.01)') +parser.add_argument('--momentum', type=float, default=0.5, metavar='M', + help='SGD momentum (default: 0.5)') +parser.add_argument('--no-cuda', action='store_true', default=False, + help='disables CUDA training') +parser.add_argument('--seed', type=int, default=1, metavar='S', + help='random seed (default: 1)') +parser.add_argument('--log-interval', type=int, default=10, metavar='N', + help='how many batches to wait before logging training status') +parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456') +parser.add_argument('--rank', type=int, default=0) +parser.add_argument('--world-size', type=int, default=1) + +args = parser.parse_args() +args.cuda = not args.no_cuda and torch.cuda.is_available() + +#初始化 +dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank, + group_name="pytorch_test") + +torch.manual_seed(args.seed) +if args.cuda: + torch.cuda.manual_seed(args.seed) + +train_dataset = datasets.MNIST('data', train=True, download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) +# 分发数据 +train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) + +kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {} + +train_loader = torch.utils.data.DataLoader(train_dataset, + batch_size=args.batch_size, shuffle=True, **kwargs) +test_loader = torch.utils.data.DataLoader( + datasets.MNIST('data', train=False, transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])), + batch_size=args.test_batch_size, shuffle=True, **kwargs) + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x) + +model = Net() +if args.cuda: + # 分发模型 + model.cuda() + model = torch.nn.parallel.DistributedDataParallel(model) + # model = torch.nn.DataParallel(model,device_ids=[0,1,2,3]).cuda() + # model.cuda() + +optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum) + + +def train(epoch): + model.train() + for batch_idx, (data, target) in enumerate(train_loader): + if args.cuda: + data, target = data.cuda(), target.cuda() + # data, target = Variable(data), Variable(target) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + loss.backward() + optimizer.step() + if batch_idx % args.log_interval == 0: + print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( + epoch, batch_idx * len(data), len(train_loader.dataset), + 100. * batch_idx / len(train_loader), loss.item())) + + +def test(): + model.eval() + test_loss = 0 + correct = 0 + for data, target in test_loader: + if args.cuda: + data, target = data.cuda(), target.cuda() + # data, target = Variable(data, volatile=True), Variable(target) + output = model(data) + test_loss += F.nll_loss(output, target, size_average=False).item() # sum up batch loss + pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability + correct += pred.eq(target.data.view_as(pred)).cpu().sum() + + test_loss /= len(test_loader.dataset) + print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( + test_loss, correct, len(test_loader.dataset), + 100. * correct / len(test_loader.dataset))) + +tot_time = 0 + +for epoch in range(1, args.epochs + 1): + # 设置epoch位置,这应该是个为了同步所做的工作 + train_sampler.set_epoch(epoch) + start_cpu_secs = time.time() + #long running + train(epoch) + end_cpu_secs = time.time() + print("Epoch {} of {} took {:.3f}s".format( + epoch, args.epochs, end_cpu_secs - start_cpu_secs)) + tot_time += end_cpu_secs - start_cpu_secs + test() + +torch.save(model.state_dict(), './pth/mnist1n1g.pth') + +print("Total time= {:.3f}s".format(tot_time)) diff --git a/mnist1n1g_test.py b/mnist1n1g_test.py new file mode 100644 index 0000000..902f725 --- /dev/null +++ b/mnist1n1g_test.py @@ -0,0 +1,104 @@ +from __future__ import print_function +import argparse +import torch +import torch.nn as nn +import time + +import torch.nn.parallel +import torch.nn.functional as F +import torch.backends.cudnn as cudnn +import torch.distributed as dist +import torch.utils.data +import torch.utils.data.distributed +import torch.optim as optim +from torchvision import datasets, transforms + +# Training settings +parser = argparse.ArgumentParser(description='PyTorch MNIST Example') +parser.add_argument('--batch-size', type=int, default=1024, metavar='N', + help='input batch size for training (default: 64)') +parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', + help='input batch size for testing (default: 1000)') +parser.add_argument('--epochs', type=int, default=20, metavar='N', + help='number of epochs to train (default: 10)') +parser.add_argument('--lr', type=float, default=0.01, metavar='LR', + help='learning rate (default: 0.01)') +parser.add_argument('--momentum', type=float, default=0.5, metavar='M', + help='SGD momentum (default: 0.5)') +parser.add_argument('--no-cuda', action='store_true', default=False, + help='disables CUDA training') +parser.add_argument('--seed', type=int, default=1, metavar='S', + help='random seed (default: 1)') +parser.add_argument('--log-interval', type=int, default=10, metavar='N', + help='how many batches to wait before logging training status') +parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456') +parser.add_argument('--rank', type=int, default=0) +parser.add_argument('--world-size', type=int, default=1) + +args = parser.parse_args() +args.cuda = not args.no_cuda and torch.cuda.is_available() + +#初始化 +dist.init_process_group(init_method=args.init_method, backend="nccl", world_size=args.world_size, rank=args.rank, + group_name="pytorch_test") + +torch.manual_seed(args.seed) +if args.cuda: + torch.cuda.manual_seed(args.seed) + +kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {} + +test_loader = torch.utils.data.DataLoader( + datasets.MNIST('data', train=False, transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])), + batch_size=args.test_batch_size, shuffle=True, **kwargs) + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x) + + +model = Net() +if args.cuda: + model.cuda() + model = torch.nn.parallel.DistributedDataParallel(model) +model.load_state_dict(torch.load('./pth/mnist1n1g.pth')) + +def test(): + model.eval() + test_loss = 0 + correct = 0 + for data, target in test_loader: + if args.cuda: + data, target = data.cuda(), target.cuda() + # data, target = Variable(data, volatile=True), Variable(target) + output = model(data) + test_loss += F.nll_loss(output, target, size_average=False).item() # sum up batch loss + pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability + correct += pred.eq(target.data.view_as(pred)).cpu().sum() + + test_loss /= len(test_loader.dataset) + print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( + test_loss, correct, len(test_loader.dataset), + 100. * correct / len(test_loader.dataset))) + +test() + + diff --git a/mnist1n2g.py b/mnist1n2g.py new file mode 100644 index 0000000..cb9ff29 --- /dev/null +++ b/mnist1n2g.py @@ -0,0 +1,38 @@ +import os +import torch +import torch.distributed as dist +from torch.multiprocessing import Process + +import torch.nn.functional as F +import torch.backends.cudnn as cudnn +import torch.distributed as dist +import torch.utils.data +import torch.utils.data.distributed +import torch.optim as optim +from torchvision import datasets, transforms +from torch.autograd import Variable + + +def run(rank, size): + """ Distributed function to be implemented later. """ + pass + + +def init_processes(rank, size, fn, backend='tcp'): + """ Initialize the distributed environment. """ + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = '29500' + dist.init_process_group(backend, rank=rank, world_size=size) + fn(rank, size) + + +if __name__ == "__main__": + size = 2 + processes = [] + for rank in range(size): + p = Process(target=init_processes, args=(rank, size, run)) + p.start() + processes.append(p) + + for p in processes: + p.join() \ No newline at end of file diff --git a/mnist1n2g_test.py b/mnist1n2g_test.py new file mode 100644 index 0000000..9338c4d --- /dev/null +++ b/mnist1n2g_test.py @@ -0,0 +1,122 @@ +import os +import argparse +from random import Random + +import torch +import torch.nn as nn +from torch.multiprocessing import Process + +import torch.nn.functional as F +import torch.backends.cudnn as cudnn +import torch.distributed as dist +import torch.utils.data +import torch.utils.data.distributed +import torch.optim as optim +from torchvision import datasets, transforms + +# Training settings +parser = argparse.ArgumentParser(description='PyTorch MNIST Example') +parser.add_argument('--batch-size', type=int, default=1024, metavar='N', + help='input batch size for training (default: 64)') +parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', + help='input batch size for testing (default: 1000)') +parser.add_argument('--epochs', type=int, default=20, metavar='N', + help='number of epochs to train (default: 10)') +parser.add_argument('--lr', type=float, default=0.01, metavar='LR', + help='learning rate (default: 0.01)') +parser.add_argument('--momentum', type=float, default=0.5, metavar='M', + help='SGD momentum (default: 0.5)') +parser.add_argument('--no-cuda', action='store_true', default=False, + help='disables CUDA training') +parser.add_argument('--seed', type=int, default=1, metavar='S', + help='random seed (default: 1)') +parser.add_argument('--log-interval', type=int, default=10, metavar='N', + help='how many batches to wait before logging training status') +parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456') +parser.add_argument('--rank', type=int, default=0) +parser.add_argument('--world-size', type=int, default=2) + +args = parser.parse_args() +args.cuda = not args.no_cuda and torch.cuda.is_available() +kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {} + +test_dataset = datasets.MNIST('data', train=False, transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) + + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x) + + +model = Net() +if args.cuda: + model.cuda() + model = torch.nn.parallel.DistributedDataParallel(model) +model.load_state_dict(torch.load('./pth/mnist1n1g.pth')) + + +def test(): + model.eval() + test_loss = 0 + correct = 0 + for data, target in test_loader: + if args.cuda: + data, target = data.cuda(), target.cuda() + output = model(data) + test_loss += F.nll_loss(output, target, size_average=False).item() # sum up batch loss + pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability + correct += pred.eq(target.data.view_as(pred)).cpu().sum() + + test_loss /= len(test_loader.dataset) + print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( + test_loss, correct, len(test_loader.dataset), + 100. * correct / len(test_loader.dataset))) + + +def run(rank, size): + print(rank, size) + test() + + +def init_processes(rank, size, fn, backend='nccl'): + """ Initialize the distributed environment. """ + # os.environ['MASTER_ADDR'] = '127.0.0.1' + # os.environ['MASTER_PORT'] = '29500' + dist.init_process_group(init_method=args.init_method, backend=backend, rank=rank, world_size=size, group_name="pytorch_test") + fn(rank, size) + + +if __name__ == "__main__": + + processes = [] + for rank in range(args.world_size): + p = Process(target=init_processes, args=(rank, args.world_size, run)) + p.start() + processes.append(p) + + for p in processes: + p.join() + # 分发数据 + test_sampler = torch.utils.data.distributed.DistributedSampler(test_dataset) + test_loader = torch.utils.data.DataLoader(test_dataset, + batch_size=args.test_batch_size, shuffle=True, sampler=test_sampler, + **kwargs) + test_sampler.set_epoch(1) diff --git a/mnist_train.py b/mnist_train.py new file mode 100644 index 0000000..02600e8 --- /dev/null +++ b/mnist_train.py @@ -0,0 +1,414 @@ +import argparse +import os +import random +import shutil +import time +import warnings + +import torch +import torch.nn as nn +import torch.nn.parallel +import torch.backends.cudnn as cudnn +import torch.distributed as dist +import torch.optim +import torch.multiprocessing as mp +import torch.utils.data +import torch.utils.data.distributed +import torchvision.transforms as transforms +import torchvision.datasets as datasets +import torchvision.models as models +import torch.nn.functional as F + + +model_names = sorted(name for name in models.__dict__ + if name.islower() and not name.startswith("__") + and callable(models.__dict__[name])) + +parser = argparse.ArgumentParser(description='PyTorch ImageNet Training') +# parser.add_argument('data', metavar='DIR', default='./', help='path to dataset') +parser.add_argument('-a', '--arch', metavar='ARCH', default='resnet18', + choices=model_names, + help='model architecture: ' + + ' | '.join(model_names) + + ' (default: resnet18)') +parser.add_argument('-j', '--workers', default=4, type=int, metavar='N', + help='number of data loading workers (default: 4)') +parser.add_argument('--epochs', default=20, type=int, metavar='N', + help='number of total epochs to run') +parser.add_argument('--start-epoch', default=0, type=int, metavar='N', + help='manual epoch number (useful on restarts)') +parser.add_argument('-b', '--batch-size', default=1024, type=int, + metavar='N', + help='mini-batch size (default: 256), this is the total ' + 'batch size of all GPUs on the current node when ' + 'using Data Parallel or Distributed Data Parallel') +parser.add_argument('--lr', '--learning-rate', default=0.1, type=float, + metavar='LR', help='initial learning rate', dest='lr') +parser.add_argument('--momentum', default=0.9, type=float, metavar='M', + help='momentum') +parser.add_argument('--wd', '--weight-decay', default=1e-4, type=float, + metavar='W', help='weight decay (default: 1e-4)', + dest='weight_decay') +parser.add_argument('-p', '--print-freq', default=10, type=int, + metavar='N', help='print frequency (default: 10)') +parser.add_argument('--resume', default='', type=str, metavar='PATH', + help='path to latest checkpoint (default: none)') +parser.add_argument('-e', '--evaluate', dest='evaluate', action='store_true', + help='evaluate model on validation set') +parser.add_argument('--pretrained', dest='pretrained', action='store_true', + help='use pre-trained model') +parser.add_argument('--world-size', default=-1, type=int, + help='number of nodes for distributed training') +parser.add_argument('--rank', default=-1, type=int, + help='node rank for distributed training') +parser.add_argument('--dist-url', default='tcp://172.18.146.204:23455', type=str, + help='url used to set up distributed training') +parser.add_argument('--dist-backend', default='nccl', type=str, + help='distributed backend') +parser.add_argument('--seed', default=None, type=int, + help='seed for initializing training. ') +parser.add_argument('--gpu', default=None, type=int, + help='GPU id to use.') +parser.add_argument('--multiprocessing-distributed', action='store_true', + help='Use multi-processing distributed training to launch ' + 'N processes per node, which has N GPUs. This is the ' + 'fastest way to use PyTorch for either single node or ' + 'multi node data parallel training') + +best_acc = 0 + + +def main(): + args = parser.parse_args() + + if args.seed is not None: + random.seed(args.seed) + torch.manual_seed(args.seed) + cudnn.deterministic = True + warnings.warn('You have chosen to seed training. ' + 'This will turn on the CUDNN deterministic setting, ' + 'which can slow down your training considerably! ' + 'You may see unexpected behavior when restarting ' + 'from checkpoints.') + + if args.gpu is not None: + warnings.warn('You have chosen a specific GPU. This will completely ' + 'disable data parallelism.') + + if args.dist_url == "env://" and args.world_size == -1: + args.world_size = int(os.environ["WORLD_SIZE"]) + + args.distributed = args.world_size > 1 or args.multiprocessing_distributed + + ngpus_per_node = torch.cuda.device_count() + if args.multiprocessing_distributed: + # Since we have ngpus_per_node processes per node, the total world_size + # needs to be adjusted accordingly + args.world_size = ngpus_per_node * args.world_size + # Use torch.multiprocessing.spawn to launch distributed processes: the + # main_worker process function + mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args)) + else: + # Simply call main_worker function + main_worker(args.gpu, ngpus_per_node, args) + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x) + + +def main_worker(gpu, ngpus_per_node, args): + global best_acc + args.gpu = gpu + + if args.gpu is not None: + print("Use GPU: {} for training".format(args.gpu)) + + if args.distributed: + if args.dist_url == "env://" and args.rank == -1: + args.rank = int(os.environ["RANK"]) + if args.multiprocessing_distributed: + # For multiprocessing distributed training, rank needs to be the + # global rank among all the processes + args.rank = args.rank * ngpus_per_node + gpu + dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url, + world_size=args.world_size, rank=args.rank) + # create model + # if args.pretrained: + # print("=> using pre-trained model '{}'".format(args.arch)) + # model = models.__dict__[args.arch](pretrained=True) + # else: + # print("=> creating model '{}'".format(args.arch)) + # model = models.__dict__[args.arch]() + + model = Net() + + if args.distributed: + # For multiprocessing distributed, DistributedDataParallel constructor + # should always set the single device scope, otherwise, + # DistributedDataParallel will use all available devices. + if args.gpu is not None: + torch.cuda.set_device(args.gpu) + model.cuda(args.gpu) + # When using a single GPU per process and per + # DistributedDataParallel, we need to divide the batch size + # ourselves based on the total number of GPUs we have + args.batch_size = int(args.batch_size / ngpus_per_node) + args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node) + model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) + else: + model.cuda() + # DistributedDataParallel will divide and allocate batch_size to all + # available GPUs if device_ids are not set + model = torch.nn.parallel.DistributedDataParallel(model) + elif args.gpu is not None: + torch.cuda.set_device(args.gpu) + model = model.cuda(args.gpu) + else: + # DataParallel will divide and allocate batch_size to all available GPUs + if args.arch.startswith('alexnet') or args.arch.startswith('vgg'): + model.features = torch.nn.DataParallel(model.features) + model.cuda() + else: + model = torch.nn.DataParallel(model).cuda() + + # define loss function (criterion) and optimizer + criterion = nn.CrossEntropyLoss().cuda(args.gpu) + + optimizer = torch.optim.SGD(model.parameters(), args.lr, + momentum=args.momentum, + weight_decay=args.weight_decay) + + # optionally resume from a checkpoint + if args.resume: + if os.path.isfile(args.resume): + print("=> loading checkpoint '{}'".format(args.resume)) + if args.gpu is None: + checkpoint = torch.load(args.resume) + else: + # Map model to be loaded to specified single gpu. + loc = 'cuda:{}'.format(args.gpu) + checkpoint = torch.load(args.resume, map_location=loc) + args.start_epoch = checkpoint['epoch'] + best_acc = checkpoint['best_acc'] + if args.gpu is not None: + # best_acc may be from a checkpoint from a different GPU + best_acc = best_acc.to(args.gpu) + model.load_state_dict(checkpoint['state_dict']) + optimizer.load_state_dict(checkpoint['optimizer']) + print("=> loaded checkpoint '{}' (epoch {})" + .format(args.resume, checkpoint['epoch'])) + else: + print("=> no checkpoint found at '{}'".format(args.resume)) + + cudnn.benchmark = True + + # Data loading code + train_dataset = datasets.MNIST('data', train=True, download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])) + + if args.distributed: + train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) + else: + train_sampler = None + + train_loader = torch.utils.data.DataLoader( + train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None), + num_workers=args.workers, pin_memory=True, sampler=train_sampler) + + val_loader = torch.utils.data.DataLoader( + datasets.MNIST('data', train=False, transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])), + batch_size=args.batch_size, shuffle=False, + num_workers=args.workers, pin_memory=True) + + if args.evaluate: + validate(val_loader, model, criterion, args) + return + + for epoch in range(args.start_epoch, args.epochs): + if args.distributed: + train_sampler.set_epoch(epoch) + adjust_learning_rate(optimizer, epoch, args) + + # train for one epoch + train(train_loader, model, optimizer, epoch, args) + + # evaluate on validation set + acc = validate(val_loader, model, criterion, args) + + # remember best acc@1 and save checkpoint + is_best = acc > best_acc + best_acc = max(acc, best_acc) + + if not args.multiprocessing_distributed or (args.multiprocessing_distributed + and args.rank % ngpus_per_node == 0): + save_checkpoint({ + 'epoch': epoch + 1, + 'arch': args.arch, + 'state_dict': model.state_dict(), + 'best_acc1': best_acc, + 'optimizer' : optimizer.state_dict(), + }, is_best) + + +def train(train_loader, model, optimizer, epoch, args): + batch_time = AverageMeter('Time', ':6.3f') + data_time = AverageMeter('Data', ':6.3f') + losses = AverageMeter('Loss', ':.4e') + progress = ProgressMeter( + len(train_loader), + [batch_time, data_time, losses], + prefix="Epoch: [{}]".format(epoch)) + + # switch to train mode + model.train() + + end = time.time() + for i, (images, target) in enumerate(train_loader): + # measure data loading time + data_time.update(time.time() - end) + + if args.gpu is not None: + images = images.cuda(args.gpu, non_blocking=True) + target = target.cuda(args.gpu, non_blocking=True) + + # compute output + output = model(images) + loss = F.nll_loss(output, target) + + # record loss + losses.update(loss.item(), images.size(0)) + + # compute gradient and do SGD step + optimizer.zero_grad() + loss.backward() + optimizer.step() + + # measure elapsed time + batch_time.update(time.time() - end) + end = time.time() + + if i % args.print_freq == 0: + progress.display(i) + + +def validate(val_loader, model, criterion, args): + batch_time = AverageMeter('Time', ':6.3f') + losses = AverageMeter('Loss', ':.4e') + progress = ProgressMeter( + len(val_loader), + [batch_time, losses], + prefix='Test: ') + + # switch to evaluate mode + model.eval() + correct = 0 + + with torch.no_grad(): + end = time.time() + for i, (images, target) in enumerate(val_loader): + if args.gpu is not None: + images = images.cuda(args.gpu, non_blocking=True) + target = target.cuda(args.gpu, non_blocking=True) + + # compute output + output = model(images) + loss = F.nll_loss(output, target, size_average=False) + # measure accuracy and record loss + losses.update(loss.item(), images.size(0)) + pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability + correct += pred.eq(target.data.view_as(pred)).cpu().sum() + + # measure elapsed time + batch_time.update(time.time() - end) + end = time.time() + + if i % args.print_freq == 0: + progress.display(i) + + acc = 100. * correct / len(val_loader.dataset) + # TODO: this should also be done with the ProgressMeter + print('\nRank {},Test set: Average loss: {:.4f}, Accuracy:({:.0f}%)\n'.format( + args.rank, losses.avg, acc)) + + return acc + + +def save_checkpoint(state, is_best, filename='./pth/checkpoint.pth.tar'): + torch.save(state, filename) + if is_best: + shutil.copyfile(filename, './pth/model_best.pth.tar') + + +class AverageMeter(object): + """Computes and stores the average and current value""" + def __init__(self, name, fmt=':f'): + self.name = name + self.fmt = fmt + self.reset() + + def reset(self): + self.val = 0 + self.avg = 0 + self.sum = 0 + self.count = 0 + + def update(self, val, n=1): + self.val = val + self.sum += val * n + self.count += n + self.avg = self.sum / self.count + + def __str__(self): + fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})' + return fmtstr.format(**self.__dict__) + + +class ProgressMeter(object): + def __init__(self, num_batches, meters, prefix=""): + self.batch_fmtstr = self._get_batch_fmtstr(num_batches) + self.meters = meters + self.prefix = prefix + + def display(self, batch): + entries = [self.prefix + self.batch_fmtstr.format(batch)] + entries += [str(meter) for meter in self.meters] + print('\t'.join(entries)) + + def _get_batch_fmtstr(self, num_batches): + num_digits = len(str(num_batches // 1)) + fmt = '{:' + str(num_digits) + 'd}' + return '[' + fmt + '/' + fmt.format(num_batches) + ']' + + +def adjust_learning_rate(optimizer, epoch, args): + """Sets the learning rate to the initial LR decayed by 10 every 30 epochs""" + lr = args.lr * (0.1 ** (epoch // 30)) + for param_group in optimizer.param_groups: + param_group['lr'] = lr + + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/ptp.py b/ptp.py new file mode 100644 index 0000000..fd44346 --- /dev/null +++ b/ptp.py @@ -0,0 +1,47 @@ +#!/usr/bin/env pybon + +import os +import torch +import torch.distributed as dist +from torch.multiprocessing import Process + + +def gather(tensor, rank, tensor_list=None, root=0, group=None): + """ + Sends tensor to root process, which store it in tensor_list. + """ + if group is None: + group = dist.group.WORLD + if rank == root: + assert(tensor_list is not None) + dist.gather_recv(tensor_list, tensor, group) + else: + dist.gather_send(tensor, root, group) + +def run(rank, size): + """ Simple point-to-point communication. """ + print(dist.get_world_size()) + tensor = torch.ones(1) + tensor_list = [torch.zeros(1) for _ in range(size)] + dist.gather(tensor, dst=0, gather_list=tensor_list, group=0) + + print('Rank ', rank, ' has data ', sum(tensor_list)[0]) + +def init_processes(rank, size, fn, backend='tcp'): + """ Initialize the distributed environment. """ + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = '29500' + dist.init_process_group(backend, rank=rank, world_size=size) + fn(rank, size) + + +if __name__ == "__main__": + size = 2 + processes = [] + for rank in range(size): + p = Process(target=init_processes, args=(rank, size, run)) + p.start() + processes.append(p) + + for p in processes: + p.join() diff --git a/test1.py b/test1.py new file mode 100644 index 0000000..ca476b2 --- /dev/null +++ b/test1.py @@ -0,0 +1,2 @@ +for i in range(1): + print(i) diff --git a/train_dist.py b/train_dist.py new file mode 100644 index 0000000..651986b --- /dev/null +++ b/train_dist.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python + +import os +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim + +from math import ceil +from random import Random +from torch.multiprocessing import Process +from torch.autograd import Variable +from torchvision import datasets, transforms + + +class Partition(object): + """ Dataset-like object, but only access a subset of it. """ + + def __init__(self, data, index): + self.data = data + self.index = index + + def __len__(self): + return len(self.index) + + def __getitem__(self, index): + data_idx = self.index[index] + return self.data[data_idx] + + +class DataPartitioner(object): + """ Partitions a dataset into different chuncks. """ + + def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234): + self.data = data + self.partitions = [] + rng = Random() + rng.seed(seed) + data_len = len(data) + indexes = [x for x in range(0, data_len)] + rng.shuffle(indexes) + + for frac in sizes: + part_len = int(frac * data_len) + self.partitions.append(indexes[0:part_len]) + indexes = indexes[part_len:] + + def use(self, partition): + return Partition(self.data, self.partitions[partition]) + + +class Net(nn.Module): + """ Network architecture. """ + + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 10, kernel_size=5) + self.conv2 = nn.Conv2d(10, 20, kernel_size=5) + self.conv2_drop = nn.Dropout2d() + self.fc1 = nn.Linear(320, 50) + self.fc2 = nn.Linear(50, 10) + + def forward(self, x): + x = F.relu(F.max_pool2d(self.conv1(x), 2)) + x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2)) + x = x.view(-1, 320) + x = F.relu(self.fc1(x)) + x = F.dropout(x, training=self.training) + x = self.fc2(x) + return F.log_softmax(x) + + +def partition_dataset(): + """ Partitioning MNIST """ + dataset = datasets.MNIST( + './data', + train=True, + download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307, ), (0.3081, )) + ])) + size = dist.get_world_size() + bsz = 128 / float(size) + partition_sizes = [1.0 / size for _ in range(size)] + partition = DataPartitioner(dataset, partition_sizes) + partition = partition.use(dist.get_rank()) + train_set = torch.utils.data.DataLoader( + partition, batch_size=bsz, shuffle=True) + return train_set, bsz + + +def average_gradients(model): + """ Gradient averaging. """ + size = float(dist.get_world_size()) + for param in model.parameters(): + dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM, group=0) + param.grad.data /= size + + +def run(rank, size): + """ Distributed Synchronous SGD Example """ + torch.manual_seed(1234) + train_set, bsz = partition_dataset() + model = Net() + model = model +# model = model.cuda(rank) + optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5) + + num_batches = ceil(len(train_set.dataset) / float(bsz)) + for epoch in range(10): + epoch_loss = 0.0 + for data, target in train_set: + data, target = Variable(data), Variable(target) +# data, target = Variable(data.cuda(rank)), Variable(target.cuda(rank)) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + epoch_loss += loss.data[0] + loss.backward() + average_gradients(model) + optimizer.step() + print('Rank ', + dist.get_rank(), ', epoch ', epoch, ': ', + epoch_loss / num_batches) + + +def init_processes(rank, size, fn, backend='gloo'): + """ Initialize the distributed environment. """ + os.environ['MASTER_ADDR'] = '127.0.0.1' + os.environ['MASTER_PORT'] = '29500' + dist.init_process_group(backend, rank=rank, world_size=size) + fn(rank, size) + + +if __name__ == "__main__": + size = 2 + processes = [] + for rank in range(size): + p = Process(target=init_processes, args=(rank, size, run)) + p.start() + processes.append(p) + + for p in processes: + p.join()