diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_backend.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_backend.py index 2f0c6d29c7ebd..e91428568dcb6 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_backend.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_backend.py @@ -68,19 +68,6 @@ def train(self): self.__recover_child_process_weights(model, best_path, last_path) return results - def __recover_child_process_weights(self, model, best_path, last_path): - # transfer back the best path to the trainer - if self.trainer.checkpoint_callback: - self.trainer.checkpoint_callback.best_model_path = best_path - # todo, pass also best score - - # load last weights - if last_path is not None and not self.trainer.testing: - ckpt = torch.load(last_path, map_location=lambda storage, loc: storage) - model.load_state_dict(ckpt) - - self.trainer.model = model - def ddp_train(self, process_idx, mp_queue, model): """ Entry point for ddp @@ -95,9 +82,7 @@ def ddp_train(self, process_idx, mp_queue, model): self.trainer.progress_bar_callback.disable() # determine which process we are and world size - self.trainer.local_rank = process_idx - self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx - self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes + self.set_world_ranks(process_idx) # set warning rank rank_zero_only.rank = self.trainer.global_rank @@ -116,7 +101,7 @@ def ddp_train(self, process_idx, mp_queue, model): self.trainer.call_setup_hook(model) # on world_size=0 let everyone know training is starting - if self.trainer.is_global_zero: + if self.trainer.is_global_zero and not torch.distributed.is_initialized(): log.info('-' * 100) log.info(f'distributed_backend={self.trainer.distributed_backend}') log.info(f'All DDP processes registered. Starting ddp with {self.trainer.world_size} processes') @@ -126,6 +111,9 @@ def ddp_train(self, process_idx, mp_queue, model): if self.trainer.sync_batchnorm: model = model.configure_sync_batchnorm(model) + # move the model to the correct device + self.model_to_device(model, process_idx) + # CHOOSE OPTIMIZER # allow for lr schedulers as well self.setup_optimizers(model) @@ -137,7 +125,7 @@ def ddp_train(self, process_idx, mp_queue, model): model = self.trainer.precision_connector.connect(model) # DDP spawn already spawned off each process... no need to do anything - device_ids = None + device_ids = self.get_device_ids() # allow user to configure ddp model = model.configure_ddp(model, device_ids) @@ -174,7 +162,8 @@ def test_step(self, args): return output def barrier(self, name: str = None): - torch_distrib.barrier() + if torch_distrib.is_initialized(): + torch_distrib.barrier() def broadcast(self, obj, src=0): return self.dist.broadcast(obj) @@ -186,6 +175,31 @@ def early_stopping_should_stop(self, pl_module): should_stop = stop == self.trainer.world_size return should_stop + def set_world_ranks(self, process_idx): + self.trainer.local_rank = process_idx + self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx + self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes + + def model_to_device(self, model, process_idx): + model.cpu() + + def get_device_ids(self): + device_ids = None + return device_ids + + def __recover_child_process_weights(self, model, best_path, last_path): + # transfer back the best path to the trainer + if self.trainer.checkpoint_callback: + self.trainer.checkpoint_callback.best_model_path = best_path + # todo, pass also best score + + # load last weights + if last_path is not None and not self.trainer.testing: + ckpt = torch.load(last_path, map_location=lambda storage, loc: storage) + model.load_state_dict(ckpt) + + self.trainer.model = model + def transfer_distrib_spawn_state_on_fit_end(self, model, mp_queue, results): # track the best model path best_model_path = None diff --git a/tests/backends/test_ddp.py b/tests/backends/test_ddp.py new file mode 100644 index 0000000000000..91f22c4d7c59d --- /dev/null +++ b/tests/backends/test_ddp.py @@ -0,0 +1,57 @@ +import pytest +import torch +import os +from tests.backends import ddp_model +from tests.utilities.dist import call_training_script + + +@pytest.mark.parametrize('cli_args', [ + pytest.param('--max_epochs 1 --gpus 2 --distributed_backend ddp'), +]) +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +def test_multi_gpu_model_ddp_fit_only(tmpdir, cli_args): + # call the script + std, err = call_training_script(ddp_model, cli_args, 'fit', tmpdir, timeout=120) + + # load the results of the script + result_path = os.path.join(tmpdir, 'ddp.result') + result = torch.load(result_path) + + # verify the file wrote the expected outputs + assert result['status'] == 'complete' + + +@pytest.mark.parametrize('cli_args', [ + pytest.param('--max_epochs 1 --gpus 2 --distributed_backend ddp'), +]) +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +def test_multi_gpu_model_ddp_test_only(tmpdir, cli_args): + # call the script + call_training_script(ddp_model, cli_args, 'test', tmpdir) + + # load the results of the script + result_path = os.path.join(tmpdir, 'ddp.result') + result = torch.load(result_path) + + # verify the file wrote the expected outputs + assert result['status'] == 'complete' + + +# @pytest.mark.parametrize('cli_args', [ +# pytest.param('--max_epochs 1 --gpus 2 --distributed_backend ddp'), +# ]) +# @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +# def test_multi_gpu_model_ddp_fit_test(tmpdir, cli_args): +# # call the script +# call_training_script(ddp_model, cli_args, 'fit_test', tmpdir, timeout=20) +# +# # load the results of the script +# result_path = os.path.join(tmpdir, 'ddp.result') +# result = torch.load(result_path) +# +# # verify the file wrote the expected outputs +# assert result['status'] == 'complete' +# +# model_outs = result['result'] +# for out in model_outs: +# assert out['test_acc'] > 0.90 diff --git a/tests/backends/test_ddp_spawn.py b/tests/backends/test_ddp_spawn.py new file mode 100644 index 0000000000000..0c5db6b1a0b8b --- /dev/null +++ b/tests/backends/test_ddp_spawn.py @@ -0,0 +1,71 @@ +import pytest +import torch + +import tests.base.develop_pipelines as tpipes +import tests.base.develop_utils as tutils +from tests.base import EvalModelTemplate +from pytorch_lightning.core import memory +from pytorch_lightning.trainer import Trainer + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +def test_multi_gpu_early_stop_ddp_spawn(tmpdir): + """Make sure DDP works. with early stopping""" + tutils.set_random_master_port() + + trainer_options = dict( + default_root_dir=tmpdir, + early_stop_callback=True, + max_epochs=50, + limit_train_batches=10, + limit_val_batches=10, + gpus=[0, 1], + distributed_backend='ddp_spawn', + ) + + model = EvalModelTemplate() + tpipes.run_model_test(trainer_options, model) + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +def test_multi_gpu_model_ddp_spawn(tmpdir): + tutils.set_random_master_port() + + trainer_options = dict( + default_root_dir=tmpdir, + max_epochs=1, + limit_train_batches=10, + limit_val_batches=10, + gpus=[0, 1], + distributed_backend='ddp_spawn', + progress_bar_refresh_rate=0 + ) + + model = EvalModelTemplate() + + tpipes.run_model_test(trainer_options, model) + + # test memory helper functions + memory.get_memory_profile('min_max') + + +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +def test_ddp_all_dataloaders_passed_to_fit(tmpdir): + """Make sure DDP works with dataloaders passed to fit()""" + tutils.set_random_master_port() + + model = EvalModelTemplate() + fit_options = dict(train_dataloader=model.train_dataloader(), + val_dataloaders=model.val_dataloader()) + + trainer = Trainer( + default_root_dir=tmpdir, + progress_bar_refresh_rate=0, + max_epochs=1, + limit_train_batches=0.2, + limit_val_batches=0.2, + gpus=[0, 1], + distributed_backend='ddp_spawn' + ) + result = trainer.fit(model, **fit_options) + assert result == 1, "DDP doesn't work with dataloaders passed to fit()." diff --git a/tests/models/data/ddp/train_test_variations.py b/tests/models/data/ddp/train_test_variations.py deleted file mode 100644 index f37bd27e8a005..0000000000000 --- a/tests/models/data/ddp/train_test_variations.py +++ /dev/null @@ -1,44 +0,0 @@ -""" -Runs either `.fit()` or `.test()` on a single node across multiple gpus. -""" -from argparse import ArgumentParser - -from pytorch_lightning import Trainer, seed_everything -from tests.base import EvalModelTemplate - - -def variation_fit(trainer, model): - trainer.fit(model) - - -def variation_test(trainer, model): - trainer.test(model) - - -def get_variations(): - variations = [ - "variation_fit", - "variation_test", - ] - return variations - - -def main(): - seed_everything(1234) - parser = ArgumentParser(add_help=False) - parser = Trainer.add_argparse_args(parser) - parser.add_argument('--variation', default=variation_fit.__name__) - parser.set_defaults(gpus=2) - parser.set_defaults(distributed_backend="ddp") - args = parser.parse_args() - - model = EvalModelTemplate() - trainer = Trainer.from_argparse_args(args) - - # run the chosen variation - run_variation = globals()[args.variation] - run_variation(trainer, model) - - -if __name__ == '__main__': - main() diff --git a/tests/models/test_gpu.py b/tests/models/test_gpu.py index 83416444bbe4d..8c2be4cabc594 100644 --- a/tests/models/test_gpu.py +++ b/tests/models/test_gpu.py @@ -1,15 +1,10 @@ -import os -import subprocess -import sys from collections import namedtuple -from pathlib import Path from unittest.mock import patch import pytest import torch from torchtext.data import Batch, Dataset, Example, Field, LabelField -import pytorch_lightning import tests.base.develop_pipelines as tpipes import tests.base.develop_utils as tutils from pytorch_lightning import Trainer @@ -17,9 +12,7 @@ from pytorch_lightning.utilities import device_parser from pytorch_lightning.utilities.exceptions import MisconfigurationException from tests.base import EvalModelTemplate -from tests.models.data.ddp import train_test_variations from pytorch_lightning.accelerators.gpu_backend import GPUBackend -from pytorch_lightning.accelerators.cpu_backend import CPUBackend PRETEND_N_OF_GPUS = 16 @@ -62,25 +55,6 @@ def test_multi_gpu_none_backend(tmpdir): tpipes.run_model_test(trainer_options, model) -@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") -def test_multi_gpu_early_stop_ddp_spawn(tmpdir): - """Make sure DDP works. with early stopping""" - tutils.set_random_master_port() - - trainer_options = dict( - default_root_dir=tmpdir, - early_stop_callback=True, - max_epochs=50, - limit_train_batches=10, - limit_val_batches=10, - gpus=[0, 1], - distributed_backend='ddp_spawn', - ) - - model = EvalModelTemplate() - tpipes.run_model_test(trainer_options, model) - - @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") def test_multi_gpu_model_dp(tmpdir): tutils.set_random_master_port() @@ -103,56 +77,6 @@ def test_multi_gpu_model_dp(tmpdir): memory.get_memory_profile('min_max') -@pytest.mark.parametrize('cli_args', [ - pytest.param('--max_epochs 1 --gpus 2 --distributed_backend ddp'), -]) -@pytest.mark.parametrize('variation', train_test_variations.get_variations()) -@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") -def test_multi_gpu_model_ddp(tmpdir, cli_args, variation): - """ Runs a basic training and test run with distributed_backend=ddp. """ - file = Path(train_test_variations.__file__).absolute() - cli_args = cli_args.split(' ') if cli_args else [] - cli_args += ['--default_root_dir', str(tmpdir)] - cli_args += ['--variation', variation] - command = [sys.executable, str(file)] + cli_args - - # need to set the PYTHONPATH in case pytorch_lightning was not installed into the environment - env = os.environ.copy() - env['PYTHONPATH'] = f'{pytorch_lightning.__file__}:' + env.get('PYTHONPATH', '') - - # for running in ddp mode, we need to lauch it's own process or pytest will get stuck - p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) - - std, err = p.communicate(timeout=60) - std = std.decode('utf-8').strip() - err = err.decode('utf-8').strip() - assert std, f"{variation} produced no output" - if p.returncode > 0: - pytest.fail(err) - - -@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") -def test_multi_gpu_model_ddp_spawn(tmpdir): - tutils.set_random_master_port() - - trainer_options = dict( - default_root_dir=tmpdir, - max_epochs=1, - limit_train_batches=10, - limit_val_batches=10, - gpus=[0, 1], - distributed_backend='ddp_spawn', - progress_bar_refresh_rate=0 - ) - - model = EvalModelTemplate() - - tpipes.run_model_test(trainer_options, model) - - # test memory helper functions - memory.get_memory_profile('min_max') - - @pytest.mark.skipif(not torch.cuda.is_available(), reason="test requires GPU machine") @pytest.mark.parametrize('gpus', [1, [0], [1]]) def test_single_gpu_model(tmpdir, gpus): @@ -170,28 +94,6 @@ def test_single_gpu_model(tmpdir, gpus): tpipes.run_model_test(trainer_options, model) -@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") -def test_ddp_all_dataloaders_passed_to_fit(tmpdir): - """Make sure DDP works with dataloaders passed to fit()""" - tutils.set_random_master_port() - - model = EvalModelTemplate() - fit_options = dict(train_dataloader=model.train_dataloader(), - val_dataloaders=model.val_dataloader()) - - trainer = Trainer( - default_root_dir=tmpdir, - progress_bar_refresh_rate=0, - max_epochs=1, - limit_train_batches=0.2, - limit_val_batches=0.2, - gpus=[0, 1], - distributed_backend='ddp_spawn' - ) - result = trainer.fit(model, **fit_options) - assert result == 1, "DDP doesn't work with dataloaders passed to fit()." - - @pytest.fixture def mocked_device_count(monkeypatch): def device_count():