diff --git a/CHANGELOG.md b/CHANGELOG.md index fee8f94f226b9..efe0c55888b0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -318,6 +318,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Fixed a bug where an infinite recursion would be triggered when using the `BaseFinetuning` callback on a model that contains a `ModuleDict` ([#8170](https://github.com/PyTorchLightning/pytorch-lightning/pull/8170)) +- Fixed NCCL error when selecting non-consecutive device ids ([#8165](https://github.com/PyTorchLightning/pytorch-lightning/pull/8165)) + + - Fixed `log_gpu_memory` metrics not being added to `logging` when nothing else is logged ([#8174](https://github.com/PyTorchLightning/pytorch-lightning/pull/8174)) diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index b855d100b1f12..8efbac4b3b5e2 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -323,8 +323,12 @@ def pre_dispatch(self): def post_dispatch(self) -> None: self.cluster_environment.teardown() - def barrier(self, *args, **kwargs): - if torch_distrib.is_available() and torch_distrib.is_initialized(): + def barrier(self, *args, **kwargs) -> None: + if not torch_distrib.is_initialized(): + return + if _TORCH_GREATER_EQUAL_1_8 and torch.distributed.get_backend() == "nccl": + torch_distrib.barrier(device_ids=self.determine_ddp_device_ids()) + else: torch_distrib.barrier() def broadcast(self, obj: object, src: int = 0) -> object: diff --git a/pytorch_lightning/plugins/training_type/ddp_spawn.py b/pytorch_lightning/plugins/training_type/ddp_spawn.py index b61f9a6052630..cdf53b4854a10 100644 --- a/pytorch_lightning/plugins/training_type/ddp_spawn.py +++ b/pytorch_lightning/plugins/training_type/ddp_spawn.py @@ -309,8 +309,12 @@ def __recover_child_process_weights(self, best_path, last_path): ckpt = pl_load(last_path, map_location=lambda storage, loc: storage) self.lightning_module.load_state_dict(ckpt) - def barrier(self, *args, **kwargs): - if torch_distrib.is_initialized(): + def barrier(self, *args, **kwargs) -> None: + if not torch_distrib.is_initialized(): + return + if _TORCH_GREATER_EQUAL_1_8 and torch.distributed.get_backend() == "nccl": + torch_distrib.barrier(device_ids=self.determine_ddp_device_ids()) + else: torch_distrib.barrier() def broadcast(self, obj: object, src: int = 0) -> object: diff --git a/tests/plugins/test_ddp_plugin.py b/tests/plugins/test_ddp_plugin.py index d236dc145d96c..61c5d70191db2 100644 --- a/tests/plugins/test_ddp_plugin.py +++ b/tests/plugins/test_ddp_plugin.py @@ -11,7 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from unittest import mock + import torch +from torch.nn.parallel import DistributedDataParallel from pytorch_lightning import Trainer from pytorch_lightning.plugins import DDPPlugin @@ -46,3 +49,30 @@ def test_ddp_with_2_gpus(): assert model.device == torch.device("cpu") cuda_memory = torch.cuda.memory_allocated() assert cuda_memory < model.start_cuda_memory + + +class BarrierModel(BoringModel): + + def setup(self, stage=None): + assert not isinstance(self.trainer.accelerator.model, DistributedDataParallel) + self.trainer.accelerator.barrier("barrier before model is wrapped") + + def on_train_start(self): + assert isinstance(self.trainer.accelerator.model, DistributedDataParallel) + self.trainer.accelerator.barrier("barrier after model is wrapped") + + +@RunIf(min_gpus=4, special=True) +@mock.patch("torch.distributed.barrier") +def test_ddp_barrier_non_consecutive_device_ids(barrier_mock, tmpdir): + """ Test correct usage of barriers when device ids do not start at 0 or are not consecutive. """ + model = BoringModel() + gpus = [1, 3] + trainer = Trainer( + default_root_dir=tmpdir, + max_steps=1, + gpus=gpus, + accelerator="ddp", + ) + trainer.fit(model) + barrier_mock.assert_any_call(device_ids=[gpus[trainer.local_rank]])