diff --git a/docs/source/accelerators.rst b/docs/source/accelerators.rst new file mode 100644 index 0000000000000..ee801f2dee28b --- /dev/null +++ b/docs/source/accelerators.rst @@ -0,0 +1,182 @@ +############ +Accelerators +############ +Accelerators connect a Lightning Trainer to arbitrary accelerators (CPUs, GPUs, TPUs, etc). Accelerators +also manage distributed accelerators (like DP, DDP, HPC cluster). + +Accelerators can also be configured to run on arbitrary clusters using Plugins or to link up to arbitrary +computational strategies like 16-bit precision via AMP and Apex. + +---------- + +****************************** +Implement a custom accelerator +****************************** +To link up arbitrary hardware, implement your own Accelerator subclass + +.. code-block:: python + + from pytorch_lightning.accelerators.accelerator import Accelerator + + class MyAccelerator(Accelerator): + def __init__(self, trainer, cluster_environment=None): + super().__init__(trainer, cluster_environment) + self.nickname = 'my_accelator' + + def setup(self): + # find local rank, etc, custom things to implement + + def train(self): + # implement what happens during training + + def training_step(self): + # implement how to do a training_step on this accelerator + + def validation_step(self): + # implement how to do a validation_step on this accelerator + + def test_step(self): + # implement how to do a test_step on this accelerator + + def backward(self, closure_loss, optimizer, opt_idx, *args, **kwargs): + # implement how to do a backward pass with this accelerator + + def barrier(self, name: Optional[str] = None): + # implement this accelerator's barrier + + def broadcast(self, obj, src=0): + # implement this accelerator's broadcast function + + def sync_tensor(self, + tensor: Union[torch.Tensor], + group: Optional[Any] = None, + reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: + # implement how to sync tensors when reducing metrics across accelerators + +******** +Examples +******** +The following examples illustrate customizing accelerators. + +Example 1: Arbitrary HPC cluster +================================ +To link any accelerator with an arbitrary cluster (SLURM, Condor, etc), pass in a Cluster Plugin which will be passed +into any accelerator. + +First, implement your own ClusterEnvironment. Here is the torch elastic implementation. + +.. code-block:: python + + import os + from pytorch_lightning import _logger as log + from pytorch_lightning.utilities import rank_zero_warn + from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment + + class TorchElasticEnvironment(ClusterEnvironment): + + def __init__(self): + super().__init__() + + def master_address(self): + if "MASTER_ADDR" not in os.environ: + rank_zero_warn( + "MASTER_ADDR environment variable is not defined. Set as localhost" + ) + os.environ["MASTER_ADDR"] = "127.0.0.1" + log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") + master_address = os.environ.get('MASTER_ADDR') + return master_address + + def master_port(self): + if "MASTER_PORT" not in os.environ: + rank_zero_warn( + "MASTER_PORT environment variable is not defined. Set as 12910" + ) + os.environ["MASTER_PORT"] = "12910" + log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") + + port = os.environ.get('MASTER_PORT') + return port + + def world_size(self): + return os.environ.get('WORLD_SIZE') + + def local_rank(self): + return int(os.environ['LOCAL_RANK']) + +Now, pass it into the trainer which will use Torch Elastic across your accelerator of choice. + +.. code-block:: python + + cluster = TorchElasticEnvironment() + accelerator = MyAccelerator() + trainer = Trainer(plugins=[cluster], accelerator=MyAccelerator()) + +In this example, MyAccelerator can define arbitrary hardware (like IPUs or TPUs) and links it to an arbitrary +compute cluster. + +------------ + +********************** +Available Accelerators +********************** + +CPU Accelerator +=============== + +.. autoclass:: pytorch_lightning.accelerators.cpu_accelerator.CPUAccelerator + :noindex: + +DDP Accelerator +=============== + +.. autoclass:: pytorch_lightning.accelerators.ddp_accelerator.DDPAccelerator + :noindex: + +DDP2 Accelerator +================ + +.. autoclass:: pytorch_lightning.accelerators.ddp2_accelerator.DDP2Accelerator + :noindex: + +DDP CPU HPC Accelerator +======================= + +.. autoclass:: pytorch_lightning.accelerators.ddp_cpu_hpc_accelerator.DDPCPUHPCAccelerator + :noindex: + +DDP CPU Spawn Accelerator +========================= + +.. autoclass:: pytorch_lightning.accelerators.ddp_cpu_spawn_accelerator.DDPCPUSpawnAccelerator + :noindex: + +DDP HPC Accelerator +=================== + +.. autoclass:: pytorch_lightning.accelerators.ddp_hpc_accelerator.DDPHPCAccelerator + :noindex: + +DDP Spawn Accelerator +===================== + +.. autoclass:: pytorch_lightning.accelerators.ddp_spawn_accelerator.DDPSpawnAccelerator + :noindex: + +GPU Accelerator +=============== + +.. autoclass:: pytorch_lightning.accelerators.gpu_accelerator.GPUAccelerator + :noindex: + +Horovod Accelerator +=================== + +.. autoclass:: pytorch_lightning.accelerators.horovod_accelerator.HorovodAccelerator + :noindex: + +TPU Accelerator +=============== + +.. autoclass:: pytorch_lightning.accelerators.tpu_accelerator.TPUAccelerator + :noindex: diff --git a/docs/source/index.rst b/docs/source/index.rst index 6ea7709a8e72f..8f0642c6ad771 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -39,6 +39,7 @@ PyTorch Lightning Documentation :name: docs :caption: Optional extensions + accelerators callbacks datamodules logging diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index 408c979d1ff2e..dc0b0bf63a98d 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -221,11 +221,13 @@ def sync_tensor(self, reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: """ Function to reduce a tensor from several distributed processes to one aggregated tensor. + Args: tensor: the tensor to sync and reduce group: the process group to gather results from. Defaults to all processes (world) reduce_op: the reduction operation. Defaults to sum. Can also be a string of 'avg', 'mean' to calculate the mean during reduction. + Return: reduced value """ diff --git a/pytorch_lightning/accelerators/cpu_accelerator.py b/pytorch_lightning/accelerators/cpu_accelerator.py index 3ce6f7315feb8..083b5193ff8f3 100644 --- a/pytorch_lightning/accelerators/cpu_accelerator.py +++ b/pytorch_lightning/accelerators/cpu_accelerator.py @@ -21,6 +21,15 @@ class CPUAccelerator(Accelerator): def __init__(self, trainer, cluster_environment=None): + """ + Runs training on CPU + + Example:: + + # default + trainer = Trainer(accelerator=CPUAccelerator()) + + """ super().__init__(trainer, cluster_environment) self.nickname = None diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 4341a3b30e2fb..2da9747a9be92 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -38,6 +38,15 @@ class DDP2Accelerator(Accelerator): def __init__(self, trainer, cluster_environment=None, ddp_plugin=None): + """ + Runs training using DDP2 strategy on a cluster + + Example:: + + # default + trainer = Trainer(accelerator=DDP2Accelerator()) + + """ super().__init__(trainer, cluster_environment, ddp_plugin) self.task_idx = None self.dist = LightningDistributed() diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index b127fdd40c934..f99cd1149e5ae 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -47,6 +47,15 @@ class DDPAccelerator(Accelerator): def __init__(self, trainer, cluster_environment=None, ddp_plugin=None): + """ + Runs training using DDP strategy on a single machine (manually, not via cluster start) + + Example:: + + # default + trainer = Trainer(accelerator=DDPAccelerator()) + + """ super().__init__(trainer, cluster_environment, ddp_plugin) self.task_idx = None self._has_spawned_children = False @@ -304,4 +313,7 @@ def sync_tensor(self, tensor: Union[torch.Tensor], group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = None) -> torch.Tensor: + """ + + """ return sync_ddp_if_available(tensor, group, reduce_op) diff --git a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py index fab87750eb4ed..7b43dc9f6b68a 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py @@ -26,6 +26,15 @@ class DDPCPUHPCAccelerator(DDPHPCAccelerator): def __init__(self, trainer, cluster_environment=None, ddp_plugin=None): + """ + Runs training using DDP (with CPUs) strategy on a cluster + + Example:: + + # default + trainer = Trainer(accelerator=DDPCPUHPCAccelerator()) + + """ super().__init__(trainer, cluster_environment, ddp_plugin) self.nickname = 'ddp_cpu' diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index 64e326b7ee0fc..221ed5769c35e 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -40,6 +40,15 @@ class DDPCPUSpawnAccelerator(Accelerator): def __init__(self, trainer, nprocs, cluster_environment=None, ddp_plugin=None): + """ + Runs training using DDP (on a single machine or manually on multiple machines), using mp.spawn + + Example:: + + # default + trainer = Trainer(accelerator=DDPCPUSpawnAccelerator()) + + """ super().__init__(trainer, cluster_environment, ddp_plugin) self.mp_queue = None self.nprocs = nprocs diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 23d7778c1fff7..b6d813f978943 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -39,6 +39,15 @@ class DDPHPCAccelerator(Accelerator): def __init__(self, trainer, cluster_environment=None, ddp_plugin=None): + """ + Runs training using DDP on an HPC cluster + + Example:: + + # default + trainer = Trainer(accelerator=DDPHPCAccelerator()) + + """ super().__init__(trainer, cluster_environment, ddp_plugin) self.task_idx = None self._has_spawned_children = False diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index 2e0bac46c4c20..a30d266ec1b2f 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -43,6 +43,15 @@ class DDPSpawnAccelerator(Accelerator): def __init__(self, trainer, nprocs, cluster_environment=None, ddp_plugin=None): + """ + Runs training using DDP using mp.spawn via manual launch (not cluster launch) + + Example:: + + # default + trainer = Trainer(accelerator=DDPSpawnAccelerator()) + + """ super().__init__(trainer, cluster_environment, ddp_plugin) self.mp_queue = None self.nprocs = nprocs diff --git a/pytorch_lightning/accelerators/dp_accelerator.py b/pytorch_lightning/accelerators/dp_accelerator.py index 0a6eac607d79c..2f6c5dce97c46 100644 --- a/pytorch_lightning/accelerators/dp_accelerator.py +++ b/pytorch_lightning/accelerators/dp_accelerator.py @@ -26,6 +26,15 @@ class DataParallelAccelerator(Accelerator): def __init__(self, trainer, cluster_environment=None): + """ + Runs training using DP via manual start (not HPC cluster) + + Example:: + + # default + trainer = Trainer(accelerator=DataParallelAccelerator()) + + """ super().__init__(trainer, cluster_environment) self.model_autocast_original_forward = None self.dist = LightningDistributed() diff --git a/pytorch_lightning/accelerators/gpu_accelerator.py b/pytorch_lightning/accelerators/gpu_accelerator.py index e5611767547a1..e66f5bcb8b48c 100644 --- a/pytorch_lightning/accelerators/gpu_accelerator.py +++ b/pytorch_lightning/accelerators/gpu_accelerator.py @@ -23,6 +23,15 @@ class GPUAccelerator(Accelerator): amp_backend: AMPType def __init__(self, trainer, cluster_environment=None): + """ + Runs training using a single GPU + + Example:: + + # default + trainer = Trainer(accelerator=GPUAccelerator()) + + """ super().__init__(trainer, cluster_environment) self.dist = LightningDistributed() self.nickname = None diff --git a/pytorch_lightning/accelerators/horovod_accelerator.py b/pytorch_lightning/accelerators/horovod_accelerator.py index e5314a983f9db..3d9191914566d 100644 --- a/pytorch_lightning/accelerators/horovod_accelerator.py +++ b/pytorch_lightning/accelerators/horovod_accelerator.py @@ -33,6 +33,15 @@ class HorovodAccelerator(Accelerator): amp_backend: AMPType def __init__(self, trainer, cluster_environment=None): + """ + Runs training using horovod + + Example:: + + # default + trainer = Trainer(accelerator=HorovodAccelerator()) + + """ super().__init__(trainer, cluster_environment) self.nickname = 'horovod' diff --git a/pytorch_lightning/accelerators/tpu_accelerator.py b/pytorch_lightning/accelerators/tpu_accelerator.py index b60cd5a9dfc9c..5f4e6cc22cacd 100644 --- a/pytorch_lightning/accelerators/tpu_accelerator.py +++ b/pytorch_lightning/accelerators/tpu_accelerator.py @@ -39,6 +39,15 @@ class TPUAccelerator(Accelerator): def __init__(self, trainer, cluster_environment=None): + """ + Runs training using TPUs (colab, single machine or pod) + + Example:: + + # default + trainer = Trainer(accelerator=TPUAccelerator()) + + """ super().__init__(trainer, cluster_environment) self.start_method = None self.mp_queue = None @@ -270,8 +279,6 @@ def early_stopping_should_stop(self, pl_module): def save_spawn_weights(self, model): """ Dump a temporary checkpoint after ddp ends to get weights out of the process - :param model: - :return: """ if self.trainer.is_global_zero: path = os.path.join(self.trainer.default_root_dir, '__temp_weight_distributed_end.ckpt') @@ -282,8 +289,6 @@ def load_spawn_weights(self, original_model): """ Load the temp weights saved in the process To recover the trained model from the ddp process we load the saved weights - :param model: - :return: """ loaded_model = original_model