Skip to content

Commit

Permalink
revert backend types (#3788)
Browse files Browse the repository at this point in the history
* revert backend types

* todo

* todo
  • Loading branch information
Borda authored Oct 2, 2020
1 parent ab7d9bd commit 62eabdd
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 29 deletions.
30 changes: 14 additions & 16 deletions pytorch_lightning/accelerators/accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import os
import torch

from pytorch_lightning.accelerators.base_backend import BackendType
from pytorch_lightning.utilities import device_parser
from pytorch_lightning.utilities import rank_zero_only
from pytorch_lightning.utilities.distributed import rank_zero_warn, rank_zero_info
Expand Down Expand Up @@ -51,7 +50,7 @@ def on_trainer_init(
os.environ["HOROVOD_FUSION_THRESHOLD"] = str(0)

# distributed backend choice
self.trainer.distributed_backend = BackendType(distributed_backend.lower()) if distributed_backend else None
self.trainer.distributed_backend = distributed_backend.lower() if distributed_backend else None

# init the default rank if exists
# we need to call this here or NVIDIA flags and other messaging in init will show on all ranks
Expand All @@ -75,7 +74,7 @@ def on_trainer_init(

self.trainer.tpu_id = self.trainer.tpu_cores[0] if isinstance(self.trainer.tpu_cores, list) else None

if num_processes != 1 and distributed_backend != BackendType.DDP_CPU:
if num_processes != 1 and distributed_backend != "ddp_cpu":
rank_zero_warn("num_processes is only used for distributed_backend=\"ddp_cpu\". Ignoring it.")
self.trainer.num_processes = num_processes

Expand Down Expand Up @@ -103,7 +102,7 @@ def on_trainer_init(

# override dist backend when using tpus
if self.trainer.on_tpu:
self.trainer.distributed_backend = BackendType.TPU
self.trainer.distributed_backend = "tpu"
self.trainer.use_tpu = True

# init flags for SLURM+DDP to work
Expand Down Expand Up @@ -135,8 +134,8 @@ def select_accelerator(self):
te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ)
use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed

use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == BackendType.DDP_SPAWN
use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == BackendType.DDP_CPU
use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_spawn"
use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_cpu"

# choose the appropriate accelerator backend
if self.trainer.use_ddp2:
Expand All @@ -154,7 +153,7 @@ def select_accelerator(self):
elif use_ddp_cpu_spawn:
accelerator_backend = accelerators.DDPCPUSpawnBackend(self.trainer, nprocs=self.trainer.num_processes)

elif self.trainer.distributed_backend == BackendType.DDP:
elif self.trainer.distributed_backend == "ddp":
accelerator_backend = accelerators.DDPBackend(self.trainer, mode='ddp')

elif self.trainer.use_dp:
Expand Down Expand Up @@ -196,21 +195,20 @@ def set_distributed_mode(self):
elif self.trainer.num_gpus > 1:
rank_zero_warn(
'You requested multiple GPUs but did not specify a backend, e.g.'
f' Trainer(distributed_backend={str(BackendType.DP)}|'
f'{str(BackendType.DDP)}|{str(BackendType.DDP2)}).'
f' Setting distributed_backend={str(BackendType.DDP_SPAWN)} for you.'
' Trainer(distributed_backend="dp"|"ddp"|"ddp2").'
' Setting distributed_backend="ddp_spawn" for you.'
)
self.trainer.distributed_backend = BackendType.DDP_SPAWN
self.trainer.distributed_backend = "ddp_spawn"

if self.trainer.distributed_backend == BackendType.DP:
if self.trainer.distributed_backend == "dp":
# do nothing if num_gpus == 0
if self.trainer.num_gpus == 1:
self.trainer.use_single_gpu = True
self.trainer.use_dp = True
elif self.trainer.num_gpus > 1:
self.trainer.use_dp = True

elif self.trainer.distributed_backend in (BackendType.DDP, BackendType.DDP_SPAWN):
elif self.trainer.distributed_backend in ("ddp", "ddp_spawn"):
if self.trainer.num_gpus == 0:
if self.trainer.num_nodes > 1 or self.trainer.num_processes > 1:
self.trainer.use_ddp = True # ddp_cpu
Expand All @@ -221,19 +219,19 @@ def set_distributed_mode(self):
self.trainer.use_ddp = True
self.trainer.num_processes = self.trainer.num_gpus

elif self.trainer.distributed_backend == BackendType.DDP2:
elif self.trainer.distributed_backend == "ddp2":
# do nothing if num_gpus == 0
if self.trainer.num_gpus >= 1:
self.trainer.use_ddp2 = True
elif self.trainer.distributed_backend == BackendType.DDP_CPU:
elif self.trainer.distributed_backend == "ddp_cpu":
if self.trainer.num_gpus > 0:
rank_zero_warn(
'You requested one or more GPUs, but set the backend to `ddp_cpu`. Training will not use GPUs.'
)
self.trainer.use_ddp = True
self.trainer.data_parallel_device_ids = None
self.trainer.on_gpu = False
elif self.trainer.distributed_backend == BackendType.HOROVOD:
elif self.trainer.distributed_backend == "horovod":
self._set_horovod_backend()

# throw error to force user ddp or ddp2 choice
Expand Down
5 changes: 4 additions & 1 deletion pytorch_lightning/accelerators/base_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,14 @@ def setup_optimizers(self, model):
self.trainer.optimizer_frequencies = optimizer_frequencies


# TODO: allow user to compare with string even internaly we shall use these Enum to prevent typos...
class BackendType(Enum):
DP = 'dp'
DDP = 'ddp'
DDP2 = 'ddp2'
DDP_SPAWN = 'ddp_spawn'
# decuple distrib and device
DDP_CPU = 'ddp_cpu'
TPU = 'tpu'
HOROVOD = 'horovod'
# this is rather device
TPU = 'tpu'
4 changes: 2 additions & 2 deletions pytorch_lightning/accelerators/ddp_base_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import torch.distributed as dist

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.base_backend import Accelerator, BackendType
from pytorch_lightning.accelerators.base_backend import Accelerator
from pytorch_lightning.utilities import AMPType
from pytorch_lightning.utilities.cloud_io import atomic_save
from pytorch_lightning.utilities.distributed import rank_zero_only, rank_zero_warn
Expand Down Expand Up @@ -69,7 +69,7 @@ def early_stopping_should_stop(self, pl_module):
return should_stop

def transfer_distrib_spawn_state_on_fit_end(self, model, mp_queue, results):
if self.trainer.distributed_backend not in (BackendType.DDP_SPAWN, BackendType.DDP_CPU, BackendType.TPU):
if self.trainer.distributed_backend not in ("ddp_spawn", "ddp_cpu", "tpu"):
return

# track the best model path
Expand Down
4 changes: 2 additions & 2 deletions pytorch_lightning/accelerators/tpu_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import torch.multiprocessing as mp

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.base_backend import Accelerator, BackendType
from pytorch_lightning.accelerators.base_backend import Accelerator
from pytorch_lightning.core import LightningModule
from pytorch_lightning.utilities import AMPType, rank_zero_info, rank_zero_only, rank_zero_warn
from pytorch_lightning.utilities.cloud_io import atomic_save
Expand Down Expand Up @@ -296,7 +296,7 @@ def load_spawn_weights(self, original_model):
return loaded_model

def transfer_distrib_spawn_state_on_fit_end(self, model, mp_queue, results):
if self.trainer.distributed_backend not in (BackendType.DDP_SPAWN, BackendType.DDP_CPU, BackendType.TPU):
if self.trainer.distributed_backend not in ("ddp_spawn", "ddp_cpu", "tpu"):
return

# track the best model path
Expand Down
11 changes: 5 additions & 6 deletions pytorch_lightning/trainer/data_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from torch.utils.data.distributed import DistributedSampler

from pytorch_lightning.accelerators.base_backend import Accelerator
from pytorch_lightning.accelerators.base_backend import BackendType
from pytorch_lightning.core import LightningModule
from pytorch_lightning.utilities import rank_zero_warn
from pytorch_lightning.utilities.data import has_iterable_dataset, has_len
Expand Down Expand Up @@ -86,7 +85,7 @@ def _worker_check(self, dataloader: DataLoader, name: str) -> None:

# ddp_spawn + num_workers > 0 don't mix! tell the user
is_dataloader = isinstance(dataloader, DataLoader)
using_spawn = self.distributed_backend == BackendType.DDP_SPAWN
using_spawn = self.distributed_backend == "ddp_spawn"
if is_dataloader and not on_windows:
if dataloader.num_workers > 0 and using_spawn:
rank_zero_warn('Dataloader(num_workers>0) and ddp_spawn do not mix well!'
Expand Down Expand Up @@ -149,10 +148,10 @@ def _get_distributed_sampler(self, dataloader, train):
kwargs = dict(num_replicas=hvd.size(), rank=hvd.rank())
else:
world_size = {
BackendType.DDP: self.num_nodes * self.num_processes,
BackendType.DDP_SPAWN: self.num_nodes * self.num_processes,
BackendType.DDP2: self.num_nodes,
BackendType.DDP_CPU: self.num_processes * self.num_nodes
"ddp": self.num_nodes * self.num_processes,
"ddp_spawn": self.num_nodes * self.num_processes,
"ddp2": self.num_nodes,
"ddp_cpu": self.num_processes * self.num_nodes
}
assert self.distributed_backend is not None
kwargs = dict(num_replicas=world_size[self.distributed_backend], rank=self.global_rank)
Expand Down
3 changes: 1 addition & 2 deletions tests/models/test_tpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import tests.base.develop_pipelines as tpipes
from pytorch_lightning import Trainer, seed_everything
from pytorch_lightning.accelerators.base_backend import BackendType
from pytorch_lightning.utilities.exceptions import MisconfigurationException
from tests.base import EvalModelTemplate
from tests.base.datasets import TrialMNIST
Expand Down Expand Up @@ -239,7 +238,7 @@ def test_exception_when_no_tpu_found(tmpdir):
@pytest.mark.parametrize('tpu_cores', [1, 8, [1]])
def test_distributed_backend_set_when_using_tpu(tmpdir, tpu_cores):
"""Test if distributed_backend is set to `tpu` when tpu_cores is not None"""
assert Trainer(tpu_cores=tpu_cores).distributed_backend == BackendType.TPU
assert Trainer(tpu_cores=tpu_cores).distributed_backend == "tpu"


@pytest.mark.skipif(not TPU_AVAILABLE, reason="test requires TPU machine")
Expand Down

0 comments on commit 62eabdd

Please sign in to comment.