From 1a37fcd6c56878453984678687ab1d0a7e5150a5 Mon Sep 17 00:00:00 2001 From: Shaden Smith Date: Wed, 20 Jan 2021 19:53:18 +0000 Subject: [PATCH] make test_pipe more stable --- tests/unit/test_pipe.py | 62 ++++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/tests/unit/test_pipe.py b/tests/unit/test_pipe.py index 6902ff827224..30d4314a8441 100755 --- a/tests/unit/test_pipe.py +++ b/tests/unit/test_pipe.py @@ -1,4 +1,5 @@ import os +import copy import torch import torch.nn as nn @@ -13,8 +14,7 @@ from deepspeed.runtime.pipe.topology import PipeDataParallelTopology, PipeModelDataParallelTopology PipeTopo = PipeDataParallelTopology -import deepspeed.runtime.pipe.module as PipelineModule -from deepspeed.runtime.pipe.module import LayerSpec +from deepspeed.runtime.pipe.module import PipelineModule, LayerSpec from common import distributed_test @@ -74,7 +74,13 @@ def forward(self, x, y): return self.loss_fn(x, y) -class AlexNetPipe(PipelineModule.PipelineModule): +class AlexNetPipe(AlexNet): + def to_layers(self): + layers = [*self.features, lambda x: x.view(x.size(0), -1), self.classifier] + return layers + + +class AlexNetPipeSpec(PipelineModule): def __init__(self, num_classes=10, **kwargs): self.num_classes = num_classes specs = [ @@ -135,6 +141,9 @@ def train_cifar(model, args, num_steps=400, average_dp_losses=True, fp16=True, s with torch.random.fork_rng(devices=[torch.cuda.current_device()]): ds_utils.set_random_seed(seed) + # disable dropout + model.eval() + trainset = cifar_trainset(fp16=fp16) args.local_rank = dist.get_rank() @@ -148,7 +157,7 @@ def train_cifar(model, args, num_steps=400, average_dp_losses=True, fp16=True, s for step in range(num_steps): loss = engine.train_batch() losses.append(loss.item()) - if step % 50 == 0: + if step % 50 == 0 and dist.get_rank() == 0: print(f'STEP={step} LOSS={loss.item()}') if average_dp_losses: @@ -160,18 +169,16 @@ def train_cifar(model, args, num_steps=400, average_dp_losses=True, fp16=True, s return losses -@pytest.mark.parametrize('base_topo,test_topo', +@pytest.mark.parametrize('topo', [ - (PipeTopo(num_pp=1, - num_dp=4), - PipeTopo(num_pp=2, - num_dp=2)), - (PipeTopo(num_pp=1, - num_dp=4), - PipeTopo(num_pp=4, - num_dp=1)), + PipeTopo(num_pp=1, + num_dp=4), + PipeTopo(num_pp=2, + num_dp=2), + PipeTopo(num_pp=4, + num_dp=1), ]) -def test_pipe_cifar10_seedlayers(base_topo, test_topo, tmpdir): +def test_pipe_cifar10(topo, tmpdir): config_dict = { "train_batch_size": 16, "train_micro_batch_size_per_gpu": 4, @@ -199,21 +206,32 @@ def test_pipe_cifar10_seedlayers(base_topo, test_topo, tmpdir): } args = args_from_dict(tmpdir, config_dict) + # Allocate model for consistent initial weights. + init_net = AlexNetPipe() + @distributed_test(world_size=4) - def _helper(base_topo, test_topo, tmpdir, steps=500): + def _helper(topo, tmpdir, steps=500): assert steps >= 100 - base_model = AlexNetPipe(num_classes=10, - topology=base_topo, - seed_layers=config_dict['pipeline']['seed_layers']) + base_net = copy.deepcopy(init_net) + base_model = PipelineModule(layers=base_net.to_layers(), + num_stages=1, + loss_fn=nn.CrossEntropyLoss()) + + # Train with just data parallelism base_losses = train_cifar(base_model, args, num_steps=steps, fp16=config_dict['fp16']['enabled']) - test_model = AlexNetPipe(num_classes=10, - topology=test_topo, - seed_layers=config_dict['pipeline']['seed_layers']) + test_net = copy.deepcopy(init_net) + test_model = PipelineModule(layers=test_net.to_layers(), + topology=topo, + loss_fn=nn.CrossEntropyLoss()) + + #test_model = AlexNetPipe(num_classes=10, + # topology=test_topo, + # seed_layers=config_dict['pipeline']['seed_layers']) test_losses = train_cifar(test_model, args, num_steps=steps, @@ -246,4 +264,4 @@ def _helper(base_topo, test_topo, tmpdir, steps=500): test_avg = sum(test) / len(test) assert rel_diff(base_avg, test_avg) < 0.03 - _helper(base_topo, test_topo, tmpdir) + _helper(topo, tmpdir)